Back to AI App Dev Series

PydanticAI SDK Track Part 9: Multi-Agent Patterns & Testing

May 24, 2026 Wasil Zafar 40 min read

Orchestrate multiple PydanticAI agents with delegation patterns, handoffs, and shared context. Build web chat UIs with streaming, generate embeddings for semantic search, and implement comprehensive testing strategies with TestModel and FunctionModel.

Table of Contents

  1. Multi-Agent Orchestration
  2. Handoff & Routing Patterns
  3. Web Chat UI Integration
  4. Embeddings
  5. Testing Strategies
What You’ll Learn: MCP (Model Context Protocol) integration lets your PydanticAI agents connect to any MCP-compatible server — databases, file systems, APIs, and more — through a standardized protocol. Instead of building custom tools for every service, you plug in MCP servers and their tools become available automatically. Think of it like installing plugins: plug in the server, get the capabilities.

1. Multi-Agent Orchestration

PydanticAI supports multi-agent architectures where one agent delegates subtasks to other agents via tool calls. This enables separation of concerns — each agent specializes in a specific domain while a supervisor coordinates the overall workflow.

1.1 Supervisor Pattern: One Agent Routes to Specialists

The supervisor pattern creates a top-level agent that decides which specialist agent to invoke based on user input. Each specialist is exposed as a tool:

from pydantic_ai import Agent, RunContext
from pydantic import BaseModel
from dataclasses import dataclass

# Define specialist agents
coding_agent = Agent(
    "openai:gpt-4o",
    system_prompt="You are an expert Python developer. Write clean, tested code."
)

research_agent = Agent(
    "openai:gpt-4o",
    system_prompt="You are a research analyst. Provide factual, cited information."
)

# Shared dependencies for all agents
@dataclass
class TeamDeps:
    project_name: str
    max_tokens: int = 2000

# Supervisor agent that delegates to specialists
supervisor = Agent(
    "openai:gpt-4o",
    deps_type=TeamDeps,
    system_prompt="""You are a project supervisor. Analyze the user request and 
    delegate to the appropriate specialist:
    - For coding tasks, use the coding_tool
    - For research tasks, use the research_tool
    Synthesize their responses into a coherent answer."""
)

@supervisor.tool
async def coding_tool(ctx: RunContext[TeamDeps], task: str) -> str:
    """Delegate a coding task to the specialist coding agent."""
    result = await coding_agent.run(task)
    return result.data

@supervisor.tool
async def research_tool(ctx: RunContext[TeamDeps], query: str) -> str:
    """Delegate a research query to the specialist research agent."""
    result = await research_agent.run(query)
    return result.data

# Run the supervisor
import asyncio

async def main():
    deps = TeamDeps(project_name="AI Assistant")
    result = await supervisor.run(
        "Write a Python function to calculate Fibonacci numbers and explain the math behind it.",
        deps=deps
    )
    print(result.data)

asyncio.run(main())

1.2 Shared Dependencies Across Agents

When multiple agents need access to the same resources (databases, API clients, configuration), pass shared dependencies through the orchestration layer:

from pydantic_ai import Agent, RunContext
from dataclasses import dataclass

@dataclass
class SharedContext:
    db_connection: str
    user_id: str
    session_history: list[str]

# Agent A: processes queries
query_agent = Agent(
    "openai:gpt-4o",
    deps_type=SharedContext,
    system_prompt="You analyze user queries and extract intent."
)

# Agent B: generates responses using shared context
response_agent = Agent(
    "openai:gpt-4o",
    deps_type=SharedContext,
    system_prompt="You generate helpful responses based on analyzed intent."
)

@query_agent.tool
def get_user_history(ctx: RunContext[SharedContext]) -> str:
    """Retrieve conversation history for context."""
    return "\n".join(ctx.deps.session_history[-5:])

@response_agent.tool
def get_user_profile(ctx: RunContext[SharedContext]) -> str:
    """Retrieve user profile from database."""
    return f"User {ctx.deps.user_id} from DB: {ctx.deps.db_connection}"

# Orchestrate: pipe output of Agent A into Agent B
import asyncio

async def orchestrate(user_message: str):
    shared = SharedContext(
        db_connection="postgres://localhost/app",
        user_id="user_123",
        session_history=["Hello", "How can I help?"]
    )
    
    # Step 1: Analyze intent
    intent_result = await query_agent.run(user_message, deps=shared)
    
    # Step 2: Generate response using analyzed intent
    response_result = await response_agent.run(
        f"Based on this intent analysis: {intent_result.data}\nGenerate a response.",
        deps=shared
    )
    
    print(f"Final response: {response_result.data}")

asyncio.run(orchestrate("What were we discussing yesterday?"))
Key Pattern: Each agent maintains its own system prompt and tools, but shares the same dependency object. This means agents can access common resources (databases, caches, API clients) without tight coupling between them.

2. Handoff & Routing Patterns

Handoff patterns enable explicit transfer of control from one agent to another with full context preservation. This is essential for customer support systems, multi-stage processing, and escalation workflows.

2.1 Conditional Routing Based on Classification

Route user messages to specialized agents based on input classification:

from pydantic_ai import Agent, RunContext
from pydantic import BaseModel
from enum import Enum

class Department(str, Enum):
    BILLING = "billing"
    TECHNICAL = "technical"
    SALES = "sales"

class RoutingDecision(BaseModel):
    department: Department
    confidence: float
    reason: str

# Classifier agent determines routing
classifier = Agent(
    "openai:gpt-4o",
    result_type=RoutingDecision,
    system_prompt="""Classify the customer message into a department:
    - billing: payment issues, invoices, refunds, subscription changes
    - technical: bugs, errors, how-to questions, feature requests
    - sales: pricing inquiries, demos, enterprise plans, upgrades"""
)

# Department-specific agents
billing_agent = Agent(
    "openai:gpt-4o",
    system_prompt="You are a billing specialist. Help with payments, invoices, and subscriptions."
)

technical_agent = Agent(
    "openai:gpt-4o",
    system_prompt="You are a technical support engineer. Debug issues and provide solutions."
)

sales_agent = Agent(
    "openai:gpt-4o",
    system_prompt="You are a sales representative. Help with pricing, demos, and upgrades."
)

import asyncio

async def handle_customer_message(message: str) -> str:
    # Step 1: Classify the message
    routing = await classifier.run(message)
    decision = routing.data
    print(f"Routed to: {decision.department} (confidence: {decision.confidence:.2f})")
    
    # Step 2: Route to appropriate agent
    agents = {
        Department.BILLING: billing_agent,
        Department.TECHNICAL: technical_agent,
        Department.SALES: sales_agent,
    }
    
    target_agent = agents[decision.department]
    context = f"[Customer message classified as {decision.department}]\n{message}"
    result = await target_agent.run(context)
    
    return result.data

response = asyncio.run(handle_customer_message("My invoice shows a double charge from last month"))
print(response)

2.2 Pipeline Patterns: Sequential Agent Processing

Chain agents sequentially where each agent's output feeds the next agent's input:

from pydantic_ai import Agent
from pydantic import BaseModel
import asyncio

class ResearchFindings(BaseModel):
    topic: str
    key_points: list[str]
    sources: list[str]

class DraftArticle(BaseModel):
    title: str
    sections: list[str]
    word_count: int

# Pipeline: Research → Draft → Edit
researcher = Agent(
    "openai:gpt-4o",
    result_type=ResearchFindings,
    system_prompt="Research the given topic. Return structured findings with key points."
)

writer = Agent(
    "openai:gpt-4o",
    result_type=DraftArticle,
    system_prompt="Write a draft article based on the research findings provided."
)

editor = Agent(
    "openai:gpt-4o",
    system_prompt="Edit and polish the draft article. Fix grammar, improve flow, add transitions."
)

async def content_pipeline(topic: str) -> str:
    # Stage 1: Research
    research_result = await researcher.run(f"Research this topic: {topic}")
    findings = research_result.data
    print(f"Research complete: {len(findings.key_points)} key points found")
    
    # Stage 2: Write draft from findings
    draft_result = await writer.run(
        f"Write an article about '{findings.topic}' using these points:\n"
        + "\n".join(f"- {p}" for p in findings.key_points)
    )
    draft = draft_result.data
    print(f"Draft complete: {draft.word_count} words")
    
    # Stage 3: Edit the draft
    edit_result = await editor.run(
        f"Edit this article:\nTitle: {draft.title}\n\n" 
        + "\n\n".join(draft.sections)
    )
    
    return edit_result.data

final_article = asyncio.run(content_pipeline("The impact of quantum computing on cryptography"))
print(final_article)
Cost Warning: Multi-agent pipelines multiply API calls. A 3-agent pipeline means 3× the token usage. Use structured outputs (result_type) to minimize token waste between stages, and consider caching intermediate results for retry scenarios.

3. Web Chat UI Integration

PydanticAI agents integrate naturally with web frameworks for building chat interfaces. The streaming API enables real-time token delivery to connected clients.

3.1 Streaming Responses to Web Clients

Use PydanticAI's streaming with FastAPI for real-time chat experiences:

from pydantic_ai import Agent
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import asyncio

app = FastAPI()

chat_agent = Agent(
    "openai:gpt-4o",
    system_prompt="You are a helpful assistant. Respond conversationally."
)

class ChatRequest(BaseModel):
    message: str
    conversation_id: str | None = None

@app.post("/chat/stream")
async def stream_chat(request: ChatRequest):
    async def generate():
        async with chat_agent.run_stream(request.message) as stream:
            async for chunk in stream.stream_text():
                yield f"data: {chunk}\n\n"
        yield "data: [DONE]\n\n"
    
    return StreamingResponse(generate(), media_type="text/event-stream")

@app.post("/chat")
async def chat(request: ChatRequest):
    result = await chat_agent.run(request.message)
    return {"response": result.data}

3.2 FastAPI WebSocket Integration

Real-World Application

Enterprise Knowledge Integration

A consulting firm connects their PydanticAI agents to MCP servers for Confluence (documentation), Jira (project tracking), and PostgreSQL (client data). Consultants ask questions like “What are Client X’s open issues and what did we document about their architecture?” and get answers that synthesize across all three systems.

EnterpriseMCP Integration

For bidirectional communication, use WebSocket connections:

from pydantic_ai import Agent
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from pydantic_ai.messages import ModelMessage
import json

app = FastAPI()

chat_agent = Agent(
    "openai:gpt-4o",
    system_prompt="You are a helpful chat assistant. Keep responses concise."
)

@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
    await websocket.accept()
    message_history: list[ModelMessage] = []
    
    try:
        while True:
            # Receive user message
            data = await websocket.receive_text()
            user_input = json.loads(data)["message"]
            
            # Run agent with history for multi-turn context
            result = await chat_agent.run(user_input, message_history=message_history)
            
            # Update history for next turn
            message_history = result.all_messages()
            
            # Send response back
            await websocket.send_text(json.dumps({
                "response": result.data,
                "tokens_used": result.usage().total_tokens
            }))
    except WebSocketDisconnect:
        print("Client disconnected")
Message History Pattern: Use result.all_messages() to capture the full conversation state, then pass it back via message_history on the next call. This gives you multi-turn context without managing raw message arrays manually.

4. Embeddings

PydanticAI provides a unified interface for generating embeddings across providers. Embeddings convert text into dense vector representations for similarity search, clustering, and retrieval-augmented generation (RAG).

from pydantic_ai.models.openai import OpenAIModel
from openai import AsyncOpenAI
import numpy as np

# Use OpenAI's embedding model
client = AsyncOpenAI()

async def get_embeddings(texts: list[str]) -> list[list[float]]:
    """Generate embeddings for a list of texts."""
    response = await client.embeddings.create(
        model="text-embedding-3-small",
        input=texts
    )
    return [item.embedding for item in response.data]

def cosine_similarity(a: list[float], b: list[float]) -> float:
    """Calculate cosine similarity between two vectors."""
    a_arr = np.array(a)
    b_arr = np.array(b)
    return float(np.dot(a_arr, b_arr) / (np.linalg.norm(a_arr) * np.linalg.norm(b_arr)))

import asyncio

async def semantic_search_demo():
    # Document corpus
    documents = [
        "PydanticAI uses type-safe structured outputs for agent responses.",
        "FastAPI is a modern Python web framework for building APIs.",
        "Machine learning models require training data and evaluation metrics.",
        "Docker containers package applications with their dependencies.",
        "Agent testing with TestModel eliminates API calls during development."
    ]
    
    # Generate embeddings for all documents
    doc_embeddings = await get_embeddings(documents)
    
    # Query
    query = "How do I test AI agents without making API calls?"
    query_embedding = (await get_embeddings([query]))[0]
    
    # Find most similar documents
    similarities = [
        (doc, cosine_similarity(query_embedding, doc_emb))
        for doc, doc_emb in zip(documents, doc_embeddings)
    ]
    similarities.sort(key=lambda x: x[1], reverse=True)
    
    print(f"Query: {query}\n")
    print("Results (ranked by similarity):")
    for doc, score in similarities[:3]:
        print(f"  [{score:.4f}] {doc}")

asyncio.run(semantic_search_demo())
RAG Integration: Combine embeddings with PydanticAI tools to build retrieval-augmented agents. Store document embeddings in a vector database (Pinecone, Qdrant, pgvector), then create a tool that queries the database and injects relevant context into the agent’s prompt.

5. Testing Strategies

PydanticAI provides first-class testing support through TestModel (deterministic responses without API calls) and FunctionModel (custom response logic). This enables fast, reliable unit tests for agent behavior.

5.1 TestModel for Deterministic Testing

TestModel returns predefined responses, enabling unit tests that run instantly without network calls:

from pydantic_ai import Agent
from pydantic_ai.models.test import TestModel
from pydantic import BaseModel
import asyncio

class WeatherResponse(BaseModel):
    city: str
    temperature: float
    conditions: str

weather_agent = Agent(
    "openai:gpt-4o",  # Real model for production
    result_type=WeatherResponse,
    system_prompt="You are a weather assistant. Return structured weather data."
)

@weather_agent.tool
def get_current_weather(city: str) -> str:
    """Fetch current weather for a city."""
    # In production, this calls a weather API
    return f"Weather in {city}: 22°C, sunny"

# Test with TestModel — no API calls made
async def test_weather_agent():
    # Override the model with TestModel for testing
    with weather_agent.override(model=TestModel()):
        result = await weather_agent.run("What's the weather in London?")
        
        # TestModel produces structured output matching result_type
        assert isinstance(result.data, WeatherResponse)
        print(f"Test passed! Got: {result.data}")
        
        # Verify tool was called
        tool_calls = [
            msg for msg in result.all_messages()
            if hasattr(msg, 'parts') and any(
                hasattr(p, 'tool_name') for p in getattr(msg, 'parts', [])
            )
        ]
        print(f"Tool calls made: {len(tool_calls)}")

asyncio.run(test_weather_agent())

5.2 FunctionModel for Custom Test Behavior

FunctionModel lets you define exactly how the model responds, enabling scenario-specific testing:

from pydantic_ai import Agent
from pydantic_ai.models.function import FunctionModel, AgentInfo
from pydantic_ai.messages import ModelResponse, TextPart
import asyncio

def mock_response(messages: list, info: AgentInfo) -> ModelResponse:
    """Custom function that generates test responses."""
    # Access the last user message
    last_message = str(messages[-1]) if messages else ""
    
    # Return different responses based on input
    if "error" in last_message.lower():
        response_text = "I encountered an error. Let me help you debug that."
    elif "hello" in last_message.lower():
        response_text = "Hello! How can I assist you today?"
    else:
        response_text = f"Processing your request about: {last_message[:50]}"
    
    return ModelResponse(parts=[TextPart(content=response_text)])

# Create agent with FunctionModel for testing
test_agent = Agent(FunctionModel(mock_response), system_prompt="Test assistant")

async def test_function_model():
    # Test greeting path
    result = await test_agent.run("Hello there!")
    assert "Hello" in result.data
    print(f"Greeting test: {result.data}")
    
    # Test error path
    result = await test_agent.run("I got an error in my code")
    assert "error" in result.data.lower()
    print(f"Error test: {result.data}")
    
    # Test default path
    result = await test_agent.run("Calculate revenue for Q4")
    assert "Processing" in result.data
    print(f"Default test: {result.data}")

asyncio.run(test_function_model())

5.3 Integration Testing Patterns

Combine TestModel with pytest for comprehensive agent test suites:

from pydantic_ai import Agent, RunContext
from pydantic_ai.models.test import TestModel
from pydantic import BaseModel
from dataclasses import dataclass
import pytest
import asyncio

# Production agent definition
@dataclass
class AppDeps:
    api_key: str
    user_id: str

class TaskResult(BaseModel):
    status: str
    message: str

task_agent = Agent(
    "openai:gpt-4o",
    deps_type=AppDeps,
    result_type=TaskResult,
    system_prompt="You manage tasks. Create, update, or delete tasks as requested."
)

@task_agent.tool
def create_task(ctx: RunContext[AppDeps], title: str, priority: str) -> str:
    """Create a new task."""
    return f"Task '{title}' created with {priority} priority for user {ctx.deps.user_id}"

# Test suite
@pytest.fixture
def test_deps():
    return AppDeps(api_key="test-key-123", user_id="test-user")

@pytest.mark.asyncio
async def test_task_creation(test_deps):
    """Test that the agent can create tasks."""
    with task_agent.override(model=TestModel()):
        result = await task_agent.run("Create a high priority task: Fix login bug", deps=test_deps)
        assert isinstance(result.data, TaskResult)
        assert result.data.status in ["success", "created", "completed"]

@pytest.mark.asyncio
async def test_tool_invocation(test_deps):
    """Test that create_task tool is invoked correctly."""
    with task_agent.override(model=TestModel()):
        result = await task_agent.run("Create task: Deploy v2.0", deps=test_deps)
        # Verify the agent attempted to use tools
        messages = result.all_messages()
        assert len(messages) >= 2  # At least request + response

@pytest.mark.asyncio
async def test_deps_accessibility(test_deps):
    """Test that dependencies are accessible in tools."""
    with task_agent.override(model=TestModel()):
        result = await task_agent.run("Create a task for me", deps=test_deps)
        # Tool should have access to user_id from deps
        assert result.data is not None

# Run tests
if __name__ == "__main__":
    asyncio.run(test_task_creation(AppDeps(api_key="test", user_id="user1")))
    print("All integration tests passed!")
Testing Best Practice: Use TestModel for unit tests (fast, deterministic), FunctionModel for scenario testing (custom logic), and real models with low-cost providers for integration tests. The agent.override() context manager makes swapping models trivial without changing production code.
Try It Yourself: Connect a PydanticAI agent to 2 MCP servers simultaneously: (1) the filesystem server (for reading/writing files), (2) a custom HTTP server you build that exposes a ‘get_weather’ tool. Have the agent: read a list of cities from a file, get weather for each, and write a summary report. Verify tools from both servers work together.

Next in the PydanticAI SDK Track

In Part 10: MCP Integration, we’ll connect PydanticAI agents to Model Context Protocol servers for standardized tool interoperability, build MCP servers that expose agent capabilities, and achieve cross-framework tool sharing.