1. Hooks System
Hooks are deterministic interception points that fire before or after tool execution in the agentic loop. Unlike prompt-based guidance (which is probabilistic), hooks run as code and provide 100% enforcement guarantees. They are the primary mechanism for compliance, audit logging, cost controls, and human-in-the-loop gates.
flowchart LR
A["Claude emits tool_use"] --> B["PreToolUse Hook"]
B -->|"allow"| C["Execute Tool"]
B -->|"block"| D["Return error to Claude"]
C --> E["PostToolUse Hook"]
E -->|"pass-through"| F["Return result to Claude"]
E -->|"modify"| G["Return modified result"]
E -->|"notify"| H["Side effect + pass result"]
1.1 PreToolUse Hooks
PreToolUse hooks fire before a tool is executed. They can block execution, modify inputs, or add validation. Use these for permission checks, input sanitization, and spend limits:
import anthropic
import json
from typing import Optional
client = anthropic.Anthropic()
def execute_tool(name: str, input_data: dict) -> dict:
"""Stub tool executor."""
return {"status": "ok", "tool": name, "result": "mock data"}
class PreToolUseHook:
"""PreToolUse hook — fires before tool execution. Can block or modify."""
def __init__(self, spend_limit: float = 100.0):
self.total_spend = 0.0
self.spend_limit = spend_limit
self.blocked_tools = set()
def __call__(self, tool_name: str, tool_input: dict) -> Optional[dict]:
"""
Returns None to allow execution.
Returns dict with 'error' key to block execution.
Returns dict with 'modified_input' to allow with changes.
"""
# Block: tool is on deny list
if tool_name in self.blocked_tools:
return {"error": f"Tool '{tool_name}' is blocked by policy"}
# Block: spend limit exceeded
if tool_name == "process_payment":
amount = tool_input.get("amount", 0)
if self.total_spend + amount > self.spend_limit:
return {"error": f"Spend limit exceeded (${self.total_spend:.2f}/{self.spend_limit:.2f})"}
self.total_spend += amount
# Modify: sanitize inputs (strip PII from search queries)
if tool_name == "web_search":
import re
sanitized = re.sub(r'\b[\w.+-]+@[\w-]+\.[\w.-]+\b', '[EMAIL]', tool_input.get("query", ""))
if sanitized != tool_input.get("query"):
return {"modified_input": {**tool_input, "query": sanitized}}
# Allow: no issues
return None
# Usage in the agentic loop
pre_hook = PreToolUseHook(spend_limit=50.0)
def loop_with_hooks(messages, tools):
while True:
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=4096, tools=tools, messages=messages
)
if response.stop_reason == "end_turn":
return "\n".join(b.text for b in response.content if b.type == "text")
if response.stop_reason == "tool_use":
messages.append({"role": "assistant", "content": response.content})
tool_results = []
for block in response.content:
if block.type == "tool_use":
# Fire PreToolUse hook
hook_result = pre_hook(block.name, block.input)
if hook_result and "error" in hook_result:
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": json.dumps(hook_result),
"is_error": True
})
else:
# Use modified input if hook provided one
input_data = hook_result.get("modified_input", block.input) if hook_result else block.input
result = execute_tool(block.name, input_data)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": json.dumps(result)
})
messages.append({"role": "user", "content": tool_results})
# Demo: test hook behavior
hook = PreToolUseHook(spend_limit=50.0)
hook.blocked_tools.add("delete_database")
print("Test 1 - Blocked tool:", hook("delete_database", {}))
print("Test 2 - Over limit:", hook("process_payment", {"amount": 999}))
print("Test 3 - Allowed:", hook("web_search", {"query": "python tutorial"}))
print("Test 4 - PII sanitized:", hook("web_search", {"query": "info about user@test.com"}))
1.2 PostToolUse Hooks
PostToolUse hooks fire after a tool has been executed. They can modify results before Claude sees them, trigger side effects (logging, notifications), or inject additional context. These are essential for audit trails and result enrichment:
import json
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
class PostToolUseHook:
"""PostToolUse hook — fires after tool execution. Can modify results or trigger side effects."""
def __init__(self):
self.audit_log = []
def __call__(self, tool_name: str, tool_input: dict, tool_result: dict) -> dict:
"""
Returns the (possibly modified) result to pass back to Claude.
Side effects (logging, notifications) happen here.
"""
# Audit logging — every tool call gets recorded
self.audit_log.append({
"timestamp": datetime.utcnow().isoformat(),
"tool": tool_name,
"input": tool_input,
"result_preview": str(tool_result)[:200]
})
# Redact sensitive data from results before Claude sees them
if tool_name == "get_customer":
if "ssn" in tool_result:
tool_result["ssn"] = "***-**-" + tool_result["ssn"][-4:]
if "credit_card" in tool_result:
tool_result["credit_card"] = "****" + tool_result["credit_card"][-4:]
# Enrich: add metadata Claude can use for decision-making
if tool_name == "process_refund":
tool_result["_meta"] = {
"processed_at": datetime.utcnow().isoformat(),
"compliance_note": "Refund logged for quarterly audit"
}
# Notification: alert on high-value operations
if tool_name == "process_refund" and tool_result.get("amount", 0) > 100:
logger.warning(f"High-value refund: ${tool_result['amount']} for {tool_input.get('customer_id')}")
return tool_result
1.3 Notification Hooks
Notification hooks are a specialized pattern where the hook triggers an external side effect (Slack message, PagerDuty alert, email) without blocking the agent loop. The correct SDK pattern uses {"async_": True} in the return value — this signals the SDK to let the agent proceed immediately while the background task finishes:
import asyncio
import json
# Fire-and-forget notification hooks — async_ return lets the agent continue immediately
async def notify_large_refund(input_data, tool_use_id, context):
"""PostToolUse hook: alert on refunds over $200, non-blocking."""
result = json.loads(input_data.get("tool_result", "{}")) if isinstance(input_data.get("tool_result"), str) else {}
amount = result.get("amount", 0)
customer_id = input_data.get("tool_input", {}).get("customer_id", "unknown")
if amount > 200:
# Start a background task — agent does NOT wait for it
asyncio.create_task(send_slack_message(
channel="#refunds",
text=f"Large refund: ${amount} for customer {customer_id}"
))
# async_=True tells SDK to proceed without waiting for this hook
return {"async_": True, "asyncTimeout": 30000}
# No notification needed — allow normally
return {}
async def notify_escalation(input_data, tool_use_id, context):
"""PostToolUse hook: always notify when escalation happens."""
asyncio.create_task(send_slack_message(
channel="#support-escalations",
text=f"Escalation: {input_data.get('tool_input', {}).get('reason', '')} — "
f"Customer: {input_data.get('tool_input', {}).get('customer_id', '')}"
))
return {"async_": True, "asyncTimeout": 30000}
# Register with Agent SDK:
# options = ClaudeAgentOptions(
# hooks={
# "PostToolUse": [
# HookMatcher(matcher="process_refund", hooks=[notify_large_refund]),
# HookMatcher(matcher="escalate_to_human", hooks=[notify_escalation]),
# ]
# }
# )
#
# Key rule: use async_=True (Python) for side-effect-only hooks.
# Async hooks CANNOT block, modify inputs, or inject context — the agent
# has already moved on by the time they finish.
PreToolUse can block/modify inputs, (3) PostToolUse can modify results/inject context, (4) notification hooks fire asynchronously using {"async_": True} to avoid blocking the loop.
1.4 Claude Agent SDK Hooks (Production API)
The Claude Agent SDK (claude_agent_sdk) provides a built-in hooks system using HookMatcher and callback functions. This is the production approach — hooks are registered in your ClaudeAgentOptions and the SDK fires them automatically during the agent loop. No manual loop wiring needed.
Key differences from the manual approach (Sections 1.1–1.3 above):
| Aspect | Manual (Client SDK) | Agent SDK Hooks |
|---|---|---|
| Registration | You call hooks in your loop | Registered via options.hooks |
| Firing | You decide when to call | SDK fires automatically on events |
| Matchers | You write if/else conditions | HookMatcher(matcher="Write|Edit") |
| Return value | Custom dict interpretation | Structured: permissionDecision, decision |
| Hook types | Pre/Post only | PreToolUse, PostToolUse, Stop, SubagentStart/Stop, PreCompact, UserPromptSubmit |
# Agent SDK Hooks — Production Pattern
# Requires: pip install claude-agent-sdk
# Docs: https://code.claude.com/docs/en/agent-sdk/hooks
import asyncio
from claude_agent_sdk import (
query,
ClaudeAgentOptions,
HookMatcher,
AssistantMessage,
ResultMessage,
)
# --- Hook callback: protect .env files from edits ---
async def protect_env_files(input_data, tool_use_id, context):
"""PreToolUse hook — blocks Write/Edit to .env files."""
file_path = input_data["tool_input"].get("file_path", "")
file_name = file_path.split("/")[-1]
if file_name == ".env":
return {
"hookSpecificOutput": {
"hookEventName": input_data["hook_event_name"],
# "deny" cancels the tool call — Claude receives the reason
"permissionDecision": "deny",
"permissionDecisionReason": "Cannot modify .env files — policy enforced",
}
}
# Return empty dict to allow the operation
return {}
# --- Hook callback: audit log every tool call ---
async def audit_log_all_tools(input_data, tool_use_id, context):
"""PostToolUse hook — logs tool name and duration for compliance."""
print(f"[AUDIT] Tool: {input_data.get('tool_name')} | ID: {tool_use_id}")
# In production: write to database, send to SIEM, etc.
return {}
# --- Hook callback: validate before stop ---
async def validate_on_stop(input_data, tool_use_id, context):
"""Stop hook — runs when the agent finishes. Validate the output."""
# Check if result meets criteria, trigger alerts, save state
print("[STOP] Agent finished — validating result quality")
return {}
async def main():
options = ClaudeAgentOptions(
allowed_tools=["Read", "Write", "Edit", "Bash", "Grep", "Glob"],
hooks={
# PreToolUse: matcher filters to only Write and Edit tools
"PreToolUse": [
HookMatcher(matcher="Write|Edit", hooks=[protect_env_files])
],
# PostToolUse: no matcher = fires for ALL tool calls
"PostToolUse": [
HookMatcher(hooks=[audit_log_all_tools])
],
# Stop: fires when agent completes
"Stop": [
HookMatcher(hooks=[validate_on_stop])
],
},
)
async for message in query(
prompt="Update the database configuration in config.py",
options=options,
):
if isinstance(message, AssistantMessage):
for block in message.content:
if hasattr(block, "text"):
print(block.text)
if isinstance(message, ResultMessage):
print(f"\n[Done: {message.subtype}, cost: ${message.total_cost_usd:.4f}]")
asyncio.run(main())
All supported hook events:
| Event | Matcher Input | When It Fires | Key Decisions |
|---|---|---|---|
PreToolUse | Tool name | Before a tool executes | allow, deny, ask |
PostToolUse | Tool name | After a tool returns | block (reject result) |
UserPromptSubmit | — | When a prompt is sent | Inject additionalContext |
Stop | — | When the agent finishes | Validate, save state |
SubagentStart | — | When a subagent spawns | Track parallel tasks |
SubagentStop | — | When a subagent completes | Aggregate results |
PreCompact | — | Before context compaction | Archive full transcript |
PreToolUse hook that denies a tool call is enforcement — it fires every time, deterministically. An instruction in CLAUDE.md saying “never edit .env files” is a request — Claude may still attempt it. If a rule must hold every time, make it a hook.
2. Session Management
Sessions allow agents to persist conversation state across invocations. In Claude Code, sessions maintain the full message history so an agent can be resumed later without replaying the entire conversation.
2.1 Application-Level Session Labels
The SDK itself resumes by session ID or the most recent local session. If you want human-readable labels like review-pr-42 or customer-123, keep that mapping in your own application state and resolve the label to a captured session ID before calling resume:
import json
import os
from pathlib import Path
class SessionStore:
"""Simple file-based session persistence for agent conversations."""
def __init__(self, storage_dir: str = ".sessions"):
self.storage_dir = Path(storage_dir)
self.storage_dir.mkdir(exist_ok=True)
def save(self, session_id: str, messages: list, metadata: dict = None):
"""Persist a session with its full message history."""
session_data = {
"session_id": session_id,
"messages": messages,
"metadata": metadata or {},
"message_count": len(messages)
}
path = self.storage_dir / f"{session_id}.json"
path.write_text(json.dumps(session_data, indent=2, default=str))
def load(self, session_id: str) -> dict:
"""Load a previously saved session."""
path = self.storage_dir / f"{session_id}.json"
if not path.exists():
return None
return json.loads(path.read_text())
def list_sessions(self) -> list:
"""List all saved session IDs."""
return [p.stem for p in self.storage_dir.glob("*.json")]
# Usage in a host application that maps human labels to Claude session IDs
store = SessionStore()
def resume_or_start_session(session_id: str, new_message: str = None) -> list:
"""Resume an existing session transcript or start a new one."""
existing = store.load(session_id)
if existing:
messages = existing["messages"]
if new_message:
messages.append({"role": "user", "content": new_message})
else:
messages = []
if new_message:
messages.append({"role": "user", "content": new_message})
return messages
2.2 Session Resumption Patterns
When resuming a session, the agent continues from where it left off. In a custom host-managed pattern, that means reloading the stored transcript and making the next API call with the complete context:
import anthropic
import json
client = anthropic.Anthropic()
store = SessionStore()
def agent_with_sessions(session_id: str, user_message: str, tools: list, system: str) -> str:
"""Agent that persists state across invocations via stored session transcripts."""
# Resume or start fresh
messages = resume_or_start_session(session_id, user_message)
# Run the agentic loop
while True:
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=4096,
system=system,
tools=tools,
messages=messages
)
if response.stop_reason == "end_turn":
# Append final assistant message and persist
final_text = "\n".join(b.text for b in response.content if b.type == "text")
messages.append({"role": "assistant", "content": response.content})
store.save(session_id, messages, metadata={"last_stop_reason": "end_turn"})
return final_text
if response.stop_reason == "tool_use":
messages.append({"role": "assistant", "content": response.content})
tool_results = []
for block in response.content:
if block.type == "tool_use":
result = execute_tool(block.name, block.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": json.dumps(result)
})
messages.append({"role": "user", "content": tool_results})
# Persist after every tool round (crash recovery)
store.save(session_id, messages, metadata={"last_stop_reason": "tool_use"})
# Multi-turn usage:
# Day 1: agent_with_sessions("review-pr-42", "Review this PR for security issues", ...)
# Day 2: agent_with_sessions("review-pr-42", "Now check the database migrations too", ...)
# The agent has full context from Day 1 when processing Day 2's request
2.3 fork_session for Parallel Exploration
The fork_session pattern creates a copy of the current session state, allowing the agent to explore an alternative approach without losing the original conversation path. This is used in Claude Code for speculative edits and “what if” explorations:
import anthropic
import json
import copy
client = anthropic.Anthropic()
store = SessionStore()
def fork_session(original_id: str, fork_id: str, fork_message: str = None) -> list:
"""Create a fork of an existing session for parallel exploration."""
original = store.load(original_id)
if not original:
raise ValueError(f"Session '{original_id}' not found")
# Deep copy to prevent shared references
forked_messages = copy.deepcopy(original["messages"])
if fork_message:
forked_messages.append({"role": "user", "content": fork_message})
# Save the fork as its own session
store.save(fork_id, forked_messages, metadata={
"forked_from": original_id,
"fork_point": len(original["messages"])
})
return forked_messages
def explore_alternatives(session_id: str, approaches: list, tools: list, system: str) -> dict:
"""Fork a session into multiple branches to explore different approaches."""
results = {}
for i, approach in enumerate(approaches):
fork_id = f"{session_id}--fork-{i}"
messages = fork_session(session_id, fork_id, approach["message"])
# Run each fork independently
result = run_agent_with_messages(messages, tools, system)
results[approach["label"]] = {
"fork_id": fork_id,
"result": result
}
return results
# Example: exploring two refactoring approaches
alternatives = explore_alternatives(
session_id="refactor-auth",
approaches=[
{"label": "approach_a", "message": "Refactor using the Strategy pattern."},
{"label": "approach_b", "message": "Refactor using dependency injection."}
],
tools=code_tools,
system="You are a senior engineer. Implement the refactoring."
)
# Both forks preserve the original context but explore different paths
Compliance-Auditable Financial Agent
A fintech company uses hooks to create audit trails for every action their trading agent takes. Every tool call is logged with timestamps, the hook validates trades against compliance rules before execution, and sessions allow traders to resume conversations across sessions. The system passed SOC 2 audit requirements by demonstrating deterministic enforcement of trading limits via PreToolUse hooks.
2.4 Claude Agent SDK Sessions (ClaudeSDKClient)
The Claude Agent SDK provides built-in session management via ClaudeSDKClient (Python) and continue: true (TypeScript). The SDK automatically tracks sessions, writes them to disk, and supports continue, resume, and fork operations — no manual file I/O needed.
~/.claude/projects/<encoded-cwd>/<session-id>.jsonl on macOS/Linux. On Windows, the equivalent location is typically %USERPROFILE%\.claude\projects\<encoded-cwd>\<session-id>.jsonl. The <encoded-cwd> portion is derived from the current working directory, which is why resume expects the same project path unless you use a SessionStore adapter.
# Agent SDK Session Management — ClaudeSDKClient
# Requires: pip install claude-agent-sdk
# Docs: https://code.claude.com/docs/en/agent-sdk/sessions
import asyncio
from claude_agent_sdk import (
ClaudeSDKClient,
ClaudeAgentOptions,
AssistantMessage,
ResultMessage,
TextBlock,
)
def print_response(message):
"""Print only the human-readable parts of a message."""
if isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock):
print(block.text)
elif isinstance(message, ResultMessage):
cost = f"${message.total_cost_usd:.4f}" if message.total_cost_usd else "N/A"
print(f"[done: {message.subtype}, cost: {cost}]")
async def multi_turn_session():
"""ClaudeSDKClient automatically manages sessions across queries."""
options = ClaudeAgentOptions(
allowed_tools=["Read", "Edit", "Glob", "Grep"],
)
async with ClaudeSDKClient(options=options) as client:
# First query: client captures session ID internally
await client.query("Analyze the auth module for security issues")
async for message in client.receive_response():
print_response(message)
# Second query: AUTOMATICALLY continues the same session
# No session ID passing needed — client tracks it
await client.query("Now refactor it to use JWT tokens")
async for message in client.receive_response():
print_response(message)
# The agent has full context from the first query
# Session transcript path on disk:
# ~/.claude/projects//.jsonl
# Windows equivalent:
# %USERPROFILE%\.claude\projects\\.jsonl
asyncio.run(multi_turn_session())
Session operations (options on query()):
# Continue, Resume, Fork — Three Ways to Pick Up a Session
# Docs: https://code.claude.com/docs/en/agent-sdk/sessions
import asyncio
from claude_agent_sdk import query, ClaudeAgentOptions, ResultMessage
async def session_operations():
# --- CONTINUE: picks up the most recent session in current directory ---
# Best for: single-conversation apps, no ID tracking needed
async for msg in query(
prompt="What files did you edit earlier?",
options=ClaudeAgentOptions(
continue_conversation=True, # Finds most recent session on disk
allowed_tools=["Read", "Glob"],
),
):
if isinstance(msg, ResultMessage):
session_id = msg.session_id # Capture for later
print(f"Session: {session_id}")
# --- RESUME: picks up a SPECIFIC session by ID ---
# Best for: multi-user apps, returning to a specific stored session
async for msg in query(
prompt="Continue the analysis from yesterday",
options=ClaudeAgentOptions(
resume=session_id, # Explicit session ID
allowed_tools=["Read", "Grep"],
),
):
if isinstance(msg, ResultMessage):
print(msg.result)
# --- FORK: creates a NEW session copying history from the original ---
# Original stays unchanged. Use for exploring alternatives.
# fork_session=True combined with resume= branches from that session.
async for msg in query(
prompt="Try a completely different approach using microservices",
options=ClaudeAgentOptions(
resume=session_id, # Start from this session's history
fork_session=True, # Create a new independent branch
allowed_tools=["Read", "Edit", "Bash"],
),
):
if isinstance(msg, ResultMessage):
forked_id = msg.session_id # New fork ID — distinct from session_id
print(f"Forked session: {forked_id} (original unchanged)")
asyncio.run(session_operations())
config.py in a previous session, then resume gives Claude memory that the edit happened, why it happened, and what tool results were returned — but it does not restore the old file contents, re-apply the edit, or roll the workspace back to that moment in time. Claude resumes against the current filesystem state on disk, which means resumed sessions can contain stale assumptions if a human, CI job, or another agent changed files between runs. The same rule applies to fork_session: it branches the conversation history, not the files themselves. If you need true file rollback or point-in-time restore semantics, use file checkpointing and treat session resume/fork as conversation-state features rather than filesystem-state features.
3. State Management
3.1 Conversation State Tracking
Beyond raw message persistence, agents often need to track derived state — verified prerequisites, accumulated findings, cost counters, and workflow positions. This state lives outside the message array and guides hook behavior:
from dataclasses import dataclass, field
from typing import Any
@dataclass
class AgentConversationState:
"""Track derived state across the agentic loop."""
# Prerequisite tracking
verified_customer_id: str = None
confirmed_order_ids: list = field(default_factory=list)
# Accumulated context
findings: list = field(default_factory=list)
tool_call_count: int = 0
# Cost tracking
total_input_tokens: int = 0
total_output_tokens: int = 0
total_tool_calls: int = 0
# Workflow position
current_phase: str = "initial"
completed_phases: list = field(default_factory=list)
def update_from_response(self, response):
"""Update state after each API response."""
self.total_input_tokens += response.usage.input_tokens
self.total_output_tokens += response.usage.output_tokens
tool_blocks = [b for b in response.content if b.type == "tool_use"]
self.total_tool_calls += len(tool_blocks)
self.tool_call_count += len(tool_blocks)
def update_from_tool_result(self, tool_name: str, result: dict):
"""Update state based on tool results."""
if tool_name == "get_customer" and result.get("verified"):
self.verified_customer_id = result["customer_id"]
elif tool_name == "lookup_order":
self.confirmed_order_ids.append(result.get("order_id"))
def to_dict(self) -> dict:
"""Serialize for session persistence."""
return {
"verified_customer_id": self.verified_customer_id,
"confirmed_order_ids": self.confirmed_order_ids,
"findings": self.findings,
"tool_call_count": self.tool_call_count,
"total_tokens": self.total_input_tokens + self.total_output_tokens,
"current_phase": self.current_phase
}
3.2 Persistence Patterns
For production systems, combine message-level persistence (for session resumption) with state-level persistence (for hooks and enforcement). Save after every tool execution round to enable crash recovery:
import anthropic
import json
client = anthropic.Anthropic()
def production_agent_loop(session_id: str, user_message: str, tools: list, system: str) -> str:
"""Production agent with full persistence and hooks."""
# Initialize or resume state
session = store.load(session_id)
if session:
messages = session["messages"]
state = AgentConversationState(**session.get("state", {}))
messages.append({"role": "user", "content": user_message})
else:
messages = [{"role": "user", "content": user_message}]
state = AgentConversationState()
pre_hook = PreToolUseHook(spend_limit=100.0)
post_hook = PostToolUseHook()
while True:
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=4096,
system=system,
tools=tools,
messages=messages
)
state.update_from_response(response)
if response.stop_reason == "end_turn":
messages.append({"role": "assistant", "content": response.content})
store.save(session_id, messages, metadata={"state": state.to_dict()})
return "\n".join(b.text for b in response.content if b.type == "text")
if response.stop_reason == "tool_use":
messages.append({"role": "assistant", "content": response.content})
tool_results = []
for block in response.content:
if block.type == "tool_use":
# PreToolUse hook
pre_result = pre_hook(block.name, block.input)
if pre_result and "error" in pre_result:
tool_results.append({
"type": "tool_result", "tool_use_id": block.id,
"content": json.dumps(pre_result), "is_error": True
})
continue
# Execute
input_data = pre_result.get("modified_input", block.input) if pre_result else block.input
result = execute_tool(block.name, input_data)
# PostToolUse hook
result = post_hook(block.name, input_data, result)
# Update state
state.update_from_tool_result(block.name, result)
tool_results.append({
"type": "tool_result", "tool_use_id": block.id,
"content": json.dumps(result)
})
messages.append({"role": "user", "content": tool_results})
# Persist after every tool round (crash recovery)
store.save(session_id, messages, metadata={"state": state.to_dict()})
4. Session Utilities & Cross-Host Persistence
The Agent SDK writes session transcripts to disk at ~/.claude/projects/<encoded-cwd>/<session-id>.jsonl. Beyond the resume, fork_session, and ClaudeSDKClient patterns from Section 2, the SDK exposes utility functions for enumerating, reading, tagging, and renaming sessions — plus a SessionStore adapter for persisting transcripts to external storage so sessions can be resumed across machines.
4.1 Session Disk Utilities
# Agent SDK session utility functions
# Requires: pip install claude-agent-sdk
# Docs: https://code.claude.com/docs/en/agent-sdk/sessions
import asyncio
from claude_agent_sdk import (
list_sessions, # Enumerate session files on disk
get_session_messages, # Read a session's full transcript
get_session_info, # Get metadata about a session
rename_session, # Give a session a human-readable title
tag_session, # Add searchable tags to a session
ResultMessage,
)
# --- LIST all sessions in the current directory ---
sessions = list_sessions()
for s in sessions:
print(f"{s.session_id}: {s.title or 'untitled'} ({s.message_count} messages)")
# --- READ a session's transcript for replay or analysis ---
messages = get_session_messages(session_id="sess_01XYZ...")
for msg in messages:
print(f"[{msg.role}] {str(msg.content)[:100]}")
# --- GET metadata without loading the full transcript ---
info = get_session_info(session_id="sess_01XYZ...")
print(f"Title: {info.title}, Tags: {info.tags}, Messages: {info.message_count}")
# --- RENAME for human-readable session pickers ---
rename_session(session_id="sess_01XYZ...", title="Auth module JWT refactor — May 29")
# --- TAG to organize by project, feature, or status ---
tag_session(session_id="sess_01XYZ...", tags=["auth", "jwt", "in-review"])
# Use case: build a session picker UI
def show_session_menu():
sessions = list_sessions()
print("Recent sessions:")
for i, s in enumerate(sessions[:10], 1):
title = s.title or "untitled"
print(f" {i}. {title} (id: {s.session_id[:12]}...)")
return sessions
4.2 Cross-Host Session Persistence (SessionStore)
By default, sessions are local to the machine that created them. To resume sessions across CI workers, serverless functions, or Docker containers — where the filesystem is not shared — use a SessionStore adapter that mirrors transcripts to external storage (S3, Redis, database):
# SessionStore adapter — mirror local transcripts to shared storage
# Docs: https://code.claude.com/docs/en/agent-sdk/session-storage
# Requires: pip install claude-agent-sdk
import asyncio
import json
from claude_agent_sdk import query, ClaudeAgentOptions, ResultMessage
# --- Implement a custom SessionStore adapter ---
class RedisSessionStore:
"""Mirror session transcripts to Redis for cross-host resume."""
def __init__(self, redis_client):
self.redis = redis_client
async def save(self, session_id: str, messages: list):
"""Called by SDK after every turn — mirror to Redis."""
await self.redis.set(
f"session:{session_id}",
json.dumps(messages, default=str),
ex=86400 # 1-day TTL
)
async def load(self, session_id: str) -> list | None:
"""Called by SDK when resuming — load from Redis."""
data = await self.redis.get(f"session:{session_id}")
return json.loads(data) if data else None
# Usage: pass store= to ClaudeAgentOptions
# redis = await aioredis.create_redis_pool("redis://localhost")
# store = RedisSessionStore(redis)
# First run (CI worker 1): create and save session ID
async def first_run():
session_id = None
async for msg in query(
prompt="Analyze the auth module for security issues",
options=ClaudeAgentOptions(
allowed_tools=["Read", "Grep", "Glob"],
# session_store=store, # Mirrors to Redis after every turn
),
):
if isinstance(msg, ResultMessage):
session_id = msg.session_id
return session_id
# Second run (CI worker 2 — different machine): resume from Redis
async def second_run(session_id: str):
async for msg in query(
prompt="Now implement the fixes you identified",
options=ClaudeAgentOptions(
resume=session_id,
allowed_tools=["Read", "Edit", "Write"],
# session_store=store, # Loads from Redis if local file missing
),
):
if isinstance(msg, ResultMessage) and msg.subtype == "success":
print(msg.result)
# Fallback: if no SessionStore, copy the JSONL file manually between hosts.
# Location: ~/.claude/projects/<encoded-cwd>/<session-id>.jsonl
# The cwd must match on both machines for resume to find the file.
print("SessionStore: mirror transcripts to S3/Redis/DB for cross-host resume")
print("Fallback: copy the .jsonl file to the same path on the new host")
list_sessions() / get_session_messages() enumerate transcripts on disk. (2) rename_session() / tag_session() organize sessions. (3) Cross-host resume requires either a SessionStore adapter or copying the .jsonl file with a matching cwd. (4) Session files are stored at ~/.claude/projects/<encoded-cwd>/ — the cwd must match for resume to find the right file.
4.3 PostToolUse: Redact & Enrich Results
The correct SDK pattern to modify a tool result before Claude sees it is updatedToolOutput in the hookSpecificOutput. To append information without replacing the result, use additionalContext:
# PostToolUse hook — modify or annotate tool results
# Requires: pip install claude-agent-sdk
import asyncio
import re
from claude_agent_sdk import query, ClaudeAgentOptions, HookMatcher, ResultMessage
async def redact_sensitive_data(input_data, tool_use_id, context):
"""PostToolUse: redact SSN and credit cards from tool output before Claude sees it."""
output = input_data.get("tool_result", "")
if not isinstance(output, str):
return {}
# Redact common PII patterns
redacted = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN REDACTED]', output)
redacted = re.sub(r'\b\d{4}[- ]\d{4}[- ]\d{4}[- ]\d{4}\b', '[CARD REDACTED]', redacted)
if redacted == output:
return {} # Nothing to redact — allow through unchanged
return {
"hookSpecificOutput": {
"hookEventName": input_data["hook_event_name"],
# updatedToolOutput replaces the result Claude receives
"updatedToolOutput": redacted,
}
}
async def enrich_tool_result(input_data, tool_use_id, context):
"""PostToolUse: append compliance metadata without replacing the result."""
return {
"hookSpecificOutput": {
"hookEventName": input_data["hook_event_name"],
# additionalContext appends to the result — Claude sees both
"additionalContext": "[Compliance] This tool call was logged per policy.",
}
}
async def main():
options = ClaudeAgentOptions(
allowed_tools=["Read", "Bash"],
hooks={
"PostToolUse": [
HookMatcher(matcher="Bash", hooks=[redact_sensitive_data]),
HookMatcher(matcher="Read", hooks=[enrich_tool_result]),
]
},
)
async for msg in query(prompt="Show me the customer database", options=options):
if isinstance(msg, ResultMessage) and msg.subtype == "success":
print(msg.result)
asyncio.run(main())
5. Credential Management for Agents
Agents that call external APIs (databases, SaaS tools, internal services) need access to secrets. The secure pattern keeps credentials out of the context window entirely — they are passed via environment variables, injected by MCP server configs, or redacted by hooks before Claude reads them.
# Pattern 1: Environment variables — the primary approach
# Set in shell, CI secrets, or container env — never in CLAUDE.md or prompts
export GITHUB_TOKEN="ghp_xxxxx"
export DB_PASSWORD="super-secret"
# .claude/settings.json: inject env vars into every agent session
# {
# "env": {
# "GITHUB_TOKEN": "${GITHUB_TOKEN}",
# "DB_PASSWORD": "${DB_PASSWORD}"
# }
# }
# Tools can then read process.env / os.environ — never embedded in context.
# Pattern 2: MCP server carries the credential
# MCP servers run as their own process with their own env vars.
# The agent calls a tool ("get_issue") — the MCP server uses its token internally.
# Claude never sees the token; it only sees the tool result.
# .claude/mcp_servers.json:
# {
# "github": {
# "command": "npx",
# "args": ["-y", "@modelcontextprotocol/server-github"],
# "env": { "GITHUB_PERSONAL_ACCESS_TOKEN": "ghp_xxxxx" }
# }
# }
# Pattern 3: PostToolUse hook — redact tokens that accidentally appear in output
import re
from claude_agent_sdk import HookMatcher
async def redact_tokens_in_output(input_data, tool_use_id, context):
"""PostToolUse: strip secrets that appear in tool results before Claude sees them."""
output = str(input_data.get("tool_result", ""))
# Redact common token patterns before Claude reads the result
cleaned = re.sub(r'ghp_[A-Za-z0-9]{36,}', '[GITHUB_TOKEN]', output)
cleaned = re.sub(r'sk-[A-Za-z0-9]{40,}', '[API_KEY]', cleaned)
if cleaned == output:
return {}
return {
"hookSpecificOutput": {
"hookEventName": input_data["hook_event_name"],
"updatedToolOutput": cleaned,
}
}
# Hook registration:
# hooks={"PostToolUse": [HookMatcher(hooks=[redact_tokens_in_output])]}
# Pattern 4: CLAUDE.md — document which env vars the agent should expect
# Add to your project CLAUDE.md:
#
# ## Required Environment Variables
# - GITHUB_TOKEN: for GitHub API calls (read-only scope)
# - DATABASE_URL: PostgreSQL connection string (read-only replica)
# - SLACK_BOT_TOKEN: for posting summaries to #engineering channel
#
# Never put actual credential values in CLAUDE.md.
PostToolUse redaction hook as a safety net.
6. Session State & Forking (CCA 9.3)
Sometimes you need to explore two approaches simultaneously, or resume a long session from a checkpoint. Session forking creates a copy of the current session state (messages, tool results, context) at a point in time, letting you branch off without affecting the original.
6.1 Resume Patterns
import asyncio
from claude_agent_sdk import query, ClaudeAgentOptions, ResultMessage
async def resume_patterns(session_id: str):
"""Resume the latest session or a specific captured session ID."""
# Resume the MOST RECENT session in the current working directory
async for message in query(
prompt="Continue the work from my last run and summarize the open items.",
options=ClaudeAgentOptions(
continue_conversation=True,
allowed_tools=["Read", "Glob", "Grep"],
),
):
if isinstance(message, ResultMessage) and message.subtype == "success":
print(message.result)
# Resume a SPECIFIC earlier session when you captured its ID previously
async for message in query(
prompt="Now implement the refactoring we discussed earlier.",
options=ClaudeAgentOptions(
resume=session_id,
allowed_tools=["Read", "Edit", "Write", "Glob", "Grep"],
),
):
if isinstance(message, ResultMessage):
print(f"Resumed exact session: {message.session_id}")
# CLI equivalents in non-interactive mode:
# claude -p "Continue the last run" --continue
# claude -p "Continue that exact run" --resume
print("Use continue_conversation/--continue for the latest local session")
print("Use resume/--resume when multiple sessions exist")
6.2 Fork a Session
import asyncio
from claude_agent_sdk import query, ClaudeAgentOptions, ResultMessage
async def fork_and_compare(session_id: str):
"""Branch from an existing session without losing the original path."""
forked_id = None
# Create a new branch from the prior session history
async for message in query(
prompt="Instead of JWT, explore OAuth2 for this module.",
options=ClaudeAgentOptions(
resume=session_id,
fork_session=True,
allowed_tools=["Read", "Edit", "Write", "Glob", "Grep"],
),
):
if isinstance(message, ResultMessage):
forked_id = message.session_id
if message.subtype == "success":
print(message.result)
print(f"Forked session: {forked_id}")
# The original session remains untouched and can still be resumed directly
async for message in query(
prompt="Continue with the JWT approach.",
options=ClaudeAgentOptions(
resume=session_id,
allowed_tools=["Read", "Edit", "Write", "Glob", "Grep"],
),
):
if isinstance(message, ResultMessage) and message.subtype == "success":
print(message.result)
# Important: forking branches the conversation history, NOT the filesystem.
# If you also need to revert file changes, use file checkpointing.
print("fork_session=True + resume=session_id creates a new branch")
print("Resumed sessions may contain stale tool results — re-check critical facts")
resume restores a specific prior session by ID. (2) continue_conversation=True resumes the latest local session in the current working directory. (3) fork_session=True branches conversation history but not filesystem state. (4) Resumed sessions may contain stale tool results, so critical facts should be re-verified.
Next in the SDK Track
In Part 6: Tool Interface Design, we shift to CCA Domain 2 — designing tool schemas that drive correct usage, crafting descriptions that enable model-driven reasoning, handling tool errors, and building composable tool sets. Covers CCA Tasks 2.1 and 2.2.