Back to AI App Dev Series

CrewAI SDK Track Part 5: Flows & State Management

May 24, 2026 Wasil Zafar 45 min read

Build event-driven workflows with CrewAI Flows — @start, @listen, and @router decorators for control flow, structured Pydantic state models, LanceDB-backed persistence for crash recovery, state restoration patterns, and complex multi-crew orchestration.

Table of Contents

  1. Flow Fundamentals
  2. Structured State Management
  3. Routing & Conditional Logic
  4. State Persistence & Restoration
  5. Integrating Crews in Flows
What You’ll Learn: Flows are CrewAI’s orchestration layer for complex, multi-step workflows that go beyond what a single crew can handle. While crews handle collaborative work, Flows coordinate entire pipelines: conditional branching, parallel execution, state management across steps, and human-in-the-loop checkpoints. Think of Flows like an orchestration engine (Airflow, Prefect) but designed specifically for AI agent workflows.

1. Flow Fundamentals

CrewAI Flows are event-driven workflow orchestrators that sit above Crews. They manage execution order through decorators, maintain state across steps, and provide persistence for crash recovery. Flows are the recommended way to build production multi-agent systems.

Flows vs Crews: A Crew is a group of agents working on tasks together. A Flow orchestrates one or more Crews (and plain functions) into a multi-phase pipeline with state, routing, and persistence. Use Flows when you need control beyond what a single Crew provides.

1.1 Core Decorators

from crewai.flow.flow import Flow, start, listen, router

class BasicFlow(Flow):
    """Minimal Flow demonstrating core decorators."""

    @start()
    def begin(self):
        """@start marks the entry point. Runs first when flow.kickoff() is called."""
        print("Flow started!")
        return "initial data"

    @listen(begin)
    def process(self):
        """@listen triggers when the specified method completes."""
        print(f"Processing after begin...")
        return "processed data"

    @listen(process)
    def finalize(self):
        """Chain @listen decorators for sequential execution."""
        print("Finalizing...")
        return "done"

# Execute the flow
flow = BasicFlow()
result = flow.kickoff()
print(f"Flow result: {result}")

Flows support multiple @start methods (parallel entry points) and multiple @listen decorators on a single method (waits for all specified methods to complete):

from crewai.flow.flow import Flow, start, listen, and_

class ParallelFlow(Flow):
    """Flow with parallel starts and join."""

    @start()
    def fetch_data_a(self):
        """First parallel entry point."""
        return {"source": "A", "value": 42}

    @start()
    def fetch_data_b(self):
        """Second parallel entry point — runs concurrently with fetch_data_a."""
        return {"source": "B", "value": 99}

    @listen(and_(fetch_data_a, fetch_data_b))
    def combine_results(self):
        """Runs only after BOTH fetch methods complete (join pattern)."""
        print("Both data sources ready — combining results")
        return "combined"

flow = ParallelFlow()
flow.kickoff()

2. Structured State Management

Flows maintain state across steps. You can use a Pydantic model for type-safe structured state, or a simple dictionary for flexible unstructured state.

from crewai.flow.flow import Flow, start, listen
from pydantic import BaseModel
from typing import List, Optional

# Structured state with Pydantic
class ResearchState(BaseModel):
    topic: str = ""
    sources: List[str] = []
    findings: List[str] = []
    draft: str = ""
    review_score: float = 0.0
    is_approved: bool = False
    iteration: int = 0

class ResearchFlow(Flow[ResearchState]):
    """Flow with typed state management."""

    @start()
    def gather_sources(self):
        """Access and update state via self.state."""
        self.state.topic = "AI Safety in 2026"
        self.state.sources = [
            "https://arxiv.org/ai-safety-2026",
            "https://deepmind.com/safety-research",
            "https://anthropic.com/research"
        ]
        print(f"Gathering sources for: {self.state.topic}")
        print(f"Found {len(self.state.sources)} sources")
        return self.state.sources

    @listen(gather_sources)
    def analyze_sources(self):
        """Previous state is available in subsequent steps."""
        self.state.findings = [
            "Interpretability research has accelerated",
            "RLHF remains the dominant alignment technique",
            "Constitutional AI showing promising results"
        ]
        self.state.iteration += 1
        print(f"Iteration {self.state.iteration}: Found {len(self.state.findings)} findings")
        return self.state.findings

    @listen(analyze_sources)
    def write_draft(self):
        """Build on accumulated state."""
        self.state.draft = f"# {self.state.topic}\n\n"
        for finding in self.state.findings:
            self.state.draft += f"- {finding}\n"
        print(f"Draft written: {len(self.state.draft)} chars")
        return self.state.draft

flow = ResearchFlow()
flow.kickoff()
print(f"\nFinal state:")
print(f"  Topic: {flow.state.topic}")
print(f"  Sources: {len(flow.state.sources)}")
print(f"  Findings: {len(flow.state.findings)}")
print(f"  Draft length: {len(flow.state.draft)} chars")

2.1 Unstructured State

from crewai.flow.flow import Flow, start, listen

class FlexibleFlow(Flow):
    """Flow with unstructured dict state (no Pydantic model)."""

    @start()
    def initialize(self):
        """Without a type parameter, state is a plain dict."""
        self.state["counter"] = 0
        self.state["items"] = []
        self.state["metadata"] = {"created": "2026-05-24"}
        return "initialized"

    @listen(initialize)
    def process(self):
        """Access dict state freely."""
        self.state["counter"] += 1
        self.state["items"].append("processed_item_1")
        print(f"Counter: {self.state['counter']}")
        return self.state

flow = FlexibleFlow()
flow.kickoff()
print(f"Final state: {flow.state}")
Structured vs Unstructured: Use Pydantic models (structured) for production flows where type safety, validation, and IDE autocompletion matter. Use dict state (unstructured) for prototyping or flows where the schema evolves frequently.

3. Routing & Conditional Logic

The @router decorator enables branching — routing execution to different paths based on conditions. This is far more powerful than conditional tasks because you can loop, branch to multiple paths, or implement complex state machines.

from crewai.flow.flow import Flow, start, listen, router

class QualityFlow(Flow):
    """Flow with routing for quality control loops."""

    @start()
    def generate_content(self):
        """Generate initial content."""
        self.state["content"] = "Draft article about AI..."
        self.state["quality_score"] = 0
        self.state["attempts"] = 0
        print("Content generated")
        return self.state["content"]

    @router(generate_content)
    def check_quality(self):
        """Route based on quality score."""
        self.state["attempts"] += 1
        # Simulate quality check (in production, use an LLM judge)
        self.state["quality_score"] = 65 + (self.state["attempts"] * 15)

        print(f"Attempt {self.state['attempts']}: Score = {self.state['quality_score']}")

        if self.state["quality_score"] >= 80:
            return "approve"   # Routes to methods listening for "approve"
        elif self.state["attempts"] >= 3:
            return "escalate"  # Routes to escalation path
        else:
            return "revise"    # Routes to revision loop

    @listen("approve")
    def publish(self):
        """Called when router returns 'approve'."""
        print(f"✅ Published after {self.state['attempts']} attempts")
        return "published"

    @listen("revise")
    def revise_content(self):
        """Called when router returns 'revise' — loops back."""
        print(f"📝 Revising content (attempt {self.state['attempts']})...")
        self.state["content"] += " [revised]"
        return self.state["content"]

    @router(revise_content)
    def recheck_quality(self):
        """Re-route after revision — creates a loop."""
        self.state["attempts"] += 1
        self.state["quality_score"] = 65 + (self.state["attempts"] * 15)
        print(f"Re-check attempt {self.state['attempts']}: Score = {self.state['quality_score']}")

        if self.state["quality_score"] >= 80:
            return "approve"
        elif self.state["attempts"] >= 3:
            return "escalate"
        else:
            return "revise"

    @listen("escalate")
    def escalate_to_human(self):
        """Called when max attempts reached."""
        print(f"⚠️ Escalated to human review after {self.state['attempts']} attempts")
        return "escalated"

flow = QualityFlow()
flow.kickoff()

3.1 Loops & Cycles

Loop Safety: Always include an exit condition in routers that can loop (max attempts, timeout, or quality threshold). Without guards, flows can loop indefinitely and burn through API credits. Use self.state to track iteration count.
Real-World Application

End-to-End Sales Pipeline

A B2B company built their entire sales workflow as a CrewAI Flow: lead qualification → research → personalized outreach → follow-up scheduling → deal tracking. State persists between runs (so follow-ups know what was discussed previously), and @router sends enterprise leads to a different path than SMB leads. Result: 3x increase in qualified meetings.

Sales AutomationState Persistence

4. State Persistence & Restoration

CrewAI Flows support automatic state persistence via LanceDB. This means if your flow crashes mid-execution, you can restore from the last saved state and resume without re-running completed steps.

from crewai.flow.flow import Flow, start, listen, persist_state
from pydantic import BaseModel
from typing import List

class PipelineState(BaseModel):
    documents: List[str] = []
    processed: List[str] = []
    embeddings_created: bool = False
    index_built: bool = False

class PersistentPipeline(Flow[PipelineState]):
    """Flow with LanceDB state persistence for crash recovery."""

    @start()
    @persist_state
    def ingest_documents(self):
        """State is automatically saved after this step completes."""
        self.state.documents = [
            "doc1.pdf", "doc2.pdf", "doc3.pdf",
            "doc4.pdf", "doc5.pdf"
        ]
        print(f"Ingested {len(self.state.documents)} documents")
        return self.state.documents

    @listen(ingest_documents)
    @persist_state
    def process_documents(self):
        """If flow crashes here, we can resume from ingest_documents state."""
        for doc in self.state.documents:
            self.state.processed.append(f"processed_{doc}")
        print(f"Processed {len(self.state.processed)} documents")
        return self.state.processed

    @listen(process_documents)
    @persist_state
    def create_embeddings(self):
        """Each @persist_state saves a checkpoint."""
        self.state.embeddings_created = True
        print("Embeddings created for all documents")
        return True

    @listen(create_embeddings)
    @persist_state
    def build_index(self):
        """Final step — state fully saved."""
        self.state.index_built = True
        print("Search index built successfully")
        return "pipeline complete"

# First run
pipeline = PersistentPipeline()
pipeline.kickoff()
print(f"State ID: {pipeline.state_id}")

4.1 State Restoration After Crashes

from crewai.flow.flow import Flow, start, listen, persist_state
from pydantic import BaseModel

class JobState(BaseModel):
    job_id: str = ""
    step: str = "init"
    progress: float = 0.0

class ResumableFlow(Flow[JobState]):
    """Flow that can resume from a saved state after a crash."""

    @start()
    @persist_state
    def step_one(self):
        self.state.step = "step_one_complete"
        self.state.progress = 0.33
        print(f"Step 1 complete. Progress: {self.state.progress:.0%}")
        return "step_one_done"

    @listen(step_one)
    @persist_state
    def step_two(self):
        self.state.step = "step_two_complete"
        self.state.progress = 0.66
        print(f"Step 2 complete. Progress: {self.state.progress:.0%}")
        return "step_two_done"

    @listen(step_two)
    @persist_state
    def step_three(self):
        self.state.step = "step_three_complete"
        self.state.progress = 1.0
        print(f"Step 3 complete. Progress: {self.state.progress:.0%}")
        return "all_done"

# Normal execution
flow = ResumableFlow()
flow.kickoff()
saved_state_id = flow.state_id
print(f"Saved state ID: {saved_state_id}")

# Resume after crash (pass the state_id to restore)
resumed_flow = ResumableFlow()
resumed_flow.kickoff(restore_from_state_id=saved_state_id)
print(f"Resumed from: {resumed_flow.state.step}")
Migration Note: Older CrewAI versions used inputs={"id": "some-id"} for state restoration. The current API uses restore_from_state_id as a dedicated parameter. Update any code using the inputs.id pattern.

5. Integrating Crews in Flows

The most powerful pattern is calling Crews from within Flow steps. This combines Flow’s orchestration capabilities (state, routing, persistence) with Crew’s agent collaboration.

from crewai import Agent, Task, Crew, Process
from crewai.flow.flow import Flow, start, listen
from pydantic import BaseModel
from typing import List

class ArticleState(BaseModel):
    topic: str = ""
    research: str = ""
    outline: str = ""
    draft: str = ""
    final: str = ""

class ArticlePipeline(Flow[ArticleState]):
    """Multi-crew Flow for end-to-end article production."""

    @start()
    def research_phase(self):
        """Phase 1: Research crew gathers information."""
        researcher = Agent(
            role="Senior Researcher",
            goal=f"Research '{self.state.topic}' comprehensively",
            backstory="You are thorough and always cite sources."
        )

        fact_checker = Agent(
            role="Fact Checker",
            goal="Verify all claims and statistics",
            backstory="You never let unverified claims through."
        )

        research_task = Task(
            description=f"Research the topic: {self.state.topic}",
            expected_output="Research brief with verified facts and sources.",
            agent=researcher
        )

        verify_task = Task(
            description="Verify all claims in the research brief.",
            expected_output="Verified research with confidence scores.",
            agent=fact_checker,
            context=[research_task]
        )

        crew = Crew(
            agents=[researcher, fact_checker],
            tasks=[research_task, verify_task],
            process=Process.sequential
        )

        result = crew.kickoff()
        self.state.research = result.raw
        print(f"Research complete: {len(self.state.research)} chars")
        return self.state.research

    @listen(research_phase)
    def writing_phase(self):
        """Phase 2: Writing crew produces the article."""
        writer = Agent(
            role="Technical Writer",
            goal="Write a clear, engaging article",
            backstory="You make complex topics accessible."
        )

        editor = Agent(
            role="Senior Editor",
            goal="Polish articles to publication quality",
            backstory="You have edited for major tech publications."
        )

        write_task = Task(
            description=f"Write a 2000-word article on '{self.state.topic}' using this research:\n{self.state.research[:1000]}",
            expected_output="A well-structured 2000-word article in markdown.",
            agent=writer
        )

        edit_task = Task(
            description="Edit the article for clarity, flow, and accuracy.",
            expected_output="Publication-ready article with tracked changes.",
            agent=editor,
            context=[write_task]
        )

        crew = Crew(
            agents=[writer, editor],
            tasks=[write_task, edit_task],
            process=Process.sequential
        )

        result = crew.kickoff()
        self.state.final = result.raw
        print(f"Article complete: {len(self.state.final)} chars")
        return self.state.final

# Execute the multi-crew pipeline
pipeline = ArticlePipeline()
pipeline.kickoff(inputs={"topic": "The State of Quantum Computing in 2026"})
print(f"\nFinal article preview:\n{pipeline.state.final[:300]}...")

5.1 Complex Multi-Crew Orchestration

from crewai import Agent, Task, Crew, Process
from crewai.flow.flow import Flow, start, listen, router
from pydantic import BaseModel
from typing import List, Dict

class PipelineState(BaseModel):
    input_data: Dict = {}
    analysis_result: str = ""
    recommendations: List[str] = []
    risk_level: str = "unknown"
    final_report: str = ""

class AnalyticsPipeline(Flow[PipelineState]):
    """Complex pipeline with routing based on crew outputs."""

    @start()
    def analyze(self):
        """Run analysis crew."""
        analyst = Agent(
            role="Data Analyst",
            goal="Analyze input data and determine risk level",
            backstory="You are precise and risk-aware."
        )
        task = Task(
            description=f"Analyze this data: {self.state.input_data}. Classify risk as low/medium/high.",
            expected_output="Analysis with risk classification (low/medium/high).",
            agent=analyst
        )
        crew = Crew(agents=[analyst], tasks=[task], process=Process.sequential)
        result = crew.kickoff()
        self.state.analysis_result = result.raw

        # Extract risk level from result
        if "high" in result.raw.lower():
            self.state.risk_level = "high"
        elif "medium" in result.raw.lower():
            self.state.risk_level = "medium"
        else:
            self.state.risk_level = "low"

        return self.state.risk_level

    @router(analyze)
    def route_by_risk(self):
        """Route to different crews based on risk level."""
        print(f"Routing based on risk: {self.state.risk_level}")
        return self.state.risk_level

    @listen("high")
    def high_risk_crew(self):
        """Specialized crew for high-risk scenarios."""
        print("⚠️ Running high-risk mitigation crew...")
        self.state.final_report = f"HIGH RISK: {self.state.analysis_result}"
        return self.state.final_report

    @listen("medium")
    def medium_risk_crew(self):
        """Standard processing for medium risk."""
        print("📊 Running standard analysis crew...")
        self.state.final_report = f"MEDIUM RISK: {self.state.analysis_result}"
        return self.state.final_report

    @listen("low")
    def low_risk_crew(self):
        """Lightweight processing for low risk."""
        print("✅ Low risk — fast-track processing...")
        self.state.final_report = f"LOW RISK: {self.state.analysis_result}"
        return self.state.final_report

# Execute with routing
pipeline = AnalyticsPipeline()
pipeline.kickoff(inputs={"input_data": {"metric": "cpu_usage", "value": 95}})
print(f"\nFinal report: {pipeline.state.final_report[:200]}...")
Architecture Summary: In production CrewAI systems, the hierarchy is: Flow (orchestration + state + persistence) → Crew (agent collaboration + task execution) → Agent (LLM + tools + persona). This three-layer architecture provides clean separation of concerns and maximum flexibility.
Try It Yourself: Build a ‘hiring pipeline’ Flow with 5 steps: (1) @start: parse job requirements, (2) @listen: generate candidate screening criteria, (3) @router: route to ‘technical_screen’ or ‘culture_screen’ based on role type, (4) @listen: score candidates, (5) @listen: generate interview questions. Use structured state to pass candidate data between steps.

Next in the CrewAI SDK Track

In Part 6: Tools & Custom Tools, we’ll explore the CrewAI tools ecosystem — built-in tools (search, scrape, file I/O), building custom tools with the @tool decorator, tool caching strategies, and connecting external APIs as agent capabilities.