Model Context Protocol (MCP)
Padwan LLM ships two MCP client transports — McpStreamable (HTTP) and McpStdio (subprocess) — both implementing the MCP 2025-11-25 spec. They expose the same tools interface, but their internal concurrency models differ in important ways.
Quick start
from padwan_llm import McpStreamable, McpStdio
# Streamable HTTP — remote MCP server (with optional bearer token)
async with McpStreamable(url="https://mcp.example.com/mcp", token="sk-...") as mcp:
for tool in mcp.tools:
print(tool.name, tool.description)
result = await mcp.tools[0].handler({"query": "test"})
# Stdio — local subprocess
async with McpStdio(command="uvx", args=["my-mcp-server"]) as mcp:
tool = next(t for t in mcp.tools if t.name == "search")
result = await tool.handler({"query": "hello"})
Feature comparison
| Feature | McpStreamable |
McpStdio |
|---|---|---|
| Tool discovery | ✅ | ✅ |
| Tool calls | ✅ | ✅ |
| Session management | ✅ | — |
| SSE streaming | ✅ | — |
| Reconnectable listener | ✅ (Last-Event-ID) |
— |
| Bearer token auth | ✅ (token, on_auth refresh) |
— |
| Progress notifications | ✅ (on_progress) |
✅ (on_progress) |
| Ping | ✅ | ✅ |
| Request cancellation | ✅ (cancel()) |
✅ (cancel()) |
McpStreamable — dual-channel architecture
The streamable HTTP transport runs two concurrent communication channels against the same endpoint:
flowchart LR
subgraph client["McpStreamable client"]
Caller["User code<br/>tool.handler"]
RPC["_rpc<br/>foreground POST"]
Listen["_listen<br/>background GET task"]
Pending["pending requests<br/>by JSON-RPC id"]
end
subgraph server["MCP server"]
Endpoint["/mcp"]
end
Caller -->|1 await| RPC
RPC -->|2 POST JSON-RPC| Endpoint
Endpoint -->|3 response JSON or SSE| RPC
RPC -->|4 result| Caller
Listen -.->|GET SSE persistent| Endpoint
Endpoint -.->|server-pushed notifications| Listen
Listen -.->|tools list_changed, refresh tools| Pending
Listen -.->|progress, on_progress callback| Caller
The foreground channel (_rpc) handles request/response: you await tool.handler(...), it POSTs a JSON-RPC request, gets back either a single JSON response or an SSE stream containing the response. This is what user code interacts with directly.
The background channel (_listen) is a persistent GET against the same /mcp endpoint, opened during __aenter__ and torn down in __aexit__. It receives unsolicited server pushes — things the server wants to tell you without being asked, like notifications/tools/list_changed (server's tool list changed, we should re-fetch) or out-of-band notifications/progress for long-running operations.
This separation means the listener can deliver notifications even while you have no in-flight RPC. Without it, a server that wanted to tell you "my tools changed" would have to wait until you happened to ask for something.
McpStreamable — full lifecycle
sequenceDiagram
participant App as User code
participant Client as McpStreamable
participant Listener as _listen task
participant Server as MCP server
Note over App,Server: aenter
App->>Client: async with McpStreamable
Client->>Server: POST initialize
Server-->>Client: result plus MCP-Session-Id header
Client->>Server: POST notifications/initialized
Client->>Server: POST tools/list
Server-->>Client: tool list
Client->>Listener: spawn background task
Listener->>Server: GET /mcp (Accept text/event-stream)
Server-->>Listener: SSE stream open
Note over App,Server: Operation
App->>Client: await tool.handler(args)
Client->>Server: POST tools/call
Server-->>Client: result (JSON or SSE)
Client-->>App: return result
Server-->>Listener: notifications/progress
Listener->>App: on_progress(event)
Server-->>Listener: notifications/tools/list_changed
Listener->>Server: POST tools/list (refresh)
Server-->>Listener: updated tool list
Note over App,Server: aexit
App->>Client: exit context
Client->>Listener: cancel task
Client->>Server: close session
The key thing to notice: the background _listen task is spawned during __aenter__ and lives for the entire duration of the async with block. It runs in parallel with all your RPC calls and can deliver notifications at any moment.
Token refresh on 401
Set on_auth to recover from expired bearer tokens. When an RPC returns HTTP 401, the client invokes the callback (sync or async), stores the returned token, and retries the request exactly once:
async def refresh(transport: McpStreamable) -> str:
return await oauth.fetch_token(...)
async with McpStreamable(url="...", token=initial_token, on_auth=refresh) as mcp:
...
Without on_auth, a 401 raises RuntimeError.
Reconnection with Last-Event-ID
If the SSE stream drops mid-session, _listen automatically reconnects and tells the server to resume from the last event it successfully processed. This uses the standard SSE Last-Event-ID header.
sequenceDiagram
participant Listener as _listen task
participant Server as MCP server
Listener->>Server: GET /mcp (no Last-Event-ID)
Server-->>Listener: event id="evt-1"
Note over Listener: _last_event_id = "evt-1"
Server-->>Listener: event id="evt-2"
Note over Listener: _last_event_id = "evt-2"
Server--xListener: connection drops
Note over Listener: retries < max_retries<br/>sleep(_retry_ms)
Listener->>Server: GET /mcp<br/>Last-Event-ID: evt-2
Server-->>Listener: event id="evt-3"<br/>(resumes after evt-2)
The retry budget is bounded by max_retries (default 5) and the inter-attempt delay is controlled by the server via the SSE retry: field — if the server sends retry: 5000, the next reconnect waits 5 seconds. The client falls back to _DEFAULT_RETRY_MS = 3000 ms otherwise.
If the server returns 405 Method Not Allowed on the GET, the listener exits cleanly — that signals "this server doesn't support push notifications," which is allowed by the spec.
McpStdio — single-pipe architecture
The stdio transport launches the MCP server as a child process and communicates over its stdin/stdout. There's only one bidirectional pipe (NDJSON-framed), so the concurrency model is different from streamable HTTP:
flowchart LR
subgraph client["McpStdio parent process"]
Caller["User code<br/>tool.handler"]
RPC["_rpc<br/>writes to stdin"]
Reader["_reader task<br/>reads stdout"]
Pending["_pending<br/>id to Future map"]
OnProg["on_progress callback"]
end
subgraph child["MCP server subprocess"]
Stdin["stdin"]
Stdout["stdout"]
end
Caller -->|await| RPC
RPC -->|register Future by id| Pending
RPC -->|write JSON line| Stdin
Stdout -->|read line| Reader
Reader -->|match id, resolve Future| Pending
Pending -->|future result| Caller
Reader -->|notification without id| OnProg
Because both directions share a single pipe, there's a single background reader task parsing NDJSON lines as they arrive. Each outgoing request gets a unique JSON-RPC id, and the reader uses that id to look up the right asyncio.Future and resolve it with the response. Notifications (messages without an id) are dispatched directly to the appropriate handler — tools/list_changed triggers a refresh, progress invokes the user's on_progress callback.
Compared to McpStreamable:
- No session management — the connection is the process lifetime
- No reconnection logic — if the subprocess dies, the connection is dead, all pending futures get an exception
- No HTTP semantics — no headers, no auth, no
Last-Event-ID. The protocol is just newline-delimited JSON-RPC over pipes - One reader, many waiters — the reader is a fan-out point that resolves multiple pending futures from a single byte stream
Cancellation
Both transports support cancel(request_id), which sends a notifications/cancelled message to the server. The server is expected to abort the in-flight operation and stop sending progress events for it. This is best-effort — the spec doesn't require the server to actually stop, just to acknowledge the request.
The user-facing way to cancel is to cancel the asyncio.Task that's awaiting tool.handler(...). The transport catches CancelledError, sends the cancellation notification, and re-raises.