1. Debugging & Monitoring with Logfire
Pydantic Logfire is the observability platform built by the Pydantic team specifically for AI applications. It provides automatic instrumentation of PydanticAI agents, capturing every LLM call, tool invocation, retry, and validation error as structured spans in an OpenTelemetry-compatible trace.
1.1 Auto-Instrumentation Setup
import logfire
from pydantic_ai import Agent
# Initialize Logfire — automatically instruments all PydanticAI agents
logfire.configure()
agent = Agent(
"openai:gpt-4o-mini",
system_prompt="You are a helpful assistant.",
)
# Every agent.run() call is now traced automatically
result = agent.run_sync("What is the meaning of life?")
print(result.output)
# Logfire captures:
# - Agent run span (total duration)
# - LLM request/response spans (model, tokens, latency)
# - Tool call spans (if any tools are registered)
# - Validation spans (Pydantic model parsing)
print("Check your Logfire dashboard for the trace!")
1.2 Custom Spans & Attributes
import logfire
from pydantic_ai import Agent, RunContext
logfire.configure()
agent = Agent(
"openai:gpt-4o-mini",
system_prompt="You are a research assistant.",
)
@agent.tool
async def search_database(ctx: RunContext[None], query: str) -> str:
"""Search the internal knowledge base."""
# Add custom span for business logic tracking
with logfire.span("database_search", query=query):
# Simulate search
results = f"Found 3 results for: {query}"
logfire.info("Search completed", result_count=3, query=query)
return results
result = agent.run_sync("Find information about quantum computing")
print(result.output)
print("Custom spans visible in Logfire with query attributes")
2. Durable Execution
AI agents in production face crashes, network timeouts, and deployment restarts. Durable execution ensures your agent workflows survive failures by persisting state at each step. PydanticAI integrates with multiple durable execution frameworks.
2.1 Temporal: Long-Running Workflow Orchestration
from pydantic_ai import Agent
from temporalio import workflow, activity
from temporalio.client import Client
from dataclasses import dataclass
@dataclass
class ResearchInput:
topic: str
depth: str = "comprehensive"
# Define the agent as a Temporal activity
@activity.defn
async def run_research_agent(input: ResearchInput) -> str:
"""Run PydanticAI agent as a durable activity."""
agent = Agent(
"openai:gpt-4o-mini",
system_prompt=f"Research the topic thoroughly. Depth: {input.depth}",
)
result = await agent.run(input.topic)
return result.output
@activity.defn
async def run_summary_agent(research: str) -> str:
"""Summarize research results durably."""
agent = Agent(
"openai:gpt-4o-mini",
system_prompt="Summarize the following research into 3 bullet points.",
)
result = await agent.run(research)
return result.output
# Temporal workflow orchestrating multiple agent steps
@workflow.defn
class ResearchWorkflow:
@workflow.run
async def run(self, input: ResearchInput) -> str:
# Step 1: Research (survives crashes)
research = await workflow.execute_activity(
run_research_agent,
input,
start_to_close_timeout=60,
)
# Step 2: Summarize (runs even if step 1 was retried)
summary = await workflow.execute_activity(
run_summary_agent,
research,
start_to_close_timeout=30,
)
return summary
print("Temporal workflow defined: ResearchWorkflow")
print("Activities: run_research_agent, run_summary_agent")
print("Each step persists — workflow survives crashes between steps")
2.2 DBOS: Database-Backed Durable Execution
from pydantic_ai import Agent
from dbos import DBOS, SetWorkflowID
# Initialize DBOS for database-backed durability
# DBOS stores workflow state in PostgreSQL
DBOS()
agent = Agent(
"openai:gpt-4o-mini",
system_prompt="You are an email drafting assistant.",
)
@DBOS.workflow()
def email_workflow(recipient: str, context: str) -> dict:
"""Durable email workflow — survives process restarts."""
# Step 1: Draft the email (state persisted after completion)
draft = draft_email(recipient, context)
# Step 2: Review for tone (picks up here if crashed after step 1)
review = review_draft(draft)
return {"draft": draft, "review": review}
@DBOS.step()
def draft_email(recipient: str, context: str) -> str:
"""Each step is individually durable."""
result = agent.run_sync(
f"Draft a professional email to {recipient} about: {context}"
)
return result.output
@DBOS.step()
def review_draft(draft: str) -> str:
"""Review step executes exactly once."""
result = agent.run_sync(
f"Review this email draft for tone and professionalism:\n\n{draft}"
)
return result.output
print("DBOS workflow defined: email_workflow")
print("Each @DBOS.step() executes exactly once, even across restarts")
3. UI Event Streams
Modern AI applications stream agent events to frontend UIs in real-time — showing thinking progress, tool calls, and partial responses as they happen. PydanticAI supports multiple streaming protocols for different frontend frameworks.
3.1 AG-UI Protocol: Standardized Agent-to-UI Communication
from pydantic_ai import Agent
from pydantic_ai.agent_ui import AGUIServer
from fastapi import FastAPI
import uvicorn
# Create agent
agent = Agent(
"openai:gpt-4o-mini",
system_prompt="You are a helpful coding assistant.",
)
# AG-UI server wraps the agent with standardized event streaming
app = FastAPI()
ag_ui = AGUIServer(agent=agent)
# Mount AG-UI endpoints — compatible with any AG-UI frontend
app.mount("/agent", ag_ui.app)
# AG-UI events streamed to frontend:
# - RunStarted: agent begins processing
# - TextDelta: partial text output
# - ToolCallStart: tool invocation begins
# - ToolCallEnd: tool returns result
# - RunCompleted: final output ready
print("AG-UI server configured on /agent")
print("Frontend connects via SSE to receive real-time events")
print("Run with: uvicorn app:app --port 8000")
3.2 Vercel AI SDK Integration
Cost-Optimized Agent Fleet
A SaaS company runs 20 different PydanticAI agents serving 50K requests/day. Their optimization: query routing sends 70% of requests to cheaper models (saving $3K/month), response caching eliminates 30% of redundant calls, and adaptive rate limiting prevents any single customer from monopolizing capacity.
from pydantic_ai import Agent
from pydantic_ai.integrations.vercel import VercelAIStream
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
agent = Agent(
"openai:gpt-4o-mini",
system_prompt="You are a helpful assistant for a Next.js application.",
)
@app.post("/api/chat")
async def chat(request: dict):
"""Endpoint compatible with Vercel AI SDK useChat() hook."""
messages = request.get("messages", [])
last_message = messages[-1]["content"] if messages else ""
# Stream agent response in Vercel AI SDK format
async def generate():
async with agent.run_stream(last_message) as result:
async for chunk in VercelAIStream(result):
yield chunk
return StreamingResponse(generate(), media_type="text/event-stream")
print("Vercel AI SDK endpoint: POST /api/chat")
print("Compatible with: useChat() from 'ai/react'")
print("Streams text deltas as SSE events")
4. Agent2Agent (A2A) Protocol
The A2A protocol enables standardized communication between AI agents, regardless of their underlying framework. A PydanticAI agent can discover, invoke, and delegate tasks to remote agents running on different platforms.
4.1 A2A Server & Client
from pydantic_ai import Agent
from pydantic_ai.a2a import A2AServer, A2AClient, AgentCard
# Define a specialist agent
research_agent = Agent(
"openai:gpt-4o-mini",
system_prompt="You are a research specialist. Provide detailed factual answers.",
)
# Expose as A2A server with a discoverable agent card
server = A2AServer(
agent=research_agent,
card=AgentCard(
name="Research Agent",
description="Specialist in factual research and information synthesis",
capabilities=["research", "fact-checking", "summarization"],
endpoint="http://localhost:8001/a2a",
),
)
print("A2A Server configured:")
print(f" Name: {server.card.name}")
print(f" Capabilities: {server.card.capabilities}")
print(f" Endpoint: {server.card.endpoint}")
from pydantic_ai.a2a import A2AClient
# Client agent discovers and invokes remote agents
async def delegate_research(topic: str) -> str:
"""Delegate a research task to a remote A2A agent."""
client = A2AClient()
# Discover available agents
agents = await client.discover("http://localhost:8001/.well-known/agent.json")
print(f"Discovered agent: {agents.name} — {agents.description}")
# Invoke the remote agent with a task
result = await client.send_task(
endpoint=agents.endpoint,
task={
"type": "research",
"input": f"Research the following topic: {topic}",
},
)
return result.output
# Usage in an orchestrator agent
print("A2A Client configured for remote agent delegation")
print("Supports: discover, send_task, get_task_status, cancel_task")
5. Command Line Interface (CLI)
PydanticAI agents can power interactive CLI tools with streaming output, rich terminal formatting, and pipe-friendly modes for integration into shell scripts and automation pipelines.
5.1 Building a CLI Agent with Rich Output
from pydantic_ai import Agent
import sys
# CLI agent with streaming terminal output
agent = Agent(
"openai:gpt-4o-mini",
system_prompt="You are a helpful CLI assistant. Be concise and direct.",
)
async def cli_chat(prompt: str) -> None:
"""Interactive CLI with streaming response."""
print(f"\n\033[1;36mYou:\033[0m {prompt}")
print(f"\033[1;32mAgent:\033[0m ", end="", flush=True)
async with agent.run_stream(prompt) as result:
async for chunk in result.stream_text():
print(chunk, end="", flush=True)
print("\n") # Newline after streaming completes
# Pipe-friendly mode: detect if output is being piped
def is_piped() -> bool:
return not sys.stdout.isatty()
async def main():
"""Entry point supporting interactive and piped modes."""
if is_piped():
# Piped mode: read from stdin, output plain text
prompt = sys.stdin.read().strip()
result = await agent.run(prompt)
print(result.output)
else:
# Interactive mode: rich terminal with colors
import asyncio
prompt = " ".join(sys.argv[1:]) or "Hello!"
await cli_chat(prompt)
print("CLI agent supports:")
print(" Interactive: python agent.py 'your question'")
print(" Piped: echo 'question' | python agent.py")
print(" REPL: python agent.py (then type questions)")
from pydantic_ai import Agent
from pydantic import BaseModel
# Structured CLI output for scripting
class CLIResponse(BaseModel):
answer: str
confidence: float
sources: list[str]
agent = Agent(
"openai:gpt-4o-mini",
system_prompt="Answer questions with structured data. Include confidence score 0-1.",
output_type=CLIResponse,
)
result = agent.run_sync("What is the population of Tokyo?")
response = result.output
# JSON output for piping to jq or other tools
import json
print(json.dumps(response.model_dump(), indent=2))
# Output:
# {
# "answer": "Approximately 14 million in the city proper",
# "confidence": 0.85,
# "sources": ["UN World Urbanization Prospects", "Tokyo Metropolitan Government"]
# }
isatty() to switch between rich and plain output. (2) Use structured output types for machine-readable responses. (3) Support --json and --quiet flags. (4) Stream to terminal but buffer for pipes. (5) Exit with non-zero codes on agent errors for shell script compatibility.
Next in the PydanticAI SDK Track
In Part 14: Harness, Gateway & Production, we’ll deploy the PydanticAI Harness for coding agents, configure the Gateway for unified model access, build production Web Chat UIs, generate embeddings at scale, and implement coding agent skills for autonomous development.