Back to Technology

AI Application Development Mastery Part 8: LangGraph — Stateful Agent Workflows

April 1, 2026 Wasil Zafar 44 min read

Graduate from simple agent loops to graph-based architectures that handle real-world complexity. Master LangGraph's StateGraph, conditional routing, persistent state, subgraphs, and human-in-the-loop patterns — and understand exactly when to choose LangGraph over LangChain agents, n8n, or Zapier for workflow orchestration.

Table of Contents

  1. Why LangGraph
  2. StateGraph Fundamentals
  3. State Management
  4. Workflow Orchestration Comparison
  5. Persistence
  6. Subgraphs
  7. Human-in-the-Loop
  8. Exercises & Self-Assessment
  9. LangGraph Workflow Generator
  10. Conclusion & Next Steps

Introduction: Beyond the Agent Loop

Series Overview: This is Part 8 of our 18-part AI Application Development Mastery series. LangGraph represents a fundamental shift from simple agent loops to graph-based architectures that can handle complex, stateful, multi-step workflows with conditional branching, cycles, persistence, and human oversight.

AI Application Development Mastery

Your 20-step learning path • Currently on Step 8
1
Foundations & Evolution of AI Apps
Pre-LLM era, transformers, LLM revolution
2
LLM Fundamentals for Developers
Tokens, context windows, sampling, API patterns
3
Prompt Engineering Mastery
Zero/few-shot, CoT, ReAct, structured outputs
4
LangChain Core Concepts
Chains, prompts, LLMs, tools, LCEL
5
Retrieval-Augmented Generation (RAG)
Embeddings, vector DBs, retrievers, RAG pipelines
6
Memory & Context Engineering
Buffer/summary/vector memory, chunking, re-ranking
7
Agents — Core of Modern AI Apps
ReAct, tool-calling, planner-executor agents
8
LangGraph — Stateful Agent Workflows
Nodes, edges, state, graph execution, cycles
You Are Here
9
Deep Agents & Autonomous Systems
Multi-step reasoning, self-reflection, planning
10
Multi-Agent Systems
Supervisor, swarm, debate, role-based collaboration
11
AI Application Design Patterns
RAG, chat+memory, workflow automation, agent loops
12
Ecosystem & Frameworks
LlamaIndex, Haystack, HuggingFace, vLLM
13
MCP Foundations & Architecture
Protocol design, Host/Client/Server, primitives, security
14
MCP in Production
Building servers, integrations, scaling, agent systems
15
Evaluation & LLMOps
Prompt eval, tracing, LangSmith, experiment tracking
16
Production AI Systems
APIs, queues, caching, streaming, scaling
17
Safety, Guardrails & Reliability
Input filtering, hallucination mitigation, prompt injection
18
Advanced Topics
Fine-tuning, tool learning, hybrid LLM+symbolic
19
Building Real AI Applications
Chatbot, document QA, coding assistant, full-stack
20
Future of AI Applications
Autonomous agents, self-improving, multi-modal, AI OS

In Part 7, we built agents using AgentExecutor — a powerful but limited runtime. AgentExecutor gives you a single loop: the LLM thinks, calls a tool, observes, and repeats. But real-world AI workflows require much more: branching logic, parallel execution, persistent state across sessions, human approval gates, and nested sub-workflows.

LangGraph was created to solve these limitations. It models agent workflows as directed graphs where nodes are functions, edges define transitions, and state flows through the graph. This gives you explicit, debuggable control over every aspect of your agent's behavior — while still allowing the LLM to make dynamic decisions at each node.

Key Insight: Think of LangGraph as a state machine for AI agents. Each node is a state, each edge is a transition, and the state object is the memory that flows between nodes. This is the same pattern used in game engines, embedded systems, and workflow automation — but applied to LLM-powered applications.

1. Why LangGraph

LangChain's AgentExecutor was a great starting point for building autonomous agents, but production applications quickly expose its limitations. LangGraph was created specifically to address these gaps — providing a graph-based framework where you model workflows as nodes (functions), edges (transitions), and state (shared data). This section explores what breaks in AgentExecutor and how the graph paradigm solves it.

1.1 AgentExecutor Limitations

AgentExecutor implements a single think → act → observe loop that repeats until the LLM decides it has a final answer. This works well for simple tool-calling agents, but falls apart when you need branching logic, parallel execution, or durable state. The code below enumerates the six key limitations that motivate the transition to LangGraph.

# What AgentExecutor CANNOT do (motivating LangGraph):

# 1. Conditional branching — "If billing query, route to billing; else route to tech"
#    AgentExecutor has ONE loop. It cannot branch into different paths.

# 2. Parallel execution — "Search web AND query database simultaneously"
#    AgentExecutor processes tools sequentially within each iteration.

# 3. Persistent state — "Remember user preferences across sessions"
#    AgentExecutor state lives in memory; dies when the process ends.

# 4. Human-in-the-loop — "Pause before sending email, ask for approval"
#    AgentExecutor has no built-in interrupt/resume mechanism.

# 5. Complex cycles — "Research -> Write -> Review -> if bad, loop back to Research"
#    AgentExecutor has a single think-act-observe cycle, not arbitrary loops.

# 6. Multi-agent coordination — "Agent A hands off to Agent B when done"
#    AgentExecutor manages a single agent only.

# Bottom line: AgentExecutor is great for prototypes and simple agents.
# LangGraph is what you need for production workflows.

1.2 The Graph Paradigm

LangGraph replaces AgentExecutor's single loop with a directed graph where each node is a Python function and each edge defines when and where to transition next. State — a TypedDict — flows through the graph and is transformed by every node it passes through. Conditional edges let you branch dynamically (e.g., route billing vs. technical queries to different handlers), while cycles enable revision loops that AgentExecutor cannot express.

The following example builds a customer-support ticket router from scratch. A classify node inspects the query and sets a category, conditional_edges route to the appropriate handler, and a respond node formats the final resolution.

Ticket Routing Graph
flowchart TD
    S((START)) --> classify[classify]
    classify -->|billing| billing[handle_billing]
    classify -->|technical| technical[handle_technical]
    billing --> respond[respond]
    technical --> respond
    respond --> E((END))

    style S fill:#3B9797,stroke:#3B9797,color:#fff
    style E fill:#132440,stroke:#132440,color:#fff
    style classify fill:#e8f4f4,stroke:#3B9797,color:#132440
    style billing fill:#fff5f5,stroke:#BF092F,color:#132440
    style technical fill:#f0f4f8,stroke:#16476A,color:#132440
    style respond fill:#e8f4f4,stroke:#3B9797,color:#132440
                        
# pip install langgraph
# LangGraph models workflows as directed graphs
# Each box is a NODE (a Python function)
# Each arrow is an EDGE (a transition)
# State flows through the graph, modified by each node

from langgraph.graph import StateGraph, START, END
from typing import TypedDict

# Step 1: Define the state schema — what data flows through the graph
class TicketState(TypedDict):
    query: str        # The customer's original question
    category: str     # "billing" or "technical" (set by classify node)
    resolution: str   # The handler's resolution (set by handler node)
    response: str     # Final formatted response (set by respond node)

# Step 2: Define node functions (each takes state, returns partial update)
def classify(state: TicketState) -> dict:
    """Route the ticket to the right department."""
    query = state["query"].lower()
    if any(word in query for word in ["bill", "charge", "payment", "invoice"]):
        return {"category": "billing"}
    return {"category": "technical"}

def handle_billing(state: TicketState) -> dict:
    """Billing department resolves payment issues."""
    return {"resolution": f"Billing team reviewing: {state['query']}"}

def handle_technical(state: TicketState) -> dict:
    """Technical team resolves product issues."""
    return {"resolution": f"Tech team investigating: {state['query']}"}

def respond(state: TicketState) -> dict:
    """Format the final customer response."""
    return {"response": f"[{state['category'].upper()}] {state['resolution']}"}

# Step 3: Build the graph
graph = StateGraph(TicketState)
graph.add_node("classify", classify)
graph.add_node("billing", handle_billing)
graph.add_node("technical", handle_technical)
graph.add_node("respond", respond)

# Edges define transitions between nodes
graph.add_edge(START, "classify")
graph.add_conditional_edges(
    "classify",
    lambda state: state["category"],  # Router function
    {"billing": "billing", "technical": "technical"}
)
# Both handlers converge into the respond node
graph.add_edge("billing", "respond")
graph.add_edge("technical", "respond")
graph.add_edge("respond", END)

# Step 4: Compile and run
app = graph.compile()
result = app.invoke({"query": "I was charged twice on my credit card"})
print(f"Category: {result['category']}")
print(f"Resolution: {result['resolution']}")
print(f"Response: {result['response']}")
Architecture Pattern

When to Migrate from AgentExecutor to LangGraph

Start with AgentExecutor for rapid prototyping, then migrate to LangGraph when you need any of these capabilities:

  • Conditional routing — Different paths based on classification, risk level, or data type
  • Revision cycles — Write → Review → if quality low, loop back to Write
  • Persistent state — Conversations that survive server restarts or span multiple sessions
  • Human gates — Approval required before sending emails, deploying code, or making purchases
  • Multi-agent handoff — Specialist agents that hand off to each other based on the task

The migration path is well-defined: your existing tools, prompts, and LLM configuration remain unchanged. You simply restructure the control flow from a single loop to a graph.

Migration Path Architecture Decision Progressive Complexity

2. StateGraph Fundamentals

LangGraph's StateGraph is built on three core concepts: state (the data flowing through the graph), nodes (functions that transform state), and edges (transitions between nodes). Understanding each deeply is essential for building reliable workflows.

2.1 State Definition

In LangGraph, state is the data structure that flows through every node in the graph. You define it as a Python TypedDict, where each key represents a channel — a named slot that nodes can read from and write to. By default, when a node returns a value for a channel, it overwrites the existing value. But LangGraph's killer feature is reducers: annotate a channel with a function like operator.add and new values are appended instead of replaced. This distinction — overwrite vs. reduce — is fundamental to building correct state machines.

# pip install langgraph langchain-core
from typing import TypedDict, Annotated, Sequence
from operator import add
from langchain_core.messages import BaseMessage

# Basic state — each key is a "channel"
class BasicState(TypedDict):
    input: str           # Overwritten on each update (no reducer)
    output: str          # Overwritten on each update
    step_count: int      # Overwritten on each update

# State with reducers — control how updates are merged
class ChatState(TypedDict):
    messages: Annotated[list[BaseMessage], add]  # APPEND new messages
    current_agent: str                            # OVERWRITE (no reducer)
    iteration: int                                # OVERWRITE (no reducer)

# The 'add' reducer means:
# If node returns {"messages": [new_msg]},
# state becomes {"messages": [old_msg1, old_msg2, new_msg]}
# Without a reducer, it would REPLACE the entire list.

# Advanced: Custom reducer function
def keep_last_n(existing: list, new: list, n: int = 20) -> list:
    """Custom reducer: append new items but keep only last N."""
    combined = existing + new
    return combined[-n:]

# State with custom reducer for bounded message history
class ManagedState(TypedDict):
    messages: Annotated[list, lambda a, b: keep_last_n(a, b, 20)]
    tool_history: Annotated[list, add]
    final_answer: str

2.2 Nodes & Edges

Nodes are Python functions that accept state and return a partial update dictionary. Edges define the execution order — which node runs after which. Together, they form a directed acyclic graph (DAG) or a cyclic graph if revision loops are needed. The example below builds a four-node research → write → review → finalize pipeline where each node calls an LLM and passes its output downstream through state.

Research-Write-Review Pipeline
flowchart LR
    S((START)) --> research[research]
    research --> write[write_draft]
    write --> review[review_draft]
    review --> finalize[finalize]
    finalize --> E((END))

    style S fill:#3B9797,stroke:#3B9797,color:#fff
    style E fill:#132440,stroke:#132440,color:#fff
    style research fill:#e8f4f4,stroke:#3B9797,color:#132440
    style write fill:#f0f4f8,stroke:#16476A,color:#132440
    style review fill:#fff5f5,stroke:#BF092F,color:#132440
    style finalize fill:#e8f4f4,stroke:#3B9797,color:#132440
                        
# pip install langgraph langchain-openai python-dotenv
from typing import TypedDict, Annotated
from operator import add
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv

load_dotenv()  # Load OPENAI_API_KEY from .env file

# State schema for a research-write-review workflow
class ResearchState(TypedDict):
    topic: str
    search_results: Annotated[list, add]  # Append reducer
    draft: str
    review: str
    final_article: str

llm = ChatOpenAI(model="gpt-4o", temperature=0.3)

def research(state: ResearchState) -> dict:
    """Gather information on the topic."""
    print(f"[Node: research] Researching '{state['topic']}'...")
    response = llm.invoke(f"Research key facts about: {state['topic']}. List 5 key points.")
    print(f"[Node: research] Done — collected {len(response.content)} chars of notes")
    return {"search_results": [response.content]}

def write_draft(state: ResearchState) -> dict:
    """Write an article draft from research."""
    print(f"[Node: write] Writing draft from {len(state['search_results'])} research notes...")
    research_text = "\n".join(state["search_results"])
    response = llm.invoke(
        f"Write a 300-word article about {state['topic']}.\n"
        f"Use these research notes:\n{research_text}"
    )
    print(f"[Node: write] Done — draft is {len(response.content)} chars")
    return {"draft": response.content}

def review_draft(state: ResearchState) -> dict:
    """Review and critique the draft."""
    print("[Node: review] Reviewing draft for improvements...")
    response = llm.invoke(
        f"Review this article draft. List specific improvements:\n\n{state['draft']}"
    )
    print(f"[Node: review] Done — {len(response.content)} chars of feedback")
    return {"review": response.content}

def finalize(state: ResearchState) -> dict:
    """Apply feedback and produce final article."""
    print("[Node: finalize] Applying review feedback to produce final article...")
    response = llm.invoke(
        f"Improve this draft based on feedback.\n\n"
        f"Draft:\n{state['draft']}\n\nFeedback:\n{state['review']}"
    )
    print(f"[Node: finalize] Done — final article is {len(response.content)} chars")
    return {"final_article": response.content}

# Build the graph — linear: research → write → review → finalize
graph = StateGraph(ResearchState)
graph.add_node("research", research)
graph.add_node("write", write_draft)
graph.add_node("review", review_draft)
graph.add_node("finalize", finalize)

graph.add_edge(START, "research")
graph.add_edge("research", "write")
graph.add_edge("write", "review")
graph.add_edge("review", "finalize")
graph.add_edge("finalize", END)

app = graph.compile()
result = app.invoke({"topic": "The impact of LLMs on software engineering"})

# Show what each node produced
print("=== Research Notes ===")
print(result["search_results"][0][:200] + "...\n")
print("=== Draft (first 200 chars) ===")
print(result["draft"][:200] + "...\n")
print("=== Review Feedback (first 200 chars) ===")
print(result["review"][:200] + "...\n")
print("=== Final Article (first 300 chars) ===")
print(result["final_article"][:300] + "...")

2.3 Conditional Edges

Conditional edges are what make LangGraph strictly more powerful than linear chains. Instead of hardcoding the next node, you provide a routing function that inspects the current state and returns the name of the next node to execute. This enables dynamic branching: a classifier node can route factual questions to a search path and conversational questions directly to an answer node, all within the same compiled graph.

add_conditional_edges(source, router_fn, path_map) takes the source node, a function that returns a string key, and a dictionary mapping those keys to destination nodes. The graph below routes questions through either a web search path or a direct-answer path based on the classifier's decision.

Q&A Routing Graph
flowchart TD
    S((START)) --> classify[classify_question]
    classify -->|needs_search=true| search[search_web]
    classify -->|needs_search=false| direct[direct_answer]
    search --> answer[answer_with_context]
    answer --> E((END))
    direct --> E

    style S fill:#3B9797,stroke:#3B9797,color:#fff
    style E fill:#132440,stroke:#132440,color:#fff
    style classify fill:#e8f4f4,stroke:#3B9797,color:#132440
    style search fill:#f0f4f8,stroke:#16476A,color:#132440
    style direct fill:#fff5f5,stroke:#BF092F,color:#132440
    style answer fill:#e8f4f4,stroke:#3B9797,color:#132440
                        
# Conditional edges enable dynamic routing — the graph equivalent of if/else
# Assumes 'llm' is defined above (e.g., ChatOpenAI)
import json
from typing import TypedDict
from langgraph.graph import StateGraph, START, END

class QAState(TypedDict):
    question: str
    category: str
    needs_search: bool
    search_results: str
    answer: str

def classify_question(state: QAState) -> dict:
    """Classify the question and determine if search is needed."""
    response = llm.invoke(
        f"Classify this question:\n'{state['question']}'\n\n"
        f"Return JSON: {{\"category\": \"factual|opinion|math\", \"needs_search\": true/false}}"
    )
    data = json.loads(response.content)
    return {"category": data["category"], "needs_search": data["needs_search"]}

def search_web(state: QAState) -> dict:
    return {"search_results": f"Search results for: {state['question']}"}

def direct_answer(state: QAState) -> dict:
    response = llm.invoke(f"Answer: {state['question']}")
    return {"answer": response.content}

def answer_with_context(state: QAState) -> dict:
    response = llm.invoke(
        f"Answer using context:\n{state['search_results']}\n\nQuestion: {state['question']}"
    )
    return {"answer": response.content}

# Router function — returns the name of the next node
def route_question(state: QAState) -> str:
    if state["needs_search"]:
        return "search"
    return "direct_answer"

graph = StateGraph(QAState)
graph.add_node("classify", classify_question)
graph.add_node("search", search_web)
graph.add_node("direct_answer", direct_answer)
graph.add_node("answer_with_context", answer_with_context)

graph.add_edge(START, "classify")
graph.add_conditional_edges("classify", route_question, {
    "search": "search", "direct_answer": "direct_answer"
})
graph.add_edge("search", "answer_with_context")
graph.add_edge("direct_answer", END)
graph.add_edge("answer_with_context", END)

app = graph.compile()
Key Insight: Conditional edges are the most powerful feature of LangGraph. They enable patterns that are impossible with AgentExecutor: routing to different specialist agents based on classification, looping back to retry failed steps, escalating to human review when confidence is low, and short-circuiting when a fast path is available. Every production LangGraph workflow uses conditional edges extensively.
ReAct Agent Loop
flowchart TD
    S((START)) --> agent[agent]
    agent -->|has tool_calls| tools[tools]
    tools --> agent
    agent -->|no tool_calls| E((END))

    style S fill:#3B9797,stroke:#3B9797,color:#fff
    style E fill:#132440,stroke:#132440,color:#fff
    style agent fill:#e8f4f4,stroke:#3B9797,color:#132440
    style tools fill:#f0f4f8,stroke:#16476A,color:#132440
                        
# pip install langgraph langchain-openai langchain-community tavily-python
# Full example: LangGraph-based ReAct agent with tool calling
# This replaces AgentExecutor with explicit graph control

import os
from langchain_core.tools import tool
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode
from langchain_openai import ChatOpenAI
from langchain_core.messages import BaseMessage, HumanMessage
from typing import TypedDict, Annotated
from operator import add

# Define tools for this example
@tool
def search_tool(query: str) -> str:
    """Search the web for information."""
    return f"Search results for: {query}"

@tool
def calculator(expression: str) -> str:
    """Evaluate a math expression."""
    import math
    safe = {k: v for k, v in math.__dict__.items() if not k.startswith("__")}
    return str(eval(expression, {"__builtins__": {}}, safe))

# State: just a list of messages with an append reducer
class AgentState(TypedDict):
    messages: Annotated[list[BaseMessage], add]

# Create LLM with tools bound (requires OPENAI_API_KEY env var)
llm = ChatOpenAI(model="gpt-4o", temperature=0)
tools = [search_tool, calculator]
llm_with_tools = llm.bind_tools(tools)

# Agent node: calls the LLM
def agent_node(state: AgentState) -> dict:
    response = llm_with_tools.invoke(state["messages"])
    return {"messages": [response]}

# Tool node: executes tool calls from the agent's response
tool_node = ToolNode(tools)

# Router: decide whether to call tools or finish
def should_use_tools(state: AgentState) -> str:
    last_message = state["messages"][-1]
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        return "tools"
    return "end"

# Build the agent graph
graph = StateGraph(AgentState)
graph.add_node("agent", agent_node)
graph.add_node("tools", tool_node)

graph.add_edge(START, "agent")
graph.add_conditional_edges("agent", should_use_tools, {
    "tools": "tools",   # Agent wants to use tools -> execute them
    "end": END           # Agent has final answer -> finish
})
graph.add_edge("tools", "agent")  # After tools, back to agent for next decision

react_graph = graph.compile()

# This graph implements the exact same think-act-observe loop
# as AgentExecutor, but with full visibility and control.
result = react_graph.invoke({
    "messages": [HumanMessage(content="What is 25 * 47?")]
})
print(result["messages"][-1].content)

3. State Management

State management is the backbone of LangGraph. Every graph has a TypedDict schema that defines the data channels flowing through nodes. How those channels are updated — overwritten, appended, or transformed by a custom function — is controlled by reducers. Getting reducers right is essential: the wrong strategy leads to lost data (overwriting a message list) or unbounded growth (appending without limits). This section deep-dives into the three reducer patterns and shows how to compose complex state schemas for real-world agents.

3.1 Reducers & State Updates

A reducer is a function attached to a state channel via Annotated[type, reducer_fn]. It controls how a new value from a node is merged with the existing value. LangGraph supports three strategies:

  • Replace (no reducer) — The node's returned value completely overwrites the channel. Best for scalar fields like status or counter.
  • Append (operator.add) — New items are concatenated to the existing list. Essential for message histories where every node contributes messages.
  • Custom function — A user-defined function receives (old_value, new_value) and returns the merged result. Useful for running aggregations like max, mean, or bounded queues.

The code below demonstrates all three strategies as independent, runnable examples.

Three Reducer Strategies
flowchart LR
    subgraph replace["Replace — No Reducer"]
        direction LR
        R1["step_a\ncounter=5"] --> R2["step_b\ncounter=10"]
    end

    subgraph append["Append — add Reducer"]
        direction LR
        A1["greet\n+ hello"] --> A2["respond\n+ world"]
    end

    subgraph custom["Custom — running_max"]
        direction LR
        C1["round1\n72.5"] --> C2["round2\n91.0"] --> C3["round3\n85.3"]
    end

    style R1 fill:#f0f4f8,stroke:#16476A,color:#132440
    style R2 fill:#f0f4f8,stroke:#16476A,color:#132440
    style A1 fill:#e8f4f4,stroke:#3B9797,color:#132440
    style A2 fill:#e8f4f4,stroke:#3B9797,color:#132440
    style C1 fill:#fff5f5,stroke:#BF092F,color:#132440
    style C2 fill:#fff5f5,stroke:#BF092F,color:#132440
    style C3 fill:#fff5f5,stroke:#BF092F,color:#132440
                        
# Reducers control how state updates are merged in LangGraph
from typing import TypedDict, Annotated
from operator import add
from langgraph.graph import StateGraph, START, END

# --- Demo 1: WITHOUT reducer — values are REPLACED ---
class ReplaceState(TypedDict):
    counter: int  # Each update replaces the value

def step_a(state: ReplaceState) -> dict:
    return {"counter": 5}

def step_b(state: ReplaceState) -> dict:
    return {"counter": 10}

graph = StateGraph(ReplaceState)
graph.add_node("step_a", step_a)
graph.add_node("step_b", step_b)
graph.add_edge(START, "step_a")
graph.add_edge("step_a", "step_b")
graph.add_edge("step_b", END)

app = graph.compile()
result = app.invoke({"counter": 0})
print("=== Replace (no reducer) ===")
print(f"counter = {result['counter']}")  # 10 — step_b replaced step_a's value
print()

# --- Demo 2: WITH 'add' reducer — lists are APPENDED ---
class AppendState(TypedDict):
    messages: Annotated[list, add]

def greet(state: AppendState) -> dict:
    return {"messages": ["hello"]}

def respond(state: AppendState) -> dict:
    return {"messages": ["world"]}

graph2 = StateGraph(AppendState)
graph2.add_node("greet", greet)
graph2.add_node("respond", respond)
graph2.add_edge(START, "greet")
graph2.add_edge("greet", "respond")
graph2.add_edge("respond", END)

app2 = graph2.compile()
result2 = app2.invoke({"messages": []})
print("=== Append (add reducer) ===")
print(f"messages = {result2['messages']}")  # ["hello", "world"] — appended, not replaced
print()

# --- Demo 3: Custom reducer — running maximum ---
def running_max(current: float, new: float) -> float:
    return max(current, new) if current else new

class ScoredState(TypedDict):
    scores: Annotated[list, add]
    best_score: Annotated[float, running_max]

def score_round_1(state: ScoredState) -> dict:
    return {"scores": [72.5], "best_score": 72.5}

def score_round_2(state: ScoredState) -> dict:
    return {"scores": [91.0], "best_score": 91.0}

def score_round_3(state: ScoredState) -> dict:
    return {"scores": [85.3], "best_score": 85.3}

graph3 = StateGraph(ScoredState)
graph3.add_node("round1", score_round_1)
graph3.add_node("round2", score_round_2)
graph3.add_node("round3", score_round_3)
graph3.add_edge(START, "round1")
graph3.add_edge("round1", "round2")
graph3.add_edge("round2", "round3")
graph3.add_edge("round3", END)

app3 = graph3.compile()
result3 = app3.invoke({"scores": [], "best_score": 0.0})
print("=== Custom reducer (running max) ===")
print(f"All scores: {result3['scores']}")       # [72.5, 91.0, 85.3]
print(f"Best score: {result3['best_score']}")    # 91.0 — max across all rounds

3.2 Complex State Schemas

Real-world agents need more than a flat list of messages. A production coding agent, for example, tracks the current code, test results, review feedback, a revision counter, and the overall status. LangGraph handles this naturally: each field in the TypedDict gets its own reducer strategy. Lists of feedback items append via operator.add; scalar fields like iteration and status overwrite; and conditionals on iteration enforce maximum revision limits.

The example below builds a full plan → code → test → review → revise loop with a simulated quality gate. The review node routes back to revision when the iteration count is below the threshold, creating the cyclic graph that only LangGraph can express.

Coding Agent — Revision Loop
flowchart TD
    S((START)) --> plan[plan]
    plan --> code[code]
    code --> test[test]
    test --> review[review]
    review -->|LGTM or max reached| finalize[finalize]
    review -->|needs work| revise[revise]
    revise --> code
    finalize --> E((END))

    style S fill:#3B9797,stroke:#3B9797,color:#fff
    style E fill:#132440,stroke:#132440,color:#fff
    style plan fill:#e8f4f4,stroke:#3B9797,color:#132440
    style code fill:#f0f4f8,stroke:#16476A,color:#132440
    style test fill:#f0f4f8,stroke:#16476A,color:#132440
    style review fill:#fff5f5,stroke:#BF092F,color:#132440
    style revise fill:#fff0e0,stroke:#e67e22,color:#132440
    style finalize fill:#e8f9e8,stroke:#28a745,color:#132440
                        
# Production-grade state schema with conditional routing
from typing import TypedDict, Annotated, Optional
from operator import add
from langgraph.graph import StateGraph, START, END

# Production-grade state schema for a coding agent
class CodingAgentState(TypedDict):
    task_description: str
    plan: list[str]
    current_step: int
    code: str
    language: str
    test_results: str
    review_feedback: str
    revision_count: int
    max_revisions: int
    status: str   # "planning", "coding", "testing", "reviewing", "done"

# Conditional routing based on complex state
def should_revise(state: CodingAgentState) -> str:
    """Decide whether to revise the code or finalize."""
    if state["revision_count"] >= state["max_revisions"]:
        return "finalize"  # Hit revision limit
    if "LGTM" in state.get("review_feedback", ""):
        return "finalize"  # Reviewer approved
    return "revise"        # Need more work

# Simulated nodes for the coding agent cycle
def plan_task(state: CodingAgentState) -> dict:
    plan = ["Parse input", "Implement logic", "Add error handling"]
    print(f"[Plan] Created {len(plan)}-step plan")
    return {"plan": plan, "status": "coding"}

def write_code(state: CodingAgentState) -> dict:
    code = f"def solve():\n    # Step: {state['plan'][state['current_step']]}\n    return 42"
    print(f"[Code] Writing {state['language']} — revision #{state['revision_count']}")
    return {"code": code, "status": "testing"}

def run_tests(state: CodingAgentState) -> dict:
    passed = state["revision_count"] >= 1  # Passes on 2nd attempt
    result = "All 3 tests passed" if passed else "FAILED: test_edge_case"
    print(f"[Test] {'✅ Passed' if passed else '❌ Failed'}: {result}")
    return {"test_results": result, "status": "reviewing"}

def review_code(state: CodingAgentState) -> dict:
    feedback = "LGTM — clean and well-tested" if "passed" in state["test_results"] else "Needs revision: handle edge cases"
    print(f"[Review] {feedback}")
    return {"review_feedback": feedback, "status": "reviewing"}

def revise_code(state: CodingAgentState) -> dict:
    print(f"[Revise] Applying feedback, attempt {state['revision_count'] + 1}")
    return {"revision_count": state["revision_count"] + 1, "status": "coding"}

def finalize_code(state: CodingAgentState) -> dict:
    print(f"[Done] Finalized after {state['revision_count']} revision(s)")
    return {"status": "done"}

# Build the graph: plan → code → test → review → revise/finalize
graph = StateGraph(CodingAgentState)
graph.add_node("plan", plan_task)
graph.add_node("code", write_code)
graph.add_node("test", run_tests)
graph.add_node("review", review_code)
graph.add_node("revise", revise_code)
graph.add_node("finalize", finalize_code)

graph.add_edge(START, "plan")
graph.add_edge("plan", "code")
graph.add_edge("code", "test")
graph.add_edge("test", "review")
graph.add_conditional_edges("review", should_revise, {
    "revise": "revise", "finalize": "finalize"
})
graph.add_edge("revise", "code")  # Loop back: revise → code → test → review
graph.add_edge("finalize", END)

app = graph.compile()
result = app.invoke({
    "task_description": "Write a function to find prime numbers",
    "plan": [], "current_step": 0, "code": "", "language": "python",
    "test_results": "", "review_feedback": "",
    "revision_count": 0, "max_revisions": 3, "status": "planning"
})

print(f"\nFinal status: {result['status']}")
print(f"Total revisions: {result['revision_count']}")
print(f"Last review: {result['review_feedback']}")
Key Insight: The state schema is your agent's contract with the world. A well-designed state schema makes nodes composable and testable — each node reads specific fields, writes specific fields, and can be tested independently. A poorly designed schema leads to tight coupling where nodes depend on implicit state, making the graph fragile and hard to debug. Invest time in state schema design upfront.

4. Workflow Orchestration Comparison

One of the most important architectural decisions in AI application development is choosing the right orchestration approach. This section provides a comprehensive, critical comparison of four major options.

4.1 LangGraph vs LangChain Agents

Dimension LangChain AgentExecutor LangGraph
Control FlowSingle think-act-observe loopArbitrary graph: branches, cycles, parallel paths
StateIn-memory only, lost on restartPersistent (MemorySaver, PostgresSaver)
Human-in-the-LoopNot supported nativelyFirst-class: interrupt_before, interrupt_after
Multi-AgentSingle agent onlyMultiple agents as nodes in a graph
DebuggingVerbose logs, LangSmith tracesGraph visualization, step-through, state inspection
Learning CurveLow — simple APIMedium — requires graph/state machine thinking
Best ForSimple tool-using agents, prototypesProduction agents, complex workflows, long-running tasks

4.2 LangGraph vs n8n vs Zapier

Dimension LangGraph n8n Zapier
ParadigmCode-first stateful graphsVisual node-based workflow builderNo-code trigger-action automation
AI CapabilitiesFull LLM agent control, tool-calling, reasoning loops, custom promptsAI nodes for LLM calls, vector stores, chains; limited agent logicBasic AI actions (summarize, classify); no agent loops
FlexibilityUnlimited — any Python logic, any LLM, custom stateHigh for integrations (400+ connectors); moderate for custom AILow — predefined actions only, 6000+ app connectors
StateRich typed state with reducers, persistent across sessionsWorkflow variables, limited state between runsNo persistent state between Zap runs
Cycles & LoopsNative — agents can loop, retry, self-correctLoop nodes supported; less natural for AI reasoningNo loops — strictly linear trigger-action
Target AudiencePython developers building AI-native appsTechnical teams needing visual automation with AINon-technical users automating business processes
DeploymentSelf-managed Python app, or LangGraph CloudSelf-hosted or n8n CloudFully managed SaaS
PricingOpen source (MIT); Cloud for managed hostingFree (self-hosted); Cloud from $20/monthFree tier; paid from $20/month; expensive at scale
ExampleAI coding agent: plan, write, test, debug autonomouslyEmail → AI classify → route to team → auto-respondNew Slack msg → summarize with AI → post to Notion

4.3 Decision Framework

Choosing between LangGraph, LangChain agents, n8n, and Zapier is not about which tool is "best" — it depends on your requirements. The decision tree below encodes the key criteria: Does the workflow need agent reasoning (think-act-observe loops)? Does it need cycles, persistence, or human approval? Can the team write code, or do they need a visual builder? Run it with your own requirements to get a concrete recommendation.

# Decision helper function — run with different requirement dicts to get a recommendation
def choose_orchestration(requirements: dict) -> str:
    """Decision guide for LangGraph vs LangChain agents vs n8n vs Zapier."""

    needs_agent_reasoning = requirements.get("needs_agent_reasoning", False)
    needs_cycles = requirements.get("needs_cycles", False)
    needs_persistence = requirements.get("needs_persistence", False)
    needs_human_approval = requirements.get("needs_human_approval", False)
    team_can_code = requirements.get("team_can_code", True)
    needs_app_integrations = requirements.get("needs_app_integrations", False)
    complexity = requirements.get("complexity", "simple")

    # Non-technical team
    if not team_can_code:
        if needs_app_integrations and complexity == "simple":
            return "Zapier — No-code, 6000+ connectors, trigger-action"
        return "n8n — Visual builder with AI nodes, self-hostable"

    # No agent reasoning needed
    if not needs_agent_reasoning:
        if needs_app_integrations:
            return "n8n — Best for connecting AI to business tools visually"
        return "LangChain LCEL chains — Simple linear AI pipelines"

    # Simple agent (no cycles, no persistence)
    if complexity == "simple" and not needs_cycles and not needs_persistence:
        return "LangChain AgentExecutor — Simple agent loop, fast to build"

    # Complex agent workflows
    return (
        "LangGraph — Full workflow control:\n"
        f"  Cycles: {'Yes' if needs_cycles else 'No'}\n"
        f"  Persistence: {'Yes' if needs_persistence else 'No'}\n"
        f"  Human approval: {'Yes' if needs_human_approval else 'No'}"
    )

# Example usage
print(choose_orchestration({
    "needs_agent_reasoning": True,
    "needs_cycles": True,
    "needs_persistence": True,
    "needs_human_approval": True,
    "team_can_code": True,
    "complexity": "complex"
}))
Key Insight: These tools are complementary, not competing. A production architecture might use LangGraph for the AI agent core, n8n for connecting to business tools (Slack, email, CRM), and Zapier for simple automations that non-technical team members maintain. The right answer is usually a combination.
Critical Nuance: Do not choose n8n or Zapier for AI agent loops that require reasoning, self-correction, or dynamic tool selection. Their AI nodes can call LLMs, but they cannot implement the think-act-observe cycle that defines an agent. Conversely, do not use LangGraph just to connect Slack to Google Sheets — that is what n8n and Zapier excel at.

5. Persistence

Persistence allows LangGraph workflows to survive process restarts, enable long-running tasks, and maintain state across user sessions.

5.1 MemorySaver (Development)

MemorySaver is LangGraph's built-in, in-memory checkpointer designed for development and testing. Every time the graph executes a step, MemorySaver snapshots the entire state and stores it in a Python dictionary keyed by thread_id. This means you can invoke the graph multiple times with the same thread and the conversation history accumulates automatically — no database required.

The tradeoff is clear: MemorySaver is volatile. All state is lost when the Python process exits. It is not suitable for production, but it is the fastest way to prototype multi-turn conversations and test state persistence logic before introducing a database.

Jupyter Compatibility: MemorySaver uses asyncio internally. In Jupyter notebooks, this conflicts with the notebook's own event loop unless you use the async API (ainvoke, aget_state_history) and await at the top level. The example below uses this async pattern to avoid RuntimeWarning messages.
# pip install langgraph
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
from operator import add

# Simple chatbot state with message history
class ConversationState(TypedDict):
    messages: Annotated[list, add]  # Append reducer for messages
    user_id: str

def chatbot(state: ConversationState) -> dict:
    last_msg = state["messages"][-1]
    return {"messages": [f"Echo: {last_msg}"]}

graph = StateGraph(ConversationState)
graph.add_node("chatbot", chatbot)
graph.add_edge(START, "chatbot")
graph.add_edge("chatbot", END)

# Compile with MemorySaver
memory = MemorySaver()
app = graph.compile(checkpointer=memory)

# Use async to avoid event-loop warnings in Jupyter notebooks
async def main():
    config = {"configurable": {"thread_id": "user-123-session-1"}}

    # First message
    result = await app.ainvoke(
        {"messages": ["Hello!"], "user_id": "user-123"}, config=config
    )
    print(f"After msg 1: {result['messages']}")

    # Second message — state is preserved!
    result = await app.ainvoke(
        {"messages": ["How are you?"]}, config=config
    )
    print(f"After msg 2: {result['messages']}")

    # Retrieve full state history
    async for snapshot in app.aget_state_history(config):
        print(f"Messages: {len(snapshot.values['messages'])}")

await main()

5.2 PostgresSaver (Production)

PostgresSaver is LangGraph's production-grade checkpointer that persists graph state to a PostgreSQL database. Unlike MemorySaver, state survives process restarts, server crashes, and deployments — your agent can resume a conversation exactly where it left off, even days later.

Internally, PostgresSaver uses the psycopg3 driver and manages state across four tables:

  • checkpoints — Snapshots of the graph's state at each super-step, enabling replay, debugging, and resumption at any point in time.
  • checkpoint_blobs — Stores large serialized state data (node outputs, channel values) separately from metadata for performance.
  • checkpoint_migrations — Tracks schema versions so PostgresSaver can evolve with newer LangGraph releases without breaking existing data.
  • checkpoint_writes — Records intermediate writes during graph execution for fault tolerance. If some nodes complete but others fail mid-step, the partial progress is preserved.
Critical Setup: PostgresSaver requires the separate langgraph-checkpoint-postgres pip package — it is not included in the base langgraph install. Always use PostgresSaver.from_conn_string() instead of raw psycopg.connect(), because it sets autocommit=True which is required for the CREATE INDEX CONCURRENTLY statements in setup().
# pip install langgraph-checkpoint-postgres psycopg[binary]
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
from operator import add
import os

class ConversationState(TypedDict):
    messages: Annotated[list, add]
    user_id: str

def chatbot(state: ConversationState) -> dict:
    last_msg = state["messages"][-1]
    return {"messages": [f"Echo: {last_msg}"]}

def build_graph():
    graph = StateGraph(ConversationState)
    graph.add_node("chatbot", chatbot)
    graph.add_edge(START, "chatbot")
    graph.add_edge("chatbot", END)
    return graph

DB_URI = os.getenv("DATABASE_URL", "postgresql://user:pass@localhost:5432/langgraph_db")
THREAD = {"configurable": {"thread_id": "production-thread-42"}}

# ── Session 1: Initial conversation ──
print("SESSION 1 — Initial conversation")
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
    checkpointer.setup()  # Creates checkpoint tables on first run
    checkpointer.delete_thread("production-thread-42")  # Clean slate for demo
    app = build_graph().compile(checkpointer=checkpointer)

    result = app.invoke(
        {"messages": ["Hello, I need help with billing"], "user_id": "user-42"},
        config=THREAD,
    )
    print(f"Messages: {result['messages']}")

    result = app.invoke({"messages": ["I was charged twice"]}, config=THREAD)
    print(f"Messages: {result['messages']}")

# ── Connection closed — simulates a crash / server restart ──
print("\n⚡ Server crashed! Process restarted...\n")

# ── Session 2: NEW connection, same thread_id — state survives! ──
print("SESSION 2 — After restart (new connection)")
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
    app = build_graph().compile(checkpointer=checkpointer)

    # State is restored from PostgreSQL — all previous messages are here
    state = app.get_state(THREAD)
    print(f"Restored {len(state.values['messages'])} messages from database:")
    for i, msg in enumerate(state.values['messages']):
        print(f"  [{i}] {msg}")

    # Continue the conversation seamlessly
    result = app.invoke(
        {"messages": ["Can you process a refund?"]}, config=THREAD
    )
    print(f"\nAfter new message: {len(result['messages'])} total messages")
    print(f"Latest: {result['messages'][-1]}")

    # Cleanup demo data so re-runs start fresh
    checkpointer.delete_thread("production-thread-42")
    print("\n✓ Demo data cleaned up")
# Async version — use in Jupyter notebooks or async web servers
# pip install langgraph-checkpoint-postgres psycopg[binary]
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
from operator import add
import os

class ConversationState(TypedDict):
    messages: Annotated[list, add]
    user_id: str

def chatbot(state: ConversationState) -> dict:
    last_msg = state["messages"][-1]
    return {"messages": [f"Echo: {last_msg}"]}

def build_graph():
    graph = StateGraph(ConversationState)
    graph.add_node("chatbot", chatbot)
    graph.add_edge(START, "chatbot")
    graph.add_edge("chatbot", END)
    return graph

DB_URI = os.getenv("DATABASE_URL", "postgresql://user:pass@localhost:5432/langgraph_db")
THREAD = {"configurable": {"thread_id": "async-demo-42"}}

async def main():
    # ── Session 1 ──
    print("SESSION 1 — Async conversation")
    async with AsyncPostgresSaver.from_conn_string(DB_URI) as checkpointer:
        await checkpointer.setup()
        await checkpointer.adelete_thread("async-demo-42")  # Clean slate for demo
        app = build_graph().compile(checkpointer=checkpointer)
        result = await app.ainvoke(
            {"messages": ["Hello from async!"], "user_id": "user-42"},
            config=THREAD,
        )
        print(f"Messages: {result['messages']}")

    print("\n⚡ Server crashed! Process restarted...\n")

    # ── Session 2 — state survives ──
    print("SESSION 2 — Async after restart")
    async with AsyncPostgresSaver.from_conn_string(DB_URI) as checkpointer:
        app = build_graph().compile(checkpointer=checkpointer)
        state = await app.aget_state(THREAD)
        print(f"Restored {len(state.values['messages'])} messages")
        for i, msg in enumerate(state.values['messages']):
            print(f"  [{i}] {msg}")

        # Cleanup demo data so re-runs start fresh
        await checkpointer.adelete_thread("async-demo-42")
        print("\n✓ Demo data cleaned up")

await main()
Production Pattern

Choosing Your Persistence Backend

LangGraph supports multiple persistence backends. Choose based on your deployment environment:

  • MemorySaver — Development and testing only. State lost on restart. Zero setup. Use for rapid prototyping and unit tests.
  • SqliteSaver — Single-server deployments. File-based, no external database needed. Good for small apps and local development with persistence.
  • PostgresSaver — Production deployments. Supports horizontal scaling, concurrent access from multiple workers, and full SQL queryability. The recommended choice for any multi-user production system.

Migration path: Start with MemorySaver during development. Switch to PostgresSaver for staging and production by changing a single line (the checkpointer). Your graph code does not change at all — only the checkpointer configuration.

Persistence Production Scaling

6. Subgraphs

As workflows grow beyond 10–15 nodes, a flat graph becomes hard to maintain and reason about. LangGraph's subgraph pattern solves this by letting you compose smaller, self-contained graphs into a larger orchestration graph. Each subgraph has its own state schema, its own nodes and edges, and compiles into a standalone CompiledGraph. The parent graph then invokes it as a single node, mapping data in and out through a shared interface. This is the same principle as function composition in programming: build small, tested units and compose them.

6.1 Composing Graphs

A subgraph is just a regular StateGraph that is compiled separately. You build it with its own state schema, add nodes and edges, call .compile(), and then invoke the compiled subgraph from within a parent node. The parent node maps its state into the subgraph's expected inputs, runs the subgraph, and maps the outputs back. This keeps each subgraph independent and reusable — the research subgraph below can be used in any parent workflow that needs a search-and-summarize step.

# pip install langgraph
# Subgraphs: compose complex workflows from smaller, reusable pieces
from typing import TypedDict, Annotated
from operator import add
from langgraph.graph import StateGraph, START, END

# --- Subgraph 1: Research ---
class ResearchSubState(TypedDict):
    topic: str
    findings: Annotated[list, add]

def search_papers(state: ResearchSubState) -> dict:
    return {"findings": [f"Paper results for: {state['topic']}"]}

def summarize(state: ResearchSubState) -> dict:
    return {"findings": [f"Summary of {len(state['findings'])} findings"]}

research_graph = StateGraph(ResearchSubState)
research_graph.add_node("search", search_papers)
research_graph.add_node("summarize", summarize)
research_graph.add_edge(START, "search")
research_graph.add_edge("search", "summarize")
research_graph.add_edge("summarize", END)
research_sub = research_graph.compile()

# --- Subgraph 2: Writing ---
class WritingSubState(TypedDict):
    content: str
    draft: str
    polished: str

def write(state: WritingSubState) -> dict:
    return {"draft": f"Draft based on: {state['content'][:100]}..."}

def polish(state: WritingSubState) -> dict:
    return {"polished": f"Polished: {state['draft'][:100]}..."}

writing_graph = StateGraph(WritingSubState)
writing_graph.add_node("write", write)
writing_graph.add_node("polish", polish)
writing_graph.add_edge(START, "write")
writing_graph.add_edge("write", "polish")
writing_graph.add_edge("polish", END)
writing_sub = writing_graph.compile()

6.2 Nested Workflows

Nested workflows take composition one step further: the parent graph orchestrates multiple subgraphs and adds its own control flow on top — conditional routing, quality gates, and revision loops that span across subgraph boundaries. In the example below, a master graph calls a research subgraph, then a writing subgraph, then a quality gate that either accepts the article or loops back to the writing subgraph for revision. This "orchestrator" pattern is foundational for multi-agent systems covered in Part 10.

Master Graph with Subgraphs
flowchart TD
    S((START)) --> research

    subgraph research_sub["Research Subgraph"]
        research[research_node]
    end

    research --> write

    subgraph writing_sub["Writing Subgraph"]
        write[writing_node]
    end

    write --> quality{quality_gate}
    quality -->|approved| E((END))
    quality -->|needs_revision| write

    style S fill:#3B9797,stroke:#3B9797,color:#fff
    style E fill:#132440,stroke:#132440,color:#fff
    style research fill:#e8f4f4,stroke:#3B9797,color:#132440
    style write fill:#f0f4f8,stroke:#16476A,color:#132440
    style quality fill:#fff5f5,stroke:#BF092F,color:#132440
                        
# Parent graph orchestrating subgraphs (uses research_sub and writing_sub from above)
class MasterState(TypedDict):
    topic: str
    research_output: str
    article: str
    status: str

def run_research(state: MasterState) -> dict:
    result = research_sub.invoke({"topic": state["topic"]})
    return {"research_output": "\n".join(result["findings"]), "status": "researched"}

def run_writing(state: MasterState) -> dict:
    result = writing_sub.invoke({"content": state["research_output"]})
    return {"article": result["polished"], "status": "written"}

def quality_check(state: MasterState) -> dict:
    if len(state["article"].split()) < 10:
        return {"status": "needs_revision"}
    return {"status": "approved"}

master = StateGraph(MasterState)
master.add_node("research", run_research)
master.add_node("write", run_writing)
master.add_node("quality", quality_check)

master.add_edge(START, "research")
master.add_edge("research", "write")
master.add_edge("write", "quality")
master.add_conditional_edges("quality", lambda s: s["status"], {
    "approved": END, "needs_revision": "write"
})

app = master.compile()
result = app.invoke({"topic": "Quantum Computing in 2026"})
print(f"Status: {result['status']}")
print(f"Article: {result['article'][:200]}...")

7. Human-in-the-Loop

Human-in-the-loop is one of LangGraph's most powerful production features — pause a workflow, present state to a human for review, and resume based on their input.

7.1 Interrupt Patterns

LangGraph implements human-in-the-loop via interrupts — explicit pause points in the graph where execution stops and waits for human input. The key API is interrupt_before=["node_name"] passed to graph.compile(). When the graph reaches the specified node, it serializes its entire state to the checkpointer and halts. A human can then inspect the state via app.get_state(config), modify it with app.update_state(config, updates), and resume execution by calling app.invoke(None, config).

This pattern requires a checkpointer (MemorySaver or PostgresSaver) because the state must be persisted between the pause and resume calls. The example below implements an email approval workflow where the agent drafts an email but waits for human approval before sending it.

Email Approval — interrupt_before
flowchart LR
    S((START)) --> draft[draft_email]
    draft --> review[review_draft]
    review -.->|⏸ interrupt_before| send[send_email]
    send --> E((END))

    style S fill:#3B9797,stroke:#3B9797,color:#fff
    style E fill:#132440,stroke:#132440,color:#fff
    style draft fill:#e8f4f4,stroke:#3B9797,color:#132440
    style review fill:#e8f4f4,stroke:#3B9797,color:#132440
    style send fill:#fff5f5,stroke:#BF092F,color:#132440
                        

                        # pip install langgraph
# Human-in-the-loop: pause workflow, get human approval, then resume
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from typing import TypedDict

class EmailState(TypedDict):
    recipient: str
    subject: str
    body: str
    approved: bool
    sent: bool

def draft_email(state: EmailState) -> dict:
    body = f"========\n\nDear {state['recipient']},\n\nsubject: {state.get('subject','Proposed')},\n\n{state.get('body', 'Auto-generated content.')}\n\nBest regards\n\n========"
    return {"body": body, "approved": False}

def send_email(state: EmailState) -> dict:
    if state["approved"]:
        print(f"Sending to {state['recipient']}: {state['subject']}")
        return {"sent": True}
    return {"sent": False}

graph = StateGraph(EmailState)
graph.add_node("draft", draft_email)
graph.add_node("send", send_email)
graph.add_edge(START, "draft")
graph.add_edge("draft", "send")
graph.add_edge("send", END)

# Compile with interrupt_before — pauses BEFORE "send"
memory = MemorySaver()
app = graph.compile(checkpointer=memory, interrupt_before=["send"])

config = {"configurable": {"thread_id": "email-1"}}

# Step 1: Run until interrupt
result = app.invoke(
    {"recipient": "client@example.com", "subject": "Proposal", "body": "Our proposal..."},
    config=config
)
# Workflow PAUSED before "send" node

# Step 2: Human reviews
state = app.get_state(config)
print("Draft:", state.values["body"])

# Step 3: Human approves → update state and resume
app.update_state(config, {"approved": True})
result = app.invoke(None, config=config)  # Resume
print(f"Sent: {result['sent']}")
                    

7.2 Approval Workflows

Production systems rarely have a single approval step. A financial transaction might need no approval for small amounts, a manager's sign-off for medium amounts, and VP-level review for large amounts. LangGraph handles risk-based approval routing by combining conditional edges with interrupts: a risk assessor node classifies the transaction, conditional edges route to the appropriate approval tier, and interrupt_before pauses at whichever review node requires human input. Low-risk transactions skip human review entirely and proceed to execution automatically.

Risk-Based Approval Routing
flowchart TD
    S((START)) --> assess[assess_risk]
    assess -->|low| auto[auto_approve]
    assess -->|medium| mgr[manager_review ⏸]
    assess -->|high| vp[vp_review ⏸]
    vp --> mgr
    auto --> exec[execute]
    mgr --> exec
    exec --> E((END))

    style S fill:#3B9797,stroke:#3B9797,color:#fff
    style E fill:#132440,stroke:#132440,color:#fff
    style assess fill:#e8f4f4,stroke:#3B9797,color:#132440
    style auto fill:#e8f9e8,stroke:#28a745,color:#132440
    style mgr fill:#fff5f5,stroke:#BF092F,color:#132440
    style vp fill:#fff0e0,stroke:#e67e22,color:#132440
    style exec fill:#f0f4f8,stroke:#16476A,color:#132440
                        
# Multi-step approval with risk-based escalation
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from typing import TypedDict

class ApprovalState(TypedDict):
    request: str
    amount: float
    risk_level: str
    manager_approved: bool
    vp_approved: bool
    status: str

def assess_risk(state: ApprovalState) -> dict:
    amount = state["amount"]
    if amount > 100000: return {"risk_level": "high"}
    elif amount > 10000: return {"risk_level": "medium"}
    return {"risk_level": "low"}

def auto_approve(state: ApprovalState) -> dict:
    return {"status": "auto_approved", "manager_approved": True, "vp_approved": True}

def manager_review(state: ApprovalState) -> dict:
    return {"status": "awaiting_manager"}

def vp_review(state: ApprovalState) -> dict:
    return {"status": "awaiting_vp"}

def execute_request(state: ApprovalState) -> dict:
    return {"status": "executed"}

def route_by_risk(state: ApprovalState) -> str:
    risk = state["risk_level"]
    if risk == "low": return "auto_approve"
    elif risk == "medium": return "manager_review"
    return "vp_review"

graph = StateGraph(ApprovalState)
graph.add_node("assess", assess_risk)
graph.add_node("auto_approve", auto_approve)
graph.add_node("manager_review", manager_review)
graph.add_node("vp_review", vp_review)
graph.add_node("execute", execute_request)

graph.add_edge(START, "assess")
graph.add_conditional_edges("assess", route_by_risk, {
    "auto_approve": "auto_approve",
    "manager_review": "manager_review",
    "vp_review": "vp_review"
})
graph.add_edge("auto_approve", "execute")
graph.add_edge("manager_review", "execute")
graph.add_edge("vp_review", "manager_review")
graph.add_edge("execute", END)

app = graph.compile(
    checkpointer=MemorySaver(),
    interrupt_after=["manager_review", "vp_review"]
)
Common Mistake: Forgetting to use a checkpointer with human-in-the-loop. Without a checkpointer, interrupt_before and interrupt_after will silently do nothing — the graph cannot pause and resume without persistent state. Always compile with a checkpointer when using interrupts.

Exercises & Self-Assessment

Exercise 1

Research-Write-Review Cycle

Build a LangGraph workflow: Research node (5 key points) → Write node (300-word article) → Review node (score 1-10) → Conditional: if score < 7, loop to Write with feedback; if >= 7, end. Add max_revisions counter to prevent infinite loops.

Exercise 2

Multi-Path Customer Support

Build a support graph: classify → route to billing/technical/general → each has specialized prompts → escalation path with interrupt for human agent. Use MemorySaver for persistence across the human review step.

Exercise 3

Orchestration Tool Comparison

Design the same workflow ("Email → Classify → If complaint, draft response → Human reviews → Send") in LangGraph, n8n, and Zapier. For each, identify: what is easy, what is hard, and the cost. Write a recommendation.

Exercise 4

Persistent Conversational Agent

Build a LangGraph chatbot with PostgresSaver (or MemorySaver), conversation history via thread_id, tools for search and calculation, and demonstrate resume after simulated server restart.

Exercise 5

Reflective Questions

  1. Why does LangGraph use a graph paradigm rather than sequential pipelines? What workflows require branches and cycles?
  2. Compare LangGraph reducers to Redux reducers. Similarities and differences?
  3. When would you choose n8n over LangGraph? Give three specific scenarios.
  4. Why is persistence essential for human-in-the-loop? What happens without it?
  5. How would you handle a workflow that waits for an external webhook for hours or days?

LangGraph Workflow Document Generator

Design and document a LangGraph workflow. Download as Word, Excel, PDF, or PowerPoint.

Draft auto-saved

All data stays in your browser. Nothing is sent to or stored on any server.

Conclusion & Next Steps

You now have a comprehensive understanding of LangGraph and how it enables production-grade agent workflows. Key takeaways:

  • Why LangGraph — AgentExecutor cannot handle branching, persistence, human-in-the-loop, or multi-agent coordination; LangGraph solves all of these
  • StateGraph — Define state with TypedDict, build graphs with nodes and edges, use conditional edges for dynamic routing
  • State Management — Reducers control how updates merge: append, replace, or custom aggregation
  • Orchestration Comparison — LangGraph for complex AI workflows, LangChain agents for simple tool-calling, n8n for visual automation, Zapier for no-code — best used in combination
  • Persistence — MemorySaver for dev, PostgresSaver for production; enables state to survive restarts
  • Subgraphs — Compose complex workflows from reusable sub-workflows
  • Human-in-the-Loop — interrupt_before/after pauses workflows for human review and approval

Next in the Series

In Part 9: Deep Agents & Autonomous Systems, we explore advanced agent architectures — Planner-Executor-Critic, Plan-and-Execute, Reflexion, LATS, self-reflection, autonomy levels L1-L4, and cutting-edge research from OpenAGI, Voyager, and WebArena.

Technology