1. Pydantic Graph Overview
Pydantic Graph is a workflow orchestration library built on top of PydanticAI. It lets you define complex multi-step agent workflows as typed, composable graphs — where each step has explicit input/output types, edges define data flow, and the runtime handles execution order and parallelism.
1.1 Comparison to LangGraph
Unlike LangGraph (which uses string-keyed state dictionaries), Pydantic Graph leverages Python’s type system for compile-time safety:
| Feature | Pydantic Graph | LangGraph |
|---|---|---|
| Type safety | Full — Pydantic models at every edge | Partial — TypedDict state |
| Step definition | Decorated functions with typed I/O | Node functions with state dict |
| Branching | Decision nodes with typed conditions | Conditional edges with string routing |
| Parallelism | Built-in async parallel execution | Requires explicit fan-out nodes |
| Agent integration | Native PydanticAI agents in steps | LangChain agent wrappers |
| Validation | Pydantic validation at every boundary | Manual validation logic |
1.2 Minimal Graph Example
from pydantic_graph import Graph, GraphRunContext, End
from pydantic import BaseModel
from dataclasses import dataclass
# Define state that flows through the graph
@dataclass
class WorkflowState:
query: str
results: list[str]
# Define step nodes
class SearchStep(BaseModel):
"""Search for information based on query."""
search_query: str
async def run(self, ctx: GraphRunContext[WorkflowState]) -> "SummarizeStep | End[str]":
# Simulate search
ctx.state.results = [
f"Result 1 for: {self.search_query}",
f"Result 2 for: {self.search_query}",
]
# Route to summarize step
return SummarizeStep(items=ctx.state.results)
class SummarizeStep(BaseModel):
"""Summarize the search results."""
items: list[str]
async def run(self, ctx: GraphRunContext[WorkflowState]) -> "End[str]":
summary = f"Found {len(self.items)} results for '{ctx.state.query}'"
return End(summary)
# Build and run the graph
import asyncio
async def main():
graph = Graph(nodes=[SearchStep, SummarizeStep])
state = WorkflowState(query="PydanticAI tutorials", results=[])
# Start execution from SearchStep
result = await graph.run(
SearchStep(search_query="PydanticAI tutorials"),
state=state
)
print(f"Graph result: {result.output}")
asyncio.run(main())
2. Graph Builder API
The Graph class provides the primary API for constructing and running graphs. Define nodes as Pydantic models with run methods that return the next node or End.
2.1 Building & Running a Graph
from pydantic_graph import Graph, GraphRunContext, End
from pydantic import BaseModel
from dataclasses import dataclass
@dataclass
class PipelineState:
input_text: str
cleaned_text: str = ""
analysis: str = ""
final_output: str = ""
class CleanTextStep(BaseModel):
"""Clean and normalize input text."""
text: str
async def run(self, ctx: GraphRunContext[PipelineState]) -> "AnalyzeStep":
# Clean the text
cleaned = self.text.strip().lower()
ctx.state.cleaned_text = cleaned
return AnalyzeStep(text=cleaned)
class AnalyzeStep(BaseModel):
"""Analyze the cleaned text."""
text: str
async def run(self, ctx: GraphRunContext[PipelineState]) -> "FormatOutputStep":
# Perform analysis
word_count = len(self.text.split())
ctx.state.analysis = f"Words: {word_count}, Chars: {len(self.text)}"
return FormatOutputStep(analysis=ctx.state.analysis)
class FormatOutputStep(BaseModel):
"""Format the final output."""
analysis: str
async def run(self, ctx: GraphRunContext[PipelineState]) -> "End[str]":
output = (
f"Input: {ctx.state.input_text}\n"
f"Cleaned: {ctx.state.cleaned_text}\n"
f"Analysis: {self.analysis}"
)
ctx.state.final_output = output
return End(output)
import asyncio
async def main():
# Build the graph with all node types
graph = Graph(nodes=[CleanTextStep, AnalyzeStep, FormatOutputStep])
# Initialize state
state = PipelineState(input_text=" Hello World! This is a TEST. ")
# Run from the first step
result = await graph.run(
CleanTextStep(text=state.input_text),
state=state
)
print(result.output)
print(f"\nFinal state: {state}")
asyncio.run(main())
run method declares its return type explicitly (e.g., -> "AnalyzeStep"). This means the graph structure is validated at definition time — you cannot accidentally route to a node that doesn’t exist or pass incompatible data between steps.
3. Steps: The Building Blocks
Steps are the atomic units of a graph. Each step is a Pydantic model with fields (input data) and a run method (execution logic). Steps can contain PydanticAI agents for AI-powered processing.
3.1 Steps That Use PydanticAI Agents
Embed PydanticAI agents inside graph steps for AI-powered workflow nodes:
from pydantic_graph import Graph, GraphRunContext, End
from pydantic_ai import Agent
from pydantic import BaseModel
from dataclasses import dataclass
# Define AI agents for different steps
classifier_agent = Agent(
"openai:gpt-4o",
system_prompt="Classify text as: positive, negative, or neutral. Return only the label."
)
summarizer_agent = Agent(
"openai:gpt-4o",
system_prompt="Summarize text in one sentence."
)
@dataclass
class ReviewState:
review_text: str
sentiment: str = ""
summary: str = ""
class ClassifyStep(BaseModel):
"""Classify the sentiment of a review."""
text: str
async def run(self, ctx: GraphRunContext[ReviewState]) -> "SummarizeReviewStep":
result = await classifier_agent.run(self.text)
ctx.state.sentiment = result.data
return SummarizeReviewStep(text=self.text, sentiment=result.data)
class SummarizeReviewStep(BaseModel):
"""Summarize the review content."""
text: str
sentiment: str
async def run(self, ctx: GraphRunContext[ReviewState]) -> "End[dict]":
result = await summarizer_agent.run(self.text)
ctx.state.summary = result.data
return End({
"sentiment": self.sentiment,
"summary": result.data,
"original_length": len(self.text.split())
})
import asyncio
async def main():
graph = Graph(nodes=[ClassifyStep, SummarizeReviewStep])
state = ReviewState(
review_text="This product exceeded all my expectations. The build quality is "
"outstanding and customer service was incredibly responsive."
)
result = await graph.run(
ClassifyStep(text=state.review_text),
state=state
)
print(f"Result: {result.output}")
print(f"State: sentiment={state.sentiment}, summary={state.summary}")
asyncio.run(main())
3.2 Error Handling in Steps
Handle errors gracefully by routing to fallback steps or returning early with End:
from pydantic_graph import Graph, GraphRunContext, End
from pydantic import BaseModel
from dataclasses import dataclass
@dataclass
class ProcessingState:
input_data: str
error: str | None = None
class ValidateStep(BaseModel):
"""Validate input data before processing."""
data: str
async def run(self, ctx: GraphRunContext[ProcessingState]) -> "ProcessStep | ErrorStep":
if not self.data or len(self.data) < 3:
return ErrorStep(error_message="Input too short (minimum 3 characters)")
if any(char in self.data for char in '<>{}'):
return ErrorStep(error_message="Input contains invalid characters")
return ProcessStep(validated_data=self.data)
class ProcessStep(BaseModel):
"""Process validated data."""
validated_data: str
async def run(self, ctx: GraphRunContext[ProcessingState]) -> "End[str]":
result = f"Processed: {self.validated_data.upper()}"
return End(result)
class ErrorStep(BaseModel):
"""Handle processing errors."""
error_message: str
async def run(self, ctx: GraphRunContext[ProcessingState]) -> "End[str]":
ctx.state.error = self.error_message
return End(f"Error: {self.error_message}")
import asyncio
async def main():
graph = Graph(nodes=[ValidateStep, ProcessStep, ErrorStep])
# Test valid input
state1 = ProcessingState(input_data="hello world")
result1 = await graph.run(ValidateStep(data="hello world"), state=state1)
print(f"Valid: {result1.output}")
# Test invalid input
state2 = ProcessingState(input_data="ab")
result2 = await graph.run(ValidateStep(data="ab"), state=state2)
print(f"Invalid: {result2.output}")
print(f"Error state: {state2.error}")
asyncio.run(main())
4. Joins & Reducers
Measurable Agent Improvement
A legal-tech company tracks their contract analysis agent’s accuracy weekly using Pydantic Evals. They maintain a benchmark of 200 contracts with known correct extractions. Over 3 months: accuracy went from 87% → 91% → 94% through systematic prompt iteration guided by eval scores. Each improvement is documented and reversible.
When multiple steps execute in parallel, you need to join their outputs into a single stream for the next step. Reducers aggregate results from fan-out patterns.
4.1 Fan-Out / Fan-In Patterns
Execute multiple research paths in parallel and aggregate results:
from pydantic_graph import Graph, GraphRunContext, End
from pydantic_ai import Agent
from pydantic import BaseModel
from dataclasses import dataclass, field
import asyncio
# Specialist agents for parallel research
tech_researcher = Agent(
"openai:gpt-4o",
system_prompt="You research technology trends. Be concise — 2 sentences max."
)
market_researcher = Agent(
"openai:gpt-4o",
system_prompt="You research market trends. Be concise — 2 sentences max."
)
@dataclass
class ResearchState:
topic: str
tech_findings: str = ""
market_findings: str = ""
combined_report: str = ""
class TechResearchStep(BaseModel):
"""Research technology aspects of the topic."""
topic: str
async def run(self, ctx: GraphRunContext[ResearchState]) -> "End[str]":
result = await tech_researcher.run(f"Technology trends for: {self.topic}")
ctx.state.tech_findings = result.data
return End(result.data)
class MarketResearchStep(BaseModel):
"""Research market aspects of the topic."""
topic: str
async def run(self, ctx: GraphRunContext[ResearchState]) -> "End[str]":
result = await market_researcher.run(f"Market trends for: {self.topic}")
ctx.state.market_findings = result.data
return End(result.data)
class CombineResultsStep(BaseModel):
"""Combine research from multiple sources."""
tech_report: str
market_report: str
async def run(self, ctx: GraphRunContext[ResearchState]) -> "End[str]":
combined = (
f"# Research Report: {ctx.state.topic}\n\n"
f"## Technology Findings\n{self.tech_report}\n\n"
f"## Market Findings\n{self.market_report}"
)
ctx.state.combined_report = combined
return End(combined)
async def parallel_research(topic: str):
"""Run research steps in parallel, then combine."""
state = ResearchState(topic=topic)
# Execute both research steps concurrently
tech_graph = Graph(nodes=[TechResearchStep])
market_graph = Graph(nodes=[MarketResearchStep])
tech_task = tech_graph.run(TechResearchStep(topic=topic), state=state)
market_task = market_graph.run(MarketResearchStep(topic=topic), state=state)
tech_result, market_result = await asyncio.gather(tech_task, market_task)
# Combine results
combine_graph = Graph(nodes=[CombineResultsStep])
final = await combine_graph.run(
CombineResultsStep(
tech_report=tech_result.output,
market_report=market_result.output
),
state=state
)
print(final.output)
asyncio.run(parallel_research("autonomous vehicles in 2026"))
asyncio.gather() to run independent graph branches concurrently. This is the key performance optimization — if two steps don’t depend on each other, run them in parallel to cut total latency.
5. Decisions & Parallel Execution
Decision nodes evaluate conditions and route execution to different branches based on step output. Combined with parallel execution, this enables complex DAG (Directed Acyclic Graph) workflows.
5.1 Decision Nodes with Conditional Branching
from pydantic_graph import Graph, GraphRunContext, End
from pydantic import BaseModel
from dataclasses import dataclass
@dataclass
class OrderState:
order_total: float
customer_tier: str
requires_review: bool = False
discount_applied: float = 0.0
final_total: float = 0.0
class EvaluateOrderStep(BaseModel):
"""Evaluate order and decide processing path."""
total: float
tier: str
async def run(
self, ctx: GraphRunContext[OrderState]
) -> "ApplyDiscountStep | ManualReviewStep | StandardProcessStep":
# Decision logic: route based on conditions
if self.total > 10000:
# High-value orders need manual review
return ManualReviewStep(order_total=self.total, reason="High value order")
elif self.tier == "platinum":
# Platinum customers get automatic discount
return ApplyDiscountStep(total=self.total, discount_percent=15.0)
elif self.tier == "gold":
return ApplyDiscountStep(total=self.total, discount_percent=10.0)
else:
return StandardProcessStep(total=self.total)
class ApplyDiscountStep(BaseModel):
"""Apply tier-based discount."""
total: float
discount_percent: float
async def run(self, ctx: GraphRunContext[OrderState]) -> "End[dict]":
discount = self.total * (self.discount_percent / 100)
final = self.total - discount
ctx.state.discount_applied = discount
ctx.state.final_total = final
return End({
"status": "approved",
"original": self.total,
"discount": discount,
"final": final,
"path": "discount"
})
class ManualReviewStep(BaseModel):
"""Flag order for manual review."""
order_total: float
reason: str
async def run(self, ctx: GraphRunContext[OrderState]) -> "End[dict]":
ctx.state.requires_review = True
ctx.state.final_total = self.order_total
return End({
"status": "pending_review",
"total": self.order_total,
"reason": self.reason,
"path": "manual_review"
})
class StandardProcessStep(BaseModel):
"""Process order without modifications."""
total: float
async def run(self, ctx: GraphRunContext[OrderState]) -> "End[dict]":
ctx.state.final_total = self.total
return End({
"status": "approved",
"total": self.total,
"discount": 0,
"path": "standard"
})
import asyncio
async def main():
graph = Graph(nodes=[
EvaluateOrderStep, ApplyDiscountStep,
ManualReviewStep, StandardProcessStep
])
# Test different paths
test_cases = [
("Platinum customer", 500.0, "platinum"),
("High-value order", 15000.0, "gold"),
("Standard order", 200.0, "silver"),
]
for label, total, tier in test_cases:
state = OrderState(order_total=total, customer_tier=tier)
result = await graph.run(
EvaluateOrderStep(total=total, tier=tier),
state=state
)
print(f"{label}: {result.output}")
asyncio.run(main())
Next in the PydanticAI SDK Track
In Part 12: Pydantic Evals, we’ll implement systematic evaluation of agent outputs using Pydantic Evals — define evaluation datasets, run benchmarks against agent responses, track quality metrics over time, and integrate evaluations into CI/CD pipelines.