Back to AI App Dev Series

PydanticAI SDK Track Part 13: Integrations & UI Streams

May 24, 2026 Wasil Zafar 40 min read

Connect PydanticAI agents to the broader ecosystem — monitor with Logfire observability, orchestrate durable workflows with Temporal and DBOS, stream real-time events to UIs via AG-UI and Vercel AI SDK, enable agent-to-agent communication with the A2A protocol, and build powerful CLI tools.

Table of Contents

  1. Debugging & Monitoring with Logfire
  2. Durable Execution
  3. UI Event Streams
  4. Agent2Agent (A2A) Protocol
  5. Command Line Interface (CLI)
What You’ll Learn: Advanced patterns in PydanticAI push beyond basic agents into production-grade architectures: programmatic prompt composition, dynamic model selection, rate limiting, caching strategies, and building reusable agent libraries. This article covers the patterns that emerge when you deploy agents at scale and need them to be efficient, reliable, and maintainable.

1. Debugging & Monitoring with Logfire

Pydantic Logfire is the observability platform built by the Pydantic team specifically for AI applications. It provides automatic instrumentation of PydanticAI agents, capturing every LLM call, tool invocation, retry, and validation error as structured spans in an OpenTelemetry-compatible trace.

1.1 Auto-Instrumentation Setup

import logfire
from pydantic_ai import Agent

# Initialize Logfire — automatically instruments all PydanticAI agents
logfire.configure()

agent = Agent(
    "openai:gpt-4o-mini",
    system_prompt="You are a helpful assistant.",
)

# Every agent.run() call is now traced automatically
result = agent.run_sync("What is the meaning of life?")
print(result.output)

# Logfire captures:
# - Agent run span (total duration)
# - LLM request/response spans (model, tokens, latency)
# - Tool call spans (if any tools are registered)
# - Validation spans (Pydantic model parsing)
print("Check your Logfire dashboard for the trace!")

1.2 Custom Spans & Attributes

import logfire
from pydantic_ai import Agent, RunContext

logfire.configure()

agent = Agent(
    "openai:gpt-4o-mini",
    system_prompt="You are a research assistant.",
)

@agent.tool
async def search_database(ctx: RunContext[None], query: str) -> str:
    """Search the internal knowledge base."""
    # Add custom span for business logic tracking
    with logfire.span("database_search", query=query):
        # Simulate search
        results = f"Found 3 results for: {query}"
        logfire.info("Search completed", result_count=3, query=query)
        return results

result = agent.run_sync("Find information about quantum computing")
print(result.output)
print("Custom spans visible in Logfire with query attributes")
Logfire Benefits: (1) Zero-config auto-instrumentation for all agent runs. (2) Structured traces with full LLM request/response payloads. (3) Token usage tracking and cost estimation. (4) Correlation between agent runs and tool calls. (5) Live tail for real-time debugging during development.

2. Durable Execution

AI agents in production face crashes, network timeouts, and deployment restarts. Durable execution ensures your agent workflows survive failures by persisting state at each step. PydanticAI integrates with multiple durable execution frameworks.

2.1 Temporal: Long-Running Workflow Orchestration

from pydantic_ai import Agent
from temporalio import workflow, activity
from temporalio.client import Client
from dataclasses import dataclass

@dataclass
class ResearchInput:
    topic: str
    depth: str = "comprehensive"

# Define the agent as a Temporal activity
@activity.defn
async def run_research_agent(input: ResearchInput) -> str:
    """Run PydanticAI agent as a durable activity."""
    agent = Agent(
        "openai:gpt-4o-mini",
        system_prompt=f"Research the topic thoroughly. Depth: {input.depth}",
    )
    result = await agent.run(input.topic)
    return result.output

@activity.defn
async def run_summary_agent(research: str) -> str:
    """Summarize research results durably."""
    agent = Agent(
        "openai:gpt-4o-mini",
        system_prompt="Summarize the following research into 3 bullet points.",
    )
    result = await agent.run(research)
    return result.output

# Temporal workflow orchestrating multiple agent steps
@workflow.defn
class ResearchWorkflow:
    @workflow.run
    async def run(self, input: ResearchInput) -> str:
        # Step 1: Research (survives crashes)
        research = await workflow.execute_activity(
            run_research_agent,
            input,
            start_to_close_timeout=60,
        )
        # Step 2: Summarize (runs even if step 1 was retried)
        summary = await workflow.execute_activity(
            run_summary_agent,
            research,
            start_to_close_timeout=30,
        )
        return summary

print("Temporal workflow defined: ResearchWorkflow")
print("Activities: run_research_agent, run_summary_agent")
print("Each step persists — workflow survives crashes between steps")

2.2 DBOS: Database-Backed Durable Execution

from pydantic_ai import Agent
from dbos import DBOS, SetWorkflowID

# Initialize DBOS for database-backed durability
# DBOS stores workflow state in PostgreSQL
DBOS()

agent = Agent(
    "openai:gpt-4o-mini",
    system_prompt="You are an email drafting assistant.",
)

@DBOS.workflow()
def email_workflow(recipient: str, context: str) -> dict:
    """Durable email workflow — survives process restarts."""
    # Step 1: Draft the email (state persisted after completion)
    draft = draft_email(recipient, context)

    # Step 2: Review for tone (picks up here if crashed after step 1)
    review = review_draft(draft)

    return {"draft": draft, "review": review}

@DBOS.step()
def draft_email(recipient: str, context: str) -> str:
    """Each step is individually durable."""
    result = agent.run_sync(
        f"Draft a professional email to {recipient} about: {context}"
    )
    return result.output

@DBOS.step()
def review_draft(draft: str) -> str:
    """Review step executes exactly once."""
    result = agent.run_sync(
        f"Review this email draft for tone and professionalism:\n\n{draft}"
    )
    return result.output

print("DBOS workflow defined: email_workflow")
print("Each @DBOS.step() executes exactly once, even across restarts")
Choosing a Framework: Use Temporal for complex multi-service orchestration with sophisticated retry policies. Use DBOS for simpler workflows where PostgreSQL is already available. Use Prefect for data pipeline integration. Use Restate for event-driven virtual object patterns.

3. UI Event Streams

Modern AI applications stream agent events to frontend UIs in real-time — showing thinking progress, tool calls, and partial responses as they happen. PydanticAI supports multiple streaming protocols for different frontend frameworks.

3.1 AG-UI Protocol: Standardized Agent-to-UI Communication

from pydantic_ai import Agent
from pydantic_ai.agent_ui import AGUIServer
from fastapi import FastAPI
import uvicorn

# Create agent
agent = Agent(
    "openai:gpt-4o-mini",
    system_prompt="You are a helpful coding assistant.",
)

# AG-UI server wraps the agent with standardized event streaming
app = FastAPI()
ag_ui = AGUIServer(agent=agent)

# Mount AG-UI endpoints — compatible with any AG-UI frontend
app.mount("/agent", ag_ui.app)

# AG-UI events streamed to frontend:
# - RunStarted: agent begins processing
# - TextDelta: partial text output
# - ToolCallStart: tool invocation begins
# - ToolCallEnd: tool returns result
# - RunCompleted: final output ready

print("AG-UI server configured on /agent")
print("Frontend connects via SSE to receive real-time events")
print("Run with: uvicorn app:app --port 8000")

3.2 Vercel AI SDK Integration

Real-World Application

Cost-Optimized Agent Fleet

A SaaS company runs 20 different PydanticAI agents serving 50K requests/day. Their optimization: query routing sends 70% of requests to cheaper models (saving $3K/month), response caching eliminates 30% of redundant calls, and adaptive rate limiting prevents any single customer from monopolizing capacity.

Cost OptimizationScale
from pydantic_ai import Agent
from pydantic_ai.integrations.vercel import VercelAIStream
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

agent = Agent(
    "openai:gpt-4o-mini",
    system_prompt="You are a helpful assistant for a Next.js application.",
)

@app.post("/api/chat")
async def chat(request: dict):
    """Endpoint compatible with Vercel AI SDK useChat() hook."""
    messages = request.get("messages", [])
    last_message = messages[-1]["content"] if messages else ""

    # Stream agent response in Vercel AI SDK format
    async def generate():
        async with agent.run_stream(last_message) as result:
            async for chunk in VercelAIStream(result):
                yield chunk

    return StreamingResponse(generate(), media_type="text/event-stream")

print("Vercel AI SDK endpoint: POST /api/chat")
print("Compatible with: useChat() from 'ai/react'")
print("Streams text deltas as SSE events")
SSE vs WebSocket: AG-UI uses Server-Sent Events (SSE) for uni-directional streaming — ideal for agent responses. Use WebSockets only when you need bi-directional communication (e.g., real-time collaboration where the UI sends interrupts mid-generation). SSE is simpler, works through CDNs, and auto-reconnects.

4. Agent2Agent (A2A) Protocol

The A2A protocol enables standardized communication between AI agents, regardless of their underlying framework. A PydanticAI agent can discover, invoke, and delegate tasks to remote agents running on different platforms.

4.1 A2A Server & Client

from pydantic_ai import Agent
from pydantic_ai.a2a import A2AServer, A2AClient, AgentCard

# Define a specialist agent
research_agent = Agent(
    "openai:gpt-4o-mini",
    system_prompt="You are a research specialist. Provide detailed factual answers.",
)

# Expose as A2A server with a discoverable agent card
server = A2AServer(
    agent=research_agent,
    card=AgentCard(
        name="Research Agent",
        description="Specialist in factual research and information synthesis",
        capabilities=["research", "fact-checking", "summarization"],
        endpoint="http://localhost:8001/a2a",
    ),
)

print("A2A Server configured:")
print(f"  Name: {server.card.name}")
print(f"  Capabilities: {server.card.capabilities}")
print(f"  Endpoint: {server.card.endpoint}")
from pydantic_ai.a2a import A2AClient

# Client agent discovers and invokes remote agents
async def delegate_research(topic: str) -> str:
    """Delegate a research task to a remote A2A agent."""
    client = A2AClient()

    # Discover available agents
    agents = await client.discover("http://localhost:8001/.well-known/agent.json")
    print(f"Discovered agent: {agents.name} — {agents.description}")

    # Invoke the remote agent with a task
    result = await client.send_task(
        endpoint=agents.endpoint,
        task={
            "type": "research",
            "input": f"Research the following topic: {topic}",
        },
    )

    return result.output

# Usage in an orchestrator agent
print("A2A Client configured for remote agent delegation")
print("Supports: discover, send_task, get_task_status, cancel_task")

5. Command Line Interface (CLI)

PydanticAI agents can power interactive CLI tools with streaming output, rich terminal formatting, and pipe-friendly modes for integration into shell scripts and automation pipelines.

5.1 Building a CLI Agent with Rich Output

from pydantic_ai import Agent
import sys

# CLI agent with streaming terminal output
agent = Agent(
    "openai:gpt-4o-mini",
    system_prompt="You are a helpful CLI assistant. Be concise and direct.",
)

async def cli_chat(prompt: str) -> None:
    """Interactive CLI with streaming response."""
    print(f"\n\033[1;36mYou:\033[0m {prompt}")
    print(f"\033[1;32mAgent:\033[0m ", end="", flush=True)

    async with agent.run_stream(prompt) as result:
        async for chunk in result.stream_text():
            print(chunk, end="", flush=True)

    print("\n")  # Newline after streaming completes

# Pipe-friendly mode: detect if output is being piped
def is_piped() -> bool:
    return not sys.stdout.isatty()

async def main():
    """Entry point supporting interactive and piped modes."""
    if is_piped():
        # Piped mode: read from stdin, output plain text
        prompt = sys.stdin.read().strip()
        result = await agent.run(prompt)
        print(result.output)
    else:
        # Interactive mode: rich terminal with colors
        import asyncio
        prompt = " ".join(sys.argv[1:]) or "Hello!"
        await cli_chat(prompt)

print("CLI agent supports:")
print("  Interactive: python agent.py 'your question'")
print("  Piped: echo 'question' | python agent.py")
print("  REPL: python agent.py (then type questions)")
from pydantic_ai import Agent
from pydantic import BaseModel

# Structured CLI output for scripting
class CLIResponse(BaseModel):
    answer: str
    confidence: float
    sources: list[str]

agent = Agent(
    "openai:gpt-4o-mini",
    system_prompt="Answer questions with structured data. Include confidence score 0-1.",
    output_type=CLIResponse,
)

result = agent.run_sync("What is the population of Tokyo?")
response = result.output

# JSON output for piping to jq or other tools
import json
print(json.dumps(response.model_dump(), indent=2))
# Output:
# {
#   "answer": "Approximately 14 million in the city proper",
#   "confidence": 0.85,
#   "sources": ["UN World Urbanization Prospects", "Tokyo Metropolitan Government"]
# }
CLI Best Practices: (1) Detect isatty() to switch between rich and plain output. (2) Use structured output types for machine-readable responses. (3) Support --json and --quiet flags. (4) Stream to terminal but buffer for pipes. (5) Exit with non-zero codes on agent errors for shell script compatibility.
Try It Yourself: Build an ‘adaptive agent’ that: (1) uses a cheap model (GPT-4 mini) for simple queries and an expensive model (GPT-4) for complex ones (based on query classification), (2) caches responses for identical queries (with TTL), (3) implements rate limiting (max 10 requests/minute per user), (4) falls back gracefully when the primary model is unavailable.

Next in the PydanticAI SDK Track

In Part 14: Harness, Gateway & Production, we’ll deploy the PydanticAI Harness for coding agents, configure the Gateway for unified model access, build production Web Chat UIs, generate embeddings at scale, and implement coding agent skills for autonomous development.