T4 — Interruption-safe scope cleanup
Effect.ensuring(cleanup()) — cleanup LUÔN chạy dù stream kết thúc bình thường, bị cancel bởi user, hay gặp error không mong đợi.Tổng quan Overview
Khi user nhấn Ctrl+C giữa lúc tool đang chạy, điều gì xảy ra? Tool có được đánh dấu "aborted" không? File handle có được đóng không? UI có thoát khỏi trạng thái "running" không? Database connection có được cleanup không?
Trong harness không có explicit cleanup, câu trả lời thường là: không. Tài nguyên
bị leak, UI stuck ở trạng thái cũ, và state không nhất quán. opencode giải quyết
bằng Effect.ensuring — semantic tương đương finally
nhưng composable và interrupt-aware.
Effect.ensuring là "transaction semantic" — cleanup luôn
xảy ra, giống ACID trong database. Tính chất này composable: nesting ensuring vẫn
đảm bảo cleanup thứ tự đúng.
Phân tích code chi tiết Anatomy
Cấu trúc lồng nhau của pipeline
Pipeline cleanup có nhiều layer, từ ngoài vào trong:
session/processor.ts:544-582 — full pipeline
{`
return yield* Effect.gen(function* () {
yield* Effect.gen(function* () {
// Reset state cho iteration mới
ctx.currentText = undefined
ctx.reasoningMap = {}
// T2: Consume stream với demux
const stream = llm.stream(streamInput)
yield* stream.pipe(
Stream.tap((event) => handleEvent(event)),
Stream.takeUntil(() => ctx.needsCompaction),
Stream.runDrain,
)
}).pipe(
// Hook: khi bị interrupt → mark aborted
Effect.onInterrupt(() => Effect.gen(function* () {
aborted = true
if (!ctx.assistantMessage.error) {
yield* halt(new DOMException("Aborted", "AbortError"))
}
})),
// Chỉ propagate non-interrupt errors
Effect.catchCauseIf(
(cause) => !Cause.hasInterruptsOnly(cause),
(cause) => Effect.fail(Cause.squash(cause)),
),
// Retry logic (bên trong ensuring)
Effect.retry(retrySchedule),
// Catch remaining errors → halt
Effect.catch(halt),
// ← CLEANUP LUÔN CHẠY — dù retry, interrupt, error, hay normal
Effect.ensuring(cleanup()),
)
})
`}cleanup() function — finalize state
Cleanup finalize tất cả pending tools với status "aborted", lưu partial text, và resolve tất cả Deferred (T3):
cleanup() — finalize everything
{`
function* cleanup() {
// 1. Finalize partial text nếu có
if (ctx.currentText) {
yield* session.finalizeText(ctx.currentText)
}
// 2. Mark tất cả pending tools là aborted
for (const [id, call] of Object.entries(ctx.toolcalls)) {
yield* session.updatePart(call.partID, { status: "aborted" })
// 3. Resolve Deferred (T3) để không block
yield* Deferred.succeed(call.done, undefined)
}
// 4. Update session status
yield* status.set(sessionID, { type: "idle" })
}
`}onInterrupt hook — detect user cancel
Effect.onInterrupt chạy khi fiber bị interrupt (user Ctrl+C hay
programmatic cancel). Khác với cleanup — onInterrupt chỉ chạy khi có interrupt,
không chạy khi normal exit hay error.
Interrupt handling
{`
Effect.onInterrupt(() => Effect.gen(function* () {
aborted = true // flag cho code downstream biết
// Chỉ halt nếu chưa có error khác
if (!ctx.assistantMessage.error) {
yield* halt(new DOMException("Aborted", "AbortError"))
}
}))
// Sau onInterrupt → ensuring(cleanup()) vẫn chạy
`}Tương tác với các kỹ thuật khác Interaction
T4 bọc T2 và T3 — nó là "transaction boundary" của một iteration. Khi T2 stream bị interrupt, T4 đảm bảo T3 cleanup (resolve Deferreds) vẫn xảy ra. T1 chỉ nhận control sau khi T4 ensuring hoàn thành, đảm bảo mọi state được finalize trước khi loop check exit condition.
Failure modes Failures
Failure 1: Tool UI stuck 'running' sau cancel
Nếu không có cleanup, tool bị interrupt sẽ không được đánh dấu "aborted" — UI hiển thị tool đang chạy mãi mãi:
Không có cleanup — UI stuck
{`
// WRONG: không cleanup khi cancel
async function processStream(signal: AbortSignal) {
for await (const event of stream) {
if (signal.aborted) return // exit ngay, không cleanup
handleEvent(event)
}
}
// Tool partID vẫn có status: "running" trong DB
// User reload page → vẫn thấy tool "running" từ session cũ
`}Failure 2: File handle leak sau error
Subprocess và file handles không được đóng khi stream bị interrupt giữa chừng:
Resource leak
{`
// Tool mở file để đọc
const file = await fs.open(path, "r")
try {
const content = await file.readFile()
return content
} catch (err) {
// Nếu stream bị interrupt ở đây:
// file.close() không được gọi → fd leak
throw err
}
// CORRECT: dùng finally
} finally {
await file.close() // luôn đóng
}
`}Failure 3: Retry mất cleanup
Đặt retry NGOÀI ensuring thay vì bên trong gây ra cleanup bị gọi sau mỗi lần retry, không phải sau khi retry exhausted:
Retry ngoài ensuring — cleanup sai timing
{`
// WRONG: cleanup chạy sau mỗi retry
pipeline.pipe(
Effect.ensuring(cleanup()), // ← chạy sau mỗi attempt
Effect.retry(schedule),
)
// CORRECT: cleanup chạy sau retry exhausted
pipeline.pipe(
Effect.retry(schedule), // retry trong ensuring
Effect.ensuring(cleanup()), // ← chỉ chạy 1 lần sau cùng
)
`}So sánh với các harness khác Compare
| Harness | Cleanup mechanism | Interrupt-safe | Composable |
|---|---|---|---|
| opencode | |||
| Vanilla JS | |||
| Python asyncio | |||
| Aider | |||
| LangChain |
Vanilla JS try/finally là equivalent gần nhất nhưng không composable — nesting nhiều cleanup blocks dễ gây lỗi thứ tự. Python asyncio CancelledError pattern tương tự nhưng phải remember catch nó. Effect.ensuring là solution elegant nhất.
Implementation recipe Recipe
Vanilla TypeScript pattern với try/finally và AbortController — không cần Effect:
interruption-safe-stream.ts
{`
interface StreamContext {
pendingTools: Map<string, { resolve: () => void }>
currentText: string
aborted: boolean
}
async function streamWithCleanup(
streamFn: (signal: AbortSignal) => AsyncIterable<StreamEvent>,
onEvent: (event: StreamEvent, ctx: StreamContext) => Promise<void>,
options = { retries: 2, toolTimeoutMs: 250 }
) {
const ctx: StreamContext = {
pendingTools: new Map(),
currentText: "",
aborted: false,
}
const controller = new AbortController()
// Expose cancel để caller có thể trigger
const cancel = () => {
ctx.aborted = true
controller.abort()
}
let attempt = 0
while (attempt <= options.retries) {
try {
const stream = streamFn(controller.signal)
for await (const event of stream) {
if (controller.signal.aborted) break
await onEvent(event, ctx)
}
// Success — exit retry loop
break
} catch (err) {
if (controller.signal.aborted) {
// User cancel — không retry
break
}
attempt++
if (attempt > options.retries) throw err
// Wait before retry
await new Promise((r) => setTimeout(r, Math.pow(2, attempt) * 1000))
} finally {
// CLEANUP — luôn chạy (cancel, error, success)
await cleanup(ctx, options.toolTimeoutMs)
}
}
return { aborted: ctx.aborted, text: ctx.currentText }
}
async function cleanup(ctx: StreamContext, timeoutMs: number) {
// 1. Await all pending tools with timeout
await Promise.all(
[...ctx.pendingTools.values()].map((d) =>
Promise.race([
new Promise<void>((r) => {
// Replace resolve to track when done
const orig = d.resolve
d.resolve = () => { orig(); r() }
}),
new Promise<void>((r) => setTimeout(r, timeoutMs)),
])
)
)
// 2. Mark remaining tools aborted
ctx.pendingTools.clear()
console.log("[cleanup] done — text length:", ctx.currentText.length)
}
`}Tham khảo Refs
- Effect — Retrying docs · Effect.ensuring, Effect.retry semantics
- Why Effect is Becoming the Go-To Choice for TypeScript APIs · Structured concurrency motivation
- EffectPatterns community knowledge base · Patterns tái sử dụng với Effect
- MDN — AbortController · Native cancel signal cho async operations
- Python asyncio — Task Cancellation · CancelledError pattern tương đương
- anomalyco/opencode — session/processor.ts:544-582 · Source tham chiếu chính