Back to AI App Dev Series

PydanticAI SDK Track Part 11: Pydantic Graph

May 24, 2026 Wasil Zafar 40 min read

Build complex agent workflows with Pydantic Graph — define typed graph steps, compose joins and reducers for data aggregation, implement decision nodes for conditional branching, and execute graph paths in parallel for performance.

Table of Contents

  1. Pydantic Graph Overview
  2. Graph Builder API
  3. Steps: The Building Blocks
  4. Joins & Reducers
  5. Decisions & Parallel Execution
What You’ll Learn: Pydantic Evals is the framework’s built-in evaluation system for measuring agent quality systematically. Instead of manually checking outputs, you define test cases with expected results, run your agent against them, and get quantitative scores. This is how you move from ‘it seems to work’ to ‘it works 94.5% of the time, up from 91.2% last week.’

1. Pydantic Graph Overview

Pydantic Graph is a workflow orchestration library built on top of PydanticAI. It lets you define complex multi-step agent workflows as typed, composable graphs — where each step has explicit input/output types, edges define data flow, and the runtime handles execution order and parallelism.

When to Use Pydantic Graph: Use it when your workflow has branching logic, parallel steps, or complex data dependencies that go beyond simple sequential agent chains. If a linear pipeline suffices, stick with basic multi-agent patterns from Part 9.

1.1 Comparison to LangGraph

Unlike LangGraph (which uses string-keyed state dictionaries), Pydantic Graph leverages Python’s type system for compile-time safety:

FeaturePydantic GraphLangGraph
Type safetyFull — Pydantic models at every edgePartial — TypedDict state
Step definitionDecorated functions with typed I/ONode functions with state dict
BranchingDecision nodes with typed conditionsConditional edges with string routing
ParallelismBuilt-in async parallel executionRequires explicit fan-out nodes
Agent integrationNative PydanticAI agents in stepsLangChain agent wrappers
ValidationPydantic validation at every boundaryManual validation logic

1.2 Minimal Graph Example

from pydantic_graph import Graph, GraphRunContext, End
from pydantic import BaseModel
from dataclasses import dataclass

# Define state that flows through the graph
@dataclass
class WorkflowState:
    query: str
    results: list[str]

# Define step nodes
class SearchStep(BaseModel):
    """Search for information based on query."""
    search_query: str

    async def run(self, ctx: GraphRunContext[WorkflowState]) -> "SummarizeStep | End[str]":
        # Simulate search
        ctx.state.results = [
            f"Result 1 for: {self.search_query}",
            f"Result 2 for: {self.search_query}",
        ]
        # Route to summarize step
        return SummarizeStep(items=ctx.state.results)

class SummarizeStep(BaseModel):
    """Summarize the search results."""
    items: list[str]

    async def run(self, ctx: GraphRunContext[WorkflowState]) -> "End[str]":
        summary = f"Found {len(self.items)} results for '{ctx.state.query}'"
        return End(summary)

# Build and run the graph
import asyncio

async def main():
    graph = Graph(nodes=[SearchStep, SummarizeStep])
    state = WorkflowState(query="PydanticAI tutorials", results=[])
    
    # Start execution from SearchStep
    result = await graph.run(
        SearchStep(search_query="PydanticAI tutorials"),
        state=state
    )
    print(f"Graph result: {result.output}")

asyncio.run(main())

2. Graph Builder API

The Graph class provides the primary API for constructing and running graphs. Define nodes as Pydantic models with run methods that return the next node or End.

2.1 Building & Running a Graph

from pydantic_graph import Graph, GraphRunContext, End
from pydantic import BaseModel
from dataclasses import dataclass

@dataclass
class PipelineState:
    input_text: str
    cleaned_text: str = ""
    analysis: str = ""
    final_output: str = ""

class CleanTextStep(BaseModel):
    """Clean and normalize input text."""
    text: str

    async def run(self, ctx: GraphRunContext[PipelineState]) -> "AnalyzeStep":
        # Clean the text
        cleaned = self.text.strip().lower()
        ctx.state.cleaned_text = cleaned
        return AnalyzeStep(text=cleaned)

class AnalyzeStep(BaseModel):
    """Analyze the cleaned text."""
    text: str

    async def run(self, ctx: GraphRunContext[PipelineState]) -> "FormatOutputStep":
        # Perform analysis
        word_count = len(self.text.split())
        ctx.state.analysis = f"Words: {word_count}, Chars: {len(self.text)}"
        return FormatOutputStep(analysis=ctx.state.analysis)

class FormatOutputStep(BaseModel):
    """Format the final output."""
    analysis: str

    async def run(self, ctx: GraphRunContext[PipelineState]) -> "End[str]":
        output = (
            f"Input: {ctx.state.input_text}\n"
            f"Cleaned: {ctx.state.cleaned_text}\n"
            f"Analysis: {self.analysis}"
        )
        ctx.state.final_output = output
        return End(output)

import asyncio

async def main():
    # Build the graph with all node types
    graph = Graph(nodes=[CleanTextStep, AnalyzeStep, FormatOutputStep])
    
    # Initialize state
    state = PipelineState(input_text="  Hello World! This is a TEST.  ")
    
    # Run from the first step
    result = await graph.run(
        CleanTextStep(text=state.input_text),
        state=state
    )
    
    print(result.output)
    print(f"\nFinal state: {state}")

asyncio.run(main())
Type Safety: Each step’s run method declares its return type explicitly (e.g., -> "AnalyzeStep"). This means the graph structure is validated at definition time — you cannot accidentally route to a node that doesn’t exist or pass incompatible data between steps.

3. Steps: The Building Blocks

Steps are the atomic units of a graph. Each step is a Pydantic model with fields (input data) and a run method (execution logic). Steps can contain PydanticAI agents for AI-powered processing.

3.1 Steps That Use PydanticAI Agents

Embed PydanticAI agents inside graph steps for AI-powered workflow nodes:

from pydantic_graph import Graph, GraphRunContext, End
from pydantic_ai import Agent
from pydantic import BaseModel
from dataclasses import dataclass

# Define AI agents for different steps
classifier_agent = Agent(
    "openai:gpt-4o",
    system_prompt="Classify text as: positive, negative, or neutral. Return only the label."
)

summarizer_agent = Agent(
    "openai:gpt-4o",
    system_prompt="Summarize text in one sentence."
)

@dataclass
class ReviewState:
    review_text: str
    sentiment: str = ""
    summary: str = ""

class ClassifyStep(BaseModel):
    """Classify the sentiment of a review."""
    text: str

    async def run(self, ctx: GraphRunContext[ReviewState]) -> "SummarizeReviewStep":
        result = await classifier_agent.run(self.text)
        ctx.state.sentiment = result.data
        return SummarizeReviewStep(text=self.text, sentiment=result.data)

class SummarizeReviewStep(BaseModel):
    """Summarize the review content."""
    text: str
    sentiment: str

    async def run(self, ctx: GraphRunContext[ReviewState]) -> "End[dict]":
        result = await summarizer_agent.run(self.text)
        ctx.state.summary = result.data
        return End({
            "sentiment": self.sentiment,
            "summary": result.data,
            "original_length": len(self.text.split())
        })

import asyncio

async def main():
    graph = Graph(nodes=[ClassifyStep, SummarizeReviewStep])
    state = ReviewState(
        review_text="This product exceeded all my expectations. The build quality is "
        "outstanding and customer service was incredibly responsive."
    )
    
    result = await graph.run(
        ClassifyStep(text=state.review_text),
        state=state
    )
    
    print(f"Result: {result.output}")
    print(f"State: sentiment={state.sentiment}, summary={state.summary}")

asyncio.run(main())

3.2 Error Handling in Steps

Handle errors gracefully by routing to fallback steps or returning early with End:

from pydantic_graph import Graph, GraphRunContext, End
from pydantic import BaseModel
from dataclasses import dataclass

@dataclass
class ProcessingState:
    input_data: str
    error: str | None = None

class ValidateStep(BaseModel):
    """Validate input data before processing."""
    data: str

    async def run(self, ctx: GraphRunContext[ProcessingState]) -> "ProcessStep | ErrorStep":
        if not self.data or len(self.data) < 3:
            return ErrorStep(error_message="Input too short (minimum 3 characters)")
        if any(char in self.data for char in '<>{}'):
            return ErrorStep(error_message="Input contains invalid characters")
        return ProcessStep(validated_data=self.data)

class ProcessStep(BaseModel):
    """Process validated data."""
    validated_data: str

    async def run(self, ctx: GraphRunContext[ProcessingState]) -> "End[str]":
        result = f"Processed: {self.validated_data.upper()}"
        return End(result)

class ErrorStep(BaseModel):
    """Handle processing errors."""
    error_message: str

    async def run(self, ctx: GraphRunContext[ProcessingState]) -> "End[str]":
        ctx.state.error = self.error_message
        return End(f"Error: {self.error_message}")

import asyncio

async def main():
    graph = Graph(nodes=[ValidateStep, ProcessStep, ErrorStep])
    
    # Test valid input
    state1 = ProcessingState(input_data="hello world")
    result1 = await graph.run(ValidateStep(data="hello world"), state=state1)
    print(f"Valid: {result1.output}")
    
    # Test invalid input
    state2 = ProcessingState(input_data="ab")
    result2 = await graph.run(ValidateStep(data="ab"), state=state2)
    print(f"Invalid: {result2.output}")
    print(f"Error state: {state2.error}")

asyncio.run(main())

4. Joins & Reducers

Real-World Application

Measurable Agent Improvement

A legal-tech company tracks their contract analysis agent’s accuracy weekly using Pydantic Evals. They maintain a benchmark of 200 contracts with known correct extractions. Over 3 months: accuracy went from 87% → 91% → 94% through systematic prompt iteration guided by eval scores. Each improvement is documented and reversible.

Legal TechEvaluation

When multiple steps execute in parallel, you need to join their outputs into a single stream for the next step. Reducers aggregate results from fan-out patterns.

4.1 Fan-Out / Fan-In Patterns

Execute multiple research paths in parallel and aggregate results:

from pydantic_graph import Graph, GraphRunContext, End
from pydantic_ai import Agent
from pydantic import BaseModel
from dataclasses import dataclass, field
import asyncio

# Specialist agents for parallel research
tech_researcher = Agent(
    "openai:gpt-4o",
    system_prompt="You research technology trends. Be concise — 2 sentences max."
)

market_researcher = Agent(
    "openai:gpt-4o",
    system_prompt="You research market trends. Be concise — 2 sentences max."
)

@dataclass
class ResearchState:
    topic: str
    tech_findings: str = ""
    market_findings: str = ""
    combined_report: str = ""

class TechResearchStep(BaseModel):
    """Research technology aspects of the topic."""
    topic: str

    async def run(self, ctx: GraphRunContext[ResearchState]) -> "End[str]":
        result = await tech_researcher.run(f"Technology trends for: {self.topic}")
        ctx.state.tech_findings = result.data
        return End(result.data)

class MarketResearchStep(BaseModel):
    """Research market aspects of the topic."""
    topic: str

    async def run(self, ctx: GraphRunContext[ResearchState]) -> "End[str]":
        result = await market_researcher.run(f"Market trends for: {self.topic}")
        ctx.state.market_findings = result.data
        return End(result.data)

class CombineResultsStep(BaseModel):
    """Combine research from multiple sources."""
    tech_report: str
    market_report: str

    async def run(self, ctx: GraphRunContext[ResearchState]) -> "End[str]":
        combined = (
            f"# Research Report: {ctx.state.topic}\n\n"
            f"## Technology Findings\n{self.tech_report}\n\n"
            f"## Market Findings\n{self.market_report}"
        )
        ctx.state.combined_report = combined
        return End(combined)

async def parallel_research(topic: str):
    """Run research steps in parallel, then combine."""
    state = ResearchState(topic=topic)
    
    # Execute both research steps concurrently
    tech_graph = Graph(nodes=[TechResearchStep])
    market_graph = Graph(nodes=[MarketResearchStep])
    
    tech_task = tech_graph.run(TechResearchStep(topic=topic), state=state)
    market_task = market_graph.run(MarketResearchStep(topic=topic), state=state)
    
    tech_result, market_result = await asyncio.gather(tech_task, market_task)
    
    # Combine results
    combine_graph = Graph(nodes=[CombineResultsStep])
    final = await combine_graph.run(
        CombineResultsStep(
            tech_report=tech_result.output,
            market_report=market_result.output
        ),
        state=state
    )
    
    print(final.output)

asyncio.run(parallel_research("autonomous vehicles in 2026"))
Performance Tip: Use asyncio.gather() to run independent graph branches concurrently. This is the key performance optimization — if two steps don’t depend on each other, run them in parallel to cut total latency.

5. Decisions & Parallel Execution

Decision nodes evaluate conditions and route execution to different branches based on step output. Combined with parallel execution, this enables complex DAG (Directed Acyclic Graph) workflows.

5.1 Decision Nodes with Conditional Branching

from pydantic_graph import Graph, GraphRunContext, End
from pydantic import BaseModel
from dataclasses import dataclass

@dataclass
class OrderState:
    order_total: float
    customer_tier: str
    requires_review: bool = False
    discount_applied: float = 0.0
    final_total: float = 0.0

class EvaluateOrderStep(BaseModel):
    """Evaluate order and decide processing path."""
    total: float
    tier: str

    async def run(
        self, ctx: GraphRunContext[OrderState]
    ) -> "ApplyDiscountStep | ManualReviewStep | StandardProcessStep":
        # Decision logic: route based on conditions
        if self.total > 10000:
            # High-value orders need manual review
            return ManualReviewStep(order_total=self.total, reason="High value order")
        elif self.tier == "platinum":
            # Platinum customers get automatic discount
            return ApplyDiscountStep(total=self.total, discount_percent=15.0)
        elif self.tier == "gold":
            return ApplyDiscountStep(total=self.total, discount_percent=10.0)
        else:
            return StandardProcessStep(total=self.total)

class ApplyDiscountStep(BaseModel):
    """Apply tier-based discount."""
    total: float
    discount_percent: float

    async def run(self, ctx: GraphRunContext[OrderState]) -> "End[dict]":
        discount = self.total * (self.discount_percent / 100)
        final = self.total - discount
        ctx.state.discount_applied = discount
        ctx.state.final_total = final
        return End({
            "status": "approved",
            "original": self.total,
            "discount": discount,
            "final": final,
            "path": "discount"
        })

class ManualReviewStep(BaseModel):
    """Flag order for manual review."""
    order_total: float
    reason: str

    async def run(self, ctx: GraphRunContext[OrderState]) -> "End[dict]":
        ctx.state.requires_review = True
        ctx.state.final_total = self.order_total
        return End({
            "status": "pending_review",
            "total": self.order_total,
            "reason": self.reason,
            "path": "manual_review"
        })

class StandardProcessStep(BaseModel):
    """Process order without modifications."""
    total: float

    async def run(self, ctx: GraphRunContext[OrderState]) -> "End[dict]":
        ctx.state.final_total = self.total
        return End({
            "status": "approved",
            "total": self.total,
            "discount": 0,
            "path": "standard"
        })

import asyncio

async def main():
    graph = Graph(nodes=[
        EvaluateOrderStep, ApplyDiscountStep, 
        ManualReviewStep, StandardProcessStep
    ])
    
    # Test different paths
    test_cases = [
        ("Platinum customer", 500.0, "platinum"),
        ("High-value order", 15000.0, "gold"),
        ("Standard order", 200.0, "silver"),
    ]
    
    for label, total, tier in test_cases:
        state = OrderState(order_total=total, customer_tier=tier)
        result = await graph.run(
            EvaluateOrderStep(total=total, tier=tier),
            state=state
        )
        print(f"{label}: {result.output}")

asyncio.run(main())
Graph Complexity Warning: While Pydantic Graph supports arbitrarily complex DAGs, keep workflows as simple as possible. A 3-5 node graph with clear decision logic is maintainable. A 20-node graph with nested branches becomes hard to debug. If your graph grows beyond 10 nodes, consider decomposing it into sub-graphs.
Try It Yourself: Build an evaluation suite for a ‘sentiment analysis’ agent: (1) create 30 test cases with ground-truth labels (positive/negative/neutral), (2) define scoring functions (exact match, fuzzy match for edge cases), (3) run the eval and generate a report, (4) iterate on the system prompt to improve accuracy, (5) track improvement across 3 prompt versions.

Next in the PydanticAI SDK Track

In Part 12: Pydantic Evals, we’ll implement systematic evaluation of agent outputs using Pydantic Evals — define evaluation datasets, run benchmarks against agent responses, track quality metrics over time, and integrate evaluations into CI/CD pipelines.