flowchart TD
U[User Request] --> O[Orchestrator Agent]
O --> R{Route by Intent}
R -->|Research| RA[Research Agent]
R -->|Code| CA[Code Agent]
R -->|Analysis| DA[Data Agent]
R -->|Writing| WA[Writing Agent]
RA --> KB[(Knowledge Base)]
RA --> WS[Web Search]
CA --> SB[Sandbox Executor]
CA --> GH[GitHub API]
DA --> DB[(Database)]
DA --> VIZ[Visualization]
WA --> TM[Templates]
RA --> SC[Shared Context]
CA --> SC
DA --> SC
WA --> SC
SC --> V{Verifier Agent}
V -->|Pass| RESP[Final Response]
V -->|Fail| O
1. Multi-Agent Systems
Multi-agent systems decompose complex tasks across specialized agents that collaborate through structured delegation. Rather than one monolithic prompt handling everything, each agent has a focused role, its own system prompt, and access to specific tools. The orchestrator routes tasks, manages shared context, and synthesizes results. This architecture mirrors how human teams work — a project manager delegates to specialists who each contribute their expertise.
Delegation Patterns
| Pattern | Description | Use Case | Complexity |
|---|---|---|---|
| Sequential Pipeline | Agent A → Agent B → Agent C (linear chain) | Content generation: research → draft → edit | Low |
| Parallel Fan-Out | Orchestrator dispatches to N agents simultaneously | Multi-source research, parallel analysis | Medium |
| Hierarchical Delegation | Manager agents delegate to worker agents | Complex projects with sub-tasks | High |
| Debate/Consensus | Multiple agents argue positions, judge decides | Risk assessment, decision-making | High |
| Specialist Routing | Router selects single best agent per query | Customer support, domain-specific Q&A | Low |
Multi-Agent Orchestrator Implementation
import os
import json
from dataclasses import dataclass, field
from typing import Any
from openai import OpenAI
@dataclass
class AgentConfig:
"""Configuration for a specialized agent in the multi-agent system."""
name: str
role: str
system_prompt: str
model: str = "gpt-4o"
tools: list = field(default_factory=list)
max_tokens: int = 2000
@dataclass
class TaskResult:
"""Result from an agent's work on a delegated task."""
agent: str
task: str
output: str
confidence: float
metadata: dict = field(default_factory=dict)
class MultiAgentOrchestrator:
"""Orchestrates multiple specialized agents collaborating on complex tasks.
Architecture:
- Router: Classifies intent and selects appropriate agent(s)
- Specialists: Domain-focused agents with specific tools
- Shared Context: Accumulated knowledge across agent interactions
- Verifier: Validates final output quality before returning
"""
def __init__(self):
self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-demo-key"))
self.shared_context: list[dict] = []
self.agents = self._configure_agents()
def _configure_agents(self) -> dict[str, AgentConfig]:
return {
"research": AgentConfig(
name="Research Agent",
role="researcher",
system_prompt=(
"You are a research specialist. Gather facts, cite sources, "
"and provide comprehensive background information. Always "
"distinguish between verified facts and inferences."
),
tools=["web_search", "knowledge_base_query"],
),
"analyst": AgentConfig(
name="Data Analyst Agent",
role="analyst",
system_prompt=(
"You are a data analysis specialist. Interpret data, identify "
"patterns, compute statistics, and produce clear summaries. "
"Always show your methodology."
),
tools=["sql_query", "calculate", "visualize"],
),
"writer": AgentConfig(
name="Writing Agent",
role="writer",
system_prompt=(
"You are a professional writer. Synthesize information into "
"clear, well-structured prose. Match the requested tone and "
"format. Cite sources provided in shared context."
),
tools=["format_document", "check_grammar"],
),
"coder": AgentConfig(
name="Code Agent",
role="developer",
system_prompt=(
"You are a software engineering specialist. Write clean, "
"tested, production-quality code. Explain design decisions "
"and include error handling."
),
model="gpt-4o",
tools=["execute_code", "lint", "test_runner"],
),
}
def route_task(self, user_request: str) -> list[str]:
"""Determine which agent(s) should handle this request."""
# Simulated routing logic (in production, use an LLM call for classification)
request_lower = user_request.lower()
agents_needed = []
if any(kw in request_lower for kw in ["research", "find", "look up", "what is"]):
agents_needed.append("research")
if any(kw in request_lower for kw in ["analyze", "data", "statistics", "trend"]):
agents_needed.append("analyst")
if any(kw in request_lower for kw in ["write", "draft", "summarize", "report"]):
agents_needed.append("writer")
if any(kw in request_lower for kw in ["code", "implement", "build", "function"]):
agents_needed.append("coder")
return agents_needed or ["research"] # Default to research
def delegate_to_agent(self, agent_key: str, task: str) -> TaskResult:
"""Delegate a task to a specific agent and get structured results."""
agent = self.agents[agent_key]
context_summary = "\n".join(
f"[{c['agent']}]: {c['output'][:200]}" for c in self.shared_context[-5:]
)
# Simulated agent response (in production, call OpenAI API)
simulated_outputs = {
"research": f"Research findings for '{task}': Found 3 relevant sources. "
f"Key insight: The topic has significant implications for scalability.",
"analyst": f"Analysis of '{task}': Data shows 23% improvement in the target metric. "
f"Statistical significance: p < 0.05. Sample size: n=1,200.",
"writer": f"Draft for '{task}': [Well-structured document synthesizing research "
f"and analysis findings into actionable recommendations.]",
"coder": f"Implementation for '{task}': Created a Python module with 3 classes, "
f"15 methods, full type hints, and 92% test coverage.",
}
output = simulated_outputs.get(agent_key, f"Completed: {task}")
result = TaskResult(
agent=agent.name,
task=task,
output=output,
confidence=0.87,
metadata={"model": agent.model, "tools_available": agent.tools},
)
# Add to shared context for other agents
self.shared_context.append({"agent": agent.name, "output": output})
return result
def execute(self, user_request: str) -> dict:
"""Full orchestration: route, delegate, verify, and return."""
# Step 1: Route to appropriate agents
agent_keys = self.route_task(user_request)
# Step 2: Delegate tasks (sequential for dependencies, parallel for independent)
results = []
for key in agent_keys:
result = self.delegate_to_agent(key, user_request)
results.append(result)
# Step 3: Synthesize results
synthesis = {
"request": user_request,
"agents_used": [r.agent for r in results],
"results": [{"agent": r.agent, "output": r.output, "confidence": r.confidence} for r in results],
"shared_context_size": len(self.shared_context),
}
return synthesis
# Demonstration
orchestrator = MultiAgentOrchestrator()
print("=== Multi-Agent Orchestrator ===\n")
# Complex request requiring multiple agents
requests = [
"Research the latest trends in vector databases and write a summary report",
"Analyze our API latency data and implement a caching solution",
"Find best practices for rate limiting and code a middleware",
]
for req in requests:
print(f"Request: {req}")
result = orchestrator.execute(req)
print(f" Agents: {', '.join(result['agents_used'])}")
for r in result["results"]:
print(f" [{r['agent']}] confidence={r['confidence']:.2f}")
print(f" {r['output'][:80]}...")
print(f" Shared context entries: {result['shared_context_size']}\n")
2. Agentic RAG
Traditional RAG (Retrieval-Augmented Generation) follows a simple retrieve-then-generate pipeline. Agentic RAG adds intelligence to the retrieval step itself — the agent decides what to search for, which knowledge sources to query, whether the retrieved context is sufficient, and when to reformulate queries or search additional sources. This transforms retrieval from a static lookup into an adaptive, multi-step reasoning process.
Agentic RAG vs Traditional RAG
| Capability | Traditional RAG | Agentic RAG |
|---|---|---|
| Query Processing | Single embedding lookup | Query decomposition, reformulation, expansion |
| Source Selection | Single vector store | Dynamic routing across multiple knowledge bases |
| Retrieval Strategy | Top-K similarity | Adaptive: semantic, keyword, graph, SQL as needed |
| Sufficiency Check | None — uses whatever is retrieved | Evaluates context quality, triggers re-retrieval |
| Multi-Hop Reasoning | Not supported | Chains retrievals based on intermediate findings |
| Self-Correction | None | Detects contradictions, seeks authoritative sources |
Agentic RAG with Query Routing
import os
import json
from dataclasses import dataclass, field
from typing import Optional
from openai import OpenAI
@dataclass
class RetrievalResult:
"""A single retrieval result from a knowledge source."""
source: str
content: str
relevance_score: float
metadata: dict = field(default_factory=dict)
@dataclass
class RetrievalPlan:
"""Agent's plan for how to retrieve information."""
original_query: str
decomposed_queries: list[str]
sources_to_query: list[str]
strategy: str # "semantic", "keyword", "hybrid", "multi-hop"
reasoning: str
class AgenticRAG:
"""RAG system with agentic decision-making over retrieval strategy.
The agent:
1. Analyzes the query to determine retrieval strategy
2. Decomposes complex queries into sub-queries
3. Routes each sub-query to the appropriate knowledge source
4. Evaluates retrieval quality and re-retrieves if insufficient
5. Synthesizes results with source attribution
"""
def __init__(self):
self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-demo-key"))
self.knowledge_sources = {
"technical_docs": {"type": "vector_store", "description": "API docs, architecture guides"},
"company_wiki": {"type": "vector_store", "description": "Internal policies, processes"},
"code_repo": {"type": "code_search", "description": "Source code, READMEs"},
"metrics_db": {"type": "sql", "description": "Performance metrics, usage stats"},
"support_tickets": {"type": "vector_store", "description": "Past issues and resolutions"},
}
self.retrieval_history: list[dict] = []
def plan_retrieval(self, query: str) -> RetrievalPlan:
"""Agent decides HOW to retrieve information for this query."""
query_lower = query.lower()
# Determine if query needs decomposition
decomposed = [query]
if " and " in query_lower or "compare" in query_lower:
# Multi-part query — decompose
parts = query.split(" and ") if " and " in query_lower else [query]
decomposed = [p.strip() for p in parts]
# Route to appropriate sources based on query intent
sources = []
if any(kw in query_lower for kw in ["api", "endpoint", "sdk", "function"]):
sources.append("technical_docs")
if any(kw in query_lower for kw in ["policy", "process", "procedure", "team"]):
sources.append("company_wiki")
if any(kw in query_lower for kw in ["code", "implementation", "class", "module"]):
sources.append("code_repo")
if any(kw in query_lower for kw in ["metric", "performance", "latency", "usage"]):
sources.append("metrics_db")
if any(kw in query_lower for kw in ["error", "bug", "issue", "problem"]):
sources.append("support_tickets")
sources = sources or ["technical_docs", "company_wiki"]
# Choose retrieval strategy
if len(sources) > 2:
strategy = "multi-hop"
elif "metrics_db" in sources:
strategy = "hybrid" # SQL + semantic
else:
strategy = "semantic"
return RetrievalPlan(
original_query=query,
decomposed_queries=decomposed,
sources_to_query=sources,
strategy=strategy,
reasoning=f"Query targets {len(sources)} sources using {strategy} strategy",
)
def retrieve(self, plan: RetrievalPlan) -> list[RetrievalResult]:
"""Execute the retrieval plan across selected sources."""
results = []
for sub_query in plan.decomposed_queries:
for source in plan.sources_to_query:
# Simulated retrieval (in production, call actual vector stores/DBs)
result = RetrievalResult(
source=source,
content=f"[Retrieved from {source}] Relevant information about: {sub_query[:50]}. "
f"This content addresses the core aspects of the query with specific details.",
relevance_score=0.82 + (hash(sub_query + source) % 15) / 100,
metadata={"source_type": self.knowledge_sources[source]["type"], "query": sub_query},
)
results.append(result)
return results
def evaluate_sufficiency(self, query: str, results: list[RetrievalResult]) -> dict:
"""Agent evaluates whether retrieved context is sufficient to answer."""
avg_relevance = sum(r.relevance_score for r in results) / max(len(results), 1)
sources_covered = len(set(r.source for r in results))
is_sufficient = avg_relevance > 0.75 and sources_covered >= 2
return {
"sufficient": is_sufficient,
"avg_relevance": round(avg_relevance, 3),
"sources_covered": sources_covered,
"total_results": len(results),
"recommendation": "proceed" if is_sufficient else "reformulate_and_retry",
}
def generate_answer(self, query: str, results: list[RetrievalResult]) -> dict:
"""Generate final answer with source attribution."""
context_parts = [f"[{r.source}] {r.content}" for r in results[:5]]
sources_used = list(set(r.source for r in results[:5]))
# Simulated generation (in production, call OpenAI with context)
answer = (
f"Based on {len(results)} retrieved passages from {len(sources_used)} sources: "
f"The answer to '{query[:60]}' involves multiple aspects covered in our "
f"knowledge base. Key findings span {', '.join(sources_used)}."
)
return {
"query": query,
"answer": answer,
"sources": sources_used,
"context_passages_used": len(context_parts),
"confidence": round(sum(r.relevance_score for r in results[:5]) / 5, 3),
}
def query(self, user_query: str) -> dict:
"""Full agentic RAG pipeline: plan, retrieve, evaluate, generate."""
# Step 1: Plan retrieval strategy
plan = self.plan_retrieval(user_query)
# Step 2: Execute retrieval
results = self.retrieve(plan)
# Step 3: Evaluate sufficiency
evaluation = self.evaluate_sufficiency(user_query, results)
# Step 4: Re-retrieve if insufficient (one retry)
if not evaluation["sufficient"]:
expanded_plan = RetrievalPlan(
original_query=user_query,
decomposed_queries=[user_query, f"background context for: {user_query}"],
sources_to_query=list(self.knowledge_sources.keys())[:3],
strategy="multi-hop",
reasoning="Initial retrieval insufficient — expanding search scope",
)
results = self.retrieve(expanded_plan)
evaluation = self.evaluate_sufficiency(user_query, results)
# Step 5: Generate answer with attribution
answer = self.generate_answer(user_query, results)
self.retrieval_history.append({"query": user_query, "plan": plan.strategy, "results": len(results)})
return {
"plan": {"strategy": plan.strategy, "sources": plan.sources_to_query, "sub_queries": len(plan.decomposed_queries)},
"retrieval": {"total_results": len(results), "evaluation": evaluation},
"answer": answer,
}
# Demonstration
rag = AgenticRAG()
print("=== Agentic RAG with Query Routing ===\n")
queries = [
"How does our authentication API handle token refresh and what's the current error rate?",
"Compare our deployment process with industry best practices",
"What code changes caused the latency spike last Tuesday?",
]
for q in queries:
print(f"Query: {q}")
result = rag.query(q)
plan = result["plan"]
print(f" Strategy: {plan['strategy']} | Sources: {plan['sources']} | Sub-queries: {plan['sub_queries']}")
print(f" Retrieved: {result['retrieval']['total_results']} passages | Sufficient: {result['retrieval']['evaluation']['sufficient']}")
print(f" Answer confidence: {result['answer']['confidence']}")
print(f" Sources cited: {result['answer']['sources']}\n")
3. Plan-and-Execute
The plan-and-execute pattern separates thinking from doing. A planning agent decomposes a complex goal into discrete, verifiable steps. An executor agent carries out each step. A verifier checks results before proceeding. This separation allows you to use a more capable (expensive) model for planning and a faster (cheaper) model for execution, while maintaining quality through verification loops.
Plan-and-Execute with Verification
import os
import json
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional
from openai import OpenAI
class StepStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class PlanStep:
"""A single step in an execution plan."""
step_id: int
description: str
agent: str # Which agent executes this step
dependencies: list[int] = field(default_factory=list)
status: StepStatus = StepStatus.PENDING
result: Optional[str] = None
verification: Optional[str] = None
retries: int = 0
max_retries: int = 2
@dataclass
class ExecutionPlan:
"""A structured plan decomposing a goal into verifiable steps."""
goal: str
steps: list[PlanStep]
created_by: str = "planner"
status: str = "active"
class PlanAndExecuteAgent:
"""Separates planning from execution with verification loops.
Architecture:
- Planner: Decomposes goals into ordered steps with dependencies
- Executor: Carries out each step using appropriate tools
- Verifier: Validates step results before marking complete
- Controller: Manages execution flow, retries, and rollback
"""
def __init__(self):
self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-demo-key"))
self.execution_log: list[dict] = []
def create_plan(self, goal: str) -> ExecutionPlan:
"""Planner agent decomposes the goal into executable steps."""
# In production, this would be an LLM call to decompose the goal
# Here we simulate planning for a common enterprise task
if "migrate" in goal.lower() or "deploy" in goal.lower():
steps = [
PlanStep(1, "Audit current state and identify dependencies", "analyst"),
PlanStep(2, "Create backup of existing data/config", "executor", dependencies=[1]),
PlanStep(3, "Validate backup integrity", "verifier", dependencies=[2]),
PlanStep(4, "Execute migration/deployment in staging", "executor", dependencies=[3]),
PlanStep(5, "Run integration tests against staging", "tester", dependencies=[4]),
PlanStep(6, "Execute migration/deployment in production", "executor", dependencies=[5]),
PlanStep(7, "Run smoke tests and verify production health", "verifier", dependencies=[6]),
PlanStep(8, "Update documentation and notify stakeholders", "writer", dependencies=[7]),
]
else:
steps = [
PlanStep(1, "Analyze requirements and gather context", "analyst"),
PlanStep(2, "Design solution approach", "architect", dependencies=[1]),
PlanStep(3, "Implement core logic", "coder", dependencies=[2]),
PlanStep(4, "Write tests and validate", "tester", dependencies=[3]),
PlanStep(5, "Review and finalize", "verifier", dependencies=[4]),
]
return ExecutionPlan(goal=goal, steps=steps)
def execute_step(self, step: PlanStep, context: dict) -> str:
"""Executor agent carries out a single step."""
step.status = StepStatus.RUNNING
# Simulated execution (in production, call appropriate tools/APIs)
simulated_results = {
"analyst": f"Analysis complete: Identified 5 components affected. Risk level: medium.",
"executor": f"Execution complete: All operations succeeded. Duration: 4.2s.",
"verifier": f"Verification passed: All checksums match, no data loss detected.",
"tester": f"Tests passed: 47/47 assertions green. Coverage: 89%.",
"architect": f"Design finalized: Chose microservice pattern with event sourcing.",
"coder": f"Implementation complete: 3 modules, 12 functions, type-safe.",
"writer": f"Documentation updated: README, CHANGELOG, and runbook refreshed.",
}
result = simulated_results.get(step.agent, f"Step completed by {step.agent}")
step.result = result
step.status = StepStatus.COMPLETED
return result
def verify_step(self, step: PlanStep) -> bool:
"""Verifier checks if the step result meets acceptance criteria."""
# Simulated verification (in production, use an LLM to evaluate)
if step.status != StepStatus.COMPLETED or not step.result:
return False
# Check for failure indicators
failure_keywords = ["error", "failed", "timeout", "mismatch"]
is_valid = not any(kw in step.result.lower() for kw in failure_keywords)
step.verification = "PASSED" if is_valid else "FAILED"
return is_valid
def execute_plan(self, plan: ExecutionPlan) -> dict:
"""Execute the full plan with dependency resolution and verification."""
completed_steps = set()
context = {"goal": plan.goal, "results": {}}
for step in plan.steps:
# Check dependencies
deps_met = all(d in completed_steps for d in step.dependencies)
if not deps_met:
step.status = StepStatus.SKIPPED
self.execution_log.append({"step": step.step_id, "status": "skipped", "reason": "dependency not met"})
continue
# Execute with retry loop
success = False
while step.retries <= step.max_retries and not success:
result = self.execute_step(step, context)
success = self.verify_step(step)
if not success and step.retries < step.max_retries:
step.retries += 1
step.status = StepStatus.PENDING
self.execution_log.append({"step": step.step_id, "status": "retry", "attempt": step.retries})
if success:
completed_steps.add(step.step_id)
context["results"][step.step_id] = result
self.execution_log.append({"step": step.step_id, "status": "completed"})
else:
step.status = StepStatus.FAILED
self.execution_log.append({"step": step.step_id, "status": "failed"})
break # Stop execution on unrecoverable failure
# Summary
return {
"goal": plan.goal,
"total_steps": len(plan.steps),
"completed": len(completed_steps),
"failed": sum(1 for s in plan.steps if s.status == StepStatus.FAILED),
"skipped": sum(1 for s in plan.steps if s.status == StepStatus.SKIPPED),
"success": all(s.status == StepStatus.COMPLETED for s in plan.steps),
"steps": [
{"id": s.step_id, "desc": s.description, "status": s.status.value, "verification": s.verification}
for s in plan.steps
],
}
# Demonstration
agent = PlanAndExecuteAgent()
print("=== Plan-and-Execute Agent ===\n")
goal = "Migrate the user authentication service from PostgreSQL to DynamoDB"
print(f"Goal: {goal}\n")
# Step 1: Create plan
plan = agent.create_plan(goal)
print("--- Execution Plan ---")
for step in plan.steps:
deps = f" (depends on: {step.dependencies})" if step.dependencies else ""
print(f" Step {step.step_id}: [{step.agent}] {step.description}{deps}")
# Step 2: Execute plan
print("\n--- Executing Plan ---")
result = agent.execute_plan(plan)
for step in result["steps"]:
icon = "✓" if step["status"] == "completed" else "✗" if step["status"] == "failed" else "○"
print(f" {icon} Step {step['id']}: {step['desc'][:50]} [{step['status']}] verify={step['verification']}")
print(f"\n--- Result ---")
print(f" Success: {result['success']} | Completed: {result['completed']}/{result['total_steps']}")
print(f" Failed: {result['failed']} | Skipped: {result['skipped']}")
4. Human-in-the-Loop
Not every AI decision should be autonomous. Human-in-the-loop (HITL) patterns keep humans in control of high-stakes decisions while automating routine tasks. The key design challenge is determining when to escalate — too often and you lose the automation benefit, too rarely and you risk costly errors. Confidence thresholds, cost gates, and semantic risk classification enable selective automation that scales trust over time.
Escalation Decision Framework
| Signal | Threshold | Action | Example |
|---|---|---|---|
| Low Confidence | Score < 0.7 | Escalate for human review | Ambiguous customer intent |
| High Cost | Action cost > $1,000 | Require approval before execution | Refund processing, contract changes |
| Sensitive Domain | PII/PHI/financial data | Always require human sign-off | Medical advice, legal opinions |
| Novel Scenario | No similar past case | Route to specialist | First-time edge case |
| Contradictory Signals | Agent disagreement | Present options to human | Conflicting data sources |
Approval Workflow with Confidence Thresholds
import os
import json
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional, Callable
from openai import OpenAI
class ApprovalStatus(Enum):
AUTO_APPROVED = "auto_approved"
PENDING_REVIEW = "pending_review"
HUMAN_APPROVED = "human_approved"
HUMAN_REJECTED = "human_rejected"
ESCALATED = "escalated"
class RiskLevel(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class Decision:
"""An AI-generated decision requiring potential human approval."""
decision_id: str
action: str
reasoning: str
confidence: float
risk_level: RiskLevel
estimated_cost: float = 0.0
involves_pii: bool = False
status: ApprovalStatus = ApprovalStatus.PENDING_REVIEW
reviewer: Optional[str] = None
review_notes: Optional[str] = None
timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
class HumanInTheLoopController:
"""Manages selective automation with human oversight.
Routing logic:
- HIGH confidence + LOW risk → Auto-approve
- MEDIUM confidence OR MEDIUM risk → Queue for async review
- LOW confidence OR HIGH risk → Block until human approves
- CRITICAL risk → Escalate to senior reviewer immediately
"""
def __init__(self):
self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-demo-key"))
self.approval_queue: list[Decision] = []
self.decision_log: list[Decision] = []
self.thresholds = {
"auto_approve_confidence": 0.85,
"escalation_confidence": 0.5,
"cost_gate": 500.0, # Require approval above this dollar amount
"pii_always_review": True,
}
def classify_risk(self, action: str, context: dict) -> RiskLevel:
"""Classify the risk level of a proposed action."""
action_lower = action.lower()
if any(kw in action_lower for kw in ["delete", "terminate", "revoke", "legal"]):
return RiskLevel.CRITICAL
if any(kw in action_lower for kw in ["refund", "modify account", "change plan", "escalate"]):
return RiskLevel.HIGH
if any(kw in action_lower for kw in ["update", "send email", "create ticket"]):
return RiskLevel.MEDIUM
return RiskLevel.LOW
def should_auto_approve(self, decision: Decision) -> bool:
"""Determine if a decision can be auto-approved without human review."""
if decision.involves_pii and self.thresholds["pii_always_review"]:
return False
if decision.estimated_cost > self.thresholds["cost_gate"]:
return False
if decision.risk_level in (RiskLevel.HIGH, RiskLevel.CRITICAL):
return False
if decision.confidence < self.thresholds["auto_approve_confidence"]:
return False
return True
def process_decision(self, action: str, confidence: float, context: dict) -> Decision:
"""Process an AI decision through the approval workflow."""
risk = self.classify_risk(action, context)
cost = context.get("estimated_cost", 0.0)
has_pii = context.get("involves_pii", False)
decision = Decision(
decision_id=f"dec-{len(self.decision_log)+1:04d}",
action=action,
reasoning=f"AI determined this action based on context: {json.dumps(context)[:100]}",
confidence=confidence,
risk_level=risk,
estimated_cost=cost,
involves_pii=has_pii,
)
# Routing logic
if self.should_auto_approve(decision):
decision.status = ApprovalStatus.AUTO_APPROVED
elif decision.risk_level == RiskLevel.CRITICAL:
decision.status = ApprovalStatus.ESCALATED
elif decision.confidence < self.thresholds["escalation_confidence"]:
decision.status = ApprovalStatus.ESCALATED
else:
decision.status = ApprovalStatus.PENDING_REVIEW
self.approval_queue.append(decision)
self.decision_log.append(decision)
return decision
def human_review(self, decision_id: str, approved: bool, reviewer: str, notes: str = "") -> Decision:
"""Simulate a human reviewing and approving/rejecting a decision."""
for decision in self.decision_log:
if decision.decision_id == decision_id:
decision.status = ApprovalStatus.HUMAN_APPROVED if approved else ApprovalStatus.HUMAN_REJECTED
decision.reviewer = reviewer
decision.review_notes = notes
# Remove from queue
self.approval_queue = [d for d in self.approval_queue if d.decision_id != decision_id]
return decision
raise ValueError(f"Decision {decision_id} not found")
def get_stats(self) -> dict:
"""Get approval workflow statistics."""
total = len(self.decision_log)
if total == 0:
return {"total": 0}
statuses = {}
for d in self.decision_log:
statuses[d.status.value] = statuses.get(d.status.value, 0) + 1
return {
"total_decisions": total,
"auto_approved": statuses.get("auto_approved", 0),
"pending_review": statuses.get("pending_review", 0),
"escalated": statuses.get("escalated", 0),
"human_approved": statuses.get("human_approved", 0),
"human_rejected": statuses.get("human_rejected", 0),
"automation_rate": round(statuses.get("auto_approved", 0) / total * 100, 1),
"queue_depth": len(self.approval_queue),
}
# Demonstration
controller = HumanInTheLoopController()
print("=== Human-in-the-Loop Approval Workflow ===\n")
# Simulate various decisions
scenarios = [
("Send order confirmation email", 0.95, {"estimated_cost": 0.01, "involves_pii": False}),
("Process $50 refund for delayed shipment", 0.88, {"estimated_cost": 50.0, "involves_pii": False}),
("Update customer address in database", 0.82, {"estimated_cost": 0.0, "involves_pii": True}),
("Process $2,500 refund for service outage", 0.75, {"estimated_cost": 2500.0, "involves_pii": False}),
("Delete user account and all associated data", 0.90, {"estimated_cost": 0.0, "involves_pii": True}),
("Respond to general FAQ question", 0.92, {"estimated_cost": 0.0, "involves_pii": False}),
("Recommend treatment plan to patient", 0.45, {"estimated_cost": 0.0, "involves_pii": True}),
("Create support ticket for follow-up", 0.89, {"estimated_cost": 0.0, "involves_pii": False}),
]
for action, confidence, context in scenarios:
decision = controller.process_decision(action, confidence, context)
risk_icon = {"low": "🟢", "medium": "🟡", "high": "🟠", "critical": "🔴"}[decision.risk_level.value]
print(f" {risk_icon} [{decision.status.value:<16}] conf={confidence:.2f} cost=${context['estimated_cost']:>7.2f} | {action[:55]}")
# Simulate human reviews
print("\n--- Human Reviews ---")
pending = [d for d in controller.decision_log if d.status == ApprovalStatus.PENDING_REVIEW]
for d in pending[:2]:
reviewed = controller.human_review(d.decision_id, approved=True, reviewer="alice@company.com", notes="Approved per policy")
print(f" ✓ {reviewed.decision_id}: '{reviewed.action[:40]}' approved by {reviewed.reviewer}")
# Statistics
print("\n--- Workflow Statistics ---")
stats = controller.get_stats()
for key, value in stats.items():
print(f" {key}: {value}")
5. Autonomous Agents
Autonomous agents operate with minimal human supervision toward a specified goal. They maintain working memory across interactions, self-correct when encountering errors, acquire new tools or information as needed, and persist toward objectives through multiple reasoning cycles. The key architectural challenge is balancing autonomy with safety — agents need enough freedom to be useful but enough constraints to prevent runaway behavior.
Goal-Driven Agent with Self-Correction
import os
import json
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
from openai import OpenAI
class AgentState(Enum):
THINKING = "thinking"
ACTING = "acting"
OBSERVING = "observing"
CORRECTING = "correcting"
COMPLETE = "complete"
FAILED = "failed"
@dataclass
class MemoryEntry:
"""A single entry in the agent's working memory."""
cycle: int
state: AgentState
thought: str
action: Optional[str] = None
observation: Optional[str] = None
timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
@dataclass
class AgentGoal:
"""A goal the autonomous agent is pursuing."""
description: str
success_criteria: list[str]
max_cycles: int = 10
budget_remaining: float = 1.0 # Dollar budget for API calls
class AutonomousAgent:
"""Goal-driven agent with ReAct loop and self-correction.
Cycle: Think → Act → Observe → (Correct if needed) → Repeat
Stops when: goal achieved, max cycles reached, or budget exhausted.
"""
def __init__(self):
self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-demo-key"))
self.memory: list[MemoryEntry] = []
self.tools_available = ["search", "calculate", "write_file", "read_file", "ask_user"]
self.tools_acquired: list[str] = []
self.cycle_count = 0
self.total_cost = 0.0
def think(self, goal: AgentGoal) -> str:
"""Reasoning step: decide what to do next based on memory and goal."""
# Review past actions and observations
recent_memory = self.memory[-5:] if self.memory else []
past_actions = [m.action for m in recent_memory if m.action]
past_observations = [m.observation for m in recent_memory if m.observation]
# Determine next action (simulated reasoning)
if not past_actions:
thought = f"Starting fresh on goal: '{goal.description}'. First, I need to gather context."
elif past_observations and "error" in (past_observations[-1] or "").lower():
thought = "Previous action resulted in an error. I need to correct my approach."
elif len(past_actions) > 3 and len(set(past_actions)) < 2:
thought = "I'm repeating the same action. Let me try a different approach."
else:
criteria_met = sum(1 for _ in goal.success_criteria[:len(past_observations)])
thought = f"Progress: {criteria_met}/{len(goal.success_criteria)} criteria addressed. Continuing."
return thought
def act(self, thought: str, goal: AgentGoal) -> str:
"""Action step: execute a tool or generate output."""
self.total_cost += 0.02 # Simulated API cost per action
# Choose action based on thought
if "gather context" in thought.lower() or "starting" in thought.lower():
return "search: gathering background information on the topic"
elif "error" in thought.lower() or "correct" in thought.lower():
return "search: looking for alternative approach to resolve the issue"
elif "different approach" in thought.lower():
return "calculate: attempting analytical approach instead of search"
else:
return "write_file: drafting solution based on gathered information"
def observe(self, action: str) -> str:
"""Observation step: process the result of an action."""
# Simulated observations
if "search" in action:
return "Found 3 relevant results. Key information extracted successfully."
elif "calculate" in action:
return "Calculation complete. Result: 94.7% confidence in proposed solution."
elif "write_file" in action:
return "File written successfully. Solution documented with 5 sections."
elif "error" in action:
return "Error: Resource not found. Need to adjust approach."
return "Action completed with partial results."
def should_self_correct(self, observation: str) -> bool:
"""Determine if self-correction is needed."""
correction_triggers = ["error", "failed", "not found", "timeout", "partial"]
return any(trigger in observation.lower() for trigger in correction_triggers)
def self_correct(self, observation: str) -> str:
"""Generate a correction strategy based on the failed observation."""
if "not found" in observation.lower():
return "Resource missing — will try alternative source or create from scratch."
if "partial" in observation.lower():
return "Partial results — will supplement with additional queries."
return "Unexpected result — will retry with modified parameters."
def check_goal_completion(self, goal: AgentGoal) -> bool:
"""Check if the goal's success criteria have been met."""
observations = [m.observation for m in self.memory if m.observation]
# Simplified: goal complete after sufficient successful observations
successful_obs = [o for o in observations if o and "error" not in o.lower()]
return len(successful_obs) >= len(goal.success_criteria)
def run(self, goal: AgentGoal) -> dict:
"""Execute the autonomous agent loop until goal is achieved or limits hit."""
print(f" Goal: {goal.description}")
print(f" Success criteria: {goal.success_criteria}")
print(f" Limits: max_cycles={goal.max_cycles}, budget=${goal.budget_remaining:.2f}\n")
while self.cycle_count < goal.max_cycles and self.total_cost < goal.budget_remaining:
self.cycle_count += 1
# Think
thought = self.think(goal)
entry = MemoryEntry(cycle=self.cycle_count, state=AgentState.THINKING, thought=thought)
# Act
action = self.act(thought, goal)
entry.action = action
entry.state = AgentState.ACTING
# Observe
observation = self.observe(action)
entry.observation = observation
entry.state = AgentState.OBSERVING
# Self-correct if needed
if self.should_self_correct(observation):
correction = self.self_correct(observation)
entry.state = AgentState.CORRECTING
print(f" Cycle {self.cycle_count}: [CORRECT] {correction[:60]}")
else:
print(f" Cycle {self.cycle_count}: [{entry.state.value.upper():>9}] {thought[:40]}... → {observation[:40]}")
self.memory.append(entry)
# Check completion
if self.check_goal_completion(goal):
print(f"\n ✓ Goal achieved in {self.cycle_count} cycles (cost: ${self.total_cost:.3f})")
return {"status": "complete", "cycles": self.cycle_count, "cost": self.total_cost, "memory_size": len(self.memory)}
# Limit reached
reason = "max_cycles" if self.cycle_count >= goal.max_cycles else "budget_exhausted"
print(f"\n ✗ Stopped: {reason} after {self.cycle_count} cycles (cost: ${self.total_cost:.3f})")
return {"status": "stopped", "reason": reason, "cycles": self.cycle_count, "cost": self.total_cost}
# Demonstration
print("=== Autonomous Agent with Self-Correction ===\n")
agent = AutonomousAgent()
goal = AgentGoal(
description="Research and summarize the top 3 approaches to LLM evaluation",
success_criteria=[
"Identify evaluation approaches",
"Gather metrics for each approach",
"Write structured summary",
],
max_cycles=8,
budget_remaining=0.50,
)
result = agent.run(goal)
print(f"\n Final: status={result['status']}, memory_entries={result['memory_size']}")
6. Event-Driven Architectures
Event-driven AI architectures decouple agent invocation from request-response cycles. Agents are triggered by events (webhooks, queue messages, scheduled triggers, file uploads) and can run asynchronously for minutes or hours without blocking callers. This pattern is essential for long-running workflows like document processing pipelines, multi-step approval chains, and background analysis tasks that would timeout in synchronous HTTP handlers.
Async Agent Orchestration Patterns
| Pattern | Trigger | Duration | Use Case |
|---|---|---|---|
| Webhook-Triggered | HTTP POST from external service | Seconds to minutes | GitHub PR review, Slack commands |
| Queue-Driven | Message on SQS/RabbitMQ/Kafka | Minutes to hours | Document processing, batch analysis |
| Scheduled (Cron) | Timer/schedule trigger | Minutes | Daily reports, periodic audits |
| File Upload | Object storage event (S3/Blob) | Minutes | Invoice extraction, image analysis |
| State Machine | Workflow step completion | Days (multi-step) | Approval chains, onboarding flows |
Event-Driven Workflow Engine
import os
import json
import asyncio
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Callable, Optional
from openai import OpenAI
class EventType(Enum):
WEBHOOK = "webhook"
QUEUE_MESSAGE = "queue_message"
SCHEDULED = "scheduled"
FILE_UPLOAD = "file_upload"
WORKFLOW_STEP = "workflow_step"
AGENT_COMPLETE = "agent_complete"
class WorkflowStatus(Enum):
PENDING = "pending"
RUNNING = "running"
WAITING = "waiting_for_event"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class Event:
"""An event that triggers agent processing."""
event_id: str
event_type: EventType
payload: dict
source: str
timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
correlation_id: Optional[str] = None
@dataclass
class WorkflowStep:
"""A step in a long-running workflow."""
step_id: str
name: str
agent: str
status: WorkflowStatus = WorkflowStatus.PENDING
input_data: dict = field(default_factory=dict)
output_data: dict = field(default_factory=dict)
wait_for_event: Optional[EventType] = None
started_at: Optional[str] = None
completed_at: Optional[str] = None
@dataclass
class Workflow:
"""A long-running workflow composed of event-triggered steps."""
workflow_id: str
name: str
steps: list[WorkflowStep]
status: WorkflowStatus = WorkflowStatus.PENDING
current_step_index: int = 0
created_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
class EventDrivenOrchestrator:
"""Async orchestrator for event-driven AI workflows.
Features:
- Event routing: maps events to workflow steps
- Long-running support: workflows can pause waiting for events
- Correlation: links related events across a workflow
- Dead letter: handles failed/unroutable events
"""
def __init__(self):
self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-demo-key"))
self.workflows: dict[str, Workflow] = {}
self.event_log: list[Event] = []
self.dead_letter: list[Event] = []
self.handlers: dict[EventType, list[Callable]] = {et: [] for et in EventType}
def create_workflow(self, name: str, steps: list[dict]) -> Workflow:
"""Create a new long-running workflow definition."""
workflow_steps = [
WorkflowStep(
step_id=f"step-{i+1}",
name=s["name"],
agent=s["agent"],
wait_for_event=EventType(s["wait_for"]) if "wait_for" in s else None,
)
for i, s in enumerate(steps)
]
workflow = Workflow(
workflow_id=f"wf-{len(self.workflows)+1:04d}",
name=name,
steps=workflow_steps,
)
self.workflows[workflow.workflow_id] = workflow
return workflow
def emit_event(self, event: Event) -> dict:
"""Process an incoming event through the orchestrator."""
self.event_log.append(event)
# Find workflows waiting for this event type
handled = False
for wf_id, workflow in self.workflows.items():
if workflow.status in (WorkflowStatus.PENDING, WorkflowStatus.WAITING):
current_step = workflow.steps[workflow.current_step_index]
if current_step.wait_for_event == event.event_type or workflow.status == WorkflowStatus.PENDING:
result = self._execute_step(workflow, current_step, event)
handled = True
break
if not handled:
self.dead_letter.append(event)
return {"status": "unrouted", "event_id": event.event_id, "dead_lettered": True}
return {"status": "processed", "event_id": event.event_id, "workflow_id": wf_id}
def _execute_step(self, workflow: Workflow, step: WorkflowStep, event: Event) -> dict:
"""Execute a single workflow step triggered by an event."""
step.status = WorkflowStatus.RUNNING
step.started_at = datetime.now(timezone.utc).isoformat()
step.input_data = event.payload
workflow.status = WorkflowStatus.RUNNING
# Simulated agent processing
step.output_data = {
"agent": step.agent,
"result": f"Processed by {step.agent}: {step.name}",
"event_source": event.source,
}
step.status = WorkflowStatus.COMPLETED
step.completed_at = datetime.now(timezone.utc).isoformat()
# Advance workflow
workflow.current_step_index += 1
if workflow.current_step_index >= len(workflow.steps):
workflow.status = WorkflowStatus.COMPLETED
else:
next_step = workflow.steps[workflow.current_step_index]
if next_step.wait_for_event:
workflow.status = WorkflowStatus.WAITING
else:
# Auto-advance to next step
next_event = Event(
event_id=f"auto-{workflow.workflow_id}-{workflow.current_step_index}",
event_type=EventType.WORKFLOW_STEP,
payload=step.output_data,
source="orchestrator",
correlation_id=workflow.workflow_id,
)
self._execute_step(workflow, next_step, next_event)
return {"step": step.name, "status": step.status.value}
def get_workflow_status(self, workflow_id: str) -> dict:
"""Get the current status of a workflow."""
wf = self.workflows.get(workflow_id)
if not wf:
return {"error": "Workflow not found"}
return {
"workflow_id": wf.workflow_id,
"name": wf.name,
"status": wf.status.value,
"progress": f"{wf.current_step_index}/{len(wf.steps)}",
"steps": [
{"name": s.name, "status": s.status.value, "agent": s.agent}
for s in wf.steps
],
}
# Demonstration
orchestrator = EventDrivenOrchestrator()
print("=== Event-Driven AI Workflow ===\n")
# Define a document processing workflow
workflow = orchestrator.create_workflow(
name="Invoice Processing Pipeline",
steps=[
{"name": "Extract document data", "agent": "extraction_agent"},
{"name": "Validate extracted fields", "agent": "validation_agent"},
{"name": "Match to purchase orders", "agent": "matching_agent"},
{"name": "Route for approval", "agent": "routing_agent", "wait_for": "webhook"},
{"name": "Post to accounting system", "agent": "integration_agent"},
],
)
print(f"Created workflow: {workflow.workflow_id} ({workflow.name})")
print(f" Steps: {len(workflow.steps)}\n")
# Simulate events triggering the workflow
events = [
Event("evt-001", EventType.FILE_UPLOAD, {"file": "invoice-2026-0142.pdf", "size_kb": 340}, "s3-bucket"),
Event("evt-002", EventType.WEBHOOK, {"approved": True, "approver": "finance-mgr@company.com"}, "slack-app"),
]
for event in events:
print(f"Event: [{event.event_type.value}] from {event.source}")
result = orchestrator.emit_event(event)
print(f" Result: {result['status']}")
status = orchestrator.get_workflow_status(workflow.workflow_id)
print(f" Workflow: {status['status']} ({status['progress']} steps complete)")
for step in status["steps"]:
icon = "✓" if step["status"] == "completed" else "â³" if step["status"] == "waiting_for_event" else "â—‹"
print(f" {icon} {step['name']} [{step['agent']}] — {step['status']}")
print()
# Stats
print(f"Events processed: {len(orchestrator.event_log)}")
print(f"Dead letters: {len(orchestrator.dead_letter)}")
7. Hybrid Architectures
The most effective production AI systems are hybrid — they combine deterministic code paths with LLM reasoning, knowing when to use each. Not every decision needs AI: validation rules, data transformations, arithmetic, and well-defined business logic should remain in deterministic code. LLMs excel at ambiguous classification, natural language understanding, creative generation, and complex reasoning over unstructured data. The art is knowing where to draw the line.
When NOT to Use AI
- Input validation: Email format, phone numbers, required fields — regex and schema validators are faster, cheaper, and 100% reliable
- Arithmetic: Tax calculations, currency conversion, inventory counts — floating point math is deterministic
- Routing by explicit rules: If the user selected “billing” from a menu, route to billing — no classification needed
- Data transformations: JSON → CSV, date formatting, unit conversion — pure functions with no ambiguity
- Access control: Permission checks based on role/scope — security decisions must be deterministic
- Idempotency checks: Deduplication, retry detection — hash comparisons, not LLM calls
Hybrid Decision Router
import os
import json
import re
from dataclasses import dataclass, field
from typing import Any, Callable, Optional
from openai import OpenAI
@dataclass
class RoutingDecision:
"""Result of the hybrid router's decision."""
path: str # "deterministic" or "llm"
handler: str
confidence: float
reasoning: str
cost_estimate: float # Estimated cost in dollars
class HybridRouter:
"""Routes requests to deterministic code or LLM based on complexity.
Design principles:
- Default to deterministic: Use AI only when rules can't handle it
- Fail to deterministic: If LLM is unavailable, fall back to rules
- Cost-aware: Track when LLM usage is justified vs wasteful
- Progressive: Start with rules, graduate to LLM as patterns emerge
"""
def __init__(self):
self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-demo-key"))
self.routing_log: list[dict] = []
# Deterministic handlers — fast, free, reliable
self.deterministic_handlers = {
"validate_email": self._validate_email,
"calculate_tax": self._calculate_tax,
"format_currency": self._format_currency,
"check_business_hours": self._check_business_hours,
"route_by_keyword": self._route_by_keyword,
}
# LLM handlers — for ambiguous/complex tasks
self.llm_handlers = {
"classify_intent": self._classify_intent_llm,
"generate_response": self._generate_response_llm,
"summarize_text": self._summarize_text_llm,
"extract_entities": self._extract_entities_llm,
}
# --- Deterministic handlers ---
def _validate_email(self, data: dict) -> dict:
email = data.get("email", "")
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
is_valid = bool(re.match(pattern, email))
return {"valid": is_valid, "email": email, "method": "regex"}
def _calculate_tax(self, data: dict) -> dict:
amount = data.get("amount", 0)
rate = data.get("tax_rate", 0.08)
tax = round(amount * rate, 2)
return {"subtotal": amount, "tax": tax, "total": round(amount + tax, 2), "method": "arithmetic"}
def _format_currency(self, data: dict) -> dict:
amount = data.get("amount", 0)
currency = data.get("currency", "USD")
symbols = {"USD": "$", "EUR": "€", "GBP": "£", "JPY": "¥"}
formatted = f"{symbols.get(currency, currency)}{amount:,.2f}"
return {"formatted": formatted, "method": "string_format"}
def _check_business_hours(self, data: dict) -> dict:
hour = data.get("hour", 12)
is_open = 9 <= hour < 17
return {"is_business_hours": is_open, "hour": hour, "method": "range_check"}
def _route_by_keyword(self, data: dict) -> dict:
text = data.get("text", "").lower()
routes = {"billing": "billing_team", "technical": "engineering", "cancel": "retention"}
for keyword, route in routes.items():
if keyword in text:
return {"route": route, "matched_keyword": keyword, "method": "keyword_match"}
return {"route": "general", "matched_keyword": None, "method": "keyword_match_default"}
# --- LLM handlers (simulated) ---
def _classify_intent_llm(self, data: dict) -> dict:
text = data.get("text", "")
# In production: call OpenAI to classify ambiguous intent
return {"intent": "inquiry", "confidence": 0.84, "text": text[:50], "method": "llm_classification"}
def _generate_response_llm(self, data: dict) -> dict:
query = data.get("query", "")
return {"response": f"Generated response for: {query[:40]}...", "tokens": 150, "method": "llm_generation"}
def _summarize_text_llm(self, data: dict) -> dict:
text = data.get("text", "")
return {"summary": f"Summary of {len(text)} chars of text", "compression": 0.3, "method": "llm_summarization"}
def _extract_entities_llm(self, data: dict) -> dict:
text = data.get("text", "")
return {"entities": [{"text": "example", "type": "ORG"}], "count": 1, "method": "llm_ner"}
def route(self, request_type: str, data: dict) -> RoutingDecision:
"""Decide whether to use deterministic code or LLM."""
# Check deterministic handlers first (fast path)
if request_type in self.deterministic_handlers:
return RoutingDecision(
path="deterministic",
handler=request_type,
confidence=1.0,
reasoning=f"Exact match: '{request_type}' has a deterministic handler",
cost_estimate=0.0,
)
# Check LLM handlers
if request_type in self.llm_handlers:
return RoutingDecision(
path="llm",
handler=request_type,
confidence=0.9,
reasoning=f"Requires LLM: '{request_type}' involves ambiguous/unstructured processing",
cost_estimate=0.003,
)
# Unknown request — default to LLM for flexibility
return RoutingDecision(
path="llm",
handler="generate_response",
confidence=0.6,
reasoning=f"Unknown request type '{request_type}' — falling back to LLM",
cost_estimate=0.005,
)
def execute(self, request_type: str, data: dict) -> dict:
"""Route and execute a request through the hybrid system."""
decision = self.route(request_type, data)
# Execute through appropriate handler
if decision.path == "deterministic":
handler_fn = self.deterministic_handlers[decision.handler]
else:
handler_fn = self.llm_handlers.get(decision.handler, self._generate_response_llm)
result = handler_fn(data)
result["_routing"] = {"path": decision.path, "cost": decision.cost_estimate}
self.routing_log.append({"type": request_type, "path": decision.path, "cost": decision.cost_estimate})
return result
def get_stats(self) -> dict:
"""Get routing statistics showing deterministic vs LLM usage."""
total = len(self.routing_log)
if total == 0:
return {"total": 0}
deterministic = sum(1 for r in self.routing_log if r["path"] == "deterministic")
llm_calls = total - deterministic
total_cost = sum(r["cost"] for r in self.routing_log)
return {
"total_requests": total,
"deterministic": deterministic,
"llm_calls": llm_calls,
"deterministic_pct": round(deterministic / total * 100, 1),
"total_cost": round(total_cost, 4),
"avg_cost_per_request": round(total_cost / total, 5),
}
# Demonstration
router = HybridRouter()
print("=== Hybrid Architecture: Deterministic + LLM ===\n")
requests = [
("validate_email", {"email": "user@example.com"}),
("calculate_tax", {"amount": 99.99, "tax_rate": 0.0825}),
("classify_intent", {"text": "I'm frustrated with the service and want to talk to someone"}),
("format_currency", {"amount": 1234567.89, "currency": "USD"}),
("generate_response", {"query": "Explain your refund policy for enterprise customers"}),
("check_business_hours", {"hour": 14}),
("extract_entities", {"text": "Meeting with Google and Microsoft next Tuesday in Seattle"}),
("route_by_keyword", {"text": "I need help with my billing statement"}),
("summarize_text", {"text": "A" * 5000}),
("validate_email", {"email": "invalid-email"}),
]
for req_type, data in requests:
result = router.execute(req_type, data)
routing = result.pop("_routing")
path_icon = "⚡" if routing["path"] == "deterministic" else "🧠"
cost_str = f"${routing['cost']:.4f}" if routing["cost"] > 0 else "FREE"
print(f" {path_icon} [{routing['path']:>13}] {req_type:<22} cost={cost_str:>7} → {json.dumps(result)[:60]}")
print("\n--- Routing Statistics ---")
stats = router.get_stats()
for key, value in stats.items():
print(f" {key}: {value}")
Fallback Chain Pattern
Production systems need graceful degradation. When the primary LLM is unavailable, slow, or returns low-confidence results, fallback chains provide progressively simpler alternatives. The chain moves from most capable (and most expensive/fragile) to most reliable (and cheapest/simplest), ensuring the system always returns something useful.
import os
import json
import time
from dataclasses import dataclass, field
from typing import Optional, Callable
from openai import OpenAI
@dataclass
class FallbackResult:
"""Result from a fallback chain execution."""
response: str
provider: str
tier: int # 1 = primary, 2 = secondary, etc.
latency_ms: float
confidence: float
fallback_reason: Optional[str] = None
class FallbackChain:
"""Multi-tier fallback chain for resilient AI responses.
Tier 1: Primary LLM (GPT-4o) — best quality, highest cost
Tier 2: Secondary LLM (GPT-4o-mini) — good quality, lower cost
Tier 3: Cached responses — instant, free, limited coverage
Tier 4: Rule-based fallback — always available, basic quality
"""
def __init__(self):
self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-demo-key"))
self.cache: dict[str, str] = {
"what are your business hours": "Our business hours are Monday-Friday, 9 AM - 5 PM EST.",
"how do i reset my password": "Visit account settings and click 'Reset Password'.",
"what is your refund policy": "We offer full refunds within 30 days of purchase.",
}
self.execution_log: list[FallbackResult] = []
def _try_primary(self, query: str) -> Optional[FallbackResult]:
"""Tier 1: Primary model (GPT-4o) — simulated with occasional failures."""
start = time.time()
# Simulate 20% failure rate for demonstration
if hash(query) % 5 == 0:
return None # Simulated timeout/error
latency = 0.8 + (hash(query) % 40) / 100 # 0.8-1.2s simulated
return FallbackResult(
response=f"[GPT-4o] Comprehensive answer to: {query[:50]}",
provider="gpt-4o",
tier=1,
latency_ms=latency * 1000,
confidence=0.92,
)
def _try_secondary(self, query: str) -> Optional[FallbackResult]:
"""Tier 2: Secondary model (GPT-4o-mini) — faster, cheaper, slightly less capable."""
start = time.time()
# Simulate 10% failure rate
if hash(query + "secondary") % 10 == 0:
return None
latency = 0.3 + (hash(query) % 20) / 100 # 0.3-0.5s simulated
return FallbackResult(
response=f"[GPT-4o-mini] Good answer to: {query[:50]}",
provider="gpt-4o-mini",
tier=2,
latency_ms=latency * 1000,
confidence=0.78,
)
def _try_cache(self, query: str) -> Optional[FallbackResult]:
"""Tier 3: Cached responses — instant but limited coverage."""
query_normalized = query.lower().strip().rstrip("?.")
if query_normalized in self.cache:
return FallbackResult(
response=self.cache[query_normalized],
provider="cache",
tier=3,
latency_ms=1.0,
confidence=0.95, # High confidence because it's a known answer
)
return None
def _try_rules(self, query: str) -> FallbackResult:
"""Tier 4: Rule-based fallback — always succeeds with a generic response."""
# Simple keyword-based routing to canned responses
query_lower = query.lower()
if "price" in query_lower or "cost" in query_lower:
response = "For pricing information, please visit our pricing page or contact sales."
elif "help" in query_lower or "support" in query_lower:
response = "I'd be happy to help. Could you provide more details about your issue?"
else:
response = "Thank you for your question. Let me connect you with a specialist who can help."
return FallbackResult(
response=response,
provider="rules",
tier=4,
latency_ms=0.5,
confidence=0.4,
)
def execute(self, query: str) -> FallbackResult:
"""Execute the fallback chain, returning the first successful result."""
tiers = [
("primary", self._try_primary),
("secondary", self._try_secondary),
("cache", self._try_cache),
("rules", self._try_rules),
]
for tier_name, handler in tiers:
result = handler(query)
if result is not None:
if result.tier > 1:
result.fallback_reason = f"Tier {result.tier - 1} unavailable"
self.execution_log.append(result)
return result
# Should never reach here (rules always succeeds)
return self._try_rules(query)
def get_stats(self) -> dict:
"""Get fallback chain execution statistics."""
total = len(self.execution_log)
if total == 0:
return {"total": 0}
tier_counts = {}
for r in self.execution_log:
tier_counts[r.provider] = tier_counts.get(r.provider, 0) + 1
avg_latency = sum(r.latency_ms for r in self.execution_log) / total
avg_confidence = sum(r.confidence for r in self.execution_log) / total
primary_rate = tier_counts.get("gpt-4o", 0) / total * 100
return {
"total_requests": total,
"tier_distribution": tier_counts,
"primary_success_rate": f"{primary_rate:.1f}%",
"avg_latency_ms": round(avg_latency, 1),
"avg_confidence": round(avg_confidence, 3),
}
# Demonstration
chain = FallbackChain()
print("=== Fallback Chain: Graceful Degradation ===\n")
queries = [
"What are your business hours",
"Explain the differences between your enterprise plans",
"How do I reset my password",
"Can you help me with a complex integration issue?",
"What is the price of your premium tier?",
"Tell me about your security certifications and compliance",
"How do I configure SSO for my organization?",
"What is your refund policy",
]
for q in queries:
result = chain.execute(q)
tier_icon = {1: "🟢", 2: "🟡", 3: "💾", 4: "📋"}[result.tier]
fallback = f" (fallback: {result.fallback_reason})" if result.fallback_reason else ""
print(f" {tier_icon} Tier {result.tier} [{result.provider:<11}] {result.latency_ms:>6.1f}ms conf={result.confidence:.2f} | {q[:45]}{fallback}")
print("\n--- Chain Statistics ---")
stats = chain.get_stats()
for key, value in stats.items():
print(f" {key}: {value}")
Architecture Selection Guide
| Architecture | Best For | Avoid When | Complexity | Latency |
|---|---|---|---|---|
| Multi-Agent | Complex tasks needing diverse expertise | Simple single-domain queries | High | High (sequential) |
| Agentic RAG | Knowledge-intensive Q&A with multiple sources | Well-structured FAQ with known answers | Medium | Medium |
| Plan-and-Execute | Multi-step tasks with verification needs | Simple one-shot generations | High | High |
| Human-in-the-Loop | High-stakes decisions, regulated domains | High-volume low-risk automation | Medium | Variable (human wait) |
| Autonomous Agent | Open-ended research, exploration tasks | Time-critical or budget-constrained work | Very High | Very High |
| Event-Driven | Long-running workflows, async processing | Real-time interactive responses | High | Low (async) |
| Hybrid | Production systems optimizing cost/quality | Prototypes where simplicity matters more | Medium | Low (deterministic fast path) |
Next in the Series
In Part 19: Migration & Legacy Integration, we’ll cover migrating from older API versions to the Responses API, wrapping legacy systems with AI layers, incremental adoption strategies, and maintaining backward compatibility during the transition.