Back to AI App Dev Series

OpenAI SDK Track Part 18: Advanced Architectures

May 25, 2026Wasil Zafar40 min read

Go beyond single-model calls with multi-agent collaboration, agentic RAG with dynamic query routing, plan-and-execute decomposition, human-in-the-loop approval workflows, autonomous goal-driven agents, event-driven orchestration, and hybrid architectures that combine deterministic code with LLM reasoning — all with executable Python code using the OpenAI SDK.

Table of Contents

  1. Multi-Agent Systems
  2. Agentic RAG
  3. Plan-and-Execute
  4. Human-in-the-Loop
  5. Autonomous Agents
  6. Event-Driven Architectures
  7. Hybrid Architectures
What You’ll Learn: Production AI systems rarely consist of a single model call. Real-world applications compose multiple specialized agents, route queries dynamically, decompose complex tasks into verified steps, keep humans in the loop for critical decisions, and combine deterministic business logic with LLM reasoning. This article covers seven architectural patterns that represent the state of the art in applied AI engineering: multi-agent delegation, agentic RAG with adaptive retrieval, plan-and-execute with verification loops, human-in-the-loop approval workflows, autonomous goal-driven agents with self-correction, event-driven async orchestration, and hybrid architectures that know when NOT to use AI.
Multi-Agent Collaboration Architecture
                flowchart TD
                    U[User Request] --> O[Orchestrator Agent]

                    O --> R{Route by Intent}
                    R -->|Research| RA[Research Agent]
                    R -->|Code| CA[Code Agent]
                    R -->|Analysis| DA[Data Agent]
                    R -->|Writing| WA[Writing Agent]

                    RA --> KB[(Knowledge Base)]
                    RA --> WS[Web Search]
                    CA --> SB[Sandbox Executor]
                    CA --> GH[GitHub API]
                    DA --> DB[(Database)]
                    DA --> VIZ[Visualization]
                    WA --> TM[Templates]

                    RA --> SC[Shared Context]
                    CA --> SC
                    DA --> SC
                    WA --> SC

                    SC --> V{Verifier Agent}
                    V -->|Pass| RESP[Final Response]
                    V -->|Fail| O
            

1. Multi-Agent Systems

Multi-agent systems decompose complex tasks across specialized agents that collaborate through structured delegation. Rather than one monolithic prompt handling everything, each agent has a focused role, its own system prompt, and access to specific tools. The orchestrator routes tasks, manages shared context, and synthesizes results. This architecture mirrors how human teams work — a project manager delegates to specialists who each contribute their expertise.

Delegation Patterns

PatternDescriptionUse CaseComplexity
Sequential PipelineAgent A → Agent B → Agent C (linear chain)Content generation: research → draft → editLow
Parallel Fan-OutOrchestrator dispatches to N agents simultaneouslyMulti-source research, parallel analysisMedium
Hierarchical DelegationManager agents delegate to worker agentsComplex projects with sub-tasksHigh
Debate/ConsensusMultiple agents argue positions, judge decidesRisk assessment, decision-makingHigh
Specialist RoutingRouter selects single best agent per queryCustomer support, domain-specific Q&ALow

Multi-Agent Orchestrator Implementation

import os
import json
from dataclasses import dataclass, field
from typing import Any
from openai import OpenAI


@dataclass
class AgentConfig:
    """Configuration for a specialized agent in the multi-agent system."""
    name: str
    role: str
    system_prompt: str
    model: str = "gpt-4o"
    tools: list = field(default_factory=list)
    max_tokens: int = 2000


@dataclass
class TaskResult:
    """Result from an agent's work on a delegated task."""
    agent: str
    task: str
    output: str
    confidence: float
    metadata: dict = field(default_factory=dict)


class MultiAgentOrchestrator:
    """Orchestrates multiple specialized agents collaborating on complex tasks.

    Architecture:
    - Router: Classifies intent and selects appropriate agent(s)
    - Specialists: Domain-focused agents with specific tools
    - Shared Context: Accumulated knowledge across agent interactions
    - Verifier: Validates final output quality before returning
    """

    def __init__(self):
        self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-demo-key"))
        self.shared_context: list[dict] = []
        self.agents = self._configure_agents()

    def _configure_agents(self) -> dict[str, AgentConfig]:
        return {
            "research": AgentConfig(
                name="Research Agent",
                role="researcher",
                system_prompt=(
                    "You are a research specialist. Gather facts, cite sources, "
                    "and provide comprehensive background information. Always "
                    "distinguish between verified facts and inferences."
                ),
                tools=["web_search", "knowledge_base_query"],
            ),
            "analyst": AgentConfig(
                name="Data Analyst Agent",
                role="analyst",
                system_prompt=(
                    "You are a data analysis specialist. Interpret data, identify "
                    "patterns, compute statistics, and produce clear summaries. "
                    "Always show your methodology."
                ),
                tools=["sql_query", "calculate", "visualize"],
            ),
            "writer": AgentConfig(
                name="Writing Agent",
                role="writer",
                system_prompt=(
                    "You are a professional writer. Synthesize information into "
                    "clear, well-structured prose. Match the requested tone and "
                    "format. Cite sources provided in shared context."
                ),
                tools=["format_document", "check_grammar"],
            ),
            "coder": AgentConfig(
                name="Code Agent",
                role="developer",
                system_prompt=(
                    "You are a software engineering specialist. Write clean, "
                    "tested, production-quality code. Explain design decisions "
                    "and include error handling."
                ),
                model="gpt-4o",
                tools=["execute_code", "lint", "test_runner"],
            ),
        }

    def route_task(self, user_request: str) -> list[str]:
        """Determine which agent(s) should handle this request."""
        # Simulated routing logic (in production, use an LLM call for classification)
        request_lower = user_request.lower()
        agents_needed = []

        if any(kw in request_lower for kw in ["research", "find", "look up", "what is"]):
            agents_needed.append("research")
        if any(kw in request_lower for kw in ["analyze", "data", "statistics", "trend"]):
            agents_needed.append("analyst")
        if any(kw in request_lower for kw in ["write", "draft", "summarize", "report"]):
            agents_needed.append("writer")
        if any(kw in request_lower for kw in ["code", "implement", "build", "function"]):
            agents_needed.append("coder")

        return agents_needed or ["research"]  # Default to research

    def delegate_to_agent(self, agent_key: str, task: str) -> TaskResult:
        """Delegate a task to a specific agent and get structured results."""
        agent = self.agents[agent_key]
        context_summary = "\n".join(
            f"[{c['agent']}]: {c['output'][:200]}" for c in self.shared_context[-5:]
        )

        # Simulated agent response (in production, call OpenAI API)
        simulated_outputs = {
            "research": f"Research findings for '{task}': Found 3 relevant sources. "
                       f"Key insight: The topic has significant implications for scalability.",
            "analyst": f"Analysis of '{task}': Data shows 23% improvement in the target metric. "
                      f"Statistical significance: p < 0.05. Sample size: n=1,200.",
            "writer": f"Draft for '{task}': [Well-structured document synthesizing research "
                     f"and analysis findings into actionable recommendations.]",
            "coder": f"Implementation for '{task}': Created a Python module with 3 classes, "
                    f"15 methods, full type hints, and 92% test coverage.",
        }

        output = simulated_outputs.get(agent_key, f"Completed: {task}")
        result = TaskResult(
            agent=agent.name,
            task=task,
            output=output,
            confidence=0.87,
            metadata={"model": agent.model, "tools_available": agent.tools},
        )

        # Add to shared context for other agents
        self.shared_context.append({"agent": agent.name, "output": output})
        return result

    def execute(self, user_request: str) -> dict:
        """Full orchestration: route, delegate, verify, and return."""
        # Step 1: Route to appropriate agents
        agent_keys = self.route_task(user_request)

        # Step 2: Delegate tasks (sequential for dependencies, parallel for independent)
        results = []
        for key in agent_keys:
            result = self.delegate_to_agent(key, user_request)
            results.append(result)

        # Step 3: Synthesize results
        synthesis = {
            "request": user_request,
            "agents_used": [r.agent for r in results],
            "results": [{"agent": r.agent, "output": r.output, "confidence": r.confidence} for r in results],
            "shared_context_size": len(self.shared_context),
        }
        return synthesis


# Demonstration
orchestrator = MultiAgentOrchestrator()

print("=== Multi-Agent Orchestrator ===\n")

# Complex request requiring multiple agents
requests = [
    "Research the latest trends in vector databases and write a summary report",
    "Analyze our API latency data and implement a caching solution",
    "Find best practices for rate limiting and code a middleware",
]

for req in requests:
    print(f"Request: {req}")
    result = orchestrator.execute(req)
    print(f"  Agents: {', '.join(result['agents_used'])}")
    for r in result["results"]:
        print(f"    [{r['agent']}] confidence={r['confidence']:.2f}")
        print(f"      {r['output'][:80]}...")
    print(f"  Shared context entries: {result['shared_context_size']}\n")

2. Agentic RAG

Traditional RAG (Retrieval-Augmented Generation) follows a simple retrieve-then-generate pipeline. Agentic RAG adds intelligence to the retrieval step itself — the agent decides what to search for, which knowledge sources to query, whether the retrieved context is sufficient, and when to reformulate queries or search additional sources. This transforms retrieval from a static lookup into an adaptive, multi-step reasoning process.

Agentic RAG vs Traditional RAG

CapabilityTraditional RAGAgentic RAG
Query ProcessingSingle embedding lookupQuery decomposition, reformulation, expansion
Source SelectionSingle vector storeDynamic routing across multiple knowledge bases
Retrieval StrategyTop-K similarityAdaptive: semantic, keyword, graph, SQL as needed
Sufficiency CheckNone — uses whatever is retrievedEvaluates context quality, triggers re-retrieval
Multi-Hop ReasoningNot supportedChains retrievals based on intermediate findings
Self-CorrectionNoneDetects contradictions, seeks authoritative sources

Agentic RAG with Query Routing

import os
import json
from dataclasses import dataclass, field
from typing import Optional
from openai import OpenAI


@dataclass
class RetrievalResult:
    """A single retrieval result from a knowledge source."""
    source: str
    content: str
    relevance_score: float
    metadata: dict = field(default_factory=dict)


@dataclass
class RetrievalPlan:
    """Agent's plan for how to retrieve information."""
    original_query: str
    decomposed_queries: list[str]
    sources_to_query: list[str]
    strategy: str  # "semantic", "keyword", "hybrid", "multi-hop"
    reasoning: str


class AgenticRAG:
    """RAG system with agentic decision-making over retrieval strategy.

    The agent:
    1. Analyzes the query to determine retrieval strategy
    2. Decomposes complex queries into sub-queries
    3. Routes each sub-query to the appropriate knowledge source
    4. Evaluates retrieval quality and re-retrieves if insufficient
    5. Synthesizes results with source attribution
    """

    def __init__(self):
        self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-demo-key"))
        self.knowledge_sources = {
            "technical_docs": {"type": "vector_store", "description": "API docs, architecture guides"},
            "company_wiki": {"type": "vector_store", "description": "Internal policies, processes"},
            "code_repo": {"type": "code_search", "description": "Source code, READMEs"},
            "metrics_db": {"type": "sql", "description": "Performance metrics, usage stats"},
            "support_tickets": {"type": "vector_store", "description": "Past issues and resolutions"},
        }
        self.retrieval_history: list[dict] = []

    def plan_retrieval(self, query: str) -> RetrievalPlan:
        """Agent decides HOW to retrieve information for this query."""
        query_lower = query.lower()

        # Determine if query needs decomposition
        decomposed = [query]
        if " and " in query_lower or "compare" in query_lower:
            # Multi-part query — decompose
            parts = query.split(" and ") if " and " in query_lower else [query]
            decomposed = [p.strip() for p in parts]

        # Route to appropriate sources based on query intent
        sources = []
        if any(kw in query_lower for kw in ["api", "endpoint", "sdk", "function"]):
            sources.append("technical_docs")
        if any(kw in query_lower for kw in ["policy", "process", "procedure", "team"]):
            sources.append("company_wiki")
        if any(kw in query_lower for kw in ["code", "implementation", "class", "module"]):
            sources.append("code_repo")
        if any(kw in query_lower for kw in ["metric", "performance", "latency", "usage"]):
            sources.append("metrics_db")
        if any(kw in query_lower for kw in ["error", "bug", "issue", "problem"]):
            sources.append("support_tickets")

        sources = sources or ["technical_docs", "company_wiki"]

        # Choose retrieval strategy
        if len(sources) > 2:
            strategy = "multi-hop"
        elif "metrics_db" in sources:
            strategy = "hybrid"  # SQL + semantic
        else:
            strategy = "semantic"

        return RetrievalPlan(
            original_query=query,
            decomposed_queries=decomposed,
            sources_to_query=sources,
            strategy=strategy,
            reasoning=f"Query targets {len(sources)} sources using {strategy} strategy",
        )

    def retrieve(self, plan: RetrievalPlan) -> list[RetrievalResult]:
        """Execute the retrieval plan across selected sources."""
        results = []

        for sub_query in plan.decomposed_queries:
            for source in plan.sources_to_query:
                # Simulated retrieval (in production, call actual vector stores/DBs)
                result = RetrievalResult(
                    source=source,
                    content=f"[Retrieved from {source}] Relevant information about: {sub_query[:50]}. "
                           f"This content addresses the core aspects of the query with specific details.",
                    relevance_score=0.82 + (hash(sub_query + source) % 15) / 100,
                    metadata={"source_type": self.knowledge_sources[source]["type"], "query": sub_query},
                )
                results.append(result)

        return results

    def evaluate_sufficiency(self, query: str, results: list[RetrievalResult]) -> dict:
        """Agent evaluates whether retrieved context is sufficient to answer."""
        avg_relevance = sum(r.relevance_score for r in results) / max(len(results), 1)
        sources_covered = len(set(r.source for r in results))

        is_sufficient = avg_relevance > 0.75 and sources_covered >= 2
        return {
            "sufficient": is_sufficient,
            "avg_relevance": round(avg_relevance, 3),
            "sources_covered": sources_covered,
            "total_results": len(results),
            "recommendation": "proceed" if is_sufficient else "reformulate_and_retry",
        }

    def generate_answer(self, query: str, results: list[RetrievalResult]) -> dict:
        """Generate final answer with source attribution."""
        context_parts = [f"[{r.source}] {r.content}" for r in results[:5]]
        sources_used = list(set(r.source for r in results[:5]))

        # Simulated generation (in production, call OpenAI with context)
        answer = (
            f"Based on {len(results)} retrieved passages from {len(sources_used)} sources: "
            f"The answer to '{query[:60]}' involves multiple aspects covered in our "
            f"knowledge base. Key findings span {', '.join(sources_used)}."
        )

        return {
            "query": query,
            "answer": answer,
            "sources": sources_used,
            "context_passages_used": len(context_parts),
            "confidence": round(sum(r.relevance_score for r in results[:5]) / 5, 3),
        }

    def query(self, user_query: str) -> dict:
        """Full agentic RAG pipeline: plan, retrieve, evaluate, generate."""
        # Step 1: Plan retrieval strategy
        plan = self.plan_retrieval(user_query)

        # Step 2: Execute retrieval
        results = self.retrieve(plan)

        # Step 3: Evaluate sufficiency
        evaluation = self.evaluate_sufficiency(user_query, results)

        # Step 4: Re-retrieve if insufficient (one retry)
        if not evaluation["sufficient"]:
            expanded_plan = RetrievalPlan(
                original_query=user_query,
                decomposed_queries=[user_query, f"background context for: {user_query}"],
                sources_to_query=list(self.knowledge_sources.keys())[:3],
                strategy="multi-hop",
                reasoning="Initial retrieval insufficient — expanding search scope",
            )
            results = self.retrieve(expanded_plan)
            evaluation = self.evaluate_sufficiency(user_query, results)

        # Step 5: Generate answer with attribution
        answer = self.generate_answer(user_query, results)

        self.retrieval_history.append({"query": user_query, "plan": plan.strategy, "results": len(results)})

        return {
            "plan": {"strategy": plan.strategy, "sources": plan.sources_to_query, "sub_queries": len(plan.decomposed_queries)},
            "retrieval": {"total_results": len(results), "evaluation": evaluation},
            "answer": answer,
        }


# Demonstration
rag = AgenticRAG()

print("=== Agentic RAG with Query Routing ===\n")

queries = [
    "How does our authentication API handle token refresh and what's the current error rate?",
    "Compare our deployment process with industry best practices",
    "What code changes caused the latency spike last Tuesday?",
]

for q in queries:
    print(f"Query: {q}")
    result = rag.query(q)
    plan = result["plan"]
    print(f"  Strategy: {plan['strategy']} | Sources: {plan['sources']} | Sub-queries: {plan['sub_queries']}")
    print(f"  Retrieved: {result['retrieval']['total_results']} passages | Sufficient: {result['retrieval']['evaluation']['sufficient']}")
    print(f"  Answer confidence: {result['answer']['confidence']}")
    print(f"  Sources cited: {result['answer']['sources']}\n")

3. Plan-and-Execute

The plan-and-execute pattern separates thinking from doing. A planning agent decomposes a complex goal into discrete, verifiable steps. An executor agent carries out each step. A verifier checks results before proceeding. This separation allows you to use a more capable (expensive) model for planning and a faster (cheaper) model for execution, while maintaining quality through verification loops.

When to Use Plan-and-Execute: This pattern excels when tasks have multiple dependent steps, when partial failure requires rollback or retry, when you need an audit trail of decisions, or when the cost of getting a step wrong is high (e.g., data migrations, multi-service deployments, complex analyses). It adds latency and cost for simple tasks — don’t over-engineer single-shot queries.

Plan-and-Execute with Verification

import os
import json
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional
from openai import OpenAI


class StepStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    SKIPPED = "skipped"


@dataclass
class PlanStep:
    """A single step in an execution plan."""
    step_id: int
    description: str
    agent: str  # Which agent executes this step
    dependencies: list[int] = field(default_factory=list)
    status: StepStatus = StepStatus.PENDING
    result: Optional[str] = None
    verification: Optional[str] = None
    retries: int = 0
    max_retries: int = 2


@dataclass
class ExecutionPlan:
    """A structured plan decomposing a goal into verifiable steps."""
    goal: str
    steps: list[PlanStep]
    created_by: str = "planner"
    status: str = "active"


class PlanAndExecuteAgent:
    """Separates planning from execution with verification loops.

    Architecture:
    - Planner: Decomposes goals into ordered steps with dependencies
    - Executor: Carries out each step using appropriate tools
    - Verifier: Validates step results before marking complete
    - Controller: Manages execution flow, retries, and rollback
    """

    def __init__(self):
        self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-demo-key"))
        self.execution_log: list[dict] = []

    def create_plan(self, goal: str) -> ExecutionPlan:
        """Planner agent decomposes the goal into executable steps."""
        # In production, this would be an LLM call to decompose the goal
        # Here we simulate planning for a common enterprise task
        if "migrate" in goal.lower() or "deploy" in goal.lower():
            steps = [
                PlanStep(1, "Audit current state and identify dependencies", "analyst"),
                PlanStep(2, "Create backup of existing data/config", "executor", dependencies=[1]),
                PlanStep(3, "Validate backup integrity", "verifier", dependencies=[2]),
                PlanStep(4, "Execute migration/deployment in staging", "executor", dependencies=[3]),
                PlanStep(5, "Run integration tests against staging", "tester", dependencies=[4]),
                PlanStep(6, "Execute migration/deployment in production", "executor", dependencies=[5]),
                PlanStep(7, "Run smoke tests and verify production health", "verifier", dependencies=[6]),
                PlanStep(8, "Update documentation and notify stakeholders", "writer", dependencies=[7]),
            ]
        else:
            steps = [
                PlanStep(1, "Analyze requirements and gather context", "analyst"),
                PlanStep(2, "Design solution approach", "architect", dependencies=[1]),
                PlanStep(3, "Implement core logic", "coder", dependencies=[2]),
                PlanStep(4, "Write tests and validate", "tester", dependencies=[3]),
                PlanStep(5, "Review and finalize", "verifier", dependencies=[4]),
            ]

        return ExecutionPlan(goal=goal, steps=steps)

    def execute_step(self, step: PlanStep, context: dict) -> str:
        """Executor agent carries out a single step."""
        step.status = StepStatus.RUNNING

        # Simulated execution (in production, call appropriate tools/APIs)
        simulated_results = {
            "analyst": f"Analysis complete: Identified 5 components affected. Risk level: medium.",
            "executor": f"Execution complete: All operations succeeded. Duration: 4.2s.",
            "verifier": f"Verification passed: All checksums match, no data loss detected.",
            "tester": f"Tests passed: 47/47 assertions green. Coverage: 89%.",
            "architect": f"Design finalized: Chose microservice pattern with event sourcing.",
            "coder": f"Implementation complete: 3 modules, 12 functions, type-safe.",
            "writer": f"Documentation updated: README, CHANGELOG, and runbook refreshed.",
        }

        result = simulated_results.get(step.agent, f"Step completed by {step.agent}")
        step.result = result
        step.status = StepStatus.COMPLETED
        return result

    def verify_step(self, step: PlanStep) -> bool:
        """Verifier checks if the step result meets acceptance criteria."""
        # Simulated verification (in production, use an LLM to evaluate)
        if step.status != StepStatus.COMPLETED or not step.result:
            return False

        # Check for failure indicators
        failure_keywords = ["error", "failed", "timeout", "mismatch"]
        is_valid = not any(kw in step.result.lower() for kw in failure_keywords)

        step.verification = "PASSED" if is_valid else "FAILED"
        return is_valid

    def execute_plan(self, plan: ExecutionPlan) -> dict:
        """Execute the full plan with dependency resolution and verification."""
        completed_steps = set()
        context = {"goal": plan.goal, "results": {}}

        for step in plan.steps:
            # Check dependencies
            deps_met = all(d in completed_steps for d in step.dependencies)
            if not deps_met:
                step.status = StepStatus.SKIPPED
                self.execution_log.append({"step": step.step_id, "status": "skipped", "reason": "dependency not met"})
                continue

            # Execute with retry loop
            success = False
            while step.retries <= step.max_retries and not success:
                result = self.execute_step(step, context)
                success = self.verify_step(step)

                if not success and step.retries < step.max_retries:
                    step.retries += 1
                    step.status = StepStatus.PENDING
                    self.execution_log.append({"step": step.step_id, "status": "retry", "attempt": step.retries})

            if success:
                completed_steps.add(step.step_id)
                context["results"][step.step_id] = result
                self.execution_log.append({"step": step.step_id, "status": "completed"})
            else:
                step.status = StepStatus.FAILED
                self.execution_log.append({"step": step.step_id, "status": "failed"})
                break  # Stop execution on unrecoverable failure

        # Summary
        return {
            "goal": plan.goal,
            "total_steps": len(plan.steps),
            "completed": len(completed_steps),
            "failed": sum(1 for s in plan.steps if s.status == StepStatus.FAILED),
            "skipped": sum(1 for s in plan.steps if s.status == StepStatus.SKIPPED),
            "success": all(s.status == StepStatus.COMPLETED for s in plan.steps),
            "steps": [
                {"id": s.step_id, "desc": s.description, "status": s.status.value, "verification": s.verification}
                for s in plan.steps
            ],
        }


# Demonstration
agent = PlanAndExecuteAgent()

print("=== Plan-and-Execute Agent ===\n")

goal = "Migrate the user authentication service from PostgreSQL to DynamoDB"
print(f"Goal: {goal}\n")

# Step 1: Create plan
plan = agent.create_plan(goal)
print("--- Execution Plan ---")
for step in plan.steps:
    deps = f" (depends on: {step.dependencies})" if step.dependencies else ""
    print(f"  Step {step.step_id}: [{step.agent}] {step.description}{deps}")

# Step 2: Execute plan
print("\n--- Executing Plan ---")
result = agent.execute_plan(plan)

for step in result["steps"]:
    icon = "✓" if step["status"] == "completed" else "✗" if step["status"] == "failed" else "○"
    print(f"  {icon} Step {step['id']}: {step['desc'][:50]} [{step['status']}] verify={step['verification']}")

print(f"\n--- Result ---")
print(f"  Success: {result['success']} | Completed: {result['completed']}/{result['total_steps']}")
print(f"  Failed: {result['failed']} | Skipped: {result['skipped']}")

4. Human-in-the-Loop

Not every AI decision should be autonomous. Human-in-the-loop (HITL) patterns keep humans in control of high-stakes decisions while automating routine tasks. The key design challenge is determining when to escalate — too often and you lose the automation benefit, too rarely and you risk costly errors. Confidence thresholds, cost gates, and semantic risk classification enable selective automation that scales trust over time.

Escalation Decision Framework

SignalThresholdActionExample
Low ConfidenceScore < 0.7Escalate for human reviewAmbiguous customer intent
High CostAction cost > $1,000Require approval before executionRefund processing, contract changes
Sensitive DomainPII/PHI/financial dataAlways require human sign-offMedical advice, legal opinions
Novel ScenarioNo similar past caseRoute to specialistFirst-time edge case
Contradictory SignalsAgent disagreementPresent options to humanConflicting data sources

Approval Workflow with Confidence Thresholds

import os
import json
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional, Callable
from openai import OpenAI


class ApprovalStatus(Enum):
    AUTO_APPROVED = "auto_approved"
    PENDING_REVIEW = "pending_review"
    HUMAN_APPROVED = "human_approved"
    HUMAN_REJECTED = "human_rejected"
    ESCALATED = "escalated"


class RiskLevel(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"


@dataclass
class Decision:
    """An AI-generated decision requiring potential human approval."""
    decision_id: str
    action: str
    reasoning: str
    confidence: float
    risk_level: RiskLevel
    estimated_cost: float = 0.0
    involves_pii: bool = False
    status: ApprovalStatus = ApprovalStatus.PENDING_REVIEW
    reviewer: Optional[str] = None
    review_notes: Optional[str] = None
    timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())


class HumanInTheLoopController:
    """Manages selective automation with human oversight.

    Routing logic:
    - HIGH confidence + LOW risk → Auto-approve
    - MEDIUM confidence OR MEDIUM risk → Queue for async review
    - LOW confidence OR HIGH risk → Block until human approves
    - CRITICAL risk → Escalate to senior reviewer immediately
    """

    def __init__(self):
        self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-demo-key"))
        self.approval_queue: list[Decision] = []
        self.decision_log: list[Decision] = []
        self.thresholds = {
            "auto_approve_confidence": 0.85,
            "escalation_confidence": 0.5,
            "cost_gate": 500.0,  # Require approval above this dollar amount
            "pii_always_review": True,
        }

    def classify_risk(self, action: str, context: dict) -> RiskLevel:
        """Classify the risk level of a proposed action."""
        action_lower = action.lower()

        if any(kw in action_lower for kw in ["delete", "terminate", "revoke", "legal"]):
            return RiskLevel.CRITICAL
        if any(kw in action_lower for kw in ["refund", "modify account", "change plan", "escalate"]):
            return RiskLevel.HIGH
        if any(kw in action_lower for kw in ["update", "send email", "create ticket"]):
            return RiskLevel.MEDIUM
        return RiskLevel.LOW

    def should_auto_approve(self, decision: Decision) -> bool:
        """Determine if a decision can be auto-approved without human review."""
        if decision.involves_pii and self.thresholds["pii_always_review"]:
            return False
        if decision.estimated_cost > self.thresholds["cost_gate"]:
            return False
        if decision.risk_level in (RiskLevel.HIGH, RiskLevel.CRITICAL):
            return False
        if decision.confidence < self.thresholds["auto_approve_confidence"]:
            return False
        return True

    def process_decision(self, action: str, confidence: float, context: dict) -> Decision:
        """Process an AI decision through the approval workflow."""
        risk = self.classify_risk(action, context)
        cost = context.get("estimated_cost", 0.0)
        has_pii = context.get("involves_pii", False)

        decision = Decision(
            decision_id=f"dec-{len(self.decision_log)+1:04d}",
            action=action,
            reasoning=f"AI determined this action based on context: {json.dumps(context)[:100]}",
            confidence=confidence,
            risk_level=risk,
            estimated_cost=cost,
            involves_pii=has_pii,
        )

        # Routing logic
        if self.should_auto_approve(decision):
            decision.status = ApprovalStatus.AUTO_APPROVED
        elif decision.risk_level == RiskLevel.CRITICAL:
            decision.status = ApprovalStatus.ESCALATED
        elif decision.confidence < self.thresholds["escalation_confidence"]:
            decision.status = ApprovalStatus.ESCALATED
        else:
            decision.status = ApprovalStatus.PENDING_REVIEW
            self.approval_queue.append(decision)

        self.decision_log.append(decision)
        return decision

    def human_review(self, decision_id: str, approved: bool, reviewer: str, notes: str = "") -> Decision:
        """Simulate a human reviewing and approving/rejecting a decision."""
        for decision in self.decision_log:
            if decision.decision_id == decision_id:
                decision.status = ApprovalStatus.HUMAN_APPROVED if approved else ApprovalStatus.HUMAN_REJECTED
                decision.reviewer = reviewer
                decision.review_notes = notes
                # Remove from queue
                self.approval_queue = [d for d in self.approval_queue if d.decision_id != decision_id]
                return decision
        raise ValueError(f"Decision {decision_id} not found")

    def get_stats(self) -> dict:
        """Get approval workflow statistics."""
        total = len(self.decision_log)
        if total == 0:
            return {"total": 0}

        statuses = {}
        for d in self.decision_log:
            statuses[d.status.value] = statuses.get(d.status.value, 0) + 1

        return {
            "total_decisions": total,
            "auto_approved": statuses.get("auto_approved", 0),
            "pending_review": statuses.get("pending_review", 0),
            "escalated": statuses.get("escalated", 0),
            "human_approved": statuses.get("human_approved", 0),
            "human_rejected": statuses.get("human_rejected", 0),
            "automation_rate": round(statuses.get("auto_approved", 0) / total * 100, 1),
            "queue_depth": len(self.approval_queue),
        }


# Demonstration
controller = HumanInTheLoopController()

print("=== Human-in-the-Loop Approval Workflow ===\n")

# Simulate various decisions
scenarios = [
    ("Send order confirmation email", 0.95, {"estimated_cost": 0.01, "involves_pii": False}),
    ("Process $50 refund for delayed shipment", 0.88, {"estimated_cost": 50.0, "involves_pii": False}),
    ("Update customer address in database", 0.82, {"estimated_cost": 0.0, "involves_pii": True}),
    ("Process $2,500 refund for service outage", 0.75, {"estimated_cost": 2500.0, "involves_pii": False}),
    ("Delete user account and all associated data", 0.90, {"estimated_cost": 0.0, "involves_pii": True}),
    ("Respond to general FAQ question", 0.92, {"estimated_cost": 0.0, "involves_pii": False}),
    ("Recommend treatment plan to patient", 0.45, {"estimated_cost": 0.0, "involves_pii": True}),
    ("Create support ticket for follow-up", 0.89, {"estimated_cost": 0.0, "involves_pii": False}),
]

for action, confidence, context in scenarios:
    decision = controller.process_decision(action, confidence, context)
    risk_icon = {"low": "🟢", "medium": "🟡", "high": "🟠", "critical": "🔴"}[decision.risk_level.value]
    print(f"  {risk_icon} [{decision.status.value:<16}] conf={confidence:.2f} cost=${context['estimated_cost']:>7.2f} | {action[:55]}")

# Simulate human reviews
print("\n--- Human Reviews ---")
pending = [d for d in controller.decision_log if d.status == ApprovalStatus.PENDING_REVIEW]
for d in pending[:2]:
    reviewed = controller.human_review(d.decision_id, approved=True, reviewer="alice@company.com", notes="Approved per policy")
    print(f"  ✓ {reviewed.decision_id}: '{reviewed.action[:40]}' approved by {reviewed.reviewer}")

# Statistics
print("\n--- Workflow Statistics ---")
stats = controller.get_stats()
for key, value in stats.items():
    print(f"  {key}: {value}")

5. Autonomous Agents

Autonomous agents operate with minimal human supervision toward a specified goal. They maintain working memory across interactions, self-correct when encountering errors, acquire new tools or information as needed, and persist toward objectives through multiple reasoning cycles. The key architectural challenge is balancing autonomy with safety — agents need enough freedom to be useful but enough constraints to prevent runaway behavior.

Safety Boundaries: Autonomous agents MUST have: (1) Maximum iteration limits to prevent infinite loops; (2) Budget caps on API calls and tool usage; (3) Scope boundaries defining what actions are allowed; (4) Kill switches for immediate shutdown; (5) Audit trails for every decision and action. Without these guardrails, an autonomous agent can exhaust resources, take unintended actions, or get stuck in unproductive loops.

Goal-Driven Agent with Self-Correction

import os
import json
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
from openai import OpenAI


class AgentState(Enum):
    THINKING = "thinking"
    ACTING = "acting"
    OBSERVING = "observing"
    CORRECTING = "correcting"
    COMPLETE = "complete"
    FAILED = "failed"


@dataclass
class MemoryEntry:
    """A single entry in the agent's working memory."""
    cycle: int
    state: AgentState
    thought: str
    action: Optional[str] = None
    observation: Optional[str] = None
    timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())


@dataclass
class AgentGoal:
    """A goal the autonomous agent is pursuing."""
    description: str
    success_criteria: list[str]
    max_cycles: int = 10
    budget_remaining: float = 1.0  # Dollar budget for API calls


class AutonomousAgent:
    """Goal-driven agent with ReAct loop and self-correction.

    Cycle: Think → Act → Observe → (Correct if needed) → Repeat
    Stops when: goal achieved, max cycles reached, or budget exhausted.
    """

    def __init__(self):
        self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-demo-key"))
        self.memory: list[MemoryEntry] = []
        self.tools_available = ["search", "calculate", "write_file", "read_file", "ask_user"]
        self.tools_acquired: list[str] = []
        self.cycle_count = 0
        self.total_cost = 0.0

    def think(self, goal: AgentGoal) -> str:
        """Reasoning step: decide what to do next based on memory and goal."""
        # Review past actions and observations
        recent_memory = self.memory[-5:] if self.memory else []
        past_actions = [m.action for m in recent_memory if m.action]
        past_observations = [m.observation for m in recent_memory if m.observation]

        # Determine next action (simulated reasoning)
        if not past_actions:
            thought = f"Starting fresh on goal: '{goal.description}'. First, I need to gather context."
        elif past_observations and "error" in (past_observations[-1] or "").lower():
            thought = "Previous action resulted in an error. I need to correct my approach."
        elif len(past_actions) > 3 and len(set(past_actions)) < 2:
            thought = "I'm repeating the same action. Let me try a different approach."
        else:
            criteria_met = sum(1 for _ in goal.success_criteria[:len(past_observations)])
            thought = f"Progress: {criteria_met}/{len(goal.success_criteria)} criteria addressed. Continuing."

        return thought

    def act(self, thought: str, goal: AgentGoal) -> str:
        """Action step: execute a tool or generate output."""
        self.total_cost += 0.02  # Simulated API cost per action

        # Choose action based on thought
        if "gather context" in thought.lower() or "starting" in thought.lower():
            return "search: gathering background information on the topic"
        elif "error" in thought.lower() or "correct" in thought.lower():
            return "search: looking for alternative approach to resolve the issue"
        elif "different approach" in thought.lower():
            return "calculate: attempting analytical approach instead of search"
        else:
            return "write_file: drafting solution based on gathered information"

    def observe(self, action: str) -> str:
        """Observation step: process the result of an action."""
        # Simulated observations
        if "search" in action:
            return "Found 3 relevant results. Key information extracted successfully."
        elif "calculate" in action:
            return "Calculation complete. Result: 94.7% confidence in proposed solution."
        elif "write_file" in action:
            return "File written successfully. Solution documented with 5 sections."
        elif "error" in action:
            return "Error: Resource not found. Need to adjust approach."
        return "Action completed with partial results."

    def should_self_correct(self, observation: str) -> bool:
        """Determine if self-correction is needed."""
        correction_triggers = ["error", "failed", "not found", "timeout", "partial"]
        return any(trigger in observation.lower() for trigger in correction_triggers)

    def self_correct(self, observation: str) -> str:
        """Generate a correction strategy based on the failed observation."""
        if "not found" in observation.lower():
            return "Resource missing — will try alternative source or create from scratch."
        if "partial" in observation.lower():
            return "Partial results — will supplement with additional queries."
        return "Unexpected result — will retry with modified parameters."

    def check_goal_completion(self, goal: AgentGoal) -> bool:
        """Check if the goal's success criteria have been met."""
        observations = [m.observation for m in self.memory if m.observation]
        # Simplified: goal complete after sufficient successful observations
        successful_obs = [o for o in observations if o and "error" not in o.lower()]
        return len(successful_obs) >= len(goal.success_criteria)

    def run(self, goal: AgentGoal) -> dict:
        """Execute the autonomous agent loop until goal is achieved or limits hit."""
        print(f"  Goal: {goal.description}")
        print(f"  Success criteria: {goal.success_criteria}")
        print(f"  Limits: max_cycles={goal.max_cycles}, budget=${goal.budget_remaining:.2f}\n")

        while self.cycle_count < goal.max_cycles and self.total_cost < goal.budget_remaining:
            self.cycle_count += 1

            # Think
            thought = self.think(goal)
            entry = MemoryEntry(cycle=self.cycle_count, state=AgentState.THINKING, thought=thought)

            # Act
            action = self.act(thought, goal)
            entry.action = action
            entry.state = AgentState.ACTING

            # Observe
            observation = self.observe(action)
            entry.observation = observation
            entry.state = AgentState.OBSERVING

            # Self-correct if needed
            if self.should_self_correct(observation):
                correction = self.self_correct(observation)
                entry.state = AgentState.CORRECTING
                print(f"  Cycle {self.cycle_count}: [CORRECT] {correction[:60]}")
            else:
                print(f"  Cycle {self.cycle_count}: [{entry.state.value.upper():>9}] {thought[:40]}... → {observation[:40]}")

            self.memory.append(entry)

            # Check completion
            if self.check_goal_completion(goal):
                print(f"\n  ✓ Goal achieved in {self.cycle_count} cycles (cost: ${self.total_cost:.3f})")
                return {"status": "complete", "cycles": self.cycle_count, "cost": self.total_cost, "memory_size": len(self.memory)}

        # Limit reached
        reason = "max_cycles" if self.cycle_count >= goal.max_cycles else "budget_exhausted"
        print(f"\n  ✗ Stopped: {reason} after {self.cycle_count} cycles (cost: ${self.total_cost:.3f})")
        return {"status": "stopped", "reason": reason, "cycles": self.cycle_count, "cost": self.total_cost}


# Demonstration
print("=== Autonomous Agent with Self-Correction ===\n")

agent = AutonomousAgent()

goal = AgentGoal(
    description="Research and summarize the top 3 approaches to LLM evaluation",
    success_criteria=[
        "Identify evaluation approaches",
        "Gather metrics for each approach",
        "Write structured summary",
    ],
    max_cycles=8,
    budget_remaining=0.50,
)

result = agent.run(goal)
print(f"\n  Final: status={result['status']}, memory_entries={result['memory_size']}")

6. Event-Driven Architectures

Event-driven AI architectures decouple agent invocation from request-response cycles. Agents are triggered by events (webhooks, queue messages, scheduled triggers, file uploads) and can run asynchronously for minutes or hours without blocking callers. This pattern is essential for long-running workflows like document processing pipelines, multi-step approval chains, and background analysis tasks that would timeout in synchronous HTTP handlers.

Async Agent Orchestration Patterns

PatternTriggerDurationUse Case
Webhook-TriggeredHTTP POST from external serviceSeconds to minutesGitHub PR review, Slack commands
Queue-DrivenMessage on SQS/RabbitMQ/KafkaMinutes to hoursDocument processing, batch analysis
Scheduled (Cron)Timer/schedule triggerMinutesDaily reports, periodic audits
File UploadObject storage event (S3/Blob)MinutesInvoice extraction, image analysis
State MachineWorkflow step completionDays (multi-step)Approval chains, onboarding flows

Event-Driven Workflow Engine

import os
import json
import asyncio
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Callable, Optional
from openai import OpenAI


class EventType(Enum):
    WEBHOOK = "webhook"
    QUEUE_MESSAGE = "queue_message"
    SCHEDULED = "scheduled"
    FILE_UPLOAD = "file_upload"
    WORKFLOW_STEP = "workflow_step"
    AGENT_COMPLETE = "agent_complete"


class WorkflowStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    WAITING = "waiting_for_event"
    COMPLETED = "completed"
    FAILED = "failed"


@dataclass
class Event:
    """An event that triggers agent processing."""
    event_id: str
    event_type: EventType
    payload: dict
    source: str
    timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
    correlation_id: Optional[str] = None


@dataclass
class WorkflowStep:
    """A step in a long-running workflow."""
    step_id: str
    name: str
    agent: str
    status: WorkflowStatus = WorkflowStatus.PENDING
    input_data: dict = field(default_factory=dict)
    output_data: dict = field(default_factory=dict)
    wait_for_event: Optional[EventType] = None
    started_at: Optional[str] = None
    completed_at: Optional[str] = None


@dataclass
class Workflow:
    """A long-running workflow composed of event-triggered steps."""
    workflow_id: str
    name: str
    steps: list[WorkflowStep]
    status: WorkflowStatus = WorkflowStatus.PENDING
    current_step_index: int = 0
    created_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())


class EventDrivenOrchestrator:
    """Async orchestrator for event-driven AI workflows.

    Features:
    - Event routing: maps events to workflow steps
    - Long-running support: workflows can pause waiting for events
    - Correlation: links related events across a workflow
    - Dead letter: handles failed/unroutable events
    """

    def __init__(self):
        self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-demo-key"))
        self.workflows: dict[str, Workflow] = {}
        self.event_log: list[Event] = []
        self.dead_letter: list[Event] = []
        self.handlers: dict[EventType, list[Callable]] = {et: [] for et in EventType}

    def create_workflow(self, name: str, steps: list[dict]) -> Workflow:
        """Create a new long-running workflow definition."""
        workflow_steps = [
            WorkflowStep(
                step_id=f"step-{i+1}",
                name=s["name"],
                agent=s["agent"],
                wait_for_event=EventType(s["wait_for"]) if "wait_for" in s else None,
            )
            for i, s in enumerate(steps)
        ]

        workflow = Workflow(
            workflow_id=f"wf-{len(self.workflows)+1:04d}",
            name=name,
            steps=workflow_steps,
        )
        self.workflows[workflow.workflow_id] = workflow
        return workflow

    def emit_event(self, event: Event) -> dict:
        """Process an incoming event through the orchestrator."""
        self.event_log.append(event)

        # Find workflows waiting for this event type
        handled = False
        for wf_id, workflow in self.workflows.items():
            if workflow.status in (WorkflowStatus.PENDING, WorkflowStatus.WAITING):
                current_step = workflow.steps[workflow.current_step_index]

                if current_step.wait_for_event == event.event_type or workflow.status == WorkflowStatus.PENDING:
                    result = self._execute_step(workflow, current_step, event)
                    handled = True
                    break

        if not handled:
            self.dead_letter.append(event)
            return {"status": "unrouted", "event_id": event.event_id, "dead_lettered": True}

        return {"status": "processed", "event_id": event.event_id, "workflow_id": wf_id}

    def _execute_step(self, workflow: Workflow, step: WorkflowStep, event: Event) -> dict:
        """Execute a single workflow step triggered by an event."""
        step.status = WorkflowStatus.RUNNING
        step.started_at = datetime.now(timezone.utc).isoformat()
        step.input_data = event.payload
        workflow.status = WorkflowStatus.RUNNING

        # Simulated agent processing
        step.output_data = {
            "agent": step.agent,
            "result": f"Processed by {step.agent}: {step.name}",
            "event_source": event.source,
        }
        step.status = WorkflowStatus.COMPLETED
        step.completed_at = datetime.now(timezone.utc).isoformat()

        # Advance workflow
        workflow.current_step_index += 1
        if workflow.current_step_index >= len(workflow.steps):
            workflow.status = WorkflowStatus.COMPLETED
        else:
            next_step = workflow.steps[workflow.current_step_index]
            if next_step.wait_for_event:
                workflow.status = WorkflowStatus.WAITING
            else:
                # Auto-advance to next step
                next_event = Event(
                    event_id=f"auto-{workflow.workflow_id}-{workflow.current_step_index}",
                    event_type=EventType.WORKFLOW_STEP,
                    payload=step.output_data,
                    source="orchestrator",
                    correlation_id=workflow.workflow_id,
                )
                self._execute_step(workflow, next_step, next_event)

        return {"step": step.name, "status": step.status.value}

    def get_workflow_status(self, workflow_id: str) -> dict:
        """Get the current status of a workflow."""
        wf = self.workflows.get(workflow_id)
        if not wf:
            return {"error": "Workflow not found"}

        return {
            "workflow_id": wf.workflow_id,
            "name": wf.name,
            "status": wf.status.value,
            "progress": f"{wf.current_step_index}/{len(wf.steps)}",
            "steps": [
                {"name": s.name, "status": s.status.value, "agent": s.agent}
                for s in wf.steps
            ],
        }


# Demonstration
orchestrator = EventDrivenOrchestrator()

print("=== Event-Driven AI Workflow ===\n")

# Define a document processing workflow
workflow = orchestrator.create_workflow(
    name="Invoice Processing Pipeline",
    steps=[
        {"name": "Extract document data", "agent": "extraction_agent"},
        {"name": "Validate extracted fields", "agent": "validation_agent"},
        {"name": "Match to purchase orders", "agent": "matching_agent"},
        {"name": "Route for approval", "agent": "routing_agent", "wait_for": "webhook"},
        {"name": "Post to accounting system", "agent": "integration_agent"},
    ],
)

print(f"Created workflow: {workflow.workflow_id} ({workflow.name})")
print(f"  Steps: {len(workflow.steps)}\n")

# Simulate events triggering the workflow
events = [
    Event("evt-001", EventType.FILE_UPLOAD, {"file": "invoice-2026-0142.pdf", "size_kb": 340}, "s3-bucket"),
    Event("evt-002", EventType.WEBHOOK, {"approved": True, "approver": "finance-mgr@company.com"}, "slack-app"),
]

for event in events:
    print(f"Event: [{event.event_type.value}] from {event.source}")
    result = orchestrator.emit_event(event)
    print(f"  Result: {result['status']}")

    status = orchestrator.get_workflow_status(workflow.workflow_id)
    print(f"  Workflow: {status['status']} ({status['progress']} steps complete)")
    for step in status["steps"]:
        icon = "✓" if step["status"] == "completed" else "⏳" if step["status"] == "waiting_for_event" else "○"
        print(f"    {icon} {step['name']} [{step['agent']}] — {step['status']}")
    print()

# Stats
print(f"Events processed: {len(orchestrator.event_log)}")
print(f"Dead letters: {len(orchestrator.dead_letter)}")

7. Hybrid Architectures

The most effective production AI systems are hybrid — they combine deterministic code paths with LLM reasoning, knowing when to use each. Not every decision needs AI: validation rules, data transformations, arithmetic, and well-defined business logic should remain in deterministic code. LLMs excel at ambiguous classification, natural language understanding, creative generation, and complex reasoning over unstructured data. The art is knowing where to draw the line.

When NOT to Use AI

Keep These Deterministic:
  • Input validation: Email format, phone numbers, required fields — regex and schema validators are faster, cheaper, and 100% reliable
  • Arithmetic: Tax calculations, currency conversion, inventory counts — floating point math is deterministic
  • Routing by explicit rules: If the user selected “billing” from a menu, route to billing — no classification needed
  • Data transformations: JSON → CSV, date formatting, unit conversion — pure functions with no ambiguity
  • Access control: Permission checks based on role/scope — security decisions must be deterministic
  • Idempotency checks: Deduplication, retry detection — hash comparisons, not LLM calls

Hybrid Decision Router

import os
import json
import re
from dataclasses import dataclass, field
from typing import Any, Callable, Optional
from openai import OpenAI


@dataclass
class RoutingDecision:
    """Result of the hybrid router's decision."""
    path: str  # "deterministic" or "llm"
    handler: str
    confidence: float
    reasoning: str
    cost_estimate: float  # Estimated cost in dollars


class HybridRouter:
    """Routes requests to deterministic code or LLM based on complexity.

    Design principles:
    - Default to deterministic: Use AI only when rules can't handle it
    - Fail to deterministic: If LLM is unavailable, fall back to rules
    - Cost-aware: Track when LLM usage is justified vs wasteful
    - Progressive: Start with rules, graduate to LLM as patterns emerge
    """

    def __init__(self):
        self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-demo-key"))
        self.routing_log: list[dict] = []

        # Deterministic handlers — fast, free, reliable
        self.deterministic_handlers = {
            "validate_email": self._validate_email,
            "calculate_tax": self._calculate_tax,
            "format_currency": self._format_currency,
            "check_business_hours": self._check_business_hours,
            "route_by_keyword": self._route_by_keyword,
        }

        # LLM handlers — for ambiguous/complex tasks
        self.llm_handlers = {
            "classify_intent": self._classify_intent_llm,
            "generate_response": self._generate_response_llm,
            "summarize_text": self._summarize_text_llm,
            "extract_entities": self._extract_entities_llm,
        }

    # --- Deterministic handlers ---
    def _validate_email(self, data: dict) -> dict:
        email = data.get("email", "")
        pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        is_valid = bool(re.match(pattern, email))
        return {"valid": is_valid, "email": email, "method": "regex"}

    def _calculate_tax(self, data: dict) -> dict:
        amount = data.get("amount", 0)
        rate = data.get("tax_rate", 0.08)
        tax = round(amount * rate, 2)
        return {"subtotal": amount, "tax": tax, "total": round(amount + tax, 2), "method": "arithmetic"}

    def _format_currency(self, data: dict) -> dict:
        amount = data.get("amount", 0)
        currency = data.get("currency", "USD")
        symbols = {"USD": "$", "EUR": "€", "GBP": "£", "JPY": "¥"}
        formatted = f"{symbols.get(currency, currency)}{amount:,.2f}"
        return {"formatted": formatted, "method": "string_format"}

    def _check_business_hours(self, data: dict) -> dict:
        hour = data.get("hour", 12)
        is_open = 9 <= hour < 17
        return {"is_business_hours": is_open, "hour": hour, "method": "range_check"}

    def _route_by_keyword(self, data: dict) -> dict:
        text = data.get("text", "").lower()
        routes = {"billing": "billing_team", "technical": "engineering", "cancel": "retention"}
        for keyword, route in routes.items():
            if keyword in text:
                return {"route": route, "matched_keyword": keyword, "method": "keyword_match"}
        return {"route": "general", "matched_keyword": None, "method": "keyword_match_default"}

    # --- LLM handlers (simulated) ---
    def _classify_intent_llm(self, data: dict) -> dict:
        text = data.get("text", "")
        # In production: call OpenAI to classify ambiguous intent
        return {"intent": "inquiry", "confidence": 0.84, "text": text[:50], "method": "llm_classification"}

    def _generate_response_llm(self, data: dict) -> dict:
        query = data.get("query", "")
        return {"response": f"Generated response for: {query[:40]}...", "tokens": 150, "method": "llm_generation"}

    def _summarize_text_llm(self, data: dict) -> dict:
        text = data.get("text", "")
        return {"summary": f"Summary of {len(text)} chars of text", "compression": 0.3, "method": "llm_summarization"}

    def _extract_entities_llm(self, data: dict) -> dict:
        text = data.get("text", "")
        return {"entities": [{"text": "example", "type": "ORG"}], "count": 1, "method": "llm_ner"}

    def route(self, request_type: str, data: dict) -> RoutingDecision:
        """Decide whether to use deterministic code or LLM."""
        # Check deterministic handlers first (fast path)
        if request_type in self.deterministic_handlers:
            return RoutingDecision(
                path="deterministic",
                handler=request_type,
                confidence=1.0,
                reasoning=f"Exact match: '{request_type}' has a deterministic handler",
                cost_estimate=0.0,
            )

        # Check LLM handlers
        if request_type in self.llm_handlers:
            return RoutingDecision(
                path="llm",
                handler=request_type,
                confidence=0.9,
                reasoning=f"Requires LLM: '{request_type}' involves ambiguous/unstructured processing",
                cost_estimate=0.003,
            )

        # Unknown request — default to LLM for flexibility
        return RoutingDecision(
            path="llm",
            handler="generate_response",
            confidence=0.6,
            reasoning=f"Unknown request type '{request_type}' — falling back to LLM",
            cost_estimate=0.005,
        )

    def execute(self, request_type: str, data: dict) -> dict:
        """Route and execute a request through the hybrid system."""
        decision = self.route(request_type, data)

        # Execute through appropriate handler
        if decision.path == "deterministic":
            handler_fn = self.deterministic_handlers[decision.handler]
        else:
            handler_fn = self.llm_handlers.get(decision.handler, self._generate_response_llm)

        result = handler_fn(data)
        result["_routing"] = {"path": decision.path, "cost": decision.cost_estimate}

        self.routing_log.append({"type": request_type, "path": decision.path, "cost": decision.cost_estimate})
        return result

    def get_stats(self) -> dict:
        """Get routing statistics showing deterministic vs LLM usage."""
        total = len(self.routing_log)
        if total == 0:
            return {"total": 0}

        deterministic = sum(1 for r in self.routing_log if r["path"] == "deterministic")
        llm_calls = total - deterministic
        total_cost = sum(r["cost"] for r in self.routing_log)

        return {
            "total_requests": total,
            "deterministic": deterministic,
            "llm_calls": llm_calls,
            "deterministic_pct": round(deterministic / total * 100, 1),
            "total_cost": round(total_cost, 4),
            "avg_cost_per_request": round(total_cost / total, 5),
        }


# Demonstration
router = HybridRouter()

print("=== Hybrid Architecture: Deterministic + LLM ===\n")

requests = [
    ("validate_email", {"email": "user@example.com"}),
    ("calculate_tax", {"amount": 99.99, "tax_rate": 0.0825}),
    ("classify_intent", {"text": "I'm frustrated with the service and want to talk to someone"}),
    ("format_currency", {"amount": 1234567.89, "currency": "USD"}),
    ("generate_response", {"query": "Explain your refund policy for enterprise customers"}),
    ("check_business_hours", {"hour": 14}),
    ("extract_entities", {"text": "Meeting with Google and Microsoft next Tuesday in Seattle"}),
    ("route_by_keyword", {"text": "I need help with my billing statement"}),
    ("summarize_text", {"text": "A" * 5000}),
    ("validate_email", {"email": "invalid-email"}),
]

for req_type, data in requests:
    result = router.execute(req_type, data)
    routing = result.pop("_routing")
    path_icon = "⚡" if routing["path"] == "deterministic" else "🧠"
    cost_str = f"${routing['cost']:.4f}" if routing["cost"] > 0 else "FREE"
    print(f"  {path_icon} [{routing['path']:>13}] {req_type:<22} cost={cost_str:>7} → {json.dumps(result)[:60]}")

print("\n--- Routing Statistics ---")
stats = router.get_stats()
for key, value in stats.items():
    print(f"  {key}: {value}")

Fallback Chain Pattern

Production systems need graceful degradation. When the primary LLM is unavailable, slow, or returns low-confidence results, fallback chains provide progressively simpler alternatives. The chain moves from most capable (and most expensive/fragile) to most reliable (and cheapest/simplest), ensuring the system always returns something useful.

import os
import json
import time
from dataclasses import dataclass, field
from typing import Optional, Callable
from openai import OpenAI


@dataclass
class FallbackResult:
    """Result from a fallback chain execution."""
    response: str
    provider: str
    tier: int  # 1 = primary, 2 = secondary, etc.
    latency_ms: float
    confidence: float
    fallback_reason: Optional[str] = None


class FallbackChain:
    """Multi-tier fallback chain for resilient AI responses.

    Tier 1: Primary LLM (GPT-4o) — best quality, highest cost
    Tier 2: Secondary LLM (GPT-4o-mini) — good quality, lower cost
    Tier 3: Cached responses — instant, free, limited coverage
    Tier 4: Rule-based fallback — always available, basic quality
    """

    def __init__(self):
        self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-demo-key"))
        self.cache: dict[str, str] = {
            "what are your business hours": "Our business hours are Monday-Friday, 9 AM - 5 PM EST.",
            "how do i reset my password": "Visit account settings and click 'Reset Password'.",
            "what is your refund policy": "We offer full refunds within 30 days of purchase.",
        }
        self.execution_log: list[FallbackResult] = []

    def _try_primary(self, query: str) -> Optional[FallbackResult]:
        """Tier 1: Primary model (GPT-4o) — simulated with occasional failures."""
        start = time.time()
        # Simulate 20% failure rate for demonstration
        if hash(query) % 5 == 0:
            return None  # Simulated timeout/error

        latency = 0.8 + (hash(query) % 40) / 100  # 0.8-1.2s simulated
        return FallbackResult(
            response=f"[GPT-4o] Comprehensive answer to: {query[:50]}",
            provider="gpt-4o",
            tier=1,
            latency_ms=latency * 1000,
            confidence=0.92,
        )

    def _try_secondary(self, query: str) -> Optional[FallbackResult]:
        """Tier 2: Secondary model (GPT-4o-mini) — faster, cheaper, slightly less capable."""
        start = time.time()
        # Simulate 10% failure rate
        if hash(query + "secondary") % 10 == 0:
            return None

        latency = 0.3 + (hash(query) % 20) / 100  # 0.3-0.5s simulated
        return FallbackResult(
            response=f"[GPT-4o-mini] Good answer to: {query[:50]}",
            provider="gpt-4o-mini",
            tier=2,
            latency_ms=latency * 1000,
            confidence=0.78,
        )

    def _try_cache(self, query: str) -> Optional[FallbackResult]:
        """Tier 3: Cached responses — instant but limited coverage."""
        query_normalized = query.lower().strip().rstrip("?.")
        if query_normalized in self.cache:
            return FallbackResult(
                response=self.cache[query_normalized],
                provider="cache",
                tier=3,
                latency_ms=1.0,
                confidence=0.95,  # High confidence because it's a known answer
            )
        return None

    def _try_rules(self, query: str) -> FallbackResult:
        """Tier 4: Rule-based fallback — always succeeds with a generic response."""
        # Simple keyword-based routing to canned responses
        query_lower = query.lower()
        if "price" in query_lower or "cost" in query_lower:
            response = "For pricing information, please visit our pricing page or contact sales."
        elif "help" in query_lower or "support" in query_lower:
            response = "I'd be happy to help. Could you provide more details about your issue?"
        else:
            response = "Thank you for your question. Let me connect you with a specialist who can help."

        return FallbackResult(
            response=response,
            provider="rules",
            tier=4,
            latency_ms=0.5,
            confidence=0.4,
        )

    def execute(self, query: str) -> FallbackResult:
        """Execute the fallback chain, returning the first successful result."""
        tiers = [
            ("primary", self._try_primary),
            ("secondary", self._try_secondary),
            ("cache", self._try_cache),
            ("rules", self._try_rules),
        ]

        for tier_name, handler in tiers:
            result = handler(query)
            if result is not None:
                if result.tier > 1:
                    result.fallback_reason = f"Tier {result.tier - 1} unavailable"
                self.execution_log.append(result)
                return result

        # Should never reach here (rules always succeeds)
        return self._try_rules(query)

    def get_stats(self) -> dict:
        """Get fallback chain execution statistics."""
        total = len(self.execution_log)
        if total == 0:
            return {"total": 0}

        tier_counts = {}
        for r in self.execution_log:
            tier_counts[r.provider] = tier_counts.get(r.provider, 0) + 1

        avg_latency = sum(r.latency_ms for r in self.execution_log) / total
        avg_confidence = sum(r.confidence for r in self.execution_log) / total
        primary_rate = tier_counts.get("gpt-4o", 0) / total * 100

        return {
            "total_requests": total,
            "tier_distribution": tier_counts,
            "primary_success_rate": f"{primary_rate:.1f}%",
            "avg_latency_ms": round(avg_latency, 1),
            "avg_confidence": round(avg_confidence, 3),
        }


# Demonstration
chain = FallbackChain()

print("=== Fallback Chain: Graceful Degradation ===\n")

queries = [
    "What are your business hours",
    "Explain the differences between your enterprise plans",
    "How do I reset my password",
    "Can you help me with a complex integration issue?",
    "What is the price of your premium tier?",
    "Tell me about your security certifications and compliance",
    "How do I configure SSO for my organization?",
    "What is your refund policy",
]

for q in queries:
    result = chain.execute(q)
    tier_icon = {1: "🟢", 2: "🟡", 3: "💾", 4: "📋"}[result.tier]
    fallback = f" (fallback: {result.fallback_reason})" if result.fallback_reason else ""
    print(f"  {tier_icon} Tier {result.tier} [{result.provider:<11}] {result.latency_ms:>6.1f}ms conf={result.confidence:.2f} | {q[:45]}{fallback}")

print("\n--- Chain Statistics ---")
stats = chain.get_stats()
for key, value in stats.items():
    print(f"  {key}: {value}")

Architecture Selection Guide

ArchitectureBest ForAvoid WhenComplexityLatency
Multi-AgentComplex tasks needing diverse expertiseSimple single-domain queriesHighHigh (sequential)
Agentic RAGKnowledge-intensive Q&A with multiple sourcesWell-structured FAQ with known answersMediumMedium
Plan-and-ExecuteMulti-step tasks with verification needsSimple one-shot generationsHighHigh
Human-in-the-LoopHigh-stakes decisions, regulated domainsHigh-volume low-risk automationMediumVariable (human wait)
Autonomous AgentOpen-ended research, exploration tasksTime-critical or budget-constrained workVery HighVery High
Event-DrivenLong-running workflows, async processingReal-time interactive responsesHighLow (async)
HybridProduction systems optimizing cost/qualityPrototypes where simplicity matters moreMediumLow (deterministic fast path)
Key Takeaways: Advanced architectures are compositions of simpler patterns. Start with the simplest architecture that solves your problem, then evolve: (1) Multi-agent systems shine when tasks require diverse expertise but add coordination overhead; (2) Agentic RAG transforms retrieval from a static lookup into an intelligent, adaptive process; (3) Plan-and-execute separates thinking from doing, enabling cheaper execution with verified quality; (4) Human-in-the-loop keeps humans in control of high-stakes decisions while automating the routine; (5) Autonomous agents pursue goals independently but MUST have safety boundaries; (6) Event-driven architectures decouple triggers from processing for long-running workflows; (7) Hybrid is the most practical — default to deterministic code and use LLMs only where they add unique value. The best production systems combine multiple patterns: a hybrid router that delegates to a multi-agent system with human-in-the-loop gates and event-driven execution for long-running tasks.

Next in the Series

In Part 19: Migration & Legacy Integration, we’ll cover migrating from older API versions to the Responses API, wrapping legacy systems with AI layers, incremental adoption strategies, and maintaining backward compatibility during the transition.