← opencode report

T3 — Deferred tool-call coordination

Mỗi tool call được gán một Deferred — sync point giữa stream consumer và async tool executor. Loop chỉ proceed khi TẤT CẢ deferred resolved, với timeout 250ms mỗi cái.
Nhóm: A — Agent Loop & StreamingFile: session/processor.tsLines: 134–195, 273–278, 544–582ID: A.3

Tổng quan Overview

Vấn đề cốt lõi của agent streaming: stream events đến tuần tự và nhanh (text delta mỗi vài ms), nhưng tool execution là async và chậm (có thể mất nhiều giây). Stream consumer không thể block — nó phải đọc tiếp event kế tiếp. Nhưng loop không thể proceed đến iteration mới khi tool vẫn đang chạy.

opencode giải quyết bằng Deferred — một promise-like object được tạo khi stream nhận tool-input-start event, và được resolved khi tool executor hoàn thành. Cleanup phase sau stream drain sẽ await tất cả Deferred trước khi loop tiếp.

Tại sao quan trọng: Nếu stream end trước khi tool done, có hai tình huống xấu: (1) loop proceed với incomplete tool results — model thấy output thiếu, (2) tool kết thúc sau khi state đã được reset — kết quả bị lost. Deferred là "bridge" chính xác giữa hai timeline không đồng bộ này.
Deferred vs Promise: Effect Deferred<void> về mặt khái niệm giống Promise<void> với resolve external. Điểm khác: Deferred integrate với Effect fiber scheduler — có thể interrupt, timeout, và compose với operators khác.
Timeout 250ms: Giá trị 250ms là deliberate — đủ để tool "fast" (file read, ls, echo) hoàn thành, nhưng không block quá lâu nếu tool bị stuck. Tool nặng (compile, network request) thường không bị ảnh hưởng vì chúng resolve trước timeout.

Phân tích code chi tiết Anatomy

Bước 1: Tạo Deferred khi stream nhận tool-input-start

Trong handleEvent (T2), khi nhận tool-input-start, một entry mới được thêm vào ctx.toolcalls với Deferred chưa resolved:

session/processor.ts — tạo Deferred

JSON
{`
// Trong handleEvent switch:
case "tool-input-start": {
  const part = yield* session.updatePart({
    status: "pending",
    input: {}
  })
  ctx.toolcalls[value.id] = {
    done: yield* Deferred.make<void>(),  // ← sync point, chưa resolved
    partID: part.id,
    messageID: ...,
    sessionID: ...
  }
  return
}
`}

Bước 2: Tool executor resolve Deferred khi done

Khi tool execution hoàn thành (success hay error), executor lấy Deferred từ ctx.toolcalls và gọi Deferred.succeed:

session/processor.ts — resolve Deferred

TS
{`
// Trong tool executor (sau khi tool chạy xong):
const toolcall = ctx.toolcalls[toolCallId]
if (toolcall) {
  yield* Deferred.succeed(toolcall.done, undefined)
  // Bất kể tool thành công hay thất bại,
  // Deferred PHẢI được resolve để không block cleanup
}
`}

Bước 3: Cleanup phase await tất cả Deferred

Sau khi stream drain xong, cleanup function iterate qua tất cả entries trong ctx.toolcalls và await mỗi Deferred với timeout:

session/processor.ts:273-278 — cleanup await all

TS
{`
// Trong cleanup():
yield* Effect.forEach(
  Object.values(ctx.toolcalls),
  (call) => Deferred.await(call.done).pipe(
    Effect.timeout("250 millis"),  // per-tool timeout
    Effect.ignore,                  // ignore timeout error (không throw)
  ),
  { concurrency: "unbounded" }      // await all in parallel
)
`}

concurrency: "unbounded" nghĩa là tất cả Deferred được await đồng thời — không phải sequential. Điều này quan trọng khi có nhiều tool calls trong một step. Nếu sequential, tổng thời gian chờ sẽ là N * 250ms thay vì 250ms.

ToolCall type definition

session/processor.ts:134-195 — ToolCall type

TS
{`
type ToolCall = {
  partID: MessageV2.ToolPart["id"]
  messageID: MessageV2.ToolPart["messageID"]
  sessionID: MessageV2.ToolPart["sessionID"]
  done: Deferred.Deferred<void>  // ← sync point
}

// ctx là per-stream state:
const ctx = {
  toolcalls: {} as Record<string, ToolCall>,
  // ...
}
`}

Tương tác với các kỹ thuật khác Interaction

Timeline: một iteration với 2 tool calls song song T2 stream consumer: ──[tool-input-start id=A]──[tool-input-start id=B]──[tool-call A]──[tool-call B]──[finish]──► │ │ │ │ ▼ ▼ ▼ ▼ Deferred A Deferred B execute A execute B (unresolved) (unresolved) (async) (async) │ │ T4 cleanup (Effect.ensuring — luôn chạy): │ │ stream drain done │ │ │ │ │ ▼ │ │ await [Deferred A, Deferred B] ◄───────── resolve A ─┘ ◄── resolve B ──┘ (concurrency: unbounded, timeout 250ms each) │ ▼ T1: loop tiếp — check exit condition (lastAssistant.finish, hasToolCalls, v.v.) T6 doom loop check xảy ra TRONG T2 khi nhận tool-call event

T3 là connector giữa T2 (stream consumer) và T4 (cleanup). T2 tạo Deferred entries, T4 cleanup awaits chúng. T1 chỉ proceed đến iteration mới sau khi T4 cleanup hoàn thành. Nếu T4 bị interrupt (user cancel), cleanup vẫn chạy nhờ Effect.ensuring — nhưng Deferred sẽ bị abandoned với status "aborted" thay vì "success".

Failure modes Failures

Failure 1: Stream end trước khi tool done

Nếu không có Deferred coordination, stream kết thúc và loop proceed ngay — tool result bị lost:

Không có coordination — tool result bị mất

TS
{`
// WRONG:
for await (const event of stream) {
  if (event.type === "tool-call") {
    executeToolAsync(event)  // fire-and-forget!
  }
}
// Stream done → loop tiếp → model không thấy tool result
// Tool vẫn đang chạy nhưng không có nơi để lưu kết quả
`}

Failure 2: Sequential await thay vì parallel

Nếu await sequential thay vì concurrency: "unbounded", performance giảm đáng kể khi có nhiều tool calls:

Sequential await — chậm khi nhiều tools

TS
{`
// WRONG: sequential
for (const call of Object.values(ctx.toolcalls)) {
  await waitForDeferred(call.done, 250)  // chờ từng cái
}
// Nếu có 5 tools: chờ 5 * 250ms = 1.25 giây worst case

// CORRECT: parallel
await Promise.all(
  Object.values(ctx.toolcalls).map(call =>
    Promise.race([call.promise, sleep(250)])
  )
)
// Chờ tối đa 250ms dù có bao nhiêu tools
`}

Failure 3: Không resolve Deferred khi tool error

Nếu tool executor throw và không resolve Deferred, cleanup phase sẽ block đến timeout:

Tool error không resolve Deferred

TS
{`
// WRONG:
async function executeTool(tc: ToolCall) {
  const result = await runTool(tc)  // có thể throw!
  deferred.resolve()                 // không chạy nếu throw trên
}

// CORRECT: finally block
async function executeTool(tc: ToolCall) {
  try {
    const result = await runTool(tc)
    // lưu result
  } finally {
    deferred.resolve()  // luôn resolve, dù success hay error
  }
}
`}

So sánh với các harness khác Compare

HarnessTool sync mechanismParallel toolsPer-tool timeout
opencodeEffect DeferredUnbounded concurrency250ms per tool
Claude CodePromise.all() trực tiếpKhông có per-tool timeout
Pydantic AIExplicit deferred tool call semanticsGlobal timeout
LangChainSequential tool executeKhôngKhông
AiderSynchronous (blocking)KhôngKhông

Pydantic AI có semantics gần giống nhất với opencode — explicit deferred tool call coordination được document rõ trong API. LangChain và Aider ở cuối phổ: sequential và không có timeout protection.

Implementation recipe Recipe

Native Promise-based deferred coordination, không cần Effect:

deferred-tool-coordination.ts

TS
{`
// Deferred factory using native Promise
function makeDeferred<T>() {
  let resolve!: (value: T) => void
  let reject!: (reason?: unknown) => void
  const promise = new Promise<T>((res, rej) => {
    resolve = res
    reject = rej
  })
  return { promise, resolve, reject }
}

// Track in-flight tool calls
const pendingTools = new Map<string, ReturnType<typeof makeDeferred<void>>>()

// Called when stream receives tool-input-start
function registerToolCall(id: string) {
  const deferred = makeDeferred<void>()
  pendingTools.set(id, deferred)
  return deferred
}

// Called when tool executor finishes (success OR error)
function resolveToolCall(id: string) {
  const deferred = pendingTools.get(id)
  if (deferred) {
    deferred.resolve()
    pendingTools.delete(id)
  }
}

// Called after stream drain — await all with timeout
async function waitForAllTools(timeoutMs = 250) {
  const entries = [...pendingTools.values()]
  if (entries.length === 0) return

  // Parallel await với per-entry timeout
  await Promise.all(
    entries.map((d) =>
      Promise.race([
        d.promise,
        new Promise<void>((res) => setTimeout(res, timeoutMs))
        // timeout resolves (not rejects) — không throw
      ])
    )
  )
  pendingTools.clear()
}

// Wiring trong stream consumer:
async function processStream(stream: AsyncIterable<StreamEvent>) {
  try {
    for await (const event of stream) {
      switch (event.type) {
        case "tool-input-start":
          registerToolCall(event.id)
          break
        case "tool-call":
          const deferred = pendingTools.get(event.toolCallId)
          // Fire async, resolve khi done
          runTool(event).finally(() => resolveToolCall(event.toolCallId))
          break
      }
    }
  } finally {
    // Cleanup — luôn chạy (try/finally)
    await waitForAllTools(250)
  }
}
`}

Tham khảo Refs

Nguồn chính