T17 — MCP stdio + HTTP transport với dynamic adapter
Tổng quan Overview
MCP (Model Context Protocol) là giao thức chuẩn để agent kết nối với external tool servers.
OpenHarness implement McpClientManager — một lớp aggregate quản lý kết nối
đồng thời tới nhiều MCP servers, expose toàn bộ tools vào một flat namespace duy nhất
mà agent có thể gọi.
build_dynamic_pydantic_model() tạo Pydantic model từ JSON schema nhận được runtime từ
MCP server — "runtime type validation without compile-time schema knowledge".
AsyncExitStack duy nhất. Khi stop() được gọi, stack tự cleanup
mọi session theo đúng thứ tự — không cần manually track từng session.
Phân tích code: mcp/client.py Anatomy
McpClientManager: connect + aggregate
mcp/client.py:29-95 — McpClientManager core
{`
class McpClientManager:
def __init__(self, configs: list[McpServerConfig]):
self._configs = configs
self._sessions: dict[str, ClientSession] = {}
self._exit_stack = AsyncExitStack()
async def start(self) -> None:
for config in self._configs:
session = await self._connect(config)
self._sessions[config.name] = session
async def _connect(self, config: McpServerConfig) -> ClientSession:
if config.transport == "stdio":
transport = await self._exit_stack.enter_async_context(
stdio_client(StdioServerParameters(
command=config.command,
args=config.args,
env=config.env,
))
)
else: # streamable_http
transport = await self._exit_stack.enter_async_context(
streamablehttp_client(config.url, headers=config.headers)
)
session = await self._exit_stack.enter_async_context(
ClientSession(*transport)
)
await session.initialize()
return session
async def list_all_tools(self) -> list[Tool]:
"""Aggregate tools from all servers into flat namespace."""
tools = []
for name, session in self._sessions.items():
result = await session.list_tools()
for tool in result.tools:
tools.append(ToolWithServer(
tool=tool,
server_name=name,
))
return tools
async def stop(self) -> None:
await self._exit_stack.aclose()
`}
Key design decisions: _sessions dùng dict keyed by server name — collision
khi 2 server cùng tên sẽ overwrite. list_all_tools() wrap mỗi tool với
ToolWithServer để routing biết gọi session nào khi tool được invoke.
Dynamic Pydantic model từ JSON schema
mcp/client.py — build_dynamic_pydantic_model()
{`
def build_dynamic_pydantic_model(tool_schema: dict) -> type[BaseModel]:
"""Create runtime Pydantic model from MCP tool JSON schema."""
fields = {}
properties = tool_schema.get("properties", {})
required = set(tool_schema.get("required", []))
for field_name, field_schema in properties.items():
python_type = _json_type_to_python(field_schema)
if field_name in required:
fields[field_name] = (python_type, ...)
else:
default = field_schema.get("default", None)
fields[field_name] = (python_type | None, default)
return create_model("DynamicToolInput", **fields)
`} create_model() là Pydantic v2 API để tạo model class động.
Ellipsis (...) đánh dấu required field — không có default.
Optional fields dùng python_type | None union type.
STDIO vs Streamable HTTP Pattern
Dynamic Pydantic model pattern Pattern
MCP servers expose tool schemas dưới dạng JSON Schema tại runtime. Agent không thể biết schema này tại compile time vì tools được define phía server. OpenHarness giải quyết bằng cách tạo Pydantic model động:
Pattern này implement đúng tinh thần "runtime type validation without compile-time schema knowledge" — agent vẫn có full type safety khi gọi tool, chỉ là schema được biết muộn hơn (tại connection time thay vì compile time).
Tương tác với các kỹ thuật khác Interaction
Failure modes Failures
Failure 1: STDIO process crash
Kịch bản: MCP server subprocess crash giữa session
{`
# Kịch bản: my_tool_server process crash (OOM, exception, etc.)
# AsyncExitStack cleanup fires cho server đó
# Các session khác trong stack KHÔNG bị ảnh hưởng
# "AsyncExitStack ensures each MCP server's lifecycle is managed
# independently — one server crash doesn't cascade"
# Vấn đề: _sessions["my_tool"] vẫn tồn tại trong dict
# → Agent cố gọi tool → session đã dead → RuntimeError
# Fix cần: health check hoặc reconnect logic trong _connect()
`}Failure 2: JSON schema có unsupported types
build_dynamic_pydantic_model() gặp type lạ
{`
# MCP server trả về schema với type "anyOf" hoặc nested $ref
tool_schema = {
"properties": {
"config": {
"anyOf": [{"type": "string"}, {"type": "object"}] # union type
}
}
}
# _json_type_to_python() không handle "anyOf"
# → KeyError hoặc fallback về Any
# → Tool vẫn xuất hiện nhưng với type Any (mất validation)
# → Hoặc Exception → tool invisible (không được add vào list_all_tools)
# Fix: handle "anyOf" → Union type, "$ref" → resolve schema
`}Failure 3: Streamable HTTP auth token expire
Khi agent session kéo dài và auth token của remote MCP server hết hạn,
ClientSession chết mid-conversation. Không có reconnect logic tự động
trong implementation hiện tại — agent sẽ nhận exception khi cố gọi tool.
Fix đúng: implement token refresh callback trong streamablehttp_client
hoặc catch exception và re-call _connect().
So sánh với các harness khác Compare
| Harness | STDIO | HTTP transport | Tool aggregation | Dynamic schema |
|---|---|---|---|---|
| OpenHarness | Có | Streamable HTTP | Flat namespace (tất cả servers) | Runtime Pydantic |
| Claude Code | Có | Streamable HTTP | Per-server namespace | Static |
| LangChain | Có | SSE (legacy) | Manual aggregation | Manual |
| LangGraph | Có | SSE/HTTP | Manual aggregation | TypedDict |
OpenHarness là harness duy nhất trong bảng này combine flat-namespace aggregation với dynamic Pydantic model. Flat namespace đơn giản hóa agent routing (không cần biết tool thuộc server nào) nhưng tạo risk collision khi 2 server expose cùng tên tool.
Implementation recipe Recipe
Minimal McpClientManager — connect, aggregate, cleanup:
minimal_mcp_manager.py — đủ dùng cho production
{`
import asyncio
from contextlib import AsyncExitStack
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
async def minimal_mcp_manager(configs):
"""Minimal McpClientManager — connects, aggregates, cleans up."""
stack = AsyncExitStack()
sessions = {}
async with stack:
for cfg in configs:
params = StdioServerParameters(
command=cfg["command"],
args=cfg.get("args", []),
)
transport = await stack.enter_async_context(stdio_client(params))
session = await stack.enter_async_context(ClientSession(*transport))
await session.initialize()
sessions[cfg["name"]] = session
# Aggregate tools from all servers
all_tools = []
for name, session in sessions.items():
result = await session.list_tools()
for tool in result.tools:
all_tools.append({"server": name, "tool": tool})
return all_tools
# AsyncExitStack cleans up all sessions on exit
# Usage:
# configs = [
# {"name": "git", "command": "python", "args": ["-m", "git_mcp"]},
# {"name": "search", "command": "npx", "args": ["@search/mcp-server"]},
# ]
# tools = asyncio.run(minimal_mcp_manager(configs))
`}Tham khảo Refs
- MCP Specification — Transports (2025-03-26) · Định nghĩa chính thức STDIO và Streamable HTTP transport
- MCP Docs — Transports overview · Khi nào dùng STDIO vs HTTP, lifecycle model
- Pydantic v2 — create_model() API · Dynamic model creation từ field definitions
- Python docs — AsyncExitStack · Lifecycle management pattern cho async context managers
- Medium — Claude Code Extensions Explained · MCP integration trong agent harness context
- HumanLayer — Harness Engineering for Coding Agents · Tại sao MCP aggregation là thành phần cốt lõi của harness