1. Customizing Prompts
CrewAI agents use default system prompts that define their behavior. For advanced use cases, you can override these prompts at both the agent and task level to fine-tune how agents reason, respond, and interact with tools.
1.1 Agent-Level Prompt Customization
Override the default system prompt with system_template to control the agent’s core behavior. Use the available template variables to inject context dynamically:
from crewai import Agent, Task, Crew, Process
# Agent with fully custom system prompt
custom_agent = Agent(
role="Technical Writer",
goal="Produce clear, concise technical documentation",
backstory="You are a senior technical writer with 15 years of experience.",
llm="gpt-4o",
system_template="""You are {role}.
Your goal: {goal}
Background: {backstory}
CUSTOM INSTRUCTIONS:
- Always use active voice
- Keep sentences under 25 words
- Use bullet points for lists of 3+ items
- Include code examples for every concept
- Never use jargon without defining it first
Available tools: {tools}
""",
verbose=True
)
doc_task = Task(
description="Write API documentation for a user authentication endpoint.",
expected_output="Complete API docs with examples, error codes, and usage patterns.",
agent=custom_agent
)
crew = Crew(
agents=[custom_agent],
tasks=[doc_task],
process=Process.sequential
)
result = crew.kickoff()
print(result.raw)
1.2 Task-Level Prompt Injection
Override how task instructions are formatted using prompt_template on the task:
from crewai import Agent, Task, Crew, Process
analyst = Agent(
role="Data Analyst",
goal="Provide accurate data analysis",
backstory="Expert statistician with domain knowledge.",
llm="gpt-4o"
)
# Task with custom prompt template
analysis_task = Task(
description="Analyze customer churn patterns for Q1 2026.",
expected_output="Statistical analysis with confidence intervals.",
agent=analyst,
prompt_template="""ANALYSIS REQUEST:
{task_description}
METHODOLOGY REQUIREMENTS:
- State hypotheses before testing
- Report confidence intervals for all metrics
- Flag any data quality issues found
- Provide both summary and detailed findings
EXPECTED DELIVERABLE:
{expected_output}
CONTEXT FROM PREVIOUS TASKS:
{context}
"""
)
crew = Crew(
agents=[analyst],
tasks=[analysis_task],
process=Process.sequential
)
result = crew.kickoff()
print(result.raw)
{role}, {goal}, {backstory}, {tools}, {task_description}, {expected_output}, and {context}. CrewAI replaces these at runtime with actual values.
2. Fingerprinting
2.1 Reproducibility and Caching
Fingerprinting generates a unique hash for each crew execution based on inputs, agent configurations, and task definitions. This enables cache hits for identical executions and helps identify duplicate runs:
from crewai import Agent, Task, Crew, Process
# Agents with deterministic configuration
researcher = Agent(
role="Market Researcher",
goal="Gather market intelligence",
backstory="Expert at finding market data.",
llm="gpt-4o"
)
research_task = Task(
description="Research cloud computing market size for {year}.",
expected_output="Market size report with sources.",
agent=researcher
)
# Crew with fingerprinting enabled for caching
crew = Crew(
agents=[researcher],
tasks=[research_task],
process=Process.sequential,
cache=True, # Enable result caching
verbose=True
)
# First run — executes fully
result1 = crew.kickoff(inputs={"year": "2026"})
print(f"First run: {result1.raw[:100]}...")
# Second run with same inputs — cache hit (fingerprint match)
result2 = crew.kickoff(inputs={"year": "2026"})
print(f"Second run (cached): {result2.raw[:100]}...")
# Different inputs — new execution (fingerprint mismatch)
result3 = crew.kickoff(inputs={"year": "2027"})
print(f"Third run (new): {result3.raw[:100]}...")
Fingerprinting also helps with debugging — you can trace which specific configuration produced a given output:
from crewai import Agent, Task, Crew, Process
import hashlib
import json
# Manual fingerprint generation for audit trails
def generate_crew_fingerprint(crew_config: dict) -> str:
"""Generate a deterministic fingerprint for crew configuration."""
config_str = json.dumps(crew_config, sort_keys=True)
return hashlib.sha256(config_str.encode()).hexdigest()[:16]
crew_config = {
"agents": ["Market Researcher"],
"tasks": ["Research cloud computing market"],
"process": "sequential",
"llm": "gpt-4o",
"inputs": {"year": "2026"}
}
fingerprint = generate_crew_fingerprint(crew_config)
print(f"Crew fingerprint: {fingerprint}")
print(f"Use this to track execution lineage in logs")
verbose or max_rpm does not affect the fingerprint.
3. Event Listeners
3.1 Listening to Agent, Task, and Crew Events
CrewAI emits events at key execution points. Register listeners to monitor progress, log metrics, or trigger external systems in real-time:
from crewai import Agent, Task, Crew, Process
from crewai.utilities.events import (
crewai_event_bus,
AgentExecutionStartedEvent,
AgentExecutionCompletedEvent,
TaskExecutionStartedEvent,
TaskExecutionCompletedEvent,
CrewKickoffStartedEvent,
CrewKickoffCompletedEvent,
ToolUsageStartedEvent,
ToolUsageCompletedEvent
)
import time
# Custom event handler class
class ExecutionMonitor:
def __init__(self):
self.start_time = None
self.events_log = []
def on_crew_start(self, source, event: CrewKickoffStartedEvent):
self.start_time = time.time()
print(f"[CREW START] Crew execution initiated")
self.events_log.append(("crew_start", time.time()))
def on_crew_complete(self, source, event: CrewKickoffCompletedEvent):
duration = time.time() - self.start_time
print(f"[CREW COMPLETE] Total duration: {duration:.2f}s")
self.events_log.append(("crew_complete", time.time()))
def on_agent_start(self, source, event: AgentExecutionStartedEvent):
print(f"[AGENT START] Agent: {event.agent.role}")
self.events_log.append(("agent_start", event.agent.role))
def on_task_complete(self, source, event: TaskExecutionCompletedEvent):
print(f"[TASK COMPLETE] Output length: {len(event.output.raw)} chars")
self.events_log.append(("task_complete", len(event.output.raw)))
def on_tool_usage(self, source, event: ToolUsageStartedEvent):
print(f"[TOOL CALL] Tool: {event.tool_name}")
self.events_log.append(("tool_call", event.tool_name))
# Register event listeners
monitor = ExecutionMonitor()
crewai_event_bus.on(CrewKickoffStartedEvent, monitor.on_crew_start)
crewai_event_bus.on(CrewKickoffCompletedEvent, monitor.on_crew_complete)
crewai_event_bus.on(AgentExecutionStartedEvent, monitor.on_agent_start)
crewai_event_bus.on(TaskExecutionCompletedEvent, monitor.on_task_complete)
crewai_event_bus.on(ToolUsageStartedEvent, monitor.on_tool_usage)
# Run crew with monitoring
agent = Agent(role="Writer", goal="Write content", backstory="Expert writer.", llm="gpt-4o")
task = Task(description="Write a haiku about AI.", expected_output="A haiku poem.", agent=agent)
crew = Crew(agents=[agent], tasks=[task], process=Process.sequential)
result = crew.kickoff()
print(f"\nResult: {result.raw}")
print(f"\nEvents logged: {len(monitor.events_log)}")
Fault-Tolerant Document Processing
A legal firm processes 500-page contracts with CrewAI. Checkpointing saves state every 10 pages, so if the crew fails at page 300, it resumes from page 290 instead of starting over. Event listeners alert the team when processing takes longer than expected. Hooks inject page numbers into tool calls for precise audit trails. Result: zero lost work from transient failures.
4. Checkpointing
4.1 Saving Execution State for Fault Tolerance
Checkpointing saves crew execution state at task boundaries, enabling recovery from failures without re-running completed tasks:
from crewai import Agent, Task, Crew, Process
import json
import os
# Configure checkpoint storage
CHECKPOINT_DIR = "./checkpoints"
os.makedirs(CHECKPOINT_DIR, exist_ok=True)
research_agent = Agent(
role="Researcher",
goal="Research topics thoroughly",
backstory="Meticulous researcher.",
llm="gpt-4o"
)
writer_agent = Agent(
role="Writer",
goal="Write polished articles",
backstory="Published author.",
llm="gpt-4o"
)
# Long-running multi-task crew
task1 = Task(
description="Research the history of quantum computing.",
expected_output="Comprehensive research notes with timeline.",
agent=research_agent
)
task2 = Task(
description="Write an article based on the research.",
expected_output="Polished 1000-word article.",
agent=writer_agent,
context=[task1]
)
task3 = Task(
description="Create an executive summary of the article.",
expected_output="3-paragraph executive summary.",
agent=writer_agent,
context=[task2]
)
# Crew with checkpointing
crew = Crew(
agents=[research_agent, writer_agent],
tasks=[task1, task2, task3],
process=Process.sequential,
verbose=True
)
# Run with fault tolerance
try:
result = crew.kickoff()
print(f"Completed successfully: {result.raw[:200]}...")
except Exception as e:
print(f"Failed at checkpoint. Error: {e}")
print("Resume from last successful task on retry.")
Implement custom checkpoint persistence for production systems:
import json
import os
from datetime import datetime
class CheckpointManager:
"""Custom checkpoint manager for crew execution state."""
def __init__(self, checkpoint_dir: str = "./checkpoints"):
self.checkpoint_dir = checkpoint_dir
os.makedirs(checkpoint_dir, exist_ok=True)
def save_checkpoint(self, crew_id: str, task_index: int, state: dict):
"""Save execution state after task completion."""
checkpoint = {
"crew_id": crew_id,
"task_index": task_index,
"timestamp": datetime.now().isoformat(),
"state": state
}
filepath = os.path.join(self.checkpoint_dir, f"{crew_id}_checkpoint.json")
with open(filepath, "w") as f:
json.dump(checkpoint, f, indent=2)
print(f"Checkpoint saved: task {task_index} at {checkpoint['timestamp']}")
def load_checkpoint(self, crew_id: str) -> dict:
"""Load last checkpoint for a crew."""
filepath = os.path.join(self.checkpoint_dir, f"{crew_id}_checkpoint.json")
if os.path.exists(filepath):
with open(filepath, "r") as f:
return json.load(f)
return None
def get_resume_point(self, crew_id: str) -> int:
"""Get task index to resume from."""
checkpoint = self.load_checkpoint(crew_id)
if checkpoint:
print(f"Resuming from task {checkpoint['task_index'] + 1}")
return checkpoint["task_index"] + 1
return 0
# Usage
manager = CheckpointManager()
manager.save_checkpoint("research-crew-001", 0, {"output": "Research complete"})
resume_from = manager.get_resume_point("research-crew-001")
print(f"Resume from task index: {resume_from}")
5. Execution Hooks
Execution hooks intercept LLM calls and tool usage at the framework level. They provide fine-grained control over what happens before and after each model call or tool invocation.
5.1 LLM Call Hooks
Intercept every LLM call to add logging, modify prompts, enforce rate limits, or implement cost tracking:
from crewai import Agent, Task, Crew, Process
from crewai.utilities.events import (
crewai_event_bus,
LLMCallStartedEvent,
LLMCallCompletedEvent
)
import time
class LLMCallTracker:
"""Track and log all LLM calls for cost analysis."""
def __init__(self):
self.calls = []
self.total_tokens = 0
self.total_cost = 0.0
def before_llm_call(self, source, event: LLMCallStartedEvent):
"""Hook: runs before every LLM API call."""
call_info = {
"timestamp": time.time(),
"model": event.model if hasattr(event, "model") else "unknown",
"agent": event.agent.role if hasattr(event, "agent") else "unknown"
}
self.calls.append(call_info)
print(f" [LLM] Calling model for agent: {call_info['agent']}")
def after_llm_call(self, source, event: LLMCallCompletedEvent):
"""Hook: runs after every LLM API call."""
if hasattr(event, "token_usage"):
tokens = event.token_usage
self.total_tokens += tokens.get("total_tokens", 0)
# Approximate cost (GPT-4o pricing)
input_cost = tokens.get("prompt_tokens", 0) * 0.0000025
output_cost = tokens.get("completion_tokens", 0) * 0.00001
self.total_cost += input_cost + output_cost
print(f" [LLM] Complete. Running total: {self.total_tokens} tokens, ${self.total_cost:.4f}")
def get_summary(self):
return {
"total_calls": len(self.calls),
"total_tokens": self.total_tokens,
"total_cost": f"${self.total_cost:.4f}"
}
# Register LLM hooks
tracker = LLMCallTracker()
crewai_event_bus.on(LLMCallStartedEvent, tracker.before_llm_call)
crewai_event_bus.on(LLMCallCompletedEvent, tracker.after_llm_call)
# Run crew with LLM tracking
agent = Agent(role="Analyst", goal="Analyze data", backstory="Expert.", llm="gpt-4o")
task = Task(description="Summarize AI trends in 2026.", expected_output="Brief summary.", agent=agent)
crew = Crew(agents=[agent], tasks=[task], process=Process.sequential)
result = crew.kickoff()
print(f"\nLLM Usage Summary: {tracker.get_summary()}")
5.2 Tool Call Hooks
Intercept tool invocations to validate inputs, sanitize outputs, enforce permissions, or implement circuit breakers:
from crewai import Agent, Task, Crew, Process
from crewai.utilities.events import (
crewai_event_bus,
ToolUsageStartedEvent,
ToolUsageCompletedEvent,
ToolUsageErrorEvent
)
import time
class ToolGuard:
"""Security and monitoring layer for tool invocations."""
def __init__(self, blocked_tools=None, max_calls_per_tool=10):
self.blocked_tools = blocked_tools or []
self.max_calls = max_calls_per_tool
self.call_counts = {}
self.errors = []
def before_tool_call(self, source, event: ToolUsageStartedEvent):
"""Validate tool call before execution."""
tool_name = event.tool_name
# Check blocked list
if tool_name in self.blocked_tools:
print(f" [BLOCKED] Tool '{tool_name}' is not permitted")
return
# Rate limiting
self.call_counts[tool_name] = self.call_counts.get(tool_name, 0) + 1
if self.call_counts[tool_name] > self.max_calls:
print(f" [RATE LIMIT] Tool '{tool_name}' exceeded {self.max_calls} calls")
return
print(f" [TOOL] Executing: {tool_name} (call #{self.call_counts[tool_name]})")
def after_tool_call(self, source, event: ToolUsageCompletedEvent):
"""Log tool completion."""
print(f" [TOOL] Completed: {event.tool_name}")
def on_tool_error(self, source, event: ToolUsageErrorEvent):
"""Handle tool errors."""
self.errors.append({"tool": event.tool_name, "error": str(event.error)})
print(f" [TOOL ERROR] {event.tool_name}: {event.error}")
# Register tool hooks
guard = ToolGuard(blocked_tools=["dangerous_tool"], max_calls_per_tool=5)
crewai_event_bus.on(ToolUsageStartedEvent, guard.before_tool_call)
crewai_event_bus.on(ToolUsageCompletedEvent, guard.after_tool_call)
crewai_event_bus.on(ToolUsageErrorEvent, guard.on_tool_error)
print("Tool guard registered. Blocked tools:", guard.blocked_tools)
print("Max calls per tool:", guard.max_calls)
5.3 Using Annotations in crew.py
CrewAI projects created with crewai create crew use decorator annotations in crew.py for hook registration. This provides a clean, declarative approach:
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
from crewai.project import before_kickoff, after_kickoff
@CrewBase
class ResearchCrew:
"""Research crew with lifecycle hooks via annotations."""
agents_config = "config/agents.yaml"
tasks_config = "config/tasks.yaml"
@before_kickoff
def prepare_inputs(self, inputs):
"""Hook: runs before crew kickoff. Modify inputs here."""
inputs["timestamp"] = "2026-05-24"
inputs["version"] = "1.0"
print(f"Preparing crew with inputs: {inputs}")
return inputs
@after_kickoff
def process_output(self, output):
"""Hook: runs after crew completes. Post-process results."""
print(f"Crew completed. Output length: {len(output.raw)} chars")
# Could send to webhook, save to DB, trigger notifications
return output
@agent
def researcher(self) -> Agent:
return Agent(
role="Senior Researcher",
goal="Find comprehensive information on the topic",
backstory="Experienced research analyst.",
llm="gpt-4o",
verbose=True
)
@task
def research_task(self) -> Task:
return Task(
description="Research {topic} thoroughly.",
expected_output="Detailed research report.",
agent=self.researcher()
)
@crew
def crew(self) -> Crew:
return Crew(
agents=self.agents,
tasks=self.tasks,
process=Process.sequential,
verbose=True
)
@before_kickoff runs before any agent executes and can modify inputs. @after_kickoff runs after all tasks complete and receives the final output. Both are synchronous — long-running operations in hooks will delay execution.
Next in the CrewAI SDK Track
In Part 13: Observability & Telemetry, we’ll integrate crews with 15+ monitoring platforms including Langfuse, Datadog, Arize Phoenix, MLflow, and Weave for production-grade observability.