T3 — Deferred tool-call coordination
Deferred — sync point giữa stream consumer và async tool executor. Loop chỉ proceed khi TẤT CẢ deferred resolved, với timeout 250ms mỗi cái.Tổng quan Overview
Vấn đề cốt lõi của agent streaming: stream events đến tuần tự và nhanh (text delta mỗi vài ms), nhưng tool execution là async và chậm (có thể mất nhiều giây). Stream consumer không thể block — nó phải đọc tiếp event kế tiếp. Nhưng loop không thể proceed đến iteration mới khi tool vẫn đang chạy.
opencode giải quyết bằng Deferred — một promise-like object được
tạo khi stream nhận tool-input-start event, và được resolved khi tool
executor hoàn thành. Cleanup phase sau stream drain sẽ await tất cả
Deferred trước khi loop tiếp.
Deferred<void> về mặt
khái niệm giống Promise<void> với resolve external.
Điểm khác: Deferred integrate với Effect fiber scheduler — có thể interrupt, timeout,
và compose với operators khác.
Phân tích code chi tiết Anatomy
Bước 1: Tạo Deferred khi stream nhận tool-input-start
Trong handleEvent (T2), khi nhận tool-input-start, một entry
mới được thêm vào ctx.toolcalls với Deferred chưa resolved:
session/processor.ts — tạo Deferred
{`
// Trong handleEvent switch:
case "tool-input-start": {
const part = yield* session.updatePart({
status: "pending",
input: {}
})
ctx.toolcalls[value.id] = {
done: yield* Deferred.make<void>(), // ← sync point, chưa resolved
partID: part.id,
messageID: ...,
sessionID: ...
}
return
}
`}Bước 2: Tool executor resolve Deferred khi done
Khi tool execution hoàn thành (success hay error), executor lấy Deferred từ
ctx.toolcalls và gọi Deferred.succeed:
session/processor.ts — resolve Deferred
{`
// Trong tool executor (sau khi tool chạy xong):
const toolcall = ctx.toolcalls[toolCallId]
if (toolcall) {
yield* Deferred.succeed(toolcall.done, undefined)
// Bất kể tool thành công hay thất bại,
// Deferred PHẢI được resolve để không block cleanup
}
`}Bước 3: Cleanup phase await tất cả Deferred
Sau khi stream drain xong, cleanup function iterate qua tất cả entries trong
ctx.toolcalls và await mỗi Deferred với timeout:
session/processor.ts:273-278 — cleanup await all
{`
// Trong cleanup():
yield* Effect.forEach(
Object.values(ctx.toolcalls),
(call) => Deferred.await(call.done).pipe(
Effect.timeout("250 millis"), // per-tool timeout
Effect.ignore, // ignore timeout error (không throw)
),
{ concurrency: "unbounded" } // await all in parallel
)
`} concurrency: "unbounded" nghĩa là tất cả Deferred được await đồng thời —
không phải sequential. Điều này quan trọng khi có nhiều tool calls trong một step.
Nếu sequential, tổng thời gian chờ sẽ là N * 250ms thay vì 250ms.
ToolCall type definition
session/processor.ts:134-195 — ToolCall type
{`
type ToolCall = {
partID: MessageV2.ToolPart["id"]
messageID: MessageV2.ToolPart["messageID"]
sessionID: MessageV2.ToolPart["sessionID"]
done: Deferred.Deferred<void> // ← sync point
}
// ctx là per-stream state:
const ctx = {
toolcalls: {} as Record<string, ToolCall>,
// ...
}
`}Tương tác với các kỹ thuật khác Interaction
T3 là connector giữa T2 (stream consumer) và T4 (cleanup). T2 tạo Deferred entries,
T4 cleanup awaits chúng. T1 chỉ proceed đến iteration mới sau khi T4 cleanup hoàn thành.
Nếu T4 bị interrupt (user cancel), cleanup vẫn chạy nhờ Effect.ensuring
— nhưng Deferred sẽ bị abandoned với status "aborted" thay vì "success".
Failure modes Failures
Failure 1: Stream end trước khi tool done
Nếu không có Deferred coordination, stream kết thúc và loop proceed ngay — tool result bị lost:
Không có coordination — tool result bị mất
{`
// WRONG:
for await (const event of stream) {
if (event.type === "tool-call") {
executeToolAsync(event) // fire-and-forget!
}
}
// Stream done → loop tiếp → model không thấy tool result
// Tool vẫn đang chạy nhưng không có nơi để lưu kết quả
`}Failure 2: Sequential await thay vì parallel
Nếu await sequential thay vì concurrency: "unbounded", performance giảm
đáng kể khi có nhiều tool calls:
Sequential await — chậm khi nhiều tools
{`
// WRONG: sequential
for (const call of Object.values(ctx.toolcalls)) {
await waitForDeferred(call.done, 250) // chờ từng cái
}
// Nếu có 5 tools: chờ 5 * 250ms = 1.25 giây worst case
// CORRECT: parallel
await Promise.all(
Object.values(ctx.toolcalls).map(call =>
Promise.race([call.promise, sleep(250)])
)
)
// Chờ tối đa 250ms dù có bao nhiêu tools
`}Failure 3: Không resolve Deferred khi tool error
Nếu tool executor throw và không resolve Deferred, cleanup phase sẽ block đến timeout:
Tool error không resolve Deferred
{`
// WRONG:
async function executeTool(tc: ToolCall) {
const result = await runTool(tc) // có thể throw!
deferred.resolve() // không chạy nếu throw trên
}
// CORRECT: finally block
async function executeTool(tc: ToolCall) {
try {
const result = await runTool(tc)
// lưu result
} finally {
deferred.resolve() // luôn resolve, dù success hay error
}
}
`}So sánh với các harness khác Compare
| Harness | Tool sync mechanism | Parallel tools | Per-tool timeout |
|---|---|---|---|
| opencode | |||
| Claude Code | |||
| Pydantic AI | |||
| LangChain | |||
| Aider |
Pydantic AI có semantics gần giống nhất với opencode — explicit deferred tool call coordination được document rõ trong API. LangChain và Aider ở cuối phổ: sequential và không có timeout protection.
Implementation recipe Recipe
Native Promise-based deferred coordination, không cần Effect:
deferred-tool-coordination.ts
{`
// Deferred factory using native Promise
function makeDeferred<T>() {
let resolve!: (value: T) => void
let reject!: (reason?: unknown) => void
const promise = new Promise<T>((res, rej) => {
resolve = res
reject = rej
})
return { promise, resolve, reject }
}
// Track in-flight tool calls
const pendingTools = new Map<string, ReturnType<typeof makeDeferred<void>>>()
// Called when stream receives tool-input-start
function registerToolCall(id: string) {
const deferred = makeDeferred<void>()
pendingTools.set(id, deferred)
return deferred
}
// Called when tool executor finishes (success OR error)
function resolveToolCall(id: string) {
const deferred = pendingTools.get(id)
if (deferred) {
deferred.resolve()
pendingTools.delete(id)
}
}
// Called after stream drain — await all with timeout
async function waitForAllTools(timeoutMs = 250) {
const entries = [...pendingTools.values()]
if (entries.length === 0) return
// Parallel await với per-entry timeout
await Promise.all(
entries.map((d) =>
Promise.race([
d.promise,
new Promise<void>((res) => setTimeout(res, timeoutMs))
// timeout resolves (not rejects) — không throw
])
)
)
pendingTools.clear()
}
// Wiring trong stream consumer:
async function processStream(stream: AsyncIterable<StreamEvent>) {
try {
for await (const event of stream) {
switch (event.type) {
case "tool-input-start":
registerToolCall(event.id)
break
case "tool-call":
const deferred = pendingTools.get(event.toolCallId)
// Fire async, resolve khi done
runTool(event).finally(() => resolveToolCall(event.toolCallId))
break
}
}
} finally {
// Cleanup — luôn chạy (try/finally)
await waitForAllTools(250)
}
}
`}Tham khảo Refs
- Effect — Deferred documentation · Deferred semantics và usage patterns
- Pydantic AI — Agent & deferred tool calls · Tương tự opencode pattern
- MDN — Promise.all() · Native parallel await
- MDN — Promise.race() · Timeout pattern
- Effect — Retrying & error management · Effect error handling
- anomalyco/opencode — session/processor.ts · Source tham chiếu: lines 134–195, 273–278