Back to AI App Dev Series

CrewAI SDK Track Part 12: Advanced Patterns & Hooks

May 24, 2026 Wasil Zafar 40 min read

Customize agent prompts for fine-grained control, implement fingerprinting for reproducibility, use event listeners for real-time monitoring, enable checkpointing for fault tolerance, and leverage execution hooks (LLM call hooks, tool call hooks) with annotations.

Table of Contents

  1. Customizing Prompts
  2. Fingerprinting
  3. Event Listeners
  4. Checkpointing
  5. Execution Hooks
What You’ll Learn: Advanced patterns give you fine-grained control over your crew’s behavior: custom prompts let you override defaults, fingerprinting enables reproducible runs, event listeners provide real-time monitoring, checkpointing adds fault tolerance, and hooks intercept every LLM and tool call for custom logic. These are the production engineering tools that make crews reliable at scale.

1. Customizing Prompts

CrewAI agents use default system prompts that define their behavior. For advanced use cases, you can override these prompts at both the agent and task level to fine-tune how agents reason, respond, and interact with tools.

1.1 Agent-Level Prompt Customization

Override the default system prompt with system_template to control the agent’s core behavior. Use the available template variables to inject context dynamically:

from crewai import Agent, Task, Crew, Process

# Agent with fully custom system prompt
custom_agent = Agent(
    role="Technical Writer",
    goal="Produce clear, concise technical documentation",
    backstory="You are a senior technical writer with 15 years of experience.",
    llm="gpt-4o",
    system_template="""You are {role}.
Your goal: {goal}
Background: {backstory}

CUSTOM INSTRUCTIONS:
- Always use active voice
- Keep sentences under 25 words
- Use bullet points for lists of 3+ items
- Include code examples for every concept
- Never use jargon without defining it first

Available tools: {tools}
""",
    verbose=True
)

doc_task = Task(
    description="Write API documentation for a user authentication endpoint.",
    expected_output="Complete API docs with examples, error codes, and usage patterns.",
    agent=custom_agent
)

crew = Crew(
    agents=[custom_agent],
    tasks=[doc_task],
    process=Process.sequential
)

result = crew.kickoff()
print(result.raw)

1.2 Task-Level Prompt Injection

Override how task instructions are formatted using prompt_template on the task:

from crewai import Agent, Task, Crew, Process

analyst = Agent(
    role="Data Analyst",
    goal="Provide accurate data analysis",
    backstory="Expert statistician with domain knowledge.",
    llm="gpt-4o"
)

# Task with custom prompt template
analysis_task = Task(
    description="Analyze customer churn patterns for Q1 2026.",
    expected_output="Statistical analysis with confidence intervals.",
    agent=analyst,
    prompt_template="""ANALYSIS REQUEST:
{task_description}

METHODOLOGY REQUIREMENTS:
- State hypotheses before testing
- Report confidence intervals for all metrics
- Flag any data quality issues found
- Provide both summary and detailed findings

EXPECTED DELIVERABLE:
{expected_output}

CONTEXT FROM PREVIOUS TASKS:
{context}
"""
)

crew = Crew(
    agents=[analyst],
    tasks=[analysis_task],
    process=Process.sequential
)

result = crew.kickoff()
print(result.raw)
Template Variables: Available variables include {role}, {goal}, {backstory}, {tools}, {task_description}, {expected_output}, and {context}. CrewAI replaces these at runtime with actual values.

2. Fingerprinting

2.1 Reproducibility and Caching

Fingerprinting generates a unique hash for each crew execution based on inputs, agent configurations, and task definitions. This enables cache hits for identical executions and helps identify duplicate runs:

from crewai import Agent, Task, Crew, Process

# Agents with deterministic configuration
researcher = Agent(
    role="Market Researcher",
    goal="Gather market intelligence",
    backstory="Expert at finding market data.",
    llm="gpt-4o"
)

research_task = Task(
    description="Research cloud computing market size for {year}.",
    expected_output="Market size report with sources.",
    agent=researcher
)

# Crew with fingerprinting enabled for caching
crew = Crew(
    agents=[researcher],
    tasks=[research_task],
    process=Process.sequential,
    cache=True,  # Enable result caching
    verbose=True
)

# First run — executes fully
result1 = crew.kickoff(inputs={"year": "2026"})
print(f"First run: {result1.raw[:100]}...")

# Second run with same inputs — cache hit (fingerprint match)
result2 = crew.kickoff(inputs={"year": "2026"})
print(f"Second run (cached): {result2.raw[:100]}...")

# Different inputs — new execution (fingerprint mismatch)
result3 = crew.kickoff(inputs={"year": "2027"})
print(f"Third run (new): {result3.raw[:100]}...")

Fingerprinting also helps with debugging — you can trace which specific configuration produced a given output:

from crewai import Agent, Task, Crew, Process
import hashlib
import json

# Manual fingerprint generation for audit trails
def generate_crew_fingerprint(crew_config: dict) -> str:
    """Generate a deterministic fingerprint for crew configuration."""
    config_str = json.dumps(crew_config, sort_keys=True)
    return hashlib.sha256(config_str.encode()).hexdigest()[:16]

crew_config = {
    "agents": ["Market Researcher"],
    "tasks": ["Research cloud computing market"],
    "process": "sequential",
    "llm": "gpt-4o",
    "inputs": {"year": "2026"}
}

fingerprint = generate_crew_fingerprint(crew_config)
print(f"Crew fingerprint: {fingerprint}")
print(f"Use this to track execution lineage in logs")
Cache Invalidation: The fingerprint changes when any of these change: agent roles/goals/backstories, task descriptions, LLM model, tools list, or input values. Changing verbose or max_rpm does not affect the fingerprint.

3. Event Listeners

3.1 Listening to Agent, Task, and Crew Events

CrewAI emits events at key execution points. Register listeners to monitor progress, log metrics, or trigger external systems in real-time:

from crewai import Agent, Task, Crew, Process
from crewai.utilities.events import (
    crewai_event_bus,
    AgentExecutionStartedEvent,
    AgentExecutionCompletedEvent,
    TaskExecutionStartedEvent,
    TaskExecutionCompletedEvent,
    CrewKickoffStartedEvent,
    CrewKickoffCompletedEvent,
    ToolUsageStartedEvent,
    ToolUsageCompletedEvent
)
import time

# Custom event handler class
class ExecutionMonitor:
    def __init__(self):
        self.start_time = None
        self.events_log = []

    def on_crew_start(self, source, event: CrewKickoffStartedEvent):
        self.start_time = time.time()
        print(f"[CREW START] Crew execution initiated")
        self.events_log.append(("crew_start", time.time()))

    def on_crew_complete(self, source, event: CrewKickoffCompletedEvent):
        duration = time.time() - self.start_time
        print(f"[CREW COMPLETE] Total duration: {duration:.2f}s")
        self.events_log.append(("crew_complete", time.time()))

    def on_agent_start(self, source, event: AgentExecutionStartedEvent):
        print(f"[AGENT START] Agent: {event.agent.role}")
        self.events_log.append(("agent_start", event.agent.role))

    def on_task_complete(self, source, event: TaskExecutionCompletedEvent):
        print(f"[TASK COMPLETE] Output length: {len(event.output.raw)} chars")
        self.events_log.append(("task_complete", len(event.output.raw)))

    def on_tool_usage(self, source, event: ToolUsageStartedEvent):
        print(f"[TOOL CALL] Tool: {event.tool_name}")
        self.events_log.append(("tool_call", event.tool_name))

# Register event listeners
monitor = ExecutionMonitor()
crewai_event_bus.on(CrewKickoffStartedEvent, monitor.on_crew_start)
crewai_event_bus.on(CrewKickoffCompletedEvent, monitor.on_crew_complete)
crewai_event_bus.on(AgentExecutionStartedEvent, monitor.on_agent_start)
crewai_event_bus.on(TaskExecutionCompletedEvent, monitor.on_task_complete)
crewai_event_bus.on(ToolUsageStartedEvent, monitor.on_tool_usage)

# Run crew with monitoring
agent = Agent(role="Writer", goal="Write content", backstory="Expert writer.", llm="gpt-4o")
task = Task(description="Write a haiku about AI.", expected_output="A haiku poem.", agent=agent)
crew = Crew(agents=[agent], tasks=[task], process=Process.sequential)

result = crew.kickoff()
print(f"\nResult: {result.raw}")
print(f"\nEvents logged: {len(monitor.events_log)}")
Real-World Application

Fault-Tolerant Document Processing

A legal firm processes 500-page contracts with CrewAI. Checkpointing saves state every 10 pages, so if the crew fails at page 300, it resumes from page 290 instead of starting over. Event listeners alert the team when processing takes longer than expected. Hooks inject page numbers into tool calls for precise audit trails. Result: zero lost work from transient failures.

CheckpointingFault Tolerance

4. Checkpointing

4.1 Saving Execution State for Fault Tolerance

Checkpointing saves crew execution state at task boundaries, enabling recovery from failures without re-running completed tasks:

from crewai import Agent, Task, Crew, Process
import json
import os

# Configure checkpoint storage
CHECKPOINT_DIR = "./checkpoints"
os.makedirs(CHECKPOINT_DIR, exist_ok=True)

research_agent = Agent(
    role="Researcher",
    goal="Research topics thoroughly",
    backstory="Meticulous researcher.",
    llm="gpt-4o"
)

writer_agent = Agent(
    role="Writer",
    goal="Write polished articles",
    backstory="Published author.",
    llm="gpt-4o"
)

# Long-running multi-task crew
task1 = Task(
    description="Research the history of quantum computing.",
    expected_output="Comprehensive research notes with timeline.",
    agent=research_agent
)

task2 = Task(
    description="Write an article based on the research.",
    expected_output="Polished 1000-word article.",
    agent=writer_agent,
    context=[task1]
)

task3 = Task(
    description="Create an executive summary of the article.",
    expected_output="3-paragraph executive summary.",
    agent=writer_agent,
    context=[task2]
)

# Crew with checkpointing
crew = Crew(
    agents=[research_agent, writer_agent],
    tasks=[task1, task2, task3],
    process=Process.sequential,
    verbose=True
)

# Run with fault tolerance
try:
    result = crew.kickoff()
    print(f"Completed successfully: {result.raw[:200]}...")
except Exception as e:
    print(f"Failed at checkpoint. Error: {e}")
    print("Resume from last successful task on retry.")

Implement custom checkpoint persistence for production systems:

import json
import os
from datetime import datetime

class CheckpointManager:
    """Custom checkpoint manager for crew execution state."""

    def __init__(self, checkpoint_dir: str = "./checkpoints"):
        self.checkpoint_dir = checkpoint_dir
        os.makedirs(checkpoint_dir, exist_ok=True)

    def save_checkpoint(self, crew_id: str, task_index: int, state: dict):
        """Save execution state after task completion."""
        checkpoint = {
            "crew_id": crew_id,
            "task_index": task_index,
            "timestamp": datetime.now().isoformat(),
            "state": state
        }
        filepath = os.path.join(self.checkpoint_dir, f"{crew_id}_checkpoint.json")
        with open(filepath, "w") as f:
            json.dump(checkpoint, f, indent=2)
        print(f"Checkpoint saved: task {task_index} at {checkpoint['timestamp']}")

    def load_checkpoint(self, crew_id: str) -> dict:
        """Load last checkpoint for a crew."""
        filepath = os.path.join(self.checkpoint_dir, f"{crew_id}_checkpoint.json")
        if os.path.exists(filepath):
            with open(filepath, "r") as f:
                return json.load(f)
        return None

    def get_resume_point(self, crew_id: str) -> int:
        """Get task index to resume from."""
        checkpoint = self.load_checkpoint(crew_id)
        if checkpoint:
            print(f"Resuming from task {checkpoint['task_index'] + 1}")
            return checkpoint["task_index"] + 1
        return 0

# Usage
manager = CheckpointManager()
manager.save_checkpoint("research-crew-001", 0, {"output": "Research complete"})
resume_from = manager.get_resume_point("research-crew-001")
print(f"Resume from task index: {resume_from}")

5. Execution Hooks

Execution hooks intercept LLM calls and tool usage at the framework level. They provide fine-grained control over what happens before and after each model call or tool invocation.

5.1 LLM Call Hooks

Intercept every LLM call to add logging, modify prompts, enforce rate limits, or implement cost tracking:

from crewai import Agent, Task, Crew, Process
from crewai.utilities.events import (
    crewai_event_bus,
    LLMCallStartedEvent,
    LLMCallCompletedEvent
)
import time

class LLMCallTracker:
    """Track and log all LLM calls for cost analysis."""

    def __init__(self):
        self.calls = []
        self.total_tokens = 0
        self.total_cost = 0.0

    def before_llm_call(self, source, event: LLMCallStartedEvent):
        """Hook: runs before every LLM API call."""
        call_info = {
            "timestamp": time.time(),
            "model": event.model if hasattr(event, "model") else "unknown",
            "agent": event.agent.role if hasattr(event, "agent") else "unknown"
        }
        self.calls.append(call_info)
        print(f"  [LLM] Calling model for agent: {call_info['agent']}")

    def after_llm_call(self, source, event: LLMCallCompletedEvent):
        """Hook: runs after every LLM API call."""
        if hasattr(event, "token_usage"):
            tokens = event.token_usage
            self.total_tokens += tokens.get("total_tokens", 0)
            # Approximate cost (GPT-4o pricing)
            input_cost = tokens.get("prompt_tokens", 0) * 0.0000025
            output_cost = tokens.get("completion_tokens", 0) * 0.00001
            self.total_cost += input_cost + output_cost

        print(f"  [LLM] Complete. Running total: {self.total_tokens} tokens, ${self.total_cost:.4f}")

    def get_summary(self):
        return {
            "total_calls": len(self.calls),
            "total_tokens": self.total_tokens,
            "total_cost": f"${self.total_cost:.4f}"
        }

# Register LLM hooks
tracker = LLMCallTracker()
crewai_event_bus.on(LLMCallStartedEvent, tracker.before_llm_call)
crewai_event_bus.on(LLMCallCompletedEvent, tracker.after_llm_call)

# Run crew with LLM tracking
agent = Agent(role="Analyst", goal="Analyze data", backstory="Expert.", llm="gpt-4o")
task = Task(description="Summarize AI trends in 2026.", expected_output="Brief summary.", agent=agent)
crew = Crew(agents=[agent], tasks=[task], process=Process.sequential)

result = crew.kickoff()
print(f"\nLLM Usage Summary: {tracker.get_summary()}")

5.2 Tool Call Hooks

Intercept tool invocations to validate inputs, sanitize outputs, enforce permissions, or implement circuit breakers:

from crewai import Agent, Task, Crew, Process
from crewai.utilities.events import (
    crewai_event_bus,
    ToolUsageStartedEvent,
    ToolUsageCompletedEvent,
    ToolUsageErrorEvent
)
import time

class ToolGuard:
    """Security and monitoring layer for tool invocations."""

    def __init__(self, blocked_tools=None, max_calls_per_tool=10):
        self.blocked_tools = blocked_tools or []
        self.max_calls = max_calls_per_tool
        self.call_counts = {}
        self.errors = []

    def before_tool_call(self, source, event: ToolUsageStartedEvent):
        """Validate tool call before execution."""
        tool_name = event.tool_name

        # Check blocked list
        if tool_name in self.blocked_tools:
            print(f"  [BLOCKED] Tool '{tool_name}' is not permitted")
            return

        # Rate limiting
        self.call_counts[tool_name] = self.call_counts.get(tool_name, 0) + 1
        if self.call_counts[tool_name] > self.max_calls:
            print(f"  [RATE LIMIT] Tool '{tool_name}' exceeded {self.max_calls} calls")
            return

        print(f"  [TOOL] Executing: {tool_name} (call #{self.call_counts[tool_name]})")

    def after_tool_call(self, source, event: ToolUsageCompletedEvent):
        """Log tool completion."""
        print(f"  [TOOL] Completed: {event.tool_name}")

    def on_tool_error(self, source, event: ToolUsageErrorEvent):
        """Handle tool errors."""
        self.errors.append({"tool": event.tool_name, "error": str(event.error)})
        print(f"  [TOOL ERROR] {event.tool_name}: {event.error}")

# Register tool hooks
guard = ToolGuard(blocked_tools=["dangerous_tool"], max_calls_per_tool=5)
crewai_event_bus.on(ToolUsageStartedEvent, guard.before_tool_call)
crewai_event_bus.on(ToolUsageCompletedEvent, guard.after_tool_call)
crewai_event_bus.on(ToolUsageErrorEvent, guard.on_tool_error)

print("Tool guard registered. Blocked tools:", guard.blocked_tools)
print("Max calls per tool:", guard.max_calls)

5.3 Using Annotations in crew.py

CrewAI projects created with crewai create crew use decorator annotations in crew.py for hook registration. This provides a clean, declarative approach:

from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
from crewai.project import before_kickoff, after_kickoff

@CrewBase
class ResearchCrew:
    """Research crew with lifecycle hooks via annotations."""

    agents_config = "config/agents.yaml"
    tasks_config = "config/tasks.yaml"

    @before_kickoff
    def prepare_inputs(self, inputs):
        """Hook: runs before crew kickoff. Modify inputs here."""
        inputs["timestamp"] = "2026-05-24"
        inputs["version"] = "1.0"
        print(f"Preparing crew with inputs: {inputs}")
        return inputs

    @after_kickoff
    def process_output(self, output):
        """Hook: runs after crew completes. Post-process results."""
        print(f"Crew completed. Output length: {len(output.raw)} chars")
        # Could send to webhook, save to DB, trigger notifications
        return output

    @agent
    def researcher(self) -> Agent:
        return Agent(
            role="Senior Researcher",
            goal="Find comprehensive information on the topic",
            backstory="Experienced research analyst.",
            llm="gpt-4o",
            verbose=True
        )

    @task
    def research_task(self) -> Task:
        return Task(
            description="Research {topic} thoroughly.",
            expected_output="Detailed research report.",
            agent=self.researcher()
        )

    @crew
    def crew(self) -> Crew:
        return Crew(
            agents=self.agents,
            tasks=self.tasks,
            process=Process.sequential,
            verbose=True
        )
Hook Ordering: @before_kickoff runs before any agent executes and can modify inputs. @after_kickoff runs after all tasks complete and receives the final output. Both are synchronous — long-running operations in hooks will delay execution.
Try It Yourself: Implement a comprehensive monitoring setup: (1) add an event listener that logs all agent actions to a JSON file, (2) add a tool hook that measures execution time for each tool call, (3) add a checkpoint configuration that saves state every 5 tool calls (so the crew can resume after failure), (4) customize the system prompt for one agent using prompt overrides. Run a complex crew and analyze the logs.

Next in the CrewAI SDK Track

In Part 13: Observability & Telemetry, we’ll integrate crews with 15+ monitoring platforms including Langfuse, Datadog, Arize Phoenix, MLflow, and Weave for production-grade observability.