T2 — Streaming event demultiplexer
Effect.Stream, sau đó demultiplex từng event theo type vào handler riêng — cho phép UI update real-time, cancel mid-stream, và composable retry.Tổng quan Overview
LLM streaming API trả về một chuỗi events không đồng nhất: có lúc là text delta, có lúc là reasoning fragment, có lúc là tool call bắt đầu, có lúc là finish signal. Naive implementation thường xử lý tất cả như nhau hoặc chỉ quan tâm đến text.
opencode dùng mẫu demultiplexer: một switch lớn phân loại từng event
theo value.type, rồi route vào handler state riêng. Mỗi handler mutate
một phần của ctx (context object) — tách biệt hoàn toàn state của
reasoning, text, tool calls, và finish reason.
reasoning-start, reasoning-delta, text-delta,
tool-input-start, tool-input-delta, tool-call,
finish. opencode handle toàn bộ.
takeUntil, tap,
retry). Async iterator thuần không có interrupt semantics — phải tự quản lý
AbortController.
Phân tích code chi tiết Anatomy
handleEvent — switch theo event type
Hàm handleEvent được wrap trong Effect.fnUntraced —
không trace từng event (quá nhiều), nhưng vẫn chạy trong Effect runtime:
session/processor.ts:216-300 — handleEvent core
{`
const handleEvent = Effect.fnUntraced(function* (value: StreamEvent) {
switch (value.type) {
case "reasoning-start":
ctx.reasoningMap[value.id] = {
id: PartID.ascending(),
text: "",
time: { start: Date.now() }
}
yield* session.updatePart(ctx.reasoningMap[value.id])
return
case "reasoning-delta":
ctx.reasoningMap[value.id].text += value.text
yield* session.updatePartDelta({ delta: value.text })
return
case "text-delta":
ctx.currentText = (ctx.currentText ?? "") + value.text
yield* session.updatePartDelta({ delta: value.text })
return
case "tool-input-start":
const part = yield* session.updatePart({ status: "pending", input: {} })
ctx.toolcalls[value.id] = {
done: yield* Deferred.make<void>(), // ← T3: Deferred sync point
partID: part.id
}
return
case "tool-call":
yield* updateToolCall(value.toolCallId, (m) => ({
...m, status: "running", input: value.input
}))
// Doom loop check — T6
yield* checkDoomLoop(value.toolName, value.input)
return
case "finish":
ctx.finish = value.finishReason
return
}
})
`}Effect.Stream wrapping — interrupt và takeUntil
Stream được bọc với Stream.tap (side-effect mỗi event) và
Stream.takeUntil (dừng khi cần compact):
session/processor.ts:544-554 — stream pipeline
{`
// llm.stream() trả Effect.Stream từ AI SDK async iterable
const stream = llm.stream(streamInput)
yield* stream.pipe(
Stream.tap((event) => handleEvent(event)), // route mỗi event
Stream.takeUntil(() => ctx.needsCompaction), // stop sớm nếu overflow
Stream.runDrain, // consume toàn bộ
)
`}Context object — mutable state per-stream
ctx là object được tạo mới mỗi lần stream bắt đầu, chứa tất cả
mutable state cho iteration đó:
Context object structure
{`
const ctx = {
currentText: undefined as string | undefined,
reasoningMap: {} as Record<string, ReasoningPart>,
toolcalls: {} as Record<string, ToolCall>, // T3 Deferred entries
finish: undefined as string | undefined,
needsCompaction: false,
assistantMessage: ...,
}
`}Tương tác với các kỹ thuật khác Interaction
T2 là điểm hội tụ của nhiều kỹ thuật — nó kết nối hầu hết nhóm A:
Khi case tool-input-start được trigger, T2 tạo một Deferred
entry mới trong ctx.toolcalls — đây là sync point cho T3. Khi stream finish,
T3 sẽ await tất cả Deferred này. T4 bọc toàn bộ pipeline với
Effect.ensuring, đảm bảo cleanup dù stream bị interrupt hay lỗi.
Failure modes Failures
Failure 1: Buffer toàn bộ response — mất real-time
Nhiều harness không stream — chờ full response rồi mới xử lý. Điều này mất hoàn toàn khả năng hiển thị progress cho user:
Anti-pattern: buffer toàn bộ
{`
// WRONG: không streaming
const response = await llm.generate(messages) // block đến khi done
// Hậu quả:
// - User thấy màn hình trắng trong 10-30 giây
// - Không thể cancel giữa chừng
// - Reasoning text bị mất nếu error xảy ra giữa stream
`}Failure 2: Không handle tool-input-start riêng
Nếu không tách tool-input-start khỏi tool-call, không thể
hiển thị tool đang "loading" (trước khi có đủ input). UI sẽ nhảy đột ngột từ không
có gì sang "tool complete".
Thiếu tool-input-start handling
{`
// Nhiều SDK chỉ expose:
case "tool_call": // đã có đủ name + input
execute(tc)
// opencode tách làm 2 phase:
case "tool-input-start": // UI hiển thị "pending"
ctx.toolcalls[id] = { done: Deferred.make(), status: "pending" }
case "tool-call": // UI chuyển sang "running"
updateToolCall(id, { status: "running" })
`}Failure 3: Không cooperative cancel
Nếu stream không có cancel mechanism, user phải đợi toàn bộ response xong
mới thoát được. opencode dùng Stream.takeUntil và Effect interrupt.
Thiếu cancel — stream chạy đến hết dù user đã thoát
{`
// WRONG: không có cancel mechanism
for await (const event of stream) {
handle(event)
// nếu user cancel, vòng lặp vẫn tiếp tục
// cho đến khi stream kết thúc tự nhiên
}
// CORRECT: cooperative cancel
for await (const event of stream) {
if (abortSignal.aborted) break // check mỗi event
handle(event)
}
`}So sánh với các harness khác Compare
| Harness | Streaming mode | Event demux | Cancel support |
|---|---|---|---|
| opencode | |||
| Aider | |||
| Cursor Composer | |||
| Cline | |||
| Claude Code |
opencode là harness duy nhất trong danh sách handle tool-input-start
như một event riêng biệt, cho phép UI hiển thị "pending" state trước khi tool
có đủ input để chạy.
Implementation recipe Recipe
Minimal TypeScript demultiplexer không cần Effect dependency — dùng async iterator và AbortController:
stream-demux.ts
{`
type StreamEvent =
| { type: "text-delta"; text: string }
| { type: "tool-input-start"; id: string; toolName: string }
| { type: "tool-call"; toolCallId: string; toolName: string; input: unknown }
| { type: "reasoning-start"; id: string }
| { type: "reasoning-delta"; id: string; text: string }
| { type: "finish"; finishReason: string }
interface DemuxContext {
currentText: string
reasoningMap: Record<string, { text: string }>
pendingTools: Map<string, { resolve: () => void; promise: Promise<void> }>
finishReason: string | null
needsStop: boolean
}
async function demuxStream(
stream: AsyncIterable<StreamEvent>,
ctx: DemuxContext,
signal?: AbortSignal
) {
for await (const event of stream) {
// Cooperative cancel
if (signal?.aborted || ctx.needsStop) break
switch (event.type) {
case "reasoning-start":
ctx.reasoningMap[event.id] = { text: "" }
break
case "reasoning-delta":
ctx.reasoningMap[event.id].text += event.text
break
case "text-delta":
ctx.currentText += event.text
break
case "tool-input-start": {
// Track deferred cho T3 pattern
let resolve!: () => void
const promise = new Promise<void>((r) => { resolve = r })
ctx.pendingTools.set(event.id, { resolve, promise })
break
}
case "tool-call":
// Execute async, resolve deferred khi done
const deferred = ctx.pendingTools.get(event.toolCallId)
executeToolAsync(event).then(() => deferred?.resolve())
break
case "finish":
ctx.finishReason = event.finishReason
break
}
}
}
// Tách execute async — không block stream consumer
async function executeToolAsync(tc: Extract<StreamEvent, { type: "tool-call" }>) {
// tool execution logic
}
`}Tham khảo Refs
- Vercel AI SDK — streamText lifecycle events · Danh sách đầy đủ event types
- Vercel AI SDK 5 blog · onInputStart/onInputDelta/onInputAvailable hooks mới
- Effect — Stream introduction · Effect.Stream concepts và operators
- TDS — Building a Streaming Agent · Demux pattern trong practice
- MDN — Streams API · Web native streaming primitives
- anomalyco/opencode — session/processor.ts · Source tham chiếu chính