← openharness report

T17 — MCP stdio + HTTP transport với dynamic adapter

OpenHarness triển khai McpClientManager để aggregate tools từ N MCP servers — mỗi server có thể dùng STDIO (local process) hoặc Streamable HTTP (remote service), với AsyncExitStack đảm bảo lifecycle độc lập.
Nhóm: D — Extension EcosystemFile: mcp/client.py:29-95Transport: STDIO · Streamable HTTPID: D.4

Tổng quan Overview

MCP (Model Context Protocol) là giao thức chuẩn để agent kết nối với external tool servers. OpenHarness implement McpClientManager — một lớp aggregate quản lý kết nối đồng thời tới nhiều MCP servers, expose toàn bộ tools vào một flat namespace duy nhất mà agent có thể gọi.

Punchline kiến trúc: "Use STDIO for local tools where latency matters; use Streamable HTTP for remote services where stateful connections become a bottleneck." Việc hỗ trợ cả 2 transport trong cùng 1 manager cho phép mix local + remote MCP servers trong 1 agent session mà không cần code khác nhau.
Dynamic Pydantic models: OpenHarness không cần biết schema của tool tại compile time. build_dynamic_pydantic_model() tạo Pydantic model từ JSON schema nhận được runtime từ MCP server — "runtime type validation without compile-time schema knowledge".
AsyncExitStack per manager: Tất cả sessions được manage bởi 1 AsyncExitStack duy nhất. Khi stop() được gọi, stack tự cleanup mọi session theo đúng thứ tự — không cần manually track từng session.

Phân tích code: mcp/client.py Anatomy

McpClientManager: connect + aggregate

mcp/client.py:29-95 — McpClientManager core

PY
{`
class McpClientManager:
    def __init__(self, configs: list[McpServerConfig]):
        self._configs = configs
        self._sessions: dict[str, ClientSession] = {}
        self._exit_stack = AsyncExitStack()

    async def start(self) -> None:
        for config in self._configs:
            session = await self._connect(config)
            self._sessions[config.name] = session

    async def _connect(self, config: McpServerConfig) -> ClientSession:
        if config.transport == "stdio":
            transport = await self._exit_stack.enter_async_context(
                stdio_client(StdioServerParameters(
                    command=config.command,
                    args=config.args,
                    env=config.env,
                ))
            )
        else:  # streamable_http
            transport = await self._exit_stack.enter_async_context(
                streamablehttp_client(config.url, headers=config.headers)
            )
        session = await self._exit_stack.enter_async_context(
            ClientSession(*transport)
        )
        await session.initialize()
        return session

    async def list_all_tools(self) -> list[Tool]:
        """Aggregate tools from all servers into flat namespace."""
        tools = []
        for name, session in self._sessions.items():
            result = await session.list_tools()
            for tool in result.tools:
                tools.append(ToolWithServer(
                    tool=tool,
                    server_name=name,
                ))
        return tools

    async def stop(self) -> None:
        await self._exit_stack.aclose()
`}

Key design decisions: _sessions dùng dict keyed by server name — collision khi 2 server cùng tên sẽ overwrite. list_all_tools() wrap mỗi tool với ToolWithServer để routing biết gọi session nào khi tool được invoke.

Dynamic Pydantic model từ JSON schema

mcp/client.py — build_dynamic_pydantic_model()

PY
{`
def build_dynamic_pydantic_model(tool_schema: dict) -> type[BaseModel]:
    """Create runtime Pydantic model from MCP tool JSON schema."""
    fields = {}
    properties = tool_schema.get("properties", {})
    required = set(tool_schema.get("required", []))

    for field_name, field_schema in properties.items():
        python_type = _json_type_to_python(field_schema)
        if field_name in required:
            fields[field_name] = (python_type, ...)
        else:
            default = field_schema.get("default", None)
            fields[field_name] = (python_type | None, default)

    return create_model("DynamicToolInput", **fields)
`}

create_model() là Pydantic v2 API để tạo model class động. Ellipsis (...) đánh dấu required field — không có default. Optional fields dùng python_type | None union type.

STDIO vs Streamable HTTP Pattern

STDIO Transport (local processes): Agent process │ ├─ subprocess: python -m my_tool_server │ │ │ └─ stdin/stdout pipes (low latency, same machine) │ └─ stdio_client(StdioServerParameters( command="python", args=["-m", "my_tool_server"], env={"API_KEY": "..."} )) Key: Process lifecycle tied to agent process. Server crash → AsyncExitStack cleanup fires. No network overhead — ideal cho file ops, local git. ───────────────────────────────────────────────────────── Streamable HTTP Transport (remote services): Agent process │ └─ streamablehttp_client(url, headers={"Authorization": "Bearer ..."}) │ └─ HTTP/SSE connection tới remote MCP server │ └─ Server có thể ở cloud, có auth, có rate limiting Key: Connection stateful nhưng recoverable. Auth token expire → session dies, cần reconnect. Ideal cho cloud tools: web search, DB queries.
Khi nào dùng STDIO: file system tools, local git operations, database clients trên cùng máy, bất kỳ tool nào cần low latency hoặc access trực tiếp vào local environment variables/files.
Khi nào dùng Streamable HTTP: web APIs, cloud services, tools cần auth token riêng, tools chạy trên infrastructure khác. Lưu ý: token expiry mid-session là failure mode cần xử lý (xem #failure).

Dynamic Pydantic model pattern Pattern

MCP servers expose tool schemas dưới dạng JSON Schema tại runtime. Agent không thể biết schema này tại compile time vì tools được define phía server. OpenHarness giải quyết bằng cách tạo Pydantic model động:

MCP Server → list_tools() response: { "name": "search_docs", "inputSchema": { "type": "object", "properties": { "query": {"type": "string"}, "limit": {"type": "integer", "default": 10} }, "required": ["query"] } } │ ▼ build_dynamic_pydantic_model(inputSchema) │ ▼ DynamicToolInput(BaseModel): query: str # required (no default) limit: int | None # optional, default=10 │ ▼ Agent gọi tool → validate args → dispatch tới server

Pattern này implement đúng tinh thần "runtime type validation without compile-time schema knowledge" — agent vẫn có full type safety khi gọi tool, chỉ là schema được biết muộn hơn (tại connection time thay vì compile time).

Tương tác với các kỹ thuật khác Interaction

T17 (McpClientManager) tương tác với: T16 (Plugin manifest) └─ Plugin's mcp_file → danh sách McpServerConfig McpClientManager nhận configs này và thêm vào pool servers Plugin bundle = skills + hooks + mcp_file như 1 unit T14 (Skills) └─ Skills có thể invoke MCP tools thông qua agent SKILL.md body có thể reference tên MCP tool Agent routing tìm tool trong flat namespace của T17 T20 (6-layer permission evaluation) └─ MCP tool calls đi qua permission check trước khi dispatch T17 chỉ thực thi call sau khi T20 approve Tool name + args được inspect tại layer evaluation T21 (Async approval flow) └─ Remote MCP tool call có thể trigger interactive approval Ví dụ: "Gọi API xoá record?" → cần human confirm T17 await approval response trước khi forward tới server

Failure modes Failures

Failure 1: STDIO process crash

Kịch bản: MCP server subprocess crash giữa session

TS
{`
# Kịch bản: my_tool_server process crash (OOM, exception, etc.)
# AsyncExitStack cleanup fires cho server đó
# Các session khác trong stack KHÔNG bị ảnh hưởng

# "AsyncExitStack ensures each MCP server's lifecycle is managed
#  independently — one server crash doesn't cascade"

# Vấn đề: _sessions["my_tool"] vẫn tồn tại trong dict
# → Agent cố gọi tool → session đã dead → RuntimeError
# Fix cần: health check hoặc reconnect logic trong _connect()
`}

Failure 2: JSON schema có unsupported types

build_dynamic_pydantic_model() gặp type lạ

JSON
{`
# MCP server trả về schema với type "anyOf" hoặc nested $ref
tool_schema = {
    "properties": {
        "config": {
            "anyOf": [{"type": "string"}, {"type": "object"}]  # union type
        }
    }
}

# _json_type_to_python() không handle "anyOf"
# → KeyError hoặc fallback về Any
# → Tool vẫn xuất hiện nhưng với type Any (mất validation)
# → Hoặc Exception → tool invisible (không được add vào list_all_tools)

# Fix: handle "anyOf" → Union type, "$ref" → resolve schema
`}

Failure 3: Streamable HTTP auth token expire

Khi agent session kéo dài và auth token của remote MCP server hết hạn, ClientSession chết mid-conversation. Không có reconnect logic tự động trong implementation hiện tại — agent sẽ nhận exception khi cố gọi tool. Fix đúng: implement token refresh callback trong streamablehttp_client hoặc catch exception và re-call _connect().

So sánh với các harness khác Compare

HarnessSTDIOHTTP transportTool aggregationDynamic schema
OpenHarnessStreamable HTTPFlat namespace (tất cả servers)Runtime Pydantic
Claude CodeStreamable HTTPPer-server namespaceStatic
LangChainSSE (legacy)Manual aggregationManual
LangGraphSSE/HTTPManual aggregationTypedDict

OpenHarness là harness duy nhất trong bảng này combine flat-namespace aggregation với dynamic Pydantic model. Flat namespace đơn giản hóa agent routing (không cần biết tool thuộc server nào) nhưng tạo risk collision khi 2 server expose cùng tên tool.

Implementation recipe Recipe

Minimal McpClientManager — connect, aggregate, cleanup:

minimal_mcp_manager.py — đủ dùng cho production

PY
{`
import asyncio
from contextlib import AsyncExitStack
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

async def minimal_mcp_manager(configs):
    """Minimal McpClientManager — connects, aggregates, cleans up."""
    stack = AsyncExitStack()
    sessions = {}

    async with stack:
        for cfg in configs:
            params = StdioServerParameters(
                command=cfg["command"],
                args=cfg.get("args", []),
            )
            transport = await stack.enter_async_context(stdio_client(params))
            session = await stack.enter_async_context(ClientSession(*transport))
            await session.initialize()
            sessions[cfg["name"]] = session

        # Aggregate tools from all servers
        all_tools = []
        for name, session in sessions.items():
            result = await session.list_tools()
            for tool in result.tools:
                all_tools.append({"server": name, "tool": tool})

        return all_tools
        # AsyncExitStack cleans up all sessions on exit

# Usage:
# configs = [
#     {"name": "git", "command": "python", "args": ["-m", "git_mcp"]},
#     {"name": "search", "command": "npx", "args": ["@search/mcp-server"]},
# ]
# tools = asyncio.run(minimal_mcp_manager(configs))
`}

Tham khảo Refs

Nguồn chính