1. Crew Configuration
The @CrewBase decorator is the production standard for defining crews. It provides automatic YAML loading, decorator-based agent/task registration, and clean separation between configuration and logic.
self.agents and self.tasks collections, and integrates with crewai run CLI commands. Always use it for production crews.
1.1 Fully Configured Crew Class
from crewai import Agent, Task, Crew, Process
from crewai.project import CrewBase, agent, task, crew, before_kickoff, after_kickoff
@CrewBase
class ContentPipeline:
"""Production content generation pipeline."""
agents_config = "config/agents.yaml"
tasks_config = "config/tasks.yaml"
@before_kickoff
def prepare_inputs(self, inputs):
"""Validate and enrich inputs before crew starts."""
if "topic" not in inputs:
raise ValueError("'topic' is required in inputs")
inputs["timestamp"] = "2026-05-24"
inputs["word_count"] = inputs.get("word_count", 1500)
return inputs
@after_kickoff
def process_results(self, result):
"""Post-process crew output."""
print(f"Crew completed. Token usage: {result.token_usage}")
return result
@agent
def researcher(self) -> Agent:
return Agent(config=self.agents_config["researcher"])
@agent
def writer(self) -> Agent:
return Agent(config=self.agents_config["writer"])
@agent
def editor(self) -> Agent:
return Agent(config=self.agents_config["editor"])
@task
def research_task(self) -> Task:
return Task(config=self.tasks_config["research_task"])
@task
def writing_task(self) -> Task:
return Task(config=self.tasks_config["writing_task"])
@task
def editing_task(self) -> Task:
return Task(config=self.tasks_config["editing_task"])
@crew
def crew(self) -> Crew:
return Crew(
agents=self.agents,
tasks=self.tasks,
process=Process.sequential,
verbose=True,
memory=True,
)
2. Async & Batch Execution
For production workloads, you often need non-blocking execution or batch processing across multiple inputs.
2.1 Async Kickoff
import asyncio
from crewai import Agent, Task, Crew, Process
async def run_crew_async():
"""Run a crew without blocking the event loop."""
analyst = Agent(
role="Market Analyst",
goal="Analyze market conditions for {company}",
backstory="You are a financial analyst."
)
analysis_task = Task(
description="Analyze the market position of {company} in {sector}.",
expected_output="Market analysis report with competitive positioning.",
agent=analyst
)
crew = Crew(
agents=[analyst],
tasks=[analysis_task],
process=Process.sequential,
verbose=True
)
# Non-blocking execution
result = await crew.kickoff_async(
inputs={"company": "Tesla", "sector": "EV manufacturing"}
)
print(f"Async result: {result.raw[:200]}...")
return result
# Run in async context
result = asyncio.run(run_crew_async())
2.2 Kickoff-for-Each (Batch Processing)
from crewai import Agent, Task, Crew, Process
reviewer = Agent(
role="Code Reviewer",
goal="Review pull requests for quality and security issues",
backstory="You are a senior engineer focused on code quality."
)
review_task = Task(
description="Review PR #{pr_number}: {pr_title}. Check for bugs, security issues, and style.",
expected_output="Code review with severity-rated findings and suggestions.",
agent=reviewer
)
crew = Crew(
agents=[reviewer],
tasks=[review_task],
process=Process.sequential,
verbose=True
)
# Batch process multiple PRs
pr_list = [
{"pr_number": "142", "pr_title": "Add user authentication"},
{"pr_number": "143", "pr_title": "Fix SQL injection in search"},
{"pr_number": "144", "pr_title": "Refactor payment module"},
]
# Process all PRs — runs crew once per input
results = crew.kickoff_for_each(inputs=pr_list)
for i, result in enumerate(results):
print(f"PR #{pr_list[i]['pr_number']}: {result.raw[:100]}...")
kickoff_for_each runs sequentially by default. For parallel batch processing, combine with asyncio.gather and kickoff_async. Be mindful of API rate limits when parallelizing.
3. Production Architecture
The recommended production pattern is Flow wrapping Crew. Flows handle orchestration, state management, and persistence, while Crews handle the actual agent work.
3.1 The Flow-Wrapping-Crew Pattern
from crewai import Agent, Task, Crew, Process
from crewai.flow.flow import Flow, start, listen
from pydantic import BaseModel
from typing import Optional
# State model for the Flow
class PipelineState(BaseModel):
topic: str = ""
research_output: str = ""
article_output: str = ""
review_passed: bool = False
iteration_count: int = 0
class ContentFlow(Flow[PipelineState]):
"""Production content pipeline with state and error handling."""
@start()
def research_phase(self):
"""Phase 1: Research the topic."""
researcher = Agent(
role="Researcher",
goal=f"Research {self.state.topic} thoroughly",
backstory="You are a meticulous research analyst."
)
task = Task(
description=f"Research: {self.state.topic}",
expected_output="Comprehensive research brief.",
agent=researcher
)
crew = Crew(agents=[researcher], tasks=[task], process=Process.sequential)
result = crew.kickoff()
self.state.research_output = result.raw
return result.raw
@listen(research_phase)
def writing_phase(self):
"""Phase 2: Write article from research."""
writer = Agent(
role="Writer",
goal="Write engaging content from research",
backstory="You are an award-winning technical writer."
)
task = Task(
description=f"Write article using this research: {self.state.research_output[:500]}",
expected_output="A polished 1500-word article.",
agent=writer
)
crew = Crew(agents=[writer], tasks=[task], process=Process.sequential)
result = crew.kickoff()
self.state.article_output = result.raw
return result.raw
# Execute the flow
flow = ContentFlow()
flow.kickoff(inputs={"topic": "Quantum Computing in 2026"})
print(f"Final article: {flow.state.article_output[:200]}...")
Automated Report Generation
A consulting firm generates weekly client reports using CrewAI in production: data is fetched from their analytics platform, a Data Analyst agent interprets trends, a Writer agent creates narratives, and a Designer agent formats for presentation. The crew runs on a cron schedule, producing 40 reports/week with zero manual intervention.
4. Annotations & Decorators
4.1 Before & After Kickoff Hooks
from crewai import Agent, Task, Crew, Process
from crewai.project import CrewBase, agent, task, crew, before_kickoff, after_kickoff
import json
from datetime import datetime
@CrewBase
class AuditedCrew:
"""Crew with full audit trail via hooks."""
agents_config = "config/agents.yaml"
tasks_config = "config/tasks.yaml"
@before_kickoff
def log_start(self, inputs):
"""Log execution start with timestamp and inputs."""
self.start_time = datetime.now()
print(f"[AUDIT] Crew started at {self.start_time}")
print(f"[AUDIT] Inputs: {json.dumps(inputs, indent=2)}")
return inputs
@after_kickoff
def log_completion(self, result):
"""Log execution completion with metrics."""
duration = (datetime.now() - self.start_time).total_seconds()
print(f"[AUDIT] Crew completed in {duration:.1f}s")
print(f"[AUDIT] Token usage: {result.token_usage}")
print(f"[AUDIT] Output length: {len(result.raw)} chars")
return result
@agent
def worker(self) -> Agent:
return Agent(config=self.agents_config["worker"])
@task
def work_task(self) -> Task:
return Task(config=self.tasks_config["work_task"])
@crew
def crew(self) -> Crew:
return Crew(
agents=self.agents,
tasks=self.tasks,
process=Process.sequential,
verbose=True,
)
5. Deployment Patterns
5.1 Crew as FastAPI Endpoint
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel
from crewai import Agent, Task, Crew, Process
import uuid
app = FastAPI(title="CrewAI Service")
# In-memory job store (use Redis/DB in production)
jobs: dict = {}
class CrewRequest(BaseModel):
topic: str
word_count: int = 1500
class CrewResponse(BaseModel):
job_id: str
status: str
def run_crew_background(job_id: str, topic: str, word_count: int):
"""Execute crew in background."""
try:
jobs[job_id]["status"] = "running"
researcher = Agent(
role="Researcher",
goal=f"Research {topic}",
backstory="You are a thorough researcher."
)
writer = Agent(
role="Writer",
goal=f"Write a {word_count}-word article",
backstory="You are a skilled technical writer."
)
research_task = Task(
description=f"Research: {topic}",
expected_output="Research brief with key findings.",
agent=researcher
)
writing_task = Task(
description=f"Write {word_count}-word article on {topic}.",
expected_output=f"A {word_count}-word article in markdown.",
agent=writer,
context=[research_task]
)
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, writing_task],
process=Process.sequential
)
result = crew.kickoff()
jobs[job_id]["status"] = "completed"
jobs[job_id]["result"] = result.raw
except Exception as e:
jobs[job_id]["status"] = "failed"
jobs[job_id]["error"] = str(e)
@app.post("/crews/content", response_model=CrewResponse)
async def start_content_crew(request: CrewRequest, background_tasks: BackgroundTasks):
job_id = str(uuid.uuid4())
jobs[job_id] = {"status": "queued", "result": None, "error": None}
background_tasks.add_task(run_crew_background, job_id, request.topic, request.word_count)
return CrewResponse(job_id=job_id, status="queued")
@app.get("/jobs/{job_id}")
async def get_job_status(job_id: str):
if job_id not in jobs:
raise HTTPException(status_code=404, detail="Job not found")
return jobs[job_id]
5.2 Error Handling & Retries
from crewai import Agent, Task, Crew, Process
import time
def run_crew_with_retries(crew, inputs, max_retries=3, backoff=2):
"""Run crew with exponential backoff retry logic."""
for attempt in range(1, max_retries + 1):
try:
result = crew.kickoff(inputs=inputs)
return result
except Exception as e:
if attempt == max_retries:
raise RuntimeError(f"Crew failed after {max_retries} attempts: {e}")
wait_time = backoff ** attempt
print(f"Attempt {attempt} failed: {e}. Retrying in {wait_time}s...")
time.sleep(wait_time)
# Example usage
analyst = Agent(
role="Analyst",
goal="Analyze {topic}",
backstory="You are analytical and precise."
)
task = Task(
description="Analyze {topic} and provide key insights.",
expected_output="Analysis report with 5 key insights.",
agent=analyst
)
crew = Crew(agents=[analyst], tasks=[task], process=Process.sequential)
result = run_crew_with_retries(crew, inputs={"topic": "AI market trends"})
print(f"Result: {result.raw[:200]}...")
Next in the CrewAI SDK Track
In Part 5: Flows & State Management, we’ll build event-driven workflows with @start, @listen, and @router decorators, structured Pydantic state, LanceDB-backed persistence, state restoration after crashes, and complex multi-crew orchestration patterns.