1. Flow Fundamentals
CrewAI Flows are event-driven workflow orchestrators that sit above Crews. They manage execution order through decorators, maintain state across steps, and provide persistence for crash recovery. Flows are the recommended way to build production multi-agent systems.
1.1 Core Decorators
from crewai.flow.flow import Flow, start, listen, router
class BasicFlow(Flow):
"""Minimal Flow demonstrating core decorators."""
@start()
def begin(self):
"""@start marks the entry point. Runs first when flow.kickoff() is called."""
print("Flow started!")
return "initial data"
@listen(begin)
def process(self):
"""@listen triggers when the specified method completes."""
print(f"Processing after begin...")
return "processed data"
@listen(process)
def finalize(self):
"""Chain @listen decorators for sequential execution."""
print("Finalizing...")
return "done"
# Execute the flow
flow = BasicFlow()
result = flow.kickoff()
print(f"Flow result: {result}")
Flows support multiple @start methods (parallel entry points) and multiple @listen decorators on a single method (waits for all specified methods to complete):
from crewai.flow.flow import Flow, start, listen, and_
class ParallelFlow(Flow):
"""Flow with parallel starts and join."""
@start()
def fetch_data_a(self):
"""First parallel entry point."""
return {"source": "A", "value": 42}
@start()
def fetch_data_b(self):
"""Second parallel entry point — runs concurrently with fetch_data_a."""
return {"source": "B", "value": 99}
@listen(and_(fetch_data_a, fetch_data_b))
def combine_results(self):
"""Runs only after BOTH fetch methods complete (join pattern)."""
print("Both data sources ready — combining results")
return "combined"
flow = ParallelFlow()
flow.kickoff()
2. Structured State Management
Flows maintain state across steps. You can use a Pydantic model for type-safe structured state, or a simple dictionary for flexible unstructured state.
from crewai.flow.flow import Flow, start, listen
from pydantic import BaseModel
from typing import List, Optional
# Structured state with Pydantic
class ResearchState(BaseModel):
topic: str = ""
sources: List[str] = []
findings: List[str] = []
draft: str = ""
review_score: float = 0.0
is_approved: bool = False
iteration: int = 0
class ResearchFlow(Flow[ResearchState]):
"""Flow with typed state management."""
@start()
def gather_sources(self):
"""Access and update state via self.state."""
self.state.topic = "AI Safety in 2026"
self.state.sources = [
"https://arxiv.org/ai-safety-2026",
"https://deepmind.com/safety-research",
"https://anthropic.com/research"
]
print(f"Gathering sources for: {self.state.topic}")
print(f"Found {len(self.state.sources)} sources")
return self.state.sources
@listen(gather_sources)
def analyze_sources(self):
"""Previous state is available in subsequent steps."""
self.state.findings = [
"Interpretability research has accelerated",
"RLHF remains the dominant alignment technique",
"Constitutional AI showing promising results"
]
self.state.iteration += 1
print(f"Iteration {self.state.iteration}: Found {len(self.state.findings)} findings")
return self.state.findings
@listen(analyze_sources)
def write_draft(self):
"""Build on accumulated state."""
self.state.draft = f"# {self.state.topic}\n\n"
for finding in self.state.findings:
self.state.draft += f"- {finding}\n"
print(f"Draft written: {len(self.state.draft)} chars")
return self.state.draft
flow = ResearchFlow()
flow.kickoff()
print(f"\nFinal state:")
print(f" Topic: {flow.state.topic}")
print(f" Sources: {len(flow.state.sources)}")
print(f" Findings: {len(flow.state.findings)}")
print(f" Draft length: {len(flow.state.draft)} chars")
2.1 Unstructured State
from crewai.flow.flow import Flow, start, listen
class FlexibleFlow(Flow):
"""Flow with unstructured dict state (no Pydantic model)."""
@start()
def initialize(self):
"""Without a type parameter, state is a plain dict."""
self.state["counter"] = 0
self.state["items"] = []
self.state["metadata"] = {"created": "2026-05-24"}
return "initialized"
@listen(initialize)
def process(self):
"""Access dict state freely."""
self.state["counter"] += 1
self.state["items"].append("processed_item_1")
print(f"Counter: {self.state['counter']}")
return self.state
flow = FlexibleFlow()
flow.kickoff()
print(f"Final state: {flow.state}")
3. Routing & Conditional Logic
The @router decorator enables branching — routing execution to different paths based on conditions. This is far more powerful than conditional tasks because you can loop, branch to multiple paths, or implement complex state machines.
from crewai.flow.flow import Flow, start, listen, router
class QualityFlow(Flow):
"""Flow with routing for quality control loops."""
@start()
def generate_content(self):
"""Generate initial content."""
self.state["content"] = "Draft article about AI..."
self.state["quality_score"] = 0
self.state["attempts"] = 0
print("Content generated")
return self.state["content"]
@router(generate_content)
def check_quality(self):
"""Route based on quality score."""
self.state["attempts"] += 1
# Simulate quality check (in production, use an LLM judge)
self.state["quality_score"] = 65 + (self.state["attempts"] * 15)
print(f"Attempt {self.state['attempts']}: Score = {self.state['quality_score']}")
if self.state["quality_score"] >= 80:
return "approve" # Routes to methods listening for "approve"
elif self.state["attempts"] >= 3:
return "escalate" # Routes to escalation path
else:
return "revise" # Routes to revision loop
@listen("approve")
def publish(self):
"""Called when router returns 'approve'."""
print(f"✅ Published after {self.state['attempts']} attempts")
return "published"
@listen("revise")
def revise_content(self):
"""Called when router returns 'revise' — loops back."""
print(f"📝 Revising content (attempt {self.state['attempts']})...")
self.state["content"] += " [revised]"
return self.state["content"]
@router(revise_content)
def recheck_quality(self):
"""Re-route after revision — creates a loop."""
self.state["attempts"] += 1
self.state["quality_score"] = 65 + (self.state["attempts"] * 15)
print(f"Re-check attempt {self.state['attempts']}: Score = {self.state['quality_score']}")
if self.state["quality_score"] >= 80:
return "approve"
elif self.state["attempts"] >= 3:
return "escalate"
else:
return "revise"
@listen("escalate")
def escalate_to_human(self):
"""Called when max attempts reached."""
print(f"⚠️ Escalated to human review after {self.state['attempts']} attempts")
return "escalated"
flow = QualityFlow()
flow.kickoff()
3.1 Loops & Cycles
self.state to track iteration count.
End-to-End Sales Pipeline
A B2B company built their entire sales workflow as a CrewAI Flow: lead qualification → research → personalized outreach → follow-up scheduling → deal tracking. State persists between runs (so follow-ups know what was discussed previously), and @router sends enterprise leads to a different path than SMB leads. Result: 3x increase in qualified meetings.
4. State Persistence & Restoration
CrewAI Flows support automatic state persistence via LanceDB. This means if your flow crashes mid-execution, you can restore from the last saved state and resume without re-running completed steps.
from crewai.flow.flow import Flow, start, listen, persist_state
from pydantic import BaseModel
from typing import List
class PipelineState(BaseModel):
documents: List[str] = []
processed: List[str] = []
embeddings_created: bool = False
index_built: bool = False
class PersistentPipeline(Flow[PipelineState]):
"""Flow with LanceDB state persistence for crash recovery."""
@start()
@persist_state
def ingest_documents(self):
"""State is automatically saved after this step completes."""
self.state.documents = [
"doc1.pdf", "doc2.pdf", "doc3.pdf",
"doc4.pdf", "doc5.pdf"
]
print(f"Ingested {len(self.state.documents)} documents")
return self.state.documents
@listen(ingest_documents)
@persist_state
def process_documents(self):
"""If flow crashes here, we can resume from ingest_documents state."""
for doc in self.state.documents:
self.state.processed.append(f"processed_{doc}")
print(f"Processed {len(self.state.processed)} documents")
return self.state.processed
@listen(process_documents)
@persist_state
def create_embeddings(self):
"""Each @persist_state saves a checkpoint."""
self.state.embeddings_created = True
print("Embeddings created for all documents")
return True
@listen(create_embeddings)
@persist_state
def build_index(self):
"""Final step — state fully saved."""
self.state.index_built = True
print("Search index built successfully")
return "pipeline complete"
# First run
pipeline = PersistentPipeline()
pipeline.kickoff()
print(f"State ID: {pipeline.state_id}")
4.1 State Restoration After Crashes
from crewai.flow.flow import Flow, start, listen, persist_state
from pydantic import BaseModel
class JobState(BaseModel):
job_id: str = ""
step: str = "init"
progress: float = 0.0
class ResumableFlow(Flow[JobState]):
"""Flow that can resume from a saved state after a crash."""
@start()
@persist_state
def step_one(self):
self.state.step = "step_one_complete"
self.state.progress = 0.33
print(f"Step 1 complete. Progress: {self.state.progress:.0%}")
return "step_one_done"
@listen(step_one)
@persist_state
def step_two(self):
self.state.step = "step_two_complete"
self.state.progress = 0.66
print(f"Step 2 complete. Progress: {self.state.progress:.0%}")
return "step_two_done"
@listen(step_two)
@persist_state
def step_three(self):
self.state.step = "step_three_complete"
self.state.progress = 1.0
print(f"Step 3 complete. Progress: {self.state.progress:.0%}")
return "all_done"
# Normal execution
flow = ResumableFlow()
flow.kickoff()
saved_state_id = flow.state_id
print(f"Saved state ID: {saved_state_id}")
# Resume after crash (pass the state_id to restore)
resumed_flow = ResumableFlow()
resumed_flow.kickoff(restore_from_state_id=saved_state_id)
print(f"Resumed from: {resumed_flow.state.step}")
inputs={"id": "some-id"} for state restoration. The current API uses restore_from_state_id as a dedicated parameter. Update any code using the inputs.id pattern.
5. Integrating Crews in Flows
The most powerful pattern is calling Crews from within Flow steps. This combines Flow’s orchestration capabilities (state, routing, persistence) with Crew’s agent collaboration.
from crewai import Agent, Task, Crew, Process
from crewai.flow.flow import Flow, start, listen
from pydantic import BaseModel
from typing import List
class ArticleState(BaseModel):
topic: str = ""
research: str = ""
outline: str = ""
draft: str = ""
final: str = ""
class ArticlePipeline(Flow[ArticleState]):
"""Multi-crew Flow for end-to-end article production."""
@start()
def research_phase(self):
"""Phase 1: Research crew gathers information."""
researcher = Agent(
role="Senior Researcher",
goal=f"Research '{self.state.topic}' comprehensively",
backstory="You are thorough and always cite sources."
)
fact_checker = Agent(
role="Fact Checker",
goal="Verify all claims and statistics",
backstory="You never let unverified claims through."
)
research_task = Task(
description=f"Research the topic: {self.state.topic}",
expected_output="Research brief with verified facts and sources.",
agent=researcher
)
verify_task = Task(
description="Verify all claims in the research brief.",
expected_output="Verified research with confidence scores.",
agent=fact_checker,
context=[research_task]
)
crew = Crew(
agents=[researcher, fact_checker],
tasks=[research_task, verify_task],
process=Process.sequential
)
result = crew.kickoff()
self.state.research = result.raw
print(f"Research complete: {len(self.state.research)} chars")
return self.state.research
@listen(research_phase)
def writing_phase(self):
"""Phase 2: Writing crew produces the article."""
writer = Agent(
role="Technical Writer",
goal="Write a clear, engaging article",
backstory="You make complex topics accessible."
)
editor = Agent(
role="Senior Editor",
goal="Polish articles to publication quality",
backstory="You have edited for major tech publications."
)
write_task = Task(
description=f"Write a 2000-word article on '{self.state.topic}' using this research:\n{self.state.research[:1000]}",
expected_output="A well-structured 2000-word article in markdown.",
agent=writer
)
edit_task = Task(
description="Edit the article for clarity, flow, and accuracy.",
expected_output="Publication-ready article with tracked changes.",
agent=editor,
context=[write_task]
)
crew = Crew(
agents=[writer, editor],
tasks=[write_task, edit_task],
process=Process.sequential
)
result = crew.kickoff()
self.state.final = result.raw
print(f"Article complete: {len(self.state.final)} chars")
return self.state.final
# Execute the multi-crew pipeline
pipeline = ArticlePipeline()
pipeline.kickoff(inputs={"topic": "The State of Quantum Computing in 2026"})
print(f"\nFinal article preview:\n{pipeline.state.final[:300]}...")
5.1 Complex Multi-Crew Orchestration
from crewai import Agent, Task, Crew, Process
from crewai.flow.flow import Flow, start, listen, router
from pydantic import BaseModel
from typing import List, Dict
class PipelineState(BaseModel):
input_data: Dict = {}
analysis_result: str = ""
recommendations: List[str] = []
risk_level: str = "unknown"
final_report: str = ""
class AnalyticsPipeline(Flow[PipelineState]):
"""Complex pipeline with routing based on crew outputs."""
@start()
def analyze(self):
"""Run analysis crew."""
analyst = Agent(
role="Data Analyst",
goal="Analyze input data and determine risk level",
backstory="You are precise and risk-aware."
)
task = Task(
description=f"Analyze this data: {self.state.input_data}. Classify risk as low/medium/high.",
expected_output="Analysis with risk classification (low/medium/high).",
agent=analyst
)
crew = Crew(agents=[analyst], tasks=[task], process=Process.sequential)
result = crew.kickoff()
self.state.analysis_result = result.raw
# Extract risk level from result
if "high" in result.raw.lower():
self.state.risk_level = "high"
elif "medium" in result.raw.lower():
self.state.risk_level = "medium"
else:
self.state.risk_level = "low"
return self.state.risk_level
@router(analyze)
def route_by_risk(self):
"""Route to different crews based on risk level."""
print(f"Routing based on risk: {self.state.risk_level}")
return self.state.risk_level
@listen("high")
def high_risk_crew(self):
"""Specialized crew for high-risk scenarios."""
print("⚠️ Running high-risk mitigation crew...")
self.state.final_report = f"HIGH RISK: {self.state.analysis_result}"
return self.state.final_report
@listen("medium")
def medium_risk_crew(self):
"""Standard processing for medium risk."""
print("📊 Running standard analysis crew...")
self.state.final_report = f"MEDIUM RISK: {self.state.analysis_result}"
return self.state.final_report
@listen("low")
def low_risk_crew(self):
"""Lightweight processing for low risk."""
print("✅ Low risk — fast-track processing...")
self.state.final_report = f"LOW RISK: {self.state.analysis_result}"
return self.state.final_report
# Execute with routing
pipeline = AnalyticsPipeline()
pipeline.kickoff(inputs={"input_data": {"metric": "cpu_usage", "value": 95}})
print(f"\nFinal report: {pipeline.state.final_report[:200]}...")
Next in the CrewAI SDK Track
In Part 6: Tools & Custom Tools, we’ll explore the CrewAI tools ecosystem — built-in tools (search, scrape, file I/O), building custom tools with the @tool decorator, tool caching strategies, and connecting external APIs as agent capabilities.