Agents
AgentSession is the agentic loop built on top of the unified chat interface. Given a client, a system prompt, and zero or more tools, it repeatedly calls the LLM, dispatches any tool calls the model requests, feeds the results back, and keeps going until the model produces a plain text answer (or a round limit is hit).
The loop
flowchart TD
Start([user sends message]) --> Call[call LLM with messages + tools]
Call --> Resp{response has<br/>tool calls?}
Resp -->|no| Done([return final text])
Resp -->|yes| Approve[approve_tool hook]
Approve --> Dispatch[run handlers]
Dispatch --> Append[append tool results<br/>to message history]
Append --> Call
Call -.->|rounds exhausted| Limit([yield limit-reached message])
Each iteration of the loop is called a round. Tool lists are re-read at the top of every round, so MCP servers that emit notifications/tools/list_changed (and refresh McpStreamable.tools / McpStdio.tools in place) get picked up without restarting the session.
Quick start
from padwan_llm import AgentSession, LLMClient, McpStdio
async with AgentSession(
client=LLMClient(model="gpt-4o"),
mcp_tools=[McpStdio(command="uvx", args=["weather-mcp"])],
system="You are a weather assistant.",
) as session:
text = await session.send("What's the weather in Paris?")
print(text)
Two entry points, depending on whether you want the response streamed:
await session.send(user_input)— returns the complete text as a string.async for chunk in session.stream(user_input)— yields text chunks as the model produces them. Tool calls are silent on the stream; observe them viaon_tool.
Tools: individual or whole transports
mcp_tools accepts a heterogeneous list of McpTool and McpTransport (i.e. McpStreamable / McpStdio) instances. This is the main ergonomic win over managing transports by hand:
from padwan_llm import AgentSession, LLMClient, McpStdio, McpStreamable, McpTool
weather_tool = McpTool(
name="get_weather",
description="Return current weather for a city.",
input_schema={"type": "object", "properties": {"city": {"type": "string"}}},
handler=lambda args: {"city": args["city"], "temp": 22, "sky": "sunny"},
)
async with AgentSession(
client=LLMClient(model="gpt-4o"),
mcp_tools=[
weather_tool, # local definition
McpStdio(command="uvx", args=["filesystem-mcp", "/home/me/docs"]), # subprocess
McpStreamable(url="https://tools.example.com/mcp", token="sk-..."), # remote
],
) as session:
text = await session.send("Summarize the README and check the forecast.")
On __aenter__ the session enters every transport in order (via an AsyncExitStack), pings each one to prove the connection is live, and then fires the optional on_mcp_connect callback with the transport instance. All transports are torn down in LIFO order on exit — even if one of them fails to initialize or ping.
Configuration
AgentSession(
client=..., # LLMClientBase (e.g. LLMClient(model=...))
system=None, # system prompt, stored in ConversationState
mcp_tools=[], # McpTool | McpTransport instances
max_tool_rounds=5, # round cap; None = unbounded (use with care)
max_tool_result_chars=8_000, # truncate tool results sent to the LLM; None = no limit
execution="sequential", # "sequential" or "parallel"
on_tool=None, # callback fired per tool call: (name, args) -> None
on_tool_error=None, # custom error formatter — see below
approve_tool=None, # pre-execution hook returning bool | Awaitable[bool]
on_mcp_connect=None, # fired per MCP transport after entering + pinging
session_id=..., # auto-generated; override to resume a saved session
store=None, # optional ConversationStore for persistence
)
Parallel tool execution
When a single LLM response contains multiple tool calls, execution="parallel" dispatches them via asyncio.gather. Results are still appended to the message history in original call order:
Use the default "sequential" if you care about ordering side effects or want to rate-limit the downstream servers.
Approval hooks
approve_tool runs before every tool dispatch. Return False to block the call — the agent will append a synthetic "denied by user" result instead of executing the handler. The hook may be sync or async:
def prompt_user(tool, args):
return input(f"Run {tool.name}({args})? [y/N] ").lower() == "y"
session = AgentSession(
client=LLMClient(model="gpt-4o"),
mcp_tools=[...],
approve_tool=prompt_user,
)
Error handling
By default, exceptions raised inside a tool handler are caught, formatted via a default string, and appended as the tool result so the model can recover. Override on_tool_error to customize:
def format_error(tool, args, exc):
return f"[{tool.name} failed: {type(exc).__name__}: {exc}]"
session = AgentSession(
client=...,
mcp_tools=[...],
on_tool_error=format_error,
)
Observation
on_tool receives a ToolCallContext(name, args) and must return a context manager wrapped around each tool dispatch. Use it for logging, UI updates, metrics, or span/timer scopes that need to bracket the call:
from contextlib import contextmanager
@contextmanager
def log_call(tc):
print(f"→ {tc.name}({tc.args})")
yield
print(f"← {tc.name} done")
session = AgentSession(..., on_tool=log_call)
For a fire-and-forget side-effect, yield immediately after the action.
Persistence
Conversation state can be saved to any backend via the ConversationStore protocol:
from padwan_llm import ConversationSnapshot, ConversationStore
class JsonStore:
def __init__(self, path): self.path = path
def save(self, session_id: str, snapshot: ConversationSnapshot) -> None:
(self.path / f"{session_id}.json").write_text(json.dumps(snapshot))
def load(self, session_id: str) -> ConversationSnapshot:
return json.loads((self.path / f"{session_id}.json").read_text())
Persist after a turn completes:
async with AgentSession(
client=LLMClient(model="gpt-4o"),
store=JsonStore(Path("./sessions")),
session_id="user-42",
system="...",
) as session:
await session.send("Hi!")
session.save()
Resume in a later process with the classmethod constructor:
async with AgentSession.load(
model="gpt-4o",
store=JsonStore(Path("./sessions")),
session_id="user-42",
) as session:
await session.send("What were we talking about?")
load() pulls the system prompt and full message history from the snapshot. Pass client= instead of model= when you need a pre-configured or fake client (e.g. in tests).
session_id is optional — omit it to start a fresh session that's still wired up to the store, so a later session.save() lands under an auto-generated id:
async with AgentSession.load(model="gpt-4o", store=store) as session:
await session.send("Hello!")
session.save() # persisted under session.session_id
How tool results are fed back
When the LLM response contains tool calls, the agent:
- Parses each call's arguments from JSON.
- Calls
on_tool(name, args)if set. - Asks
approve_tool(if set). Denied calls get a synthetic"Tool call denied."result. - Appends an
AssistantToolMessagewith all tool calls to the history. - Runs the handlers (sequentially or in parallel).
- Catches any exception via
on_tool_error. - Appends a
ToolResultMessageper call, in the original order. - Loops back to the LLM call.
Long tool results are truncated to max_tool_result_chars in the copy sent to the LLM only; the full content is preserved in session.messages for inspection or persistence.
Reading state
session.messages # full ChatMessage list including tool calls + results
session.last_usage # UsageToken from the most recent LLM call
session.total_usage # accumulated usage across all rounds in this session
Limitations
- No mid-stream approval.
ChatStream.tool_callsis only populated after iteration completes, soapprove_toolruns once all tool calls for a round are known. Streaming-with-interrupt is not yet supported. - Untyped tool results. Results are normalized to strings via a small
_extract_texthelper (handles MCP wire format, plain strings, and JSON fallback). Structured result objects would require a wider refactor. - No per-call cancellation. You can cancel the whole
session.send()/session.stream()task, but not an individual in-flight tool call.