1. Multi-Agent Orchestration
PydanticAI supports multi-agent architectures where one agent delegates subtasks to other agents via tool calls. This enables separation of concerns — each agent specializes in a specific domain while a supervisor coordinates the overall workflow.
1.1 Supervisor Pattern: One Agent Routes to Specialists
The supervisor pattern creates a top-level agent that decides which specialist agent to invoke based on user input. Each specialist is exposed as a tool:
from pydantic_ai import Agent, RunContext
from pydantic import BaseModel
from dataclasses import dataclass
# Define specialist agents
coding_agent = Agent(
"openai:gpt-4o",
system_prompt="You are an expert Python developer. Write clean, tested code."
)
research_agent = Agent(
"openai:gpt-4o",
system_prompt="You are a research analyst. Provide factual, cited information."
)
# Shared dependencies for all agents
@dataclass
class TeamDeps:
project_name: str
max_tokens: int = 2000
# Supervisor agent that delegates to specialists
supervisor = Agent(
"openai:gpt-4o",
deps_type=TeamDeps,
system_prompt="""You are a project supervisor. Analyze the user request and
delegate to the appropriate specialist:
- For coding tasks, use the coding_tool
- For research tasks, use the research_tool
Synthesize their responses into a coherent answer."""
)
@supervisor.tool
async def coding_tool(ctx: RunContext[TeamDeps], task: str) -> str:
"""Delegate a coding task to the specialist coding agent."""
result = await coding_agent.run(task)
return result.data
@supervisor.tool
async def research_tool(ctx: RunContext[TeamDeps], query: str) -> str:
"""Delegate a research query to the specialist research agent."""
result = await research_agent.run(query)
return result.data
# Run the supervisor
import asyncio
async def main():
deps = TeamDeps(project_name="AI Assistant")
result = await supervisor.run(
"Write a Python function to calculate Fibonacci numbers and explain the math behind it.",
deps=deps
)
print(result.data)
asyncio.run(main())
1.2 Shared Dependencies Across Agents
When multiple agents need access to the same resources (databases, API clients, configuration), pass shared dependencies through the orchestration layer:
from pydantic_ai import Agent, RunContext
from dataclasses import dataclass
@dataclass
class SharedContext:
db_connection: str
user_id: str
session_history: list[str]
# Agent A: processes queries
query_agent = Agent(
"openai:gpt-4o",
deps_type=SharedContext,
system_prompt="You analyze user queries and extract intent."
)
# Agent B: generates responses using shared context
response_agent = Agent(
"openai:gpt-4o",
deps_type=SharedContext,
system_prompt="You generate helpful responses based on analyzed intent."
)
@query_agent.tool
def get_user_history(ctx: RunContext[SharedContext]) -> str:
"""Retrieve conversation history for context."""
return "\n".join(ctx.deps.session_history[-5:])
@response_agent.tool
def get_user_profile(ctx: RunContext[SharedContext]) -> str:
"""Retrieve user profile from database."""
return f"User {ctx.deps.user_id} from DB: {ctx.deps.db_connection}"
# Orchestrate: pipe output of Agent A into Agent B
import asyncio
async def orchestrate(user_message: str):
shared = SharedContext(
db_connection="postgres://localhost/app",
user_id="user_123",
session_history=["Hello", "How can I help?"]
)
# Step 1: Analyze intent
intent_result = await query_agent.run(user_message, deps=shared)
# Step 2: Generate response using analyzed intent
response_result = await response_agent.run(
f"Based on this intent analysis: {intent_result.data}\nGenerate a response.",
deps=shared
)
print(f"Final response: {response_result.data}")
asyncio.run(orchestrate("What were we discussing yesterday?"))
2. Handoff & Routing Patterns
Handoff patterns enable explicit transfer of control from one agent to another with full context preservation. This is essential for customer support systems, multi-stage processing, and escalation workflows.
2.1 Conditional Routing Based on Classification
Route user messages to specialized agents based on input classification:
from pydantic_ai import Agent, RunContext
from pydantic import BaseModel
from enum import Enum
class Department(str, Enum):
BILLING = "billing"
TECHNICAL = "technical"
SALES = "sales"
class RoutingDecision(BaseModel):
department: Department
confidence: float
reason: str
# Classifier agent determines routing
classifier = Agent(
"openai:gpt-4o",
result_type=RoutingDecision,
system_prompt="""Classify the customer message into a department:
- billing: payment issues, invoices, refunds, subscription changes
- technical: bugs, errors, how-to questions, feature requests
- sales: pricing inquiries, demos, enterprise plans, upgrades"""
)
# Department-specific agents
billing_agent = Agent(
"openai:gpt-4o",
system_prompt="You are a billing specialist. Help with payments, invoices, and subscriptions."
)
technical_agent = Agent(
"openai:gpt-4o",
system_prompt="You are a technical support engineer. Debug issues and provide solutions."
)
sales_agent = Agent(
"openai:gpt-4o",
system_prompt="You are a sales representative. Help with pricing, demos, and upgrades."
)
import asyncio
async def handle_customer_message(message: str) -> str:
# Step 1: Classify the message
routing = await classifier.run(message)
decision = routing.data
print(f"Routed to: {decision.department} (confidence: {decision.confidence:.2f})")
# Step 2: Route to appropriate agent
agents = {
Department.BILLING: billing_agent,
Department.TECHNICAL: technical_agent,
Department.SALES: sales_agent,
}
target_agent = agents[decision.department]
context = f"[Customer message classified as {decision.department}]\n{message}"
result = await target_agent.run(context)
return result.data
response = asyncio.run(handle_customer_message("My invoice shows a double charge from last month"))
print(response)
2.2 Pipeline Patterns: Sequential Agent Processing
Chain agents sequentially where each agent's output feeds the next agent's input:
from pydantic_ai import Agent
from pydantic import BaseModel
import asyncio
class ResearchFindings(BaseModel):
topic: str
key_points: list[str]
sources: list[str]
class DraftArticle(BaseModel):
title: str
sections: list[str]
word_count: int
# Pipeline: Research → Draft → Edit
researcher = Agent(
"openai:gpt-4o",
result_type=ResearchFindings,
system_prompt="Research the given topic. Return structured findings with key points."
)
writer = Agent(
"openai:gpt-4o",
result_type=DraftArticle,
system_prompt="Write a draft article based on the research findings provided."
)
editor = Agent(
"openai:gpt-4o",
system_prompt="Edit and polish the draft article. Fix grammar, improve flow, add transitions."
)
async def content_pipeline(topic: str) -> str:
# Stage 1: Research
research_result = await researcher.run(f"Research this topic: {topic}")
findings = research_result.data
print(f"Research complete: {len(findings.key_points)} key points found")
# Stage 2: Write draft from findings
draft_result = await writer.run(
f"Write an article about '{findings.topic}' using these points:\n"
+ "\n".join(f"- {p}" for p in findings.key_points)
)
draft = draft_result.data
print(f"Draft complete: {draft.word_count} words")
# Stage 3: Edit the draft
edit_result = await editor.run(
f"Edit this article:\nTitle: {draft.title}\n\n"
+ "\n\n".join(draft.sections)
)
return edit_result.data
final_article = asyncio.run(content_pipeline("The impact of quantum computing on cryptography"))
print(final_article)
result_type) to minimize token waste between stages, and consider caching intermediate results for retry scenarios.
3. Web Chat UI Integration
PydanticAI agents integrate naturally with web frameworks for building chat interfaces. The streaming API enables real-time token delivery to connected clients.
3.1 Streaming Responses to Web Clients
Use PydanticAI's streaming with FastAPI for real-time chat experiences:
from pydantic_ai import Agent
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import asyncio
app = FastAPI()
chat_agent = Agent(
"openai:gpt-4o",
system_prompt="You are a helpful assistant. Respond conversationally."
)
class ChatRequest(BaseModel):
message: str
conversation_id: str | None = None
@app.post("/chat/stream")
async def stream_chat(request: ChatRequest):
async def generate():
async with chat_agent.run_stream(request.message) as stream:
async for chunk in stream.stream_text():
yield f"data: {chunk}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
@app.post("/chat")
async def chat(request: ChatRequest):
result = await chat_agent.run(request.message)
return {"response": result.data}
3.2 FastAPI WebSocket Integration
Enterprise Knowledge Integration
A consulting firm connects their PydanticAI agents to MCP servers for Confluence (documentation), Jira (project tracking), and PostgreSQL (client data). Consultants ask questions like “What are Client X’s open issues and what did we document about their architecture?” and get answers that synthesize across all three systems.
For bidirectional communication, use WebSocket connections:
from pydantic_ai import Agent
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from pydantic_ai.messages import ModelMessage
import json
app = FastAPI()
chat_agent = Agent(
"openai:gpt-4o",
system_prompt="You are a helpful chat assistant. Keep responses concise."
)
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
await websocket.accept()
message_history: list[ModelMessage] = []
try:
while True:
# Receive user message
data = await websocket.receive_text()
user_input = json.loads(data)["message"]
# Run agent with history for multi-turn context
result = await chat_agent.run(user_input, message_history=message_history)
# Update history for next turn
message_history = result.all_messages()
# Send response back
await websocket.send_text(json.dumps({
"response": result.data,
"tokens_used": result.usage().total_tokens
}))
except WebSocketDisconnect:
print("Client disconnected")
result.all_messages() to capture the full conversation state, then pass it back via message_history on the next call. This gives you multi-turn context without managing raw message arrays manually.
4. Embeddings
PydanticAI provides a unified interface for generating embeddings across providers. Embeddings convert text into dense vector representations for similarity search, clustering, and retrieval-augmented generation (RAG).
4.1 Generating Embeddings & Semantic Search
from pydantic_ai.models.openai import OpenAIModel
from openai import AsyncOpenAI
import numpy as np
# Use OpenAI's embedding model
client = AsyncOpenAI()
async def get_embeddings(texts: list[str]) -> list[list[float]]:
"""Generate embeddings for a list of texts."""
response = await client.embeddings.create(
model="text-embedding-3-small",
input=texts
)
return [item.embedding for item in response.data]
def cosine_similarity(a: list[float], b: list[float]) -> float:
"""Calculate cosine similarity between two vectors."""
a_arr = np.array(a)
b_arr = np.array(b)
return float(np.dot(a_arr, b_arr) / (np.linalg.norm(a_arr) * np.linalg.norm(b_arr)))
import asyncio
async def semantic_search_demo():
# Document corpus
documents = [
"PydanticAI uses type-safe structured outputs for agent responses.",
"FastAPI is a modern Python web framework for building APIs.",
"Machine learning models require training data and evaluation metrics.",
"Docker containers package applications with their dependencies.",
"Agent testing with TestModel eliminates API calls during development."
]
# Generate embeddings for all documents
doc_embeddings = await get_embeddings(documents)
# Query
query = "How do I test AI agents without making API calls?"
query_embedding = (await get_embeddings([query]))[0]
# Find most similar documents
similarities = [
(doc, cosine_similarity(query_embedding, doc_emb))
for doc, doc_emb in zip(documents, doc_embeddings)
]
similarities.sort(key=lambda x: x[1], reverse=True)
print(f"Query: {query}\n")
print("Results (ranked by similarity):")
for doc, score in similarities[:3]:
print(f" [{score:.4f}] {doc}")
asyncio.run(semantic_search_demo())
5. Testing Strategies
PydanticAI provides first-class testing support through TestModel (deterministic responses without API calls) and FunctionModel (custom response logic). This enables fast, reliable unit tests for agent behavior.
5.1 TestModel for Deterministic Testing
TestModel returns predefined responses, enabling unit tests that run instantly without network calls:
from pydantic_ai import Agent
from pydantic_ai.models.test import TestModel
from pydantic import BaseModel
import asyncio
class WeatherResponse(BaseModel):
city: str
temperature: float
conditions: str
weather_agent = Agent(
"openai:gpt-4o", # Real model for production
result_type=WeatherResponse,
system_prompt="You are a weather assistant. Return structured weather data."
)
@weather_agent.tool
def get_current_weather(city: str) -> str:
"""Fetch current weather for a city."""
# In production, this calls a weather API
return f"Weather in {city}: 22°C, sunny"
# Test with TestModel — no API calls made
async def test_weather_agent():
# Override the model with TestModel for testing
with weather_agent.override(model=TestModel()):
result = await weather_agent.run("What's the weather in London?")
# TestModel produces structured output matching result_type
assert isinstance(result.data, WeatherResponse)
print(f"Test passed! Got: {result.data}")
# Verify tool was called
tool_calls = [
msg for msg in result.all_messages()
if hasattr(msg, 'parts') and any(
hasattr(p, 'tool_name') for p in getattr(msg, 'parts', [])
)
]
print(f"Tool calls made: {len(tool_calls)}")
asyncio.run(test_weather_agent())
5.2 FunctionModel for Custom Test Behavior
FunctionModel lets you define exactly how the model responds, enabling scenario-specific testing:
from pydantic_ai import Agent
from pydantic_ai.models.function import FunctionModel, AgentInfo
from pydantic_ai.messages import ModelResponse, TextPart
import asyncio
def mock_response(messages: list, info: AgentInfo) -> ModelResponse:
"""Custom function that generates test responses."""
# Access the last user message
last_message = str(messages[-1]) if messages else ""
# Return different responses based on input
if "error" in last_message.lower():
response_text = "I encountered an error. Let me help you debug that."
elif "hello" in last_message.lower():
response_text = "Hello! How can I assist you today?"
else:
response_text = f"Processing your request about: {last_message[:50]}"
return ModelResponse(parts=[TextPart(content=response_text)])
# Create agent with FunctionModel for testing
test_agent = Agent(FunctionModel(mock_response), system_prompt="Test assistant")
async def test_function_model():
# Test greeting path
result = await test_agent.run("Hello there!")
assert "Hello" in result.data
print(f"Greeting test: {result.data}")
# Test error path
result = await test_agent.run("I got an error in my code")
assert "error" in result.data.lower()
print(f"Error test: {result.data}")
# Test default path
result = await test_agent.run("Calculate revenue for Q4")
assert "Processing" in result.data
print(f"Default test: {result.data}")
asyncio.run(test_function_model())
5.3 Integration Testing Patterns
Combine TestModel with pytest for comprehensive agent test suites:
from pydantic_ai import Agent, RunContext
from pydantic_ai.models.test import TestModel
from pydantic import BaseModel
from dataclasses import dataclass
import pytest
import asyncio
# Production agent definition
@dataclass
class AppDeps:
api_key: str
user_id: str
class TaskResult(BaseModel):
status: str
message: str
task_agent = Agent(
"openai:gpt-4o",
deps_type=AppDeps,
result_type=TaskResult,
system_prompt="You manage tasks. Create, update, or delete tasks as requested."
)
@task_agent.tool
def create_task(ctx: RunContext[AppDeps], title: str, priority: str) -> str:
"""Create a new task."""
return f"Task '{title}' created with {priority} priority for user {ctx.deps.user_id}"
# Test suite
@pytest.fixture
def test_deps():
return AppDeps(api_key="test-key-123", user_id="test-user")
@pytest.mark.asyncio
async def test_task_creation(test_deps):
"""Test that the agent can create tasks."""
with task_agent.override(model=TestModel()):
result = await task_agent.run("Create a high priority task: Fix login bug", deps=test_deps)
assert isinstance(result.data, TaskResult)
assert result.data.status in ["success", "created", "completed"]
@pytest.mark.asyncio
async def test_tool_invocation(test_deps):
"""Test that create_task tool is invoked correctly."""
with task_agent.override(model=TestModel()):
result = await task_agent.run("Create task: Deploy v2.0", deps=test_deps)
# Verify the agent attempted to use tools
messages = result.all_messages()
assert len(messages) >= 2 # At least request + response
@pytest.mark.asyncio
async def test_deps_accessibility(test_deps):
"""Test that dependencies are accessible in tools."""
with task_agent.override(model=TestModel()):
result = await task_agent.run("Create a task for me", deps=test_deps)
# Tool should have access to user_id from deps
assert result.data is not None
# Run tests
if __name__ == "__main__":
asyncio.run(test_task_creation(AppDeps(api_key="test", user_id="user1")))
print("All integration tests passed!")
TestModel for unit tests (fast, deterministic), FunctionModel for scenario testing (custom logic), and real models with low-cost providers for integration tests. The agent.override() context manager makes swapping models trivial without changing production code.
Next in the PydanticAI SDK Track
In Part 10: MCP Integration, we’ll connect PydanticAI agents to Model Context Protocol servers for standardized tool interoperability, build MCP servers that expose agent capabilities, and achieve cross-framework tool sharing.