← opencode report

T2 — Streaming event demultiplexer

opencode bọc async iterable stream từ AI SDK trong Effect.Stream, sau đó demultiplex từng event theo type vào handler riêng — cho phép UI update real-time, cancel mid-stream, và composable retry.
Nhóm: A — Agent Loop & StreamingFile: session/processor.tsHàm: handleEvent()Lines: 216–461ID: A.2

Tổng quan Overview

LLM streaming API trả về một chuỗi events không đồng nhất: có lúc là text delta, có lúc là reasoning fragment, có lúc là tool call bắt đầu, có lúc là finish signal. Naive implementation thường xử lý tất cả như nhau hoặc chỉ quan tâm đến text.

opencode dùng mẫu demultiplexer: một switch lớn phân loại từng event theo value.type, rồi route vào handler state riêng. Mỗi handler mutate một phần của ctx (context object) — tách biệt hoàn toàn state của reasoning, text, tool calls, và finish reason.

Tại sao quan trọng: Nếu không demux, bạn không thể: (1) hiển thị reasoning riêng với text chính, (2) track từng tool call độc lập, (3) cancel chỉ một loại event mà không ảnh hưởng loại khác. opencode dùng pattern này để drive UI update real-time với granularity cao nhất có thể.
AI SDK event types: Vercel AI SDK 5 trả 7+ event types trong stream: reasoning-start, reasoning-delta, text-delta, tool-input-start, tool-input-delta, tool-call, finish. opencode handle toàn bộ.
Effect.Stream khác gì async iterator? Effect.Stream có built-in interrupt, backpressure, composable operators (takeUntil, tap, retry). Async iterator thuần không có interrupt semantics — phải tự quản lý AbortController.

Phân tích code chi tiết Anatomy

handleEvent — switch theo event type

Hàm handleEvent được wrap trong Effect.fnUntraced — không trace từng event (quá nhiều), nhưng vẫn chạy trong Effect runtime:

session/processor.ts:216-300 — handleEvent core

TS
{`
const handleEvent = Effect.fnUntraced(function* (value: StreamEvent) {
  switch (value.type) {

    case "reasoning-start":
      ctx.reasoningMap[value.id] = {
        id: PartID.ascending(),
        text: "",
        time: { start: Date.now() }
      }
      yield* session.updatePart(ctx.reasoningMap[value.id])
      return

    case "reasoning-delta":
      ctx.reasoningMap[value.id].text += value.text
      yield* session.updatePartDelta({ delta: value.text })
      return

    case "text-delta":
      ctx.currentText = (ctx.currentText ?? "") + value.text
      yield* session.updatePartDelta({ delta: value.text })
      return

    case "tool-input-start":
      const part = yield* session.updatePart({ status: "pending", input: {} })
      ctx.toolcalls[value.id] = {
        done: yield* Deferred.make<void>(),  // ← T3: Deferred sync point
        partID: part.id
      }
      return

    case "tool-call":
      yield* updateToolCall(value.toolCallId, (m) => ({
        ...m, status: "running", input: value.input
      }))
      // Doom loop check — T6
      yield* checkDoomLoop(value.toolName, value.input)
      return

    case "finish":
      ctx.finish = value.finishReason
      return
  }
})
`}

Effect.Stream wrapping — interrupt và takeUntil

Stream được bọc với Stream.tap (side-effect mỗi event) và Stream.takeUntil (dừng khi cần compact):

session/processor.ts:544-554 — stream pipeline

TS
{`
// llm.stream() trả Effect.Stream từ AI SDK async iterable
const stream = llm.stream(streamInput)

yield* stream.pipe(
  Stream.tap((event) => handleEvent(event)),     // route mỗi event
  Stream.takeUntil(() => ctx.needsCompaction),   // stop sớm nếu overflow
  Stream.runDrain,                                // consume toàn bộ
)
`}

Context object — mutable state per-stream

ctx là object được tạo mới mỗi lần stream bắt đầu, chứa tất cả mutable state cho iteration đó:

Context object structure

TS
{`
const ctx = {
  currentText: undefined as string | undefined,
  reasoningMap: {} as Record<string, ReasoningPart>,
  toolcalls: {} as Record<string, ToolCall>,   // T3 Deferred entries
  finish: undefined as string | undefined,
  needsCompaction: false,
  assistantMessage: ...,
}
`}

Tương tác với các kỹ thuật khác Interaction

T2 là điểm hội tụ của nhiều kỹ thuật — nó kết nối hầu hết nhóm A:

AI SDK stream (async iterable) │ ▼ llm.stream() — Effect.Stream wrapper │ Stream.tap(handleEvent) │ ┌──────┴──────────────────────────────────────┐ │ switch (value.type) │ │ │ │ "reasoning-*" ──► ctx.reasoningMap update │ │ "text-delta" ──► ctx.currentText update │ │ "tool-input-start" ──► T3: Deferred.make │ │ "tool-call" ──► T6: doom loop check │ │ ──► tool execute trigger │ │ "finish" ──► ctx.finish = reason │ └──────────────────────────────────────────────┘ │ Stream.takeUntil(ctx.needsCompaction) ← T1: compaction signal │ T4: Effect.ensuring(cleanup()) ← bọc bên ngoài toàn bộ stream │ T3: await all Deferreds (tools done) │ T1: loop tiếp iteration tiếp theo

Khi case tool-input-start được trigger, T2 tạo một Deferred entry mới trong ctx.toolcalls — đây là sync point cho T3. Khi stream finish, T3 sẽ await tất cả Deferred này. T4 bọc toàn bộ pipeline với Effect.ensuring, đảm bảo cleanup dù stream bị interrupt hay lỗi.

Failure modes Failures

Failure 1: Buffer toàn bộ response — mất real-time

Nhiều harness không stream — chờ full response rồi mới xử lý. Điều này mất hoàn toàn khả năng hiển thị progress cho user:

Anti-pattern: buffer toàn bộ

TS
{`
// WRONG: không streaming
const response = await llm.generate(messages)  // block đến khi done

// Hậu quả:
// - User thấy màn hình trắng trong 10-30 giây
// - Không thể cancel giữa chừng
// - Reasoning text bị mất nếu error xảy ra giữa stream
`}

Failure 2: Không handle tool-input-start riêng

Nếu không tách tool-input-start khỏi tool-call, không thể hiển thị tool đang "loading" (trước khi có đủ input). UI sẽ nhảy đột ngột từ không có gì sang "tool complete".

Thiếu tool-input-start handling

JSON
{`
// Nhiều SDK chỉ expose:
case "tool_call":  // đã có đủ name + input
  execute(tc)

// opencode tách làm 2 phase:
case "tool-input-start":  // UI hiển thị "pending"
  ctx.toolcalls[id] = { done: Deferred.make(), status: "pending" }
case "tool-call":          // UI chuyển sang "running"
  updateToolCall(id, { status: "running" })
`}

Failure 3: Không cooperative cancel

Nếu stream không có cancel mechanism, user phải đợi toàn bộ response xong mới thoát được. opencode dùng Stream.takeUntil và Effect interrupt.

Thiếu cancel — stream chạy đến hết dù user đã thoát

TS
{`
// WRONG: không có cancel mechanism
for await (const event of stream) {
  handle(event)
  // nếu user cancel, vòng lặp vẫn tiếp tục
  // cho đến khi stream kết thúc tự nhiên
}

// CORRECT: cooperative cancel
for await (const event of stream) {
  if (abortSignal.aborted) break  // check mỗi event
  handle(event)
}
`}

So sánh với các harness khác Compare

HarnessStreaming modeEvent demuxCancel support
opencodeFull streaming + Effect.Stream7 event types, per-type stateStream.takeUntil + interrupt
AiderBuffer full responseKhông — xử lý sau khi doneKhông
Cursor ComposerStream text delta onlyChỉ text, không tool-input-startAbortController cơ bản
ClineStream nhưng forEachKhông wrap trong cancellable streamPartial
Claude CodeStreamingKhông rõ per-type stateAbortController

opencode là harness duy nhất trong danh sách handle tool-input-start như một event riêng biệt, cho phép UI hiển thị "pending" state trước khi tool có đủ input để chạy.

Implementation recipe Recipe

Minimal TypeScript demultiplexer không cần Effect dependency — dùng async iterator và AbortController:

stream-demux.ts

TS
{`
type StreamEvent =
  | { type: "text-delta"; text: string }
  | { type: "tool-input-start"; id: string; toolName: string }
  | { type: "tool-call"; toolCallId: string; toolName: string; input: unknown }
  | { type: "reasoning-start"; id: string }
  | { type: "reasoning-delta"; id: string; text: string }
  | { type: "finish"; finishReason: string }

interface DemuxContext {
  currentText: string
  reasoningMap: Record<string, { text: string }>
  pendingTools: Map<string, { resolve: () => void; promise: Promise<void> }>
  finishReason: string | null
  needsStop: boolean
}

async function demuxStream(
  stream: AsyncIterable<StreamEvent>,
  ctx: DemuxContext,
  signal?: AbortSignal
) {
  for await (const event of stream) {
    // Cooperative cancel
    if (signal?.aborted || ctx.needsStop) break

    switch (event.type) {
      case "reasoning-start":
        ctx.reasoningMap[event.id] = { text: "" }
        break

      case "reasoning-delta":
        ctx.reasoningMap[event.id].text += event.text
        break

      case "text-delta":
        ctx.currentText += event.text
        break

      case "tool-input-start": {
        // Track deferred cho T3 pattern
        let resolve!: () => void
        const promise = new Promise<void>((r) => { resolve = r })
        ctx.pendingTools.set(event.id, { resolve, promise })
        break
      }

      case "tool-call":
        // Execute async, resolve deferred khi done
        const deferred = ctx.pendingTools.get(event.toolCallId)
        executeToolAsync(event).then(() => deferred?.resolve())
        break

      case "finish":
        ctx.finishReason = event.finishReason
        break
    }
  }
}

// Tách execute async — không block stream consumer
async function executeToolAsync(tc: Extract<StreamEvent, { type: "tool-call" }>) {
  // tool execution logic
}
`}
Recipe trên là simplified. Production cần thêm error handling per-event, reasoning text accumulation, và integration với T3 (Deferred coordination) và T4 (cleanup).

Tham khảo Refs

Nguồn chính