query() function returns an async generator — perfect for building real-time streaming UIs. You’ll also learn to enforce structured JSON output (no more regex parsing), checkpoint files before risky edits, track costs in real time, add OpenTelemetry tracing, and deploy production agents in Docker containers and serverless functions.
1. Real-Time Streaming
The Agent SDK’s query() yields messages as an async generator. This means you get real-time updates as the agent thinks, not just the final result. For user-facing applications, this creates responsive UIs that show the agent’s reasoning in progress.
1.1 Stream Events
# Real-Time Streaming ā Show Agent Progress to Users
# Requires: pip install claude-agent-sdk
import asyncio
import json
from claude_agent_sdk import (
query, ClaudeAgentOptions,
SystemMessage, AssistantMessage, UserMessage, ResultMessage
)
async def stream_to_user():
"""Stream all events as they happen ā for real-time UIs."""
events = [] # Collect for downstream processing
async for message in query(
prompt="Analyze the project structure and suggest improvements.",
options=ClaudeAgentOptions(
allowed_tools=["Glob", "Read", "Grep"],
),
):
# SYSTEM: session lifecycle events
if isinstance(message, SystemMessage):
print(f"[system:{message.subtype}]")
events.append({"type": "system", "subtype": message.subtype})
# ASSISTANT: Claude's response (text + tool calls)
elif isinstance(message, AssistantMessage):
for block in message.content:
if hasattr(block, "text") and block.text:
# Stream text tokens to user in real-time
print(f"š¬ {block.text[:120]}")
events.append({"type": "text", "content": block.text})
elif hasattr(block, "name"):
# Show tool calls as they happen
print(f"š§ {block.name}({json.dumps(block.input)[:60]})")
events.append({"type": "tool_call", "tool": block.name})
# USER: tool results (usually skip in UI)
elif isinstance(message, UserMessage):
events.append({"type": "tool_result"})
# RESULT: final outcome
elif isinstance(message, ResultMessage):
print(f"\nā
Done ({message.subtype}) ā ${message.total_cost_usd:.4f}")
events.append({
"type": "result",
"subtype": message.subtype,
"cost": message.total_cost_usd,
"turns": message.num_turns,
})
return events
events = asyncio.run(stream_to_user())
print(f"\nTotal events streamed: {len(events)}")
1.2 Building a Streaming UI (Server-Sent Events)
For web applications, bridge the async generator to Server-Sent Events (SSE). The client receives updates in real-time without polling:
# FastAPI + Agent SDK Streaming ā SSE Endpoint
# Requires: pip install claude-agent-sdk fastapi uvicorn
import asyncio
import json
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from claude_agent_sdk import query, ClaudeAgentOptions, AssistantMessage, ResultMessage
app = FastAPI()
async def agent_event_stream(prompt: str):
"""Yield SSE-formatted events from the Agent SDK."""
async for message in query(
prompt=prompt,
options=ClaudeAgentOptions(
allowed_tools=["Read", "Glob", "Grep", "WebSearch"],
permission_mode="plan", # Read-only for web users
),
):
if isinstance(message, AssistantMessage):
for block in message.content:
if hasattr(block, "text") and block.text:
event = json.dumps({"type": "text", "content": block.text})
yield f"data: {event}\n\n"
elif hasattr(block, "name"):
event = json.dumps({"type": "tool", "name": block.name})
yield f"data: {event}\n\n"
elif isinstance(message, ResultMessage):
event = json.dumps({
"type": "done",
"result": message.result[:500] if message.subtype == "success" else None,
"cost": message.total_cost_usd,
})
yield f"data: {event}\n\n"
@app.get("/api/agent/stream")
async def stream_agent(prompt: str):
"""SSE endpoint ā client connects and receives real-time updates."""
return StreamingResponse(
agent_event_stream(prompt),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
# Client-side (JavaScript):
# const events = new EventSource('/api/agent/stream?prompt=...');
# events.onmessage = (e) => { const data = JSON.parse(e.data); ... };
2. Structured Output
2.1 output_format — Guaranteed JSON
Instead of parsing free-text output with regex, use output_format with a JSON Schema to guarantee the agent’s final output matches the structure you need. The SDK validates the final output and returns the typed data in structured_output:
# Structured Output ā Guaranteed JSON via output_format
# Requires: pip install claude-agent-sdk
import asyncio
from claude_agent_sdk import query, ClaudeAgentOptions, ResultMessage
async def structured_analysis():
"""Force the agent to output structured JSON matching a schema."""
schema = {
"type": "object",
"properties": {
"summary": {"type": "string", "description": "One-paragraph project summary"},
"languages": {
"type": "array",
"items": {"type": "string"},
"description": "Programming languages found",
},
"file_count": {"type": "integer"},
"recommendations": {
"type": "array",
"items": {
"type": "object",
"properties": {
"priority": {"type": "string", "enum": ["high", "medium", "low"]},
"suggestion": {"type": "string"},
},
"required": ["priority", "suggestion"],
},
},
},
"required": ["summary", "languages", "file_count", "recommendations"],
}
async for message in query(
prompt="Analyze this project and return structured findings.",
options=ClaudeAgentOptions(
allowed_tools=["Glob", "Read", "Grep"],
output_format={
"type": "json_schema",
"schema": schema,
}, # Enforce this JSON Schema on output
),
):
if (
isinstance(message, ResultMessage)
and message.subtype == "success"
and message.structured_output
):
# structured_output is validated against the schema
data = message.structured_output
print(f"Languages: {data['languages']}")
print(f"Files: {data['file_count']}")
for rec in data["recommendations"]:
print(f" [{rec['priority']}] {rec['suggestion']}")
asyncio.run(structured_analysis())
2.2 JSON Mode vs Schema
| Feature | output_format + schema | JSON mode (raw SDK) |
|---|---|---|
| Schema enforcement | Yes — SDK validates | No — just requests JSON |
| Guarantee | 100% valid or error | Usually valid, may fail |
| Nested objects | Fully supported | Supported |
| Arrays with items | Validated recursively | Not validated |
| Enums | Enforced | Not enforced |
| Works with tools | Yes (agent can call tools, final output is structured) | Conflicts with tool_use |
output_format works seamlessly with agentic tool loops. The agent calls tools as needed, then returns schema-validated data in structured_output. In the raw API, plain JSON mode is a weaker fit for tool-driven workflows because you do not get the Agent SDK’s end-of-run schema validation and retry behavior.
3. File Checkpointing
Checkpointing creates a snapshot of modified files before risky operations, enabling safe rollback if something goes wrong. The SDK can automatically checkpoint before Edit/Write operations.
3.1 Checkpoint API
# File Checkpointing ā Safe Rollback for Risky Operations
# Requires: pip install claude-agent-sdk
import asyncio
from claude_agent_sdk import query, ClaudeAgentOptions, HookMatcher, ResultMessage
# PreToolUse hook that creates checkpoints before file modifications
async def checkpoint_before_edit(input_data, tool_use_id, context):
"""Create a git stash before any file modification."""
import subprocess
file_path = input_data.get("tool_input", {}).get("file_path", "")
if file_path:
# Git stash as checkpoint (production: use proper checkpoint API)
subprocess.run(["git", "stash", "push", "-m", f"checkpoint-{tool_use_id}"])
print(f"[CHECKPOINT] Saved state before editing {file_path}")
return {} # Allow the edit to proceed
async def safe_refactoring():
"""Agent edits with automatic checkpointing."""
async for message in query(
prompt="Refactor the database module to use connection pooling.",
options=ClaudeAgentOptions(
allowed_tools=["Read", "Edit", "Bash", "Glob", "Grep"],
permission_mode="acceptEdits",
hooks={
"PreToolUse": [
HookMatcher(matcher="Edit|Write", hooks=[checkpoint_before_edit])
],
},
),
):
if isinstance(message, ResultMessage):
if message.subtype == "success":
print("Refactoring complete. Checkpoints available for rollback.")
else:
print(f"Failed: {message.subtype}. Rolling back...")
# Restore from checkpoint
import subprocess
subprocess.run(["git", "stash", "pop"])
asyncio.run(safe_refactoring())
3.2 Restore & Rollback
# Rollback Pattern ā Restore Files After Failed Agent Run
import asyncio
import subprocess
from claude_agent_sdk import query, ClaudeAgentOptions, ResultMessage
async def run_with_rollback(prompt: str):
"""Run agent with automatic rollback on failure."""
# Step 1: Create checkpoint (git stash or copy files)
subprocess.run(["git", "stash", "push", "-m", "agent-checkpoint"], check=True)
print("[CHECKPOINT] State saved")
session_id = None
success = False
async for message in query(
prompt=prompt,
options=ClaudeAgentOptions(
allowed_tools=["Read", "Edit", "Bash", "Glob", "Grep"],
permission_mode="acceptEdits",
max_budget_usd=0.50,
),
):
if isinstance(message, ResultMessage):
session_id = message.session_id
success = message.subtype == "success"
if success:
# Drop the checkpoint ā changes are good
subprocess.run(["git", "stash", "drop"], check=True)
print("[SUCCESS] Changes kept, checkpoint dropped")
else:
# Restore to checkpoint ā discard all agent changes
subprocess.run(["git", "stash", "pop"], check=True)
print("[ROLLBACK] All changes reverted")
return success
asyncio.run(run_with_rollback("Add comprehensive error handling to src/api/"))
4. Cost Tracking & Budgets
4.1 ResultMessage Cost Fields
# Cost Tracking ā Every Run Reports Exact Cost
# Requires: pip install claude-agent-sdk
import asyncio
from claude_agent_sdk import query, ClaudeAgentOptions, ResultMessage
async def track_costs():
"""ResultMessage always includes cost data ā use for billing and monitoring."""
async for message in query(
prompt="Write comprehensive tests for the utils module.",
options=ClaudeAgentOptions(
allowed_tools=["Read", "Edit", "Bash", "Glob"],
permission_mode="acceptEdits",
),
):
if isinstance(message, ResultMessage):
# Available on ALL result subtypes (success, error_max_turns, etc.)
print(f"Status: {message.subtype}")
print(f"Total cost: ${message.total_cost_usd:.4f}")
print(f"Turns used: {message.num_turns}")
print(f"Session ID: {message.session_id}")
# Use for billing: charge user, update quotas, alert on high spend
if message.total_cost_usd > 0.50:
print("ā ļø High cost alert ā review agent efficiency")
asyncio.run(track_costs())
4.2 Budget Enforcement
# Budget Enforcement ā Hard Limits on Spend
# Combine max_budget_usd + max_turns for safety
import asyncio
from claude_agent_sdk import query, ClaudeAgentOptions, ResultMessage
async def budget_enforcement():
"""Demonstrate budget limits and handling budget-exceeded state."""
async for message in query(
prompt="Research and write a comprehensive report on cloud providers.",
options=ClaudeAgentOptions(
allowed_tools=["WebSearch", "WebFetch", "Read", "Write"],
max_budget_usd=0.25, # Hard cost ceiling
max_turns=20, # Hard turn ceiling
effort="medium", # Lower effort = fewer tokens per turn
),
):
if isinstance(message, ResultMessage):
if message.subtype == "success":
print(f"Completed within budget: ${message.total_cost_usd:.4f}")
elif message.subtype == "error_max_budget_usd":
# Agent hit the cost ceiling ā partial work may exist
print(f"Budget exceeded at ${message.total_cost_usd:.4f}")
print(f"Session: {message.session_id}")
# Option: resume with higher budget
# async for msg in query(prompt="Continue", options=ClaudeAgentOptions(
# resume=message.session_id, max_budget_usd=0.50))
elif message.subtype == "error_max_turns":
print(f"Turn limit hit. Used ${message.total_cost_usd:.4f}")
asyncio.run(budget_enforcement())
5. Observability
5.1 OpenTelemetry Integration
For production monitoring, export traces and metrics to your observability stack. The Agent SDK emits spans for each turn, tool call, and subagent invocation:
# OpenTelemetry ā Tracing Agent Turns and Tool Calls
# Requires: pip install claude-agent-sdk opentelemetry-sdk opentelemetry-exporter-otlp
import asyncio
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from claude_agent_sdk import (
query, ClaudeAgentOptions,
AssistantMessage, ResultMessage
)
# Set up OpenTelemetry
provider = TracerProvider()
processor = SimpleSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317"))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("agent-sdk")
async def traced_agent(prompt: str):
"""Run agent with OpenTelemetry tracing."""
with tracer.start_as_current_span("agent_query") as span:
span.set_attribute("agent.prompt", prompt[:100])
turn_count = 0
async for message in query(
prompt=prompt,
options=ClaudeAgentOptions(
allowed_tools=["Read", "Glob", "Grep", "Edit"],
permission_mode="acceptEdits",
),
):
if isinstance(message, AssistantMessage):
turn_count += 1
with tracer.start_as_current_span(f"turn_{turn_count}"):
for block in message.content:
if hasattr(block, "name"):
with tracer.start_as_current_span(f"tool_{block.name}"):
pass # SDK executes tool internally
if isinstance(message, ResultMessage):
span.set_attribute("agent.status", message.subtype)
span.set_attribute("agent.cost_usd", message.total_cost_usd or 0)
span.set_attribute("agent.turns", message.num_turns)
span.set_attribute("agent.session_id", message.session_id)
asyncio.run(traced_agent("Fix the failing tests in src/"))
5.2 Structured Logging
# Structured Logging ā JSON Logs for Each Agent Turn
import asyncio
import json
import logging
from datetime import datetime, timezone
from claude_agent_sdk import query, ClaudeAgentOptions, AssistantMessage, ResultMessage
logging.basicConfig(level=logging.INFO, format="%(message)s")
logger = logging.getLogger("agent")
async def logged_agent(prompt: str):
"""Emit structured JSON logs per turn ā feed to Datadog, Splunk, etc."""
turn = 0
async for message in query(
prompt=prompt,
options=ClaudeAgentOptions(
allowed_tools=["Read", "Glob", "Grep"],
),
):
if isinstance(message, AssistantMessage):
turn += 1
tools = [b.name for b in message.content if hasattr(b, "name")]
log_entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"event": "agent_turn",
"turn": turn,
"tools_called": tools,
"has_text": any(hasattr(b, "text") for b in message.content),
}
logger.info(json.dumps(log_entry))
if isinstance(message, ResultMessage):
log_entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"event": "agent_complete",
"status": message.subtype,
"cost_usd": message.total_cost_usd,
"turns": message.num_turns,
"session_id": message.session_id,
}
logger.info(json.dumps(log_entry))
asyncio.run(logged_agent("Summarize the codebase architecture."))
6. Deployment Patterns
6.1 Docker Deployment
The Agent SDK runs in any Python 3.10+ environment. For production, containerize with Docker for isolation and reproducibility:
# Dockerfile for Agent SDK Application
FROM python:3.12-slim
WORKDIR /app
# Install system deps for MCP servers (optional)
RUN apt-get update && apt-get install -y git nodejs npm && rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Set environment variables
ENV ANTHROPIC_API_KEY=""
ENV PYTHONUNBUFFERED=1
# Run the agent API
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
# requirements.txt
claude-agent-sdk>=0.1.0
fastapi>=0.100.0
uvicorn>=0.23.0
httpx>=0.24.0
# app.py ā Production Agent API in Docker
import asyncio
import json
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from claude_agent_sdk import query, ClaudeAgentOptions, ResultMessage
app = FastAPI(title="Agent API")
class AgentRequest(BaseModel):
prompt: str
max_budget_usd: float = 0.50
max_turns: int = 30
class AgentResponse(BaseModel):
result: str | None
status: str
cost_usd: float
turns: int
session_id: str
@app.post("/api/agent/run", response_model=AgentResponse)
async def run_agent(request: AgentRequest):
"""Run an agent query and return structured result."""
async for message in query(
prompt=request.prompt,
options=ClaudeAgentOptions(
allowed_tools=["Read", "Glob", "Grep", "WebSearch"],
permission_mode="plan", # Read-only for API users
max_budget_usd=request.max_budget_usd,
max_turns=request.max_turns,
),
):
if isinstance(message, ResultMessage):
return AgentResponse(
result=message.result if message.subtype == "success" else None,
status=message.subtype,
cost_usd=message.total_cost_usd or 0,
turns=message.num_turns,
session_id=message.session_id,
)
raise HTTPException(status_code=500, detail="No result received")
6.2 Serverless (AWS Lambda / Cloud Functions)
# AWS Lambda Handler for Agent SDK
# Package: zip with claude-agent-sdk and dependencies
# Set ANTHROPIC_API_KEY in Lambda environment variables
# Timeout: 300s+ (agents can take minutes for complex tasks)
import asyncio
import json
from claude_agent_sdk import query, ClaudeAgentOptions, ResultMessage
def lambda_handler(event, context):
"""AWS Lambda entry point for Agent SDK."""
prompt = event.get("prompt", "")
if not prompt:
return {"statusCode": 400, "body": json.dumps({"error": "prompt required"})}
# Run the async agent in Lambda's event loop
result = asyncio.run(run_agent(prompt))
return {"statusCode": 200, "body": json.dumps(result)}
async def run_agent(prompt: str) -> dict:
"""Run agent and collect result."""
async for message in query(
prompt=prompt,
options=ClaudeAgentOptions(
allowed_tools=["WebSearch", "WebFetch"], # No file tools in Lambda
max_budget_usd=0.25,
max_turns=15,
effort="medium",
),
):
if isinstance(message, ResultMessage):
return {
"result": message.result if message.subtype == "success" else None,
"status": message.subtype,
"cost": message.total_cost_usd,
"session_id": message.session_id,
}
return {"error": "no result"}
6.3 API Backend with Session Persistence
# Production API Backend ā Multi-User Session Management
# Requires: pip install claude-agent-sdk fastapi redis
import asyncio
import json
from fastapi import FastAPI
from pydantic import BaseModel
from claude_agent_sdk import query, ClaudeAgentOptions, ResultMessage
app = FastAPI()
class ChatRequest(BaseModel):
user_id: str
message: str
session_id: str | None = None # Resume existing session
class ChatResponse(BaseModel):
result: str | None
session_id: str
cost_usd: float
# In-memory session store (use Redis in production)
user_sessions: dict[str, str] = {}
@app.post("/api/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
"""Multi-user chat with session persistence."""
options = ClaudeAgentOptions(
allowed_tools=["WebSearch", "Read", "Glob"],
permission_mode="plan",
max_budget_usd=0.10,
max_turns=10,
)
# Resume or start new session
if request.session_id:
options.resume = request.session_id
elif request.user_id in user_sessions:
options.continue_conversation = True
options.resume = user_sessions[request.user_id]
async for message in query(prompt=request.message, options=options):
if isinstance(message, ResultMessage):
# Save session for this user
user_sessions[request.user_id] = message.session_id
return ChatResponse(
result=message.result if message.subtype == "success" else None,
session_id=message.session_id,
cost_usd=message.total_cost_usd or 0,
)
max_budget_usd and max_turns. (2) Use permission_mode="plan" for untrusted user input. (3) Store session IDs in a persistent store (Redis, database) for multi-user apps. (4) Set Lambda/Cloud Function timeout to 5+ minutes. (5) Export traces to your observability stack for debugging failed runs.
Continue the Series
Return to the AI App Dev Series Index for all Anthropic SDK Track articles — from fundamentals through multi-agent orchestration to production deployment patterns.