← opencode report

T4 — Interruption-safe scope cleanup

Toàn bộ stream processing trong opencode được bọc trong Effect.ensuring(cleanup()) — cleanup LUÔN chạy dù stream kết thúc bình thường, bị cancel bởi user, hay gặp error không mong đợi.
Nhóm: A — Agent Loop & StreamingFile: session/processor.tsLines: 544–582ID: A.4

Tổng quan Overview

Khi user nhấn Ctrl+C giữa lúc tool đang chạy, điều gì xảy ra? Tool có được đánh dấu "aborted" không? File handle có được đóng không? UI có thoát khỏi trạng thái "running" không? Database connection có được cleanup không?

Trong harness không có explicit cleanup, câu trả lời thường là: không. Tài nguyên bị leak, UI stuck ở trạng thái cũ, và state không nhất quán. opencode giải quyết bằng Effect.ensuring — semantic tương đương finally nhưng composable và interrupt-aware.

Tại sao quan trọng: Resource leak trong long-running agent là nguy hiểm: file handles không đóng, subprocess còn sống, DB connections cạn kiệt, tool UI stuck "running" mãi. Effect.ensuring là "transaction semantic" — cleanup luôn xảy ra, giống ACID trong database. Tính chất này composable: nesting ensuring vẫn đảm bảo cleanup thứ tự đúng.
Effect.ensuring vs try/finally: Cả hai đều "always run cleanup". Khác biệt: Effect.ensuring integrate với fiber interrupt — khi fiber bị interrupt, ensuring vẫn chạy trong fiber context đó. try/finally với async/await có edge cases khi combine với Promise cancellation (không tồn tại trong native JS).
Retry trong ensuring: opencode đặt retry BÊN TRONG ensuring — không phải ngoài. Điều này đảm bảo cleanup chỉ chạy một lần sau khi retry exhausted, không phải sau mỗi lần retry.

Phân tích code chi tiết Anatomy

Cấu trúc lồng nhau của pipeline

Pipeline cleanup có nhiều layer, từ ngoài vào trong:

session/processor.ts:544-582 — full pipeline

TS
{`
return yield* Effect.gen(function* () {
  yield* Effect.gen(function* () {
    // Reset state cho iteration mới
    ctx.currentText = undefined
    ctx.reasoningMap = {}

    // T2: Consume stream với demux
    const stream = llm.stream(streamInput)
    yield* stream.pipe(
      Stream.tap((event) => handleEvent(event)),
      Stream.takeUntil(() => ctx.needsCompaction),
      Stream.runDrain,
    )

  }).pipe(
    // Hook: khi bị interrupt → mark aborted
    Effect.onInterrupt(() => Effect.gen(function* () {
      aborted = true
      if (!ctx.assistantMessage.error) {
        yield* halt(new DOMException("Aborted", "AbortError"))
      }
    })),

    // Chỉ propagate non-interrupt errors
    Effect.catchCauseIf(
      (cause) => !Cause.hasInterruptsOnly(cause),
      (cause) => Effect.fail(Cause.squash(cause)),
    ),

    // Retry logic (bên trong ensuring)
    Effect.retry(retrySchedule),

    // Catch remaining errors → halt
    Effect.catch(halt),

    // ← CLEANUP LUÔN CHẠY — dù retry, interrupt, error, hay normal
    Effect.ensuring(cleanup()),
  )
})
`}

cleanup() function — finalize state

Cleanup finalize tất cả pending tools với status "aborted", lưu partial text, và resolve tất cả Deferred (T3):

cleanup() — finalize everything

TS
{`
function* cleanup() {
  // 1. Finalize partial text nếu có
  if (ctx.currentText) {
    yield* session.finalizeText(ctx.currentText)
  }

  // 2. Mark tất cả pending tools là aborted
  for (const [id, call] of Object.entries(ctx.toolcalls)) {
    yield* session.updatePart(call.partID, { status: "aborted" })
    // 3. Resolve Deferred (T3) để không block
    yield* Deferred.succeed(call.done, undefined)
  }

  // 4. Update session status
  yield* status.set(sessionID, { type: "idle" })
}
`}

onInterrupt hook — detect user cancel

Effect.onInterrupt chạy khi fiber bị interrupt (user Ctrl+C hay programmatic cancel). Khác với cleanup — onInterrupt chỉ chạy khi có interrupt, không chạy khi normal exit hay error.

Interrupt handling

TS
{`
Effect.onInterrupt(() => Effect.gen(function* () {
  aborted = true  // flag cho code downstream biết
  // Chỉ halt nếu chưa có error khác
  if (!ctx.assistantMessage.error) {
    yield* halt(new DOMException("Aborted", "AbortError"))
  }
}))
// Sau onInterrupt → ensuring(cleanup()) vẫn chạy
`}

Tương tác với các kỹ thuật khác Interaction

T4 là lớp ngoài cùng bọc T2 và T3: ┌─────────────────────────────────────────────────────┐ │ T4 — Effect.ensuring(cleanup()) │ │ │ │ ┌───────────────────────────────────────────────┐ │ │ │ Effect.retry(retrySchedule) │ │ │ │ │ │ │ │ ┌─────────────────────────────────────────┐ │ │ │ │ │ Effect.onInterrupt(markAborted) │ │ │ │ │ │ │ │ │ │ │ │ ┌───────────────────────────────────┐ │ │ │ │ │ │ │ T2 — Stream demux pipeline │ │ │ │ │ │ │ │ (handleEvent, takeUntil, drain) │ │ │ │ │ │ │ └───────────────────────────────────┘ │ │ │ │ │ └─────────────────────────────────────────┘ │ │ │ └───────────────────────────────────────────────┘ │ │ │ │ cleanup() — luôn chạy: │ │ → finalize partial text │ │ → mark pending tools "aborted" │ │ → T3: resolve all Deferreds │ │ → update session status → idle │ └─────────────────────────────────────────────────────┘ T1 nhận control SAU KHI T4 cleanup hoàn thành

T4 bọc T2 và T3 — nó là "transaction boundary" của một iteration. Khi T2 stream bị interrupt, T4 đảm bảo T3 cleanup (resolve Deferreds) vẫn xảy ra. T1 chỉ nhận control sau khi T4 ensuring hoàn thành, đảm bảo mọi state được finalize trước khi loop check exit condition.

Failure modes Failures

Failure 1: Tool UI stuck 'running' sau cancel

Nếu không có cleanup, tool bị interrupt sẽ không được đánh dấu "aborted" — UI hiển thị tool đang chạy mãi mãi:

Không có cleanup — UI stuck

TS
{`
// WRONG: không cleanup khi cancel
async function processStream(signal: AbortSignal) {
  for await (const event of stream) {
    if (signal.aborted) return  // exit ngay, không cleanup
    handleEvent(event)
  }
}
// Tool partID vẫn có status: "running" trong DB
// User reload page → vẫn thấy tool "running" từ session cũ
`}

Failure 2: File handle leak sau error

Subprocess và file handles không được đóng khi stream bị interrupt giữa chừng:

Resource leak

TS
{`
// Tool mở file để đọc
const file = await fs.open(path, "r")
try {
  const content = await file.readFile()
  return content
} catch (err) {
  // Nếu stream bị interrupt ở đây:
  // file.close() không được gọi → fd leak
  throw err
}
// CORRECT: dùng finally
} finally {
  await file.close()  // luôn đóng
}
`}

Failure 3: Retry mất cleanup

Đặt retry NGOÀI ensuring thay vì bên trong gây ra cleanup bị gọi sau mỗi lần retry, không phải sau khi retry exhausted:

Retry ngoài ensuring — cleanup sai timing

TS
{`
// WRONG: cleanup chạy sau mỗi retry
pipeline.pipe(
  Effect.ensuring(cleanup()),  // ← chạy sau mỗi attempt
  Effect.retry(schedule),
)

// CORRECT: cleanup chạy sau retry exhausted
pipeline.pipe(
  Effect.retry(schedule),      // retry trong ensuring
  Effect.ensuring(cleanup()),  // ← chỉ chạy 1 lần sau cùng
)
`}

So sánh với các harness khác Compare

HarnessCleanup mechanismInterrupt-safeComposable
opencodeEffect.ensuring — transactionalCó — onInterrupt hookCó — nesting preserves order
Vanilla JStry/finally + AbortControllerPartial — async cancel không đảm bảoVerbose khi nesting
Python asynciotry/except CancelledErrorCó nhưng phải catch manuallycontextmanager giúp một phần
AiderKhông có — dựa vào GCKhôngKhông
LangChainCallback hooksPartialCallback-based, không typed

Vanilla JS try/finally là equivalent gần nhất nhưng không composable — nesting nhiều cleanup blocks dễ gây lỗi thứ tự. Python asyncio CancelledError pattern tương tự nhưng phải remember catch nó. Effect.ensuring là solution elegant nhất.

Implementation recipe Recipe

Vanilla TypeScript pattern với try/finally và AbortController — không cần Effect:

interruption-safe-stream.ts

TS
{`
interface StreamContext {
  pendingTools: Map<string, { resolve: () => void }>
  currentText: string
  aborted: boolean
}

async function streamWithCleanup(
  streamFn: (signal: AbortSignal) => AsyncIterable<StreamEvent>,
  onEvent: (event: StreamEvent, ctx: StreamContext) => Promise<void>,
  options = { retries: 2, toolTimeoutMs: 250 }
) {
  const ctx: StreamContext = {
    pendingTools: new Map(),
    currentText: "",
    aborted: false,
  }

  const controller = new AbortController()

  // Expose cancel để caller có thể trigger
  const cancel = () => {
    ctx.aborted = true
    controller.abort()
  }

  let attempt = 0

  while (attempt <= options.retries) {
    try {
      const stream = streamFn(controller.signal)

      for await (const event of stream) {
        if (controller.signal.aborted) break
        await onEvent(event, ctx)
      }

      // Success — exit retry loop
      break

    } catch (err) {
      if (controller.signal.aborted) {
        // User cancel — không retry
        break
      }
      attempt++
      if (attempt > options.retries) throw err
      // Wait before retry
      await new Promise((r) => setTimeout(r, Math.pow(2, attempt) * 1000))

    } finally {
      // CLEANUP — luôn chạy (cancel, error, success)
      await cleanup(ctx, options.toolTimeoutMs)
    }
  }

  return { aborted: ctx.aborted, text: ctx.currentText }
}

async function cleanup(ctx: StreamContext, timeoutMs: number) {
  // 1. Await all pending tools with timeout
  await Promise.all(
    [...ctx.pendingTools.values()].map((d) =>
      Promise.race([
        new Promise<void>((r) => {
          // Replace resolve to track when done
          const orig = d.resolve
          d.resolve = () => { orig(); r() }
        }),
        new Promise<void>((r) => setTimeout(r, timeoutMs)),
      ])
    )
  )

  // 2. Mark remaining tools aborted
  ctx.pendingTools.clear()

  console.log("[cleanup] done — text length:", ctx.currentText.length)
}
`}

Tham khảo Refs

Nguồn chính