Back to AI App Dev Series

CrewAI SDK Track Part 4: Crews & Production Architecture

May 24, 2026 Wasil Zafar 40 min read

Build production-grade crews with the @CrewBase decorator pattern, async and batch kickoff execution, the recommended Flow-wrapping-Crew architecture for state and persistence, and deploy crews as API services with FastAPI.

Table of Contents

  1. Crew Configuration
  2. Async & Batch Execution
  3. Production Architecture
  4. Annotations & Decorators
  5. Deployment Patterns
What You’ll Learn: A Crew is the complete unit — agents + tasks + process + configuration. This article covers building production-ready crews: structuring projects with the CLI, configuring YAML-based crews for maintainability, handling async execution for performance, and patterns that scale from prototype to deployment. Think of the Crew as a Docker container for AI collaboration: self-contained, configurable, and reproducible.

1. Crew Configuration

The @CrewBase decorator is the production standard for defining crews. It provides automatic YAML loading, decorator-based agent/task registration, and clean separation between configuration and logic.

Why @CrewBase? It automatically resolves YAML references, manages agent/task registration order, provides self.agents and self.tasks collections, and integrates with crewai run CLI commands. Always use it for production crews.

1.1 Fully Configured Crew Class

from crewai import Agent, Task, Crew, Process
from crewai.project import CrewBase, agent, task, crew, before_kickoff, after_kickoff

@CrewBase
class ContentPipeline:
    """Production content generation pipeline."""

    agents_config = "config/agents.yaml"
    tasks_config = "config/tasks.yaml"

    @before_kickoff
    def prepare_inputs(self, inputs):
        """Validate and enrich inputs before crew starts."""
        if "topic" not in inputs:
            raise ValueError("'topic' is required in inputs")
        inputs["timestamp"] = "2026-05-24"
        inputs["word_count"] = inputs.get("word_count", 1500)
        return inputs

    @after_kickoff
    def process_results(self, result):
        """Post-process crew output."""
        print(f"Crew completed. Token usage: {result.token_usage}")
        return result

    @agent
    def researcher(self) -> Agent:
        return Agent(config=self.agents_config["researcher"])

    @agent
    def writer(self) -> Agent:
        return Agent(config=self.agents_config["writer"])

    @agent
    def editor(self) -> Agent:
        return Agent(config=self.agents_config["editor"])

    @task
    def research_task(self) -> Task:
        return Task(config=self.tasks_config["research_task"])

    @task
    def writing_task(self) -> Task:
        return Task(config=self.tasks_config["writing_task"])

    @task
    def editing_task(self) -> Task:
        return Task(config=self.tasks_config["editing_task"])

    @crew
    def crew(self) -> Crew:
        return Crew(
            agents=self.agents,
            tasks=self.tasks,
            process=Process.sequential,
            verbose=True,
            memory=True,
        )

2. Async & Batch Execution

For production workloads, you often need non-blocking execution or batch processing across multiple inputs.

2.1 Async Kickoff

import asyncio
from crewai import Agent, Task, Crew, Process

async def run_crew_async():
    """Run a crew without blocking the event loop."""

    analyst = Agent(
        role="Market Analyst",
        goal="Analyze market conditions for {company}",
        backstory="You are a financial analyst."
    )

    analysis_task = Task(
        description="Analyze the market position of {company} in {sector}.",
        expected_output="Market analysis report with competitive positioning.",
        agent=analyst
    )

    crew = Crew(
        agents=[analyst],
        tasks=[analysis_task],
        process=Process.sequential,
        verbose=True
    )

    # Non-blocking execution
    result = await crew.kickoff_async(
        inputs={"company": "Tesla", "sector": "EV manufacturing"}
    )

    print(f"Async result: {result.raw[:200]}...")
    return result

# Run in async context
result = asyncio.run(run_crew_async())

2.2 Kickoff-for-Each (Batch Processing)

from crewai import Agent, Task, Crew, Process

reviewer = Agent(
    role="Code Reviewer",
    goal="Review pull requests for quality and security issues",
    backstory="You are a senior engineer focused on code quality."
)

review_task = Task(
    description="Review PR #{pr_number}: {pr_title}. Check for bugs, security issues, and style.",
    expected_output="Code review with severity-rated findings and suggestions.",
    agent=reviewer
)

crew = Crew(
    agents=[reviewer],
    tasks=[review_task],
    process=Process.sequential,
    verbose=True
)

# Batch process multiple PRs
pr_list = [
    {"pr_number": "142", "pr_title": "Add user authentication"},
    {"pr_number": "143", "pr_title": "Fix SQL injection in search"},
    {"pr_number": "144", "pr_title": "Refactor payment module"},
]

# Process all PRs — runs crew once per input
results = crew.kickoff_for_each(inputs=pr_list)

for i, result in enumerate(results):
    print(f"PR #{pr_list[i]['pr_number']}: {result.raw[:100]}...")
Performance Tip: kickoff_for_each runs sequentially by default. For parallel batch processing, combine with asyncio.gather and kickoff_async. Be mindful of API rate limits when parallelizing.

3. Production Architecture

The recommended production pattern is Flow wrapping Crew. Flows handle orchestration, state management, and persistence, while Crews handle the actual agent work.

3.1 The Flow-Wrapping-Crew Pattern

from crewai import Agent, Task, Crew, Process
from crewai.flow.flow import Flow, start, listen
from pydantic import BaseModel
from typing import Optional

# State model for the Flow
class PipelineState(BaseModel):
    topic: str = ""
    research_output: str = ""
    article_output: str = ""
    review_passed: bool = False
    iteration_count: int = 0

class ContentFlow(Flow[PipelineState]):
    """Production content pipeline with state and error handling."""

    @start()
    def research_phase(self):
        """Phase 1: Research the topic."""
        researcher = Agent(
            role="Researcher",
            goal=f"Research {self.state.topic} thoroughly",
            backstory="You are a meticulous research analyst."
        )
        task = Task(
            description=f"Research: {self.state.topic}",
            expected_output="Comprehensive research brief.",
            agent=researcher
        )
        crew = Crew(agents=[researcher], tasks=[task], process=Process.sequential)
        result = crew.kickoff()
        self.state.research_output = result.raw
        return result.raw

    @listen(research_phase)
    def writing_phase(self):
        """Phase 2: Write article from research."""
        writer = Agent(
            role="Writer",
            goal="Write engaging content from research",
            backstory="You are an award-winning technical writer."
        )
        task = Task(
            description=f"Write article using this research: {self.state.research_output[:500]}",
            expected_output="A polished 1500-word article.",
            agent=writer
        )
        crew = Crew(agents=[writer], tasks=[task], process=Process.sequential)
        result = crew.kickoff()
        self.state.article_output = result.raw
        return result.raw

# Execute the flow
flow = ContentFlow()
flow.kickoff(inputs={"topic": "Quantum Computing in 2026"})
print(f"Final article: {flow.state.article_output[:200]}...")
Why Not Just Crews? Crews alone lack persistent state, crash recovery, conditional routing, and multi-phase orchestration. Flows add these production essentials while keeping agent logic cleanly encapsulated in Crews. Think of Flows as the “conductor” and Crews as the “musicians.”
Real-World Application

Automated Report Generation

A consulting firm generates weekly client reports using CrewAI in production: data is fetched from their analytics platform, a Data Analyst agent interprets trends, a Writer agent creates narratives, and a Designer agent formats for presentation. The crew runs on a cron schedule, producing 40 reports/week with zero manual intervention.

Production DeploymentReport Automation

4. Annotations & Decorators

4.1 Before & After Kickoff Hooks

from crewai import Agent, Task, Crew, Process
from crewai.project import CrewBase, agent, task, crew, before_kickoff, after_kickoff
import json
from datetime import datetime

@CrewBase
class AuditedCrew:
    """Crew with full audit trail via hooks."""

    agents_config = "config/agents.yaml"
    tasks_config = "config/tasks.yaml"

    @before_kickoff
    def log_start(self, inputs):
        """Log execution start with timestamp and inputs."""
        self.start_time = datetime.now()
        print(f"[AUDIT] Crew started at {self.start_time}")
        print(f"[AUDIT] Inputs: {json.dumps(inputs, indent=2)}")
        return inputs

    @after_kickoff
    def log_completion(self, result):
        """Log execution completion with metrics."""
        duration = (datetime.now() - self.start_time).total_seconds()
        print(f"[AUDIT] Crew completed in {duration:.1f}s")
        print(f"[AUDIT] Token usage: {result.token_usage}")
        print(f"[AUDIT] Output length: {len(result.raw)} chars")
        return result

    @agent
    def worker(self) -> Agent:
        return Agent(config=self.agents_config["worker"])

    @task
    def work_task(self) -> Task:
        return Task(config=self.tasks_config["work_task"])

    @crew
    def crew(self) -> Crew:
        return Crew(
            agents=self.agents,
            tasks=self.tasks,
            process=Process.sequential,
            verbose=True,
        )

5. Deployment Patterns

5.1 Crew as FastAPI Endpoint

from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel
from crewai import Agent, Task, Crew, Process
import uuid

app = FastAPI(title="CrewAI Service")

# In-memory job store (use Redis/DB in production)
jobs: dict = {}

class CrewRequest(BaseModel):
    topic: str
    word_count: int = 1500

class CrewResponse(BaseModel):
    job_id: str
    status: str

def run_crew_background(job_id: str, topic: str, word_count: int):
    """Execute crew in background."""
    try:
        jobs[job_id]["status"] = "running"

        researcher = Agent(
            role="Researcher",
            goal=f"Research {topic}",
            backstory="You are a thorough researcher."
        )
        writer = Agent(
            role="Writer",
            goal=f"Write a {word_count}-word article",
            backstory="You are a skilled technical writer."
        )

        research_task = Task(
            description=f"Research: {topic}",
            expected_output="Research brief with key findings.",
            agent=researcher
        )
        writing_task = Task(
            description=f"Write {word_count}-word article on {topic}.",
            expected_output=f"A {word_count}-word article in markdown.",
            agent=writer,
            context=[research_task]
        )

        crew = Crew(
            agents=[researcher, writer],
            tasks=[research_task, writing_task],
            process=Process.sequential
        )

        result = crew.kickoff()
        jobs[job_id]["status"] = "completed"
        jobs[job_id]["result"] = result.raw

    except Exception as e:
        jobs[job_id]["status"] = "failed"
        jobs[job_id]["error"] = str(e)

@app.post("/crews/content", response_model=CrewResponse)
async def start_content_crew(request: CrewRequest, background_tasks: BackgroundTasks):
    job_id = str(uuid.uuid4())
    jobs[job_id] = {"status": "queued", "result": None, "error": None}
    background_tasks.add_task(run_crew_background, job_id, request.topic, request.word_count)
    return CrewResponse(job_id=job_id, status="queued")

@app.get("/jobs/{job_id}")
async def get_job_status(job_id: str):
    if job_id not in jobs:
        raise HTTPException(status_code=404, detail="Job not found")
    return jobs[job_id]

5.2 Error Handling & Retries

from crewai import Agent, Task, Crew, Process
import time

def run_crew_with_retries(crew, inputs, max_retries=3, backoff=2):
    """Run crew with exponential backoff retry logic."""
    for attempt in range(1, max_retries + 1):
        try:
            result = crew.kickoff(inputs=inputs)
            return result
        except Exception as e:
            if attempt == max_retries:
                raise RuntimeError(f"Crew failed after {max_retries} attempts: {e}")
            wait_time = backoff ** attempt
            print(f"Attempt {attempt} failed: {e}. Retrying in {wait_time}s...")
            time.sleep(wait_time)

# Example usage
analyst = Agent(
    role="Analyst",
    goal="Analyze {topic}",
    backstory="You are analytical and precise."
)

task = Task(
    description="Analyze {topic} and provide key insights.",
    expected_output="Analysis report with 5 key insights.",
    agent=analyst
)

crew = Crew(agents=[analyst], tasks=[task], process=Process.sequential)

result = run_crew_with_retries(crew, inputs={"topic": "AI market trends"})
print(f"Result: {result.raw[:200]}...")
Try It Yourself: Build a production-ready crew using YAML configuration (not just Python): define agents in agents.yaml, tasks in tasks.yaml, and wire them together in crew.py. Add verbose mode for debugging, async kickoff for non-blocking execution, and output formatting (JSON). Test by running the crew 3 times and verifying consistent output structure.

Next in the CrewAI SDK Track

In Part 5: Flows & State Management, we’ll build event-driven workflows with @start, @listen, and @router decorators, structured Pydantic state, LanceDB-backed persistence, state restoration after crashes, and complex multi-crew orchestration patterns.