Introduction: From Protocol to Production
Series Overview: This is Part 14 of our 20-part AI Application Development Mastery series. Having covered MCP fundamentals in our ecosystem overview, we now go deep into production-grade MCP server development, scaling strategies, multi-agent orchestration, and the real-world deployment patterns that power enterprise AI systems.
1
Foundations & Evolution of AI Apps
Pre-LLM era, transformers, LLM revolution
2
LLM Fundamentals for Developers
Tokens, context windows, sampling, API patterns
3
Prompt Engineering Mastery
Zero/few-shot, CoT, ReAct, structured outputs
4
LangChain Core Concepts
Chains, prompts, LLMs, tools, LCEL
5
Retrieval-Augmented Generation (RAG)
Embeddings, vector DBs, retrievers, RAG pipelines
6
Memory & Context Engineering
Buffer/summary/vector memory, chunking, re-ranking
7
Agents — Core of Modern AI Apps
ReAct, tool-calling, planner-executor agents
8
LangGraph — Stateful Agent Workflows
Nodes, edges, state, graph execution, cycles
9
Deep Agents & Autonomous Systems
Multi-step reasoning, self-reflection, planning
10
Multi-Agent Systems
Supervisor, swarm, debate, role-based collaboration
11
AI Application Design Patterns
RAG, chat+memory, workflow automation, agent loops
12
Ecosystem & Frameworks
LlamaIndex, Haystack, HuggingFace, MCP, vLLM
13
Evaluation & LLMOps
Prompt eval, tracing, LangSmith, experiment tracking
14
MCP in Production
MCP servers, scaling, A2A, agent systems
You Are Here
15
Production AI Systems
APIs, queues, caching, streaming, scaling
16
Safety, Guardrails & Reliability
Input filtering, hallucination mitigation, prompt injection
17
Advanced Topics
Fine-tuning, tool learning, hybrid LLM+symbolic
18
Building Real AI Applications
Chatbot, document QA, coding assistant, full-stack
19
Future of AI Applications
Autonomous agents, self-improving, multi-modal, AI OS
20
Capstone: End-to-End AI System
Full production build, deploy, monitor, iterate
The Model Context Protocol (MCP) has rapidly become the industry standard for connecting AI models to external tools, data sources, and services. In Part 12 of this series, we introduced MCP's core concepts — the client-server architecture, tool definitions, and resource access patterns. Now we take the critical next step: building MCP servers that survive production traffic, integrating them with the broader DevOps ecosystem, and orchestrating multi-agent systems that leverage MCP as their communication backbone.
Production MCP is fundamentally different from development MCP. In development, you have a single user, one model, and a handful of tools running locally. In production, you face concurrent users, distributed systems, security boundaries, latency budgets, cost constraints, and the requirement that nothing breaks at 3 AM. This part teaches you how to bridge that gap.
Key Insight: MCP is not just a protocol for connecting tools to models — it is an architectural pattern for building modular, composable, and scalable AI systems. When you design your MCP servers as well-defined microservices with clear capability boundaries, you unlock the ability to independently scale, version, and deploy each piece of your AI infrastructure.
1. Building MCP Servers
Building production MCP servers requires a structured approach that separates concerns into distinct layers: transport (how messages are exchanged), protocol (JSON-RPC handling), and application logic (the actual tools, resources, and prompts you expose). This section walks through server architecture patterns, implementation techniques, and testing strategies that produce servers ready for real-world deployment.
1.1 Server Architecture & Layers
A production MCP server is not a single script — it is a layered system with distinct responsibilities. Understanding these layers is essential for building servers that are maintainable, testable, and secure.
| Layer |
Responsibility |
Key Components |
| Capability Registry |
Declares available tools, resources, and prompts to clients |
Tool schemas, resource URIs, prompt templates |
| Tool Handlers |
Execute tool calls with input validation and error handling |
Business logic, API calls, database queries |
| Resource Providers |
Serve contextual data (files, DB records, API responses) |
File readers, DB connectors, REST clients |
| Auth Middleware |
Authenticate and authorize every request |
API keys, OAuth tokens, RBAC, rate limiting |
| Logging Layer |
Structured logging, metrics, and trace propagation |
OpenTelemetry, structured JSON logs, error tracking |
1.2 Implementation Patterns
How you structure your MCP servers determines how well they scale, how easy they are to maintain, and how your team collaborates on them. Three dominant patterns have emerged:
| Pattern |
Description |
When to Use |
Trade-offs |
| Monolithic |
All tools, resources, and prompts in a single MCP server |
Small teams, early prototyping, < 10 tools |
Simple to deploy, hard to scale independently |
| Micro MCP Servers |
One server per tool or small group of related tools |
Production systems, independent scaling, team autonomy |
More infrastructure, better isolation and fault tolerance |
| Domain-Based |
One server per business domain (HR, Finance, DevOps) |
Enterprise deployments, clear ownership boundaries |
Balanced complexity, maps to org structure |
Recommendation: For most production systems, Micro MCP Servers provide the best balance. Each server owns a small, well-defined capability surface. This allows independent scaling (your database query server may need 10x the resources of your weather API server), independent versioning, and fault isolation (a broken tool does not take down your entire system).
Language choices for MCP server development:
| Language |
SDK / Library |
Best For |
Trade-offs |
| Python |
mcp (FastMCP) |
Rapid prototyping, data science tools, LangChain integration |
Fastest development, rich AI ecosystem, slower runtime |
| TypeScript/Node |
@modelcontextprotocol/sdk |
Web integrations, existing Node.js infrastructure |
Large npm ecosystem, good async support |
| Go |
mcp-go |
High-performance servers, low-latency requirements |
Best runtime performance, more boilerplate |
1.3 Complete FastMCP Server Implementation
Let us build a production-quality MCP server using Python's FastMCP framework. This server provides database query tools, a knowledge base resource, and prompt templates — the three core MCP capability types.
# Production MCP Server — Database, Knowledge Base & Prompt Tools
# pip install mcp httpx pydantic sqlalchemy aiosqlite
import os
import json
import logging
import hashlib
from datetime import datetime
from typing import Any, Optional
from contextlib import asynccontextmanager
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, Field
# --- Configuration ---
DB_URL = os.getenv("DATABASE_URL", "sqlite+aiosqlite:///./app.db")
API_KEY = os.getenv("MCP_API_KEY", "dev-key-change-in-production")
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
# --- Structured Logging ---
logging.basicConfig(
level=getattr(logging, LOG_LEVEL),
format='{"time":"%(asctime)s","level":"%(levelname)s","module":"%(module)s","message":"%(message)s"}'
)
logger = logging.getLogger("mcp-server")
# --- Initialize MCP Server ---
mcp = FastMCP(
name="enterprise-data-server",
version="1.2.0",
description="Production MCP server for enterprise data access, "
"knowledge base queries, and reporting tools."
)
# --- Pydantic Models for Input Validation ---
class QueryRequest(BaseModel):
"""Validated SQL query request."""
table: str = Field(..., description="Table name to query")
filters: dict[str, Any] = Field(default_factory=dict, description="Column-value filter pairs")
limit: int = Field(default=50, ge=1, le=500, description="Max rows to return")
columns: list[str] = Field(default_factory=list, description="Columns to select (empty = all)")
class KBSearchRequest(BaseModel):
"""Knowledge base search parameters."""
query: str = Field(..., min_length=3, max_length=500, description="Search query text")
category: Optional[str] = Field(None, description="Filter by category")
top_k: int = Field(default=5, ge=1, le=20, description="Number of results to return")
# --- Tool 1: Safe Database Query ---
@mcp.tool()
async def query_database(
table: str,
filters: dict[str, Any] = {},
limit: int = 50,
columns: list[str] = []
) -> str:
"""
Query the enterprise database safely with parameterized queries.
Supports filtering, column selection, and result limiting.
Returns JSON-formatted results.
"""
# Validate input with Pydantic
req = QueryRequest(table=table, filters=filters, limit=limit, columns=columns)
# Allowlist of queryable tables (security: prevent unauthorized access)
ALLOWED_TABLES = {"employees", "departments", "projects", "timesheets"}
if req.table not in ALLOWED_TABLES:
logger.warning(f"Blocked query to unauthorized table: {req.table}")
return json.dumps({
"error": f"Table '{req.table}' is not queryable. Allowed: {sorted(ALLOWED_TABLES)}"
})
# Build parameterized query (never interpolate user input into SQL)
col_clause = ", ".join(req.columns) if req.columns else "*"
where_parts = []
params = {}
for i, (col, val) in enumerate(req.filters.items()):
param_name = f"p{i}"
where_parts.append(f"{col} = :{param_name}")
params[param_name] = val
where_clause = f" WHERE {' AND '.join(where_parts)}" if where_parts else ""
query = f"SELECT {col_clause} FROM {req.table}{where_clause} LIMIT :limit"
params["limit"] = req.limit
logger.info(f"Executing query on table={req.table}, filters={req.filters}, limit={req.limit}")
try:
# Simulated DB execution (replace with real SQLAlchemy async session)
# In production, use: async with async_session() as session: ...
results = [
{"id": 1, "name": "Alice Johnson", "department": "Engineering", "role": "Senior Engineer"},
{"id": 2, "name": "Bob Smith", "department": "Engineering", "role": "Tech Lead"},
{"id": 3, "name": "Carol White", "department": "Product", "role": "PM"},
]
# Apply filters for demonstration
if req.filters:
results = [
r for r in results
if all(r.get(k) == v for k, v in req.filters.items())
]
return json.dumps({
"table": req.table,
"row_count": len(results[:req.limit]),
"results": results[:req.limit],
"query_hash": hashlib.md5(query.encode()).hexdigest()[:8]
}, indent=2)
except Exception as e:
logger.error(f"Database query failed: {e}")
return json.dumps({"error": f"Query failed: {str(e)}"})
# --- Tool 2: Knowledge Base Search ---
@mcp.tool()
async def search_knowledge_base(
query: str,
category: str = None,
top_k: int = 5
) -> str:
"""
Search the enterprise knowledge base using semantic similarity.
Returns ranked results with relevance scores.
Supports category filtering for targeted search.
"""
req = KBSearchRequest(query=query, category=category, top_k=top_k)
logger.info(f"KB search: query='{req.query}', category={req.category}, top_k={req.top_k}")
try:
# In production, replace with actual vector DB query
# e.g., chromadb, pinecone, pgvector
mock_results = [
{
"title": "Employee Onboarding Guide",
"snippet": "New employees receive 20 days of annual PTO. Benefits enrollment opens within 30 days of start date.",
"category": "HR",
"relevance_score": 0.94,
"doc_id": "kb-001",
"last_updated": "2026-03-15"
},
{
"title": "Remote Work Policy",
"snippet": "Employees may work remotely up to 3 days per week with manager approval. VPN required for all remote access.",
"category": "HR",
"relevance_score": 0.87,
"doc_id": "kb-002",
"last_updated": "2026-02-28"
},
{
"title": "Engineering Standards",
"snippet": "All code must pass CI/CD pipeline checks. Code review required from at least one senior engineer.",
"category": "Engineering",
"relevance_score": 0.82,
"doc_id": "kb-003",
"last_updated": "2026-03-01"
}
]
# Apply category filter
if req.category:
mock_results = [r for r in mock_results if r["category"].lower() == req.category.lower()]
return json.dumps({
"query": req.query,
"total_results": len(mock_results[:req.top_k]),
"results": mock_results[:req.top_k]
}, indent=2)
except Exception as e:
logger.error(f"KB search failed: {e}")
return json.dumps({"error": f"Search failed: {str(e)}"})
# --- Tool 3: Generate Report ---
@mcp.tool()
async def generate_report(
report_type: str,
date_range: str = "last_30_days",
department: str = None
) -> str:
"""
Generate business reports from enterprise data.
Supported report types: headcount, project_status, budget_summary.
Returns structured report data as JSON.
"""
VALID_REPORTS = {"headcount", "project_status", "budget_summary"}
if report_type not in VALID_REPORTS:
return json.dumps({
"error": f"Invalid report type. Valid options: {sorted(VALID_REPORTS)}"
})
logger.info(f"Generating report: type={report_type}, range={date_range}, dept={department}")
try:
# In production, this would query real analytics tables
report_data = {
"report_type": report_type,
"generated_at": datetime.now().isoformat(),
"date_range": date_range,
"department": department or "All Departments",
"data": {
"headcount": {"total": 342, "engineering": 128, "product": 45, "sales": 89, "ops": 80},
"open_positions": 23,
"avg_tenure_months": 28.5,
"attrition_rate": "4.2%"
},
"summary": f"Report generated for {department or 'all departments'} "
f"covering {date_range}. Total headcount: 342."
}
return json.dumps(report_data, indent=2)
except Exception as e:
logger.error(f"Report generation failed: {e}")
return json.dumps({"error": f"Report generation failed: {str(e)}"})
# --- Resource: Company Configuration ---
@mcp.resource("config://company")
async def get_company_config() -> str:
"""Provides company configuration and metadata to the AI model."""
return json.dumps({
"company_name": "Acme Corporation",
"fiscal_year_start": "January",
"departments": ["Engineering", "Product", "Sales", "Marketing", "Operations", "HR"],
"data_retention_days": 365,
"timezone": "America/New_York",
"supported_languages": ["en", "es", "fr"]
}, indent=2)
# --- Resource: Database Schema ---
@mcp.resource("schema://database")
async def get_database_schema() -> str:
"""Provides the queryable database schema so the model knows available tables and columns."""
return json.dumps({
"tables": {
"employees": {
"columns": ["id", "name", "email", "department", "role", "hire_date", "salary_band"],
"row_count": 342,
"description": "Employee directory and basic info"
},
"departments": {
"columns": ["id", "name", "head", "budget", "headcount"],
"row_count": 12,
"description": "Department details and budgets"
},
"projects": {
"columns": ["id", "name", "department", "status", "start_date", "end_date", "budget"],
"row_count": 67,
"description": "Active and completed projects"
},
"timesheets": {
"columns": ["id", "employee_id", "project_id", "date", "hours", "category"],
"row_count": 15420,
"description": "Employee time tracking records"
}
}
}, indent=2)
# --- Prompt Template ---
@mcp.prompt()
async def data_analyst_prompt(question: str) -> str:
"""Generates a structured prompt for data analysis questions."""
return f"""You are an enterprise data analyst assistant for Acme Corporation.
You have access to the company database and knowledge base.
RULES:
1. Always query the database schema first to understand available tables.
2. Use parameterized queries — never construct raw SQL from user input.
3. If a question requires data you cannot access, say so clearly.
4. Provide concise summaries with the key numbers highlighted.
5. If results seem unexpected, flag potential data quality issues.
USER QUESTION: {question}
Think step by step:
1. What data do I need to answer this question?
2. Which table(s) should I query?
3. What filters should I apply?
4. How should I present the results?"""
# --- Run the server ---
if __name__ == "__main__":
logger.info("Starting Enterprise MCP Server v1.2.0")
mcp.run(transport="stdio") # Use "sse" for HTTP transport in production
1.4 Testing MCP Servers
Production MCP servers require the same testing rigor as any production service. Here is a testing framework that covers unit tests for individual tools, mock client testing for the full server, and contract tests for API compatibility.
# Testing MCP Servers — Unit, Integration & Contract Tests
# pip install pytest pytest-asyncio mcp
import pytest
import json
from unittest.mock import AsyncMock, patch
# Import the server tools (from our server module above)
# from mcp_server import query_database, search_knowledge_base, generate_report
# --- Unit Tests for Individual Tools ---
@pytest.mark.asyncio
async def test_query_database_valid_table():
"""Test that querying an allowed table returns results."""
result = await query_database(table="employees", filters={}, limit=10)
data = json.loads(result)
assert "error" not in data
assert data["table"] == "employees"
assert data["row_count"] > 0
assert isinstance(data["results"], list)
@pytest.mark.asyncio
async def test_query_database_blocked_table():
"""Test that querying a disallowed table is rejected."""
result = await query_database(table="salaries", filters={}, limit=10)
data = json.loads(result)
assert "error" in data
assert "not queryable" in data["error"]
@pytest.mark.asyncio
async def test_query_database_with_filters():
"""Test that filters are applied correctly."""
result = await query_database(
table="employees",
filters={"department": "Engineering"},
limit=50
)
data = json.loads(result)
assert "error" not in data
# All results should match the filter
for row in data["results"]:
assert row["department"] == "Engineering"
@pytest.mark.asyncio
async def test_search_knowledge_base_basic():
"""Test basic knowledge base search."""
result = await search_knowledge_base(query="PTO policy", top_k=3)
data = json.loads(result)
assert "error" not in data
assert data["total_results"] > 0
assert len(data["results"]) <= 3
@pytest.mark.asyncio
async def test_search_knowledge_base_category_filter():
"""Test knowledge base search with category filter."""
result = await search_knowledge_base(
query="onboarding process",
category="HR",
top_k=5
)
data = json.loads(result)
assert "error" not in data
for r in data["results"]:
assert r["category"] == "HR"
@pytest.mark.asyncio
async def test_generate_report_valid_type():
"""Test valid report generation."""
result = await generate_report(report_type="headcount", date_range="last_30_days")
data = json.loads(result)
assert "error" not in data
assert data["report_type"] == "headcount"
assert "generated_at" in data
assert "data" in data
@pytest.mark.asyncio
async def test_generate_report_invalid_type():
"""Test that invalid report types are rejected."""
result = await generate_report(report_type="salary_details")
data = json.loads(result)
assert "error" in data
assert "Invalid report type" in data["error"]
# --- Contract Tests: Verify MCP Protocol Compliance ---
@pytest.mark.asyncio
async def test_tool_returns_valid_json():
"""All tools must return valid JSON strings."""
tools = [
query_database(table="employees"),
search_knowledge_base(query="test query"),
generate_report(report_type="headcount"),
]
for coro in tools:
result = await coro
# Must be valid JSON
parsed = json.loads(result)
assert isinstance(parsed, dict)
@pytest.mark.asyncio
async def test_tool_error_format():
"""Errors must follow a consistent format with an 'error' key."""
result = await query_database(table="secret_table")
data = json.loads(result)
assert "error" in data
assert isinstance(data["error"], str)
assert len(data["error"]) > 0
# Run with: pytest test_mcp_server.py -v --asyncio-mode=auto
1.5 FastMCP Patterns for Production
The following patterns demonstrate how to use the FastMCP SDK's three core decorators — @mcp.tool(), @mcp.resource(), and @mcp.prompt() — in production-grade scenarios. Each pattern addresses a specific production concern: structured validation, safe file access, error resilience, and lifecycle management.
Pro Tip: The three decorators @mcp.tool(), @mcp.resource(), and @mcp.prompt() map directly to the three core MCP primitives. If you remember nothing else: tool = LLM calls it, resource = app reads it, prompt = user invokes it.
Pattern 1: Structured Input with Pydantic Models
For complex tools, use Pydantic models to define structured inputs with field-level descriptions and validation. FastMCP automatically converts these into rich JSON Schema that helps the LLM provide well-formed arguments:
from pydantic import BaseModel, Field
class QueryInput(BaseModel):
"""Structured input for database queries."""
table: str = Field(description="Database table to query")
conditions: dict = Field(default={}, description="WHERE conditions")
limit: int = Field(default=10, description="Max rows to return")
@mcp.tool()
async def query_database(input: QueryInput) -> str:
"""Execute a structured database query with safety checks."""
# Validate table name against allowlist
allowed_tables = {"users", "orders", "products"}
if input.table not in allowed_tables:
return f"Error: Table '{input.table}' not allowed"
# Build and execute query safely
...
The Pydantic model's Field(description=...) values become parameter descriptions in the MCP schema, giving the LLM detailed guidance on what each field expects. Default values become optional parameters automatically.
Pattern 2: Resource with Safe File Access
Resources are ideal for exposing read-only data like log files. This pattern demonstrates safe file access with bounds on the returned content:
@mcp.resource("logs://app/{date}")
async def get_logs(date: str) -> str:
"""Get application logs for a specific date (YYYY-MM-DD)."""
log_path = Path(f"/var/log/app/{date}.log")
if not log_path.exists():
return f"No logs found for {date}"
return log_path.read_text()[-5000:] # Last 5000 chars
The URI template logs://app/{date} creates a dynamic resource — the application can request logs for any date by substituting the {date} parameter. Truncating output to the last 5000 characters prevents overwhelming the LLM's context window with massive log files.
Pattern 3: Error Handling in Tools
Production tools must handle failures gracefully. Rather than raising exceptions (which crash the MCP connection), return human-readable error messages that the LLM can reason about and communicate to the user:
@mcp.tool()
async def fetch_api_data(endpoint: str) -> str:
"""Fetch data from an external API safely.
Args:
endpoint: API endpoint path (e.g., /users, /orders)
"""
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"https://api.example.com{endpoint}",
headers={"Authorization": f"Bearer {os.getenv('API_TOKEN')}"},
timeout=10.0
)
response.raise_for_status()
return json.dumps(response.json(), indent=2)
except httpx.TimeoutException:
return "Error: API request timed out after 10 seconds"
except httpx.HTTPStatusError as e:
return f"Error: API returned {e.response.status_code}"
Notice the pattern: use try/except to catch specific exceptions and return descriptive error strings. The LLM can then tell the user "The API timed out" rather than the conversation ending with an opaque failure. Also note that secrets come from environment variables (os.getenv('API_TOKEN')), never hardcoded.
Pattern 4: Lifespan for Shared Resources
Production servers often need shared resources like database connection pools or HTTP clients that are created once at startup and cleaned up at shutdown. FastMCP's lifespan parameter handles this with an async context manager:
from contextlib import asynccontextmanager
@asynccontextmanager
async def app_lifespan(server: FastMCP):
"""Manage server lifecycle — setup and teardown."""
# Startup: initialize shared resources
db = await asyncpg.create_pool("postgresql://localhost/mydb")
server.state["db"] = db
print("Database pool initialized", file=sys.stderr)
try:
yield # Server runs here
finally:
# Shutdown: clean up
await db.close()
print("Database pool closed", file=sys.stderr)
mcp = FastMCP("production-server", lifespan=app_lifespan)
The lifespan context manager runs before the server starts accepting connections (yield) and after it stops. Resources stored in server.state are accessible from any tool or resource handler. This pattern prevents the common mistake of creating a new database connection on every tool call — instead, all calls share a single efficient connection pool.
2. MCP + Ecosystem Integrations
MCP’s greatest power emerges when it connects with the broader AI development ecosystem. This section demonstrates how to integrate MCP servers with LangChain (for chain composition), LangGraph (for stateful agent workflows), and LlamaIndex (for RAG pipelines). Each integration follows a consistent pattern: wrap MCP tools as framework-native tools, then use them exactly as you would any built-in tool — making MCP servers interchangeable building blocks across frameworks.
2.1 MCP + LangChain
LangChain provides first-class MCP integration, allowing you to wrap MCP tools as LangChain tools and use them in agents, chains, and memory-backed conversations. This is the most common integration pattern for teams already using LangChain.
# MCP + LangChain — Full Agent Integration
# pip install langchain-openai langchain-mcp-adapters langgraph mcp
import os
import asyncio
from langchain_openai import ChatOpenAI
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.prebuilt import create_react_agent
# API key from environment variable
# export OPENAI_API_KEY="sk-..."
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "sk-your-key-here")
async def run_mcp_langchain_agent():
"""Create a LangChain ReAct agent powered by MCP tools."""
# Connect to multiple MCP servers simultaneously
async with MultiServerMCPClient(
{
# Enterprise data server (our server from Section 1)
"enterprise-data": {
"command": "python",
"args": ["mcp_enterprise_server.py"],
"transport": "stdio",
},
# File system server for document access
"filesystem": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/data/docs"],
"transport": "stdio",
},
# Web search server for real-time information
"web-search": {
"url": "http://localhost:8080/sse",
"transport": "sse",
},
}
) as client:
# Get all tools from all connected MCP servers
tools = client.get_tools()
print(f"Loaded {len(tools)} tools from MCP servers:")
for tool in tools:
print(f" - {tool.name}: {tool.description[:60]}...")
# Create the LLM with function calling support
llm = ChatOpenAI(
model="gpt-4o",
temperature=0,
api_key=OPENAI_API_KEY
)
# Build a ReAct agent that uses MCP tools
agent = create_react_agent(llm, tools)
# Run a complex multi-tool query
result = await agent.ainvoke({
"messages": [
{
"role": "user",
"content": "How many engineers do we have, and what does our "
"remote work policy say about VPN requirements? "
"Give me a combined summary."
}
]
})
# The agent will automatically:
# 1. Call query_database to get engineering headcount
# 2. Call search_knowledge_base to find VPN policy
# 3. Synthesize both results into a coherent answer
final_message = result["messages"][-1].content
print(f"\nAgent Response:\n{final_message}")
return final_message
# --- LangChain Memory Integration with MCP ---
async def run_mcp_agent_with_memory():
"""MCP agent with conversation memory for multi-turn interactions."""
from langgraph.checkpoint.memory import MemorySaver
async with MultiServerMCPClient(
{
"enterprise-data": {
"command": "python",
"args": ["mcp_enterprise_server.py"],
"transport": "stdio",
}
}
) as client:
tools = client.get_tools()
llm = ChatOpenAI(model="gpt-4o", temperature=0, api_key=OPENAI_API_KEY)
# Add memory for multi-turn conversation state
memory = MemorySaver()
agent = create_react_agent(llm, tools, checkpointer=memory)
# Thread ID for conversation tracking
config = {"configurable": {"thread_id": "user-session-12345"}}
# Turn 1: Ask about headcount
r1 = await agent.ainvoke(
{"messages": [{"role": "user", "content": "How many people are in Engineering?"}]},
config=config
)
print(f"Turn 1: {r1['messages'][-1].content}")
# Turn 2: Follow-up (agent remembers context from Turn 1)
r2 = await agent.ainvoke(
{"messages": [{"role": "user", "content": "What projects are they working on?"}]},
config=config
)
print(f"Turn 2: {r2['messages'][-1].content}")
if __name__ == "__main__":
asyncio.run(run_mcp_langchain_agent())
2.2 MCP + Docker
Docker isolates MCP servers into reproducible containers, ensuring consistent behavior across development, staging, and production environments. Each MCP server gets its own container with pinned dependencies.
# Dockerfile for a production MCP server
# File: Dockerfile.mcp-enterprise
FROM python:3.12-slim AS base
# Security: run as non-root user
RUN groupadd -r mcpuser && useradd -r -g mcpuser mcpuser
# Install system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
curl \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
# Install Python dependencies (cached layer)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY mcp_enterprise_server.py .
COPY config/ ./config/
# Switch to non-root user
USER mcpuser
# Health check endpoint (for K8s liveness/readiness probes)
HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \
CMD curl -f http://localhost:8080/health || exit 1
# Environment variables (override at runtime)
ENV LOG_LEVEL=INFO
ENV MCP_TRANSPORT=sse
ENV MCP_PORT=8080
# Expose the SSE transport port
EXPOSE 8080
# Run the MCP server with SSE transport for networked access
CMD ["python", "mcp_enterprise_server.py", "--transport", "sse", "--port", "8080"]
# docker-compose.yml — Multi-server MCP deployment
# Run with: docker compose up -d
version: '3.8'
services:
# Enterprise data MCP server
mcp-enterprise:
build:
context: .
dockerfile: Dockerfile.mcp-enterprise
ports:
- "8081:8080"
environment:
- DATABASE_URL=postgresql+asyncpg://user:pass@postgres:5432/enterprise
- MCP_API_KEY=${MCP_API_KEY}
- LOG_LEVEL=INFO
depends_on:
- postgres
restart: unless-stopped
deploy:
resources:
limits:
memory: 512M
cpus: "0.5"
# File system MCP server
mcp-filesystem:
image: node:20-slim
command: npx -y @modelcontextprotocol/server-filesystem /data
ports:
- "8082:8080"
volumes:
- ./documents:/data:ro # Read-only mount
restart: unless-stopped
# Web search MCP server
mcp-websearch:
build:
context: ./mcp-websearch
dockerfile: Dockerfile
ports:
- "8083:8080"
environment:
- SEARCH_API_KEY=${SEARCH_API_KEY}
restart: unless-stopped
# PostgreSQL for enterprise data
postgres:
image: postgres:16-alpine
environment:
POSTGRES_DB: enterprise
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
volumes:
- pgdata:/var/lib/postgresql/data
volumes:
pgdata:
2.3 MCP + Kubernetes
For production-scale deployments, Kubernetes provides auto-scaling, service discovery, load balancing, and self-healing for your MCP server fleet. Each MCP server type becomes a Kubernetes Deployment with its own scaling policy.
# Kubernetes manifest for MCP server deployment
# File: k8s/mcp-enterprise-deployment.yaml
# Apply with: kubectl apply -f k8s/mcp-enterprise-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: mcp-enterprise-server
namespace: ai-platform
labels:
app: mcp-enterprise
component: mcp-server
version: v1.2.0
spec:
replicas: 3 # Start with 3 replicas, HPA will scale
selector:
matchLabels:
app: mcp-enterprise
template:
metadata:
labels:
app: mcp-enterprise
version: v1.2.0
spec:
containers:
- name: mcp-server
image: registry.company.com/mcp-enterprise:v1.2.0
ports:
- containerPort: 8080
name: http
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: mcp-secrets
key: database-url
- name: MCP_API_KEY
valueFrom:
secretKeyRef:
name: mcp-secrets
key: api-key
- name: LOG_LEVEL
value: "INFO"
- name: OTEL_EXPORTER_OTLP_ENDPOINT
value: "http://otel-collector:4317"
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 10
periodSeconds: 30
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
name: mcp-enterprise-service
namespace: ai-platform
spec:
selector:
app: mcp-enterprise
ports:
- port: 80
targetPort: 8080
protocol: TCP
type: ClusterIP
---
# Horizontal Pod Autoscaler — scale based on CPU and request latency
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: mcp-enterprise-hpa
namespace: ai-platform
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: mcp-enterprise-server
minReplicas: 2
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
2.4 MCP + Cloudflare Workers (Edge Deployment)
For latency-sensitive applications, deploying MCP servers to the edge via Cloudflare Workers provides sub-50ms response times globally. This is ideal for lightweight tools like search, caching, and content retrieval.
// Cloudflare Worker — Edge MCP Server
// File: src/index.js
// Deploy with: wrangler deploy
// MCP-compatible tool handler running on Cloudflare's global edge network
// Provides low-latency access to cached knowledge base and real-time search
export default {
async fetch(request, env) {
const url = new URL(request.url);
// Health check endpoint
if (url.pathname === "/health") {
return new Response(JSON.stringify({ status: "ok", edge: true }), {
headers: { "Content-Type": "application/json" },
});
}
// MCP SSE endpoint for tool calls
if (url.pathname === "/sse" && request.method === "POST") {
const body = await request.json();
const { method, params } = body;
// Route to appropriate tool handler
if (method === "tools/call") {
const toolName = params.name;
const toolArgs = params.arguments;
let result;
switch (toolName) {
case "edge_search":
// Search using Cloudflare KV for cached results
result = await handleEdgeSearch(toolArgs, env);
break;
case "edge_cache_lookup":
// Direct KV cache lookup for fast data retrieval
result = await handleCacheLookup(toolArgs, env);
break;
case "edge_geo_info":
// Return geographic info based on request
result = handleGeoInfo(request);
break;
default:
result = { error: `Unknown tool: ${toolName}` };
}
return new Response(JSON.stringify({ result }), {
headers: { "Content-Type": "application/json" },
});
}
// Tool listing for MCP discovery
if (method === "tools/list") {
return new Response(JSON.stringify({
tools: [
{
name: "edge_search",
description: "Search the knowledge base from the nearest edge location",
inputSchema: {
type: "object",
properties: {
query: { type: "string", description: "Search query" },
limit: { type: "number", description: "Max results", default: 5 }
},
required: ["query"]
}
},
{
name: "edge_cache_lookup",
description: "Fast cache lookup for frequently accessed data",
inputSchema: {
type: "object",
properties: {
key: { type: "string", description: "Cache key to look up" }
},
required: ["key"]
}
},
{
name: "edge_geo_info",
description: "Get geographic location info for the current request",
inputSchema: { type: "object", properties: {} }
}
]
}), {
headers: { "Content-Type": "application/json" },
});
}
}
return new Response("MCP Edge Server", { status: 200 });
},
};
// --- Tool Handlers ---
async function handleEdgeSearch(args, env) {
const { query, limit = 5 } = args;
// Use Cloudflare KV for cached search index
const cacheKey = `search:${query.toLowerCase().trim()}`;
const cached = await env.MCP_KV.get(cacheKey, "json");
if (cached) {
return { ...cached, source: "edge-cache", latency_ms: 1 };
}
// Fallback: call origin search API
const originResponse = await fetch(`${env.ORIGIN_URL}/api/search?q=${encodeURIComponent(query)}&limit=${limit}`);
const results = await originResponse.json();
// Cache the results at the edge (TTL: 5 minutes)
await env.MCP_KV.put(cacheKey, JSON.stringify(results), { expirationTtl: 300 });
return { ...results, source: "origin", latency_ms: "varies" };
}
async function handleCacheLookup(args, env) {
const { key } = args;
const value = await env.MCP_KV.get(key, "json");
return value ? { found: true, data: value } : { found: false, key };
}
function handleGeoInfo(request) {
// Cloudflare provides geo info on every request
return {
country: request.cf?.country || "unknown",
city: request.cf?.city || "unknown",
region: request.cf?.region || "unknown",
timezone: request.cf?.timezone || "unknown",
colo: request.cf?.colo || "unknown" // Edge location code
};
}
2.5 MCP + OAuth / Auth0
Secure MCP servers must authenticate every request and authorize access to specific tools based on user identity. OAuth 2.0 with Auth0 provides enterprise-grade authentication with token exchange flows.
# MCP Server with OAuth / Auth0 Authentication
# pip install mcp httpx python-jose cryptography
import os
import httpx
from functools import wraps
from jose import jwt, JWTError
# Auth0 configuration from environment
# export AUTH0_DOMAIN="your-tenant.auth0.com"
# export AUTH0_API_AUDIENCE="https://mcp-api.company.com"
AUTH0_DOMAIN = os.getenv("AUTH0_DOMAIN", "your-tenant.auth0.com")
AUTH0_AUDIENCE = os.getenv("AUTH0_API_AUDIENCE", "https://mcp-api.company.com")
AUTH0_ISSUER = f"https://{AUTH0_DOMAIN}/"
# Cache the JWKS (JSON Web Key Set) for token verification
_jwks_cache = None
async def get_jwks():
"""Fetch Auth0's public keys for JWT verification."""
global _jwks_cache
if _jwks_cache is None:
async with httpx.AsyncClient() as client:
resp = await client.get(f"https://{AUTH0_DOMAIN}/.well-known/jwks.json")
_jwks_cache = resp.json()
return _jwks_cache
async def verify_token(token: str) -> dict:
"""Verify an Auth0 JWT token and return the decoded claims."""
jwks = await get_jwks()
try:
# Get the signing key from JWKS
unverified_header = jwt.get_unverified_header(token)
rsa_key = {}
for key in jwks["keys"]:
if key["kid"] == unverified_header["kid"]:
rsa_key = {
"kty": key["kty"],
"kid": key["kid"],
"use": key["use"],
"n": key["n"],
"e": key["e"]
}
if not rsa_key:
raise JWTError("Unable to find appropriate signing key")
# Decode and verify the token
payload = jwt.decode(
token,
rsa_key,
algorithms=["RS256"],
audience=AUTH0_AUDIENCE,
issuer=AUTH0_ISSUER
)
return payload
except JWTError as e:
raise PermissionError(f"Token verification failed: {e}")
def require_scope(required_scope: str):
"""Decorator to enforce OAuth scopes on MCP tool handlers."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# In a real MCP server, extract token from the request context
token = kwargs.pop("_auth_token", None)
if not token:
return '{"error": "Authentication required"}'
try:
claims = await verify_token(token)
scopes = claims.get("scope", "").split()
if required_scope not in scopes:
return f'{{"error": "Insufficient permissions. Required: {required_scope}"}}'
# Add user info to kwargs for audit logging
kwargs["_user_id"] = claims.get("sub")
kwargs["_user_email"] = claims.get("email", "unknown")
return await func(*args, **kwargs)
except PermissionError as e:
return f'{{"error": "{str(e)}"}}'
return wrapper
return decorator
# --- Example: Protected MCP Tool ---
# Only users with "read:employees" scope can query employee data
@require_scope("read:employees")
async def query_employees(department: str = None, _user_id: str = None, _user_email: str = None):
"""Query employee directory (requires read:employees scope)."""
import json
import logging
logger = logging.getLogger("mcp-auth")
# Audit log: who accessed what
logger.info(f"Employee query by user={_user_id} email={_user_email} dept={department}")
return json.dumps({
"authorized": True,
"user": _user_email,
"department_filter": department,
"results": [
{"name": "Alice", "department": "Engineering", "role": "Senior Engineer"}
]
})
2.6 Deployment Options Compared
| Option |
Latency |
Scaling |
Cost |
Complexity |
Best For |
| Local (stdio) |
< 1ms |
None (single user) |
Free |
Minimal |
Development, single-user desktop apps |
| Docker Compose |
1-5ms |
Manual |
Low |
Low |
Small teams, staging environments |
| Kubernetes |
2-10ms |
Auto (HPA) |
Medium |
High |
Production, enterprise, multi-tenant |
| Serverless (Lambda) |
50-500ms (cold start) |
Auto (per-request) |
Pay-per-use |
Medium |
Bursty workloads, cost-sensitive |
| Edge (CF Workers) |
< 50ms globally |
Auto (global) |
Pay-per-use |
Medium |
Latency-critical, global users, caching |
3. Deployment & Scaling
Deploying MCP servers in production requires choosing the right deployment model (local subprocess, HTTP, or containerized), implementing proper health checks and monitoring, and designing for horizontal scalability. This section covers deployment patterns from single-machine setups to multi-region architectures, with practical guidance on Docker containerization, load balancing, and service discovery.
3.1 Deployment Models
Choosing the right deployment model depends on your scale, latency requirements, and operational maturity. Most teams progress through these stages as their MCP usage grows:
Deployment Progression: Local dev (stdio transport, single process) → Docker Compose (SSE transport, multiple containers) → Kubernetes (auto-scaling, service mesh, observability) → Hybrid (K8s for core + edge for latency-critical tools).
3.2 Scaling Challenges
Scaling MCP servers introduces specific challenges that differ from traditional web services:
| Challenge |
Root Cause |
Solution |
| Tool latency variance |
Some tools take 50ms (cache lookup), others 30s (report generation) |
Timeout budgets per tool, async execution, background jobs for slow tools |
| LLM bottleneck |
The model's reasoning loop is the slowest component |
Streaming responses, parallel tool calls, model routing (fast model for simple tasks) |
| Concurrent sessions |
Each user session maintains state across multiple tool calls |
Stateless server design, external session store (Redis), connection pooling |
| Cost explosion |
Each agent loop iteration costs tokens; complex tasks may loop 10+ times |
Max iteration limits, smart sampling, caching repeated queries |
| Error cascading |
A failed tool call can cause the agent to retry indefinitely |
Circuit breakers, retry budgets, fallback responses, dead-letter queues |
3.3 Observability & OpenTelemetry
Production MCP servers require full observability: structured logs, metrics (latency, error rates, throughput), and distributed tracing across the entire request lifecycle from client to model to tool and back.
# MCP Server with Full Observability — OpenTelemetry Integration
# pip install mcp opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp
# pip install opentelemetry-instrumentation-httpx prometheus-client
import os
import time
import json
import logging
from functools import wraps
from typing import Any, Callable
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
# --- OpenTelemetry Setup ---
# OTEL collector endpoint (set via environment variable in K8s)
OTEL_ENDPOINT = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317")
# Tracing setup
trace_provider = TracerProvider()
trace_provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint=OTEL_ENDPOINT, insecure=True))
)
trace.set_tracer_provider(trace_provider)
tracer = trace.get_tracer("mcp-enterprise-server", "1.2.0")
# Metrics setup
metric_reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint=OTEL_ENDPOINT, insecure=True),
export_interval_millis=10000 # Export every 10 seconds
)
meter_provider = MeterProvider(metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)
meter = metrics.get_meter("mcp-enterprise-server", "1.2.0")
# --- Define Metrics ---
tool_call_counter = meter.create_counter(
name="mcp.tool.calls.total",
description="Total number of MCP tool calls",
unit="1"
)
tool_call_duration = meter.create_histogram(
name="mcp.tool.duration",
description="Duration of MCP tool calls in milliseconds",
unit="ms"
)
tool_error_counter = meter.create_counter(
name="mcp.tool.errors.total",
description="Total number of MCP tool call errors",
unit="1"
)
active_sessions = meter.create_up_down_counter(
name="mcp.sessions.active",
description="Number of currently active MCP sessions",
unit="1"
)
# --- Structured Logger ---
logger = logging.getLogger("mcp-server")
def observe_tool(tool_name: str):
"""Decorator that adds observability to any MCP tool handler."""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs) -> Any:
# Start a trace span for this tool call
with tracer.start_as_current_span(
f"mcp.tool.{tool_name}",
attributes={
"mcp.tool.name": tool_name,
"mcp.tool.args": json.dumps(kwargs, default=str)[:500],
}
) as span:
start_time = time.perf_counter()
try:
# Execute the tool
result = await func(*args, **kwargs)
# Record success metrics
duration_ms = (time.perf_counter() - start_time) * 1000
tool_call_counter.add(1, {"tool": tool_name, "status": "success"})
tool_call_duration.record(duration_ms, {"tool": tool_name})
span.set_attribute("mcp.tool.duration_ms", duration_ms)
span.set_attribute("mcp.tool.status", "success")
logger.info(
f"Tool call completed",
extra={
"tool": tool_name,
"duration_ms": round(duration_ms, 2),
"status": "success"
}
)
return result
except Exception as e:
# Record error metrics
duration_ms = (time.perf_counter() - start_time) * 1000
tool_call_counter.add(1, {"tool": tool_name, "status": "error"})
tool_error_counter.add(1, {"tool": tool_name, "error_type": type(e).__name__})
tool_call_duration.record(duration_ms, {"tool": tool_name})
span.set_attribute("mcp.tool.status", "error")
span.set_attribute("mcp.tool.error", str(e))
span.record_exception(e)
logger.error(
f"Tool call failed",
extra={
"tool": tool_name,
"duration_ms": round(duration_ms, 2),
"error": str(e)
}
)
return json.dumps({"error": f"Tool '{tool_name}' failed: {str(e)}"})
return wrapper
return decorator
# --- Usage: Apply observability to MCP tools ---
@observe_tool("query_database")
async def query_database(table: str, filters: dict = {}, limit: int = 50) -> str:
"""Database query tool with full observability."""
# ... (tool implementation from Section 1)
return json.dumps({"table": table, "row_count": 3, "results": []})
@observe_tool("search_knowledge_base")
async def search_knowledge_base(query: str, top_k: int = 5) -> str:
"""Knowledge base search with full observability."""
# ... (tool implementation from Section 1)
return json.dumps({"query": query, "total_results": 2, "results": []})
@observe_tool("generate_report")
async def generate_report(report_type: str, date_range: str = "last_30_days") -> str:
"""Report generation with full observability."""
# ... (tool implementation from Section 1)
return json.dumps({"report_type": report_type, "status": "generated"})
4. Advanced Agent Systems with MCP
MCP enables a new class of advanced agent architectures where multiple AI agents share tool access through standardized servers. This section explores multi-agent systems that coordinate via MCP, demonstrating patterns for agent-to-agent communication, shared resource access, and distributed task execution. These architectures leverage MCP’s protocol guarantees to build reliable, composable agent networks.
4.1 Multi-Agent Systems with MCP
The most powerful MCP deployments use specialized agents per domain, each connected to its own set of MCP servers. A coordinator agent delegates tasks to domain experts, and MCP provides the tool access layer that lets each agent interact with its specific data sources and APIs.
# Multi-Agent MCP Orchestration System
# pip install mcp openai pydantic asyncio
import os
import json
import asyncio
import logging
from typing import Any, Optional
from dataclasses import dataclass, field
from enum import Enum
from openai import AsyncOpenAI
# API keys from environment
# export OPENAI_API_KEY="sk-..."
client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY", "sk-your-key-here"))
logger = logging.getLogger("multi-agent-mcp")
class AgentRole(Enum):
"""Defines the specialized roles agents can take."""
COORDINATOR = "coordinator"
DATA_ANALYST = "data_analyst"
HR_SPECIALIST = "hr_specialist"
ENGINEERING_LEAD = "engineering_lead"
REPORT_WRITER = "report_writer"
@dataclass
class AgentMessage:
"""Message passed between agents."""
sender: AgentRole
recipient: AgentRole
content: str
task_id: str
metadata: dict = field(default_factory=dict)
@dataclass
class TaskResult:
"""Result from a completed agent task."""
agent: AgentRole
task_id: str
success: bool
result: str
tokens_used: int = 0
duration_ms: float = 0.0
class MCPAgent:
"""A specialized agent that uses MCP tools for its domain."""
def __init__(
self,
role: AgentRole,
system_prompt: str,
mcp_tools: list[dict],
model: str = "gpt-4o"
):
self.role = role
self.system_prompt = system_prompt
self.mcp_tools = mcp_tools # MCP tool definitions for this agent
self.model = model
self.message_history: list[dict] = []
async def process_task(self, task: str, context: dict = {}) -> TaskResult:
"""Process a task using this agent's specialized tools and knowledge."""
import time
start = time.perf_counter()
# Build the message with task context
messages = [
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": f"Task: {task}\n\nContext: {json.dumps(context, indent=2)}"}
]
try:
# Call the LLM with MCP tool definitions
response = await client.chat.completions.create(
model=self.model,
messages=messages,
tools=self.mcp_tools if self.mcp_tools else None,
temperature=0.1
)
result_content = response.choices[0].message.content or ""
tokens = response.usage.total_tokens if response.usage else 0
duration = (time.perf_counter() - start) * 1000
# Handle tool calls if the model wants to use MCP tools
if response.choices[0].message.tool_calls:
tool_results = []
for tool_call in response.choices[0].message.tool_calls:
tool_result = await self._execute_mcp_tool(
tool_call.function.name,
json.loads(tool_call.function.arguments)
)
tool_results.append(tool_result)
# Feed tool results back to the model for final synthesis
messages.append(response.choices[0].message)
for i, tc in enumerate(response.choices[0].message.tool_calls):
messages.append({
"role": "tool",
"tool_call_id": tc.id,
"content": tool_results[i]
})
final_response = await client.chat.completions.create(
model=self.model,
messages=messages,
temperature=0.1
)
result_content = final_response.choices[0].message.content or ""
tokens += final_response.usage.total_tokens if final_response.usage else 0
duration = (time.perf_counter() - start) * 1000
logger.info(f"Agent {self.role.value} completed task in {duration:.0f}ms, {tokens} tokens")
return TaskResult(
agent=self.role,
task_id=f"{self.role.value}-{hash(task) % 10000}",
success=True,
result=result_content,
tokens_used=tokens,
duration_ms=duration
)
except Exception as e:
logger.error(f"Agent {self.role.value} failed: {e}")
return TaskResult(
agent=self.role,
task_id=f"{self.role.value}-error",
success=False,
result=f"Agent error: {str(e)}",
duration_ms=(time.perf_counter() - start) * 1000
)
async def _execute_mcp_tool(self, tool_name: str, arguments: dict) -> str:
"""Execute an MCP tool call (connects to MCP server)."""
# In production, this would call the actual MCP server via stdio/SSE
logger.info(f"Agent {self.role.value} calling tool: {tool_name}({arguments})")
# Placeholder — replace with actual MCP client call
return json.dumps({"tool": tool_name, "status": "executed", "mock": True})
class MultiAgentOrchestrator:
"""Coordinates multiple specialized agents to solve complex tasks."""
def __init__(self):
self.agents: dict[AgentRole, MCPAgent] = {}
self.task_history: list[TaskResult] = []
def register_agent(self, agent: MCPAgent):
"""Register a specialized agent."""
self.agents[agent.role] = agent
logger.info(f"Registered agent: {agent.role.value}")
async def solve(self, user_query: str) -> str:
"""Orchestrate multiple agents to solve a complex query."""
logger.info(f"Orchestrating solution for: {user_query}")
# Step 1: Coordinator analyzes the query and creates a task plan
coordinator = self.agents.get(AgentRole.COORDINATOR)
if not coordinator:
return "Error: No coordinator agent registered"
plan_result = await coordinator.process_task(
f"Analyze this query and create a task plan. "
f"Identify which specialist agents are needed.\n\nQuery: {user_query}",
context={"available_agents": [r.value for r in self.agents.keys()]}
)
# Step 2: Execute specialist tasks in parallel where possible
specialist_tasks = []
# Route to data analyst for data-heavy questions
if AgentRole.DATA_ANALYST in self.agents:
specialist_tasks.append(
self.agents[AgentRole.DATA_ANALYST].process_task(
user_query,
context={"coordinator_plan": plan_result.result}
)
)
# Route to HR specialist for people-related questions
if AgentRole.HR_SPECIALIST in self.agents:
specialist_tasks.append(
self.agents[AgentRole.HR_SPECIALIST].process_task(
user_query,
context={"coordinator_plan": plan_result.result}
)
)
# Execute all specialist tasks in parallel
specialist_results = await asyncio.gather(*specialist_tasks, return_exceptions=True)
# Step 3: Report writer synthesizes all results
report_writer = self.agents.get(AgentRole.REPORT_WRITER)
if report_writer:
synthesis_context = {
"original_query": user_query,
"coordinator_plan": plan_result.result,
"specialist_results": [
{"agent": r.agent.value, "result": r.result}
for r in specialist_results
if isinstance(r, TaskResult) and r.success
]
}
final_result = await report_writer.process_task(
"Synthesize all specialist findings into a clear, actionable report.",
context=synthesis_context
)
# Track all results
self.task_history.extend([plan_result] + [
r for r in specialist_results if isinstance(r, TaskResult)
] + [final_result])
total_tokens = sum(
r.tokens_used for r in self.task_history
if isinstance(r, TaskResult)
)
logger.info(f"Orchestration complete. Total tokens: {total_tokens}")
return final_result.result
return plan_result.result
# --- Build and Run the Multi-Agent System ---
async def main():
"""Set up and run the multi-agent MCP orchestration system."""
orchestrator = MultiAgentOrchestrator()
# Register the coordinator agent
orchestrator.register_agent(MCPAgent(
role=AgentRole.COORDINATOR,
system_prompt=(
"You are a task coordinator. Analyze queries and create execution plans. "
"Identify which specialist agents should handle which parts of the query."
),
mcp_tools=[] # Coordinator does not use tools directly
))
# Register the data analyst agent (with database MCP tools)
orchestrator.register_agent(MCPAgent(
role=AgentRole.DATA_ANALYST,
system_prompt=(
"You are a data analyst. Use database tools to find quantitative answers. "
"Always verify data accuracy and flag anomalies."
),
mcp_tools=[{
"type": "function",
"function": {
"name": "query_database",
"description": "Query the enterprise database with filters",
"parameters": {
"type": "object",
"properties": {
"table": {"type": "string"},
"filters": {"type": "object"},
"limit": {"type": "integer"}
},
"required": ["table"]
}
}
}]
))
# Register the HR specialist agent (with knowledge base MCP tools)
orchestrator.register_agent(MCPAgent(
role=AgentRole.HR_SPECIALIST,
system_prompt=(
"You are an HR specialist. Use the knowledge base to answer policy questions. "
"Always cite the specific policy document and last-updated date."
),
mcp_tools=[{
"type": "function",
"function": {
"name": "search_knowledge_base",
"description": "Search the HR knowledge base",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string"},
"category": {"type": "string"},
"top_k": {"type": "integer"}
},
"required": ["query"]
}
}
}]
))
# Register the report writer agent
orchestrator.register_agent(MCPAgent(
role=AgentRole.REPORT_WRITER,
system_prompt=(
"You are a report writer. Synthesize findings from multiple sources "
"into clear, well-structured reports with key takeaways and recommendations."
),
mcp_tools=[] # Report writer synthesizes, does not query
))
# Run a complex query that requires multiple agents
result = await orchestrator.solve(
"How many engineers do we have, what is our remote work policy, "
"and are there any staffing concerns I should know about?"
)
print(f"\n{'='*60}\nFinal Report:\n{'='*60}\n{result}")
if __name__ == "__main__":
asyncio.run(main())
4.2 A2A (Agent-to-Agent) Protocol
While MCP connects models to tools, the Agent-to-Agent (A2A) protocol connects agents to agents. A2A enables capability negotiation, task delegation, and distributed intelligence — agents can discover what other agents can do and route tasks accordingly.
MCP vs A2A: Think of MCP as the arms of an agent (how it interacts with the world) and A2A as the communication network between agents (how they collaborate). In production, you use both: MCP for tool access, A2A for inter-agent coordination.
# A2A Protocol — Agent-to-Agent Communication
# pip install httpx pydantic
import os
import json
import httpx
import asyncio
from dataclasses import dataclass, field
from typing import Any, Optional
from pydantic import BaseModel
# --- A2A Agent Card: Declares agent capabilities ---
class AgentCapability(BaseModel):
"""A single capability an agent can perform."""
name: str
description: str
input_schema: dict
output_schema: dict
class AgentCard(BaseModel):
"""
Agent Card — the A2A equivalent of an MCP tool listing.
Published at /.well-known/agent.json for discovery.
"""
name: str
description: str
version: str
capabilities: list[AgentCapability]
endpoint: str # URL where this agent accepts tasks
auth_required: bool = True
max_concurrent_tasks: int = 10
# --- A2A Task Protocol ---
class A2ATask(BaseModel):
"""A task delegated from one agent to another."""
task_id: str
capability: str # Which capability to invoke
input_data: dict
sender_agent: str
priority: str = "normal" # low, normal, high, critical
timeout_seconds: int = 60
callback_url: Optional[str] = None # For async task completion
class A2ATaskResult(BaseModel):
"""Result from a completed A2A task."""
task_id: str
status: str # "completed", "failed", "timeout"
result: Any
agent_name: str
duration_ms: float
tokens_used: int = 0
# --- A2A Client: Call other agents ---
class A2AClient:
"""Client for discovering and calling other agents via A2A protocol."""
def __init__(self):
self.known_agents: dict[str, AgentCard] = {} # name -> agent card
self.http_client = httpx.AsyncClient(timeout=60.0)
async def discover_agent(self, agent_url: str) -> AgentCard:
"""Discover an agent's capabilities by fetching its Agent Card."""
response = await self.http_client.get(f"{agent_url}/.well-known/agent.json")
card = AgentCard(**response.json())
self.known_agents[card.name] = card
print(f"Discovered agent: {card.name} with {len(card.capabilities)} capabilities")
return card
async def delegate_task(
self,
agent_name: str,
capability: str,
input_data: dict,
sender: str = "coordinator"
) -> A2ATaskResult:
"""Delegate a task to another agent."""
agent = self.known_agents.get(agent_name)
if not agent:
return A2ATaskResult(
task_id="error",
status="failed",
result=f"Unknown agent: {agent_name}",
agent_name=agent_name,
duration_ms=0
)
# Verify the agent supports this capability
supported = [c.name for c in agent.capabilities]
if capability not in supported:
return A2ATaskResult(
task_id="error",
status="failed",
result=f"Agent '{agent_name}' does not support capability '{capability}'",
agent_name=agent_name,
duration_ms=0
)
task = A2ATask(
task_id=f"task-{hash(json.dumps(input_data)) % 100000}",
capability=capability,
input_data=input_data,
sender_agent=sender
)
# Send task to the agent's endpoint
import time
start = time.perf_counter()
response = await self.http_client.post(
f"{agent.endpoint}/tasks",
json=task.model_dump()
)
duration = (time.perf_counter() - start) * 1000
if response.status_code == 200:
result_data = response.json()
return A2ATaskResult(
task_id=task.task_id,
status="completed",
result=result_data,
agent_name=agent_name,
duration_ms=duration
)
else:
return A2ATaskResult(
task_id=task.task_id,
status="failed",
result=f"HTTP {response.status_code}: {response.text}",
agent_name=agent_name,
duration_ms=duration
)
async def find_agent_for_capability(self, capability: str) -> Optional[str]:
"""Find which agent can handle a given capability."""
for name, card in self.known_agents.items():
if any(c.name == capability for c in card.capabilities):
return name
return None
# --- Example Usage ---
async def a2a_demo():
"""Demonstrate agent-to-agent communication."""
a2a = A2AClient()
# In production, discover agents from a registry or well-known URLs
# await a2a.discover_agent("https://data-agent.internal.company.com")
# await a2a.discover_agent("https://hr-agent.internal.company.com")
# Simulate agent discovery
a2a.known_agents["data-analyst"] = AgentCard(
name="data-analyst",
description="Analyzes enterprise data and generates insights",
version="1.0.0",
capabilities=[
AgentCapability(
name="analyze_headcount",
description="Analyze headcount by department",
input_schema={"type": "object", "properties": {"department": {"type": "string"}}},
output_schema={"type": "object"}
)
],
endpoint="http://data-agent:8080"
)
# Find and delegate
agent = await a2a.find_agent_for_capability("analyze_headcount")
if agent:
print(f"Found agent for analyze_headcount: {agent}")
if __name__ == "__main__":
asyncio.run(a2a_demo())
4.3 Workflow Orchestration
Complex AI workflows require orchestration patterns beyond simple request-response. DAG-based execution allows you to define workflows as directed acyclic graphs, with parallel branches, conditional routing, and retry logic.
# DAG-Based Agent Workflow Orchestration
# pip install networkx pydantic
import asyncio
import json
import logging
from typing import Any, Callable, Optional
from dataclasses import dataclass, field
from enum import Enum
logger = logging.getLogger("workflow-orchestrator")
class StepStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class WorkflowStep:
"""A single step in a workflow DAG."""
name: str
handler: Callable # async function to execute
dependencies: list[str] = field(default_factory=list) # Steps that must complete first
retry_count: int = 2
timeout_seconds: int = 60
condition: Optional[Callable] = None # Optional: skip step if condition returns False
compensation: Optional[Callable] = None # Rollback function if downstream fails
status: StepStatus = StepStatus.PENDING
result: Any = None
error: Optional[str] = None
class WorkflowOrchestrator:
"""DAG-based workflow execution engine for agent systems."""
def __init__(self, name: str):
self.name = name
self.steps: dict[str, WorkflowStep] = {}
self.results: dict[str, Any] = {} # Shared result store
def add_step(self, step: WorkflowStep) -> "WorkflowOrchestrator":
"""Add a step to the workflow. Returns self for chaining."""
self.steps[step.name] = step
return self
def _get_ready_steps(self) -> list[str]:
"""Find steps whose dependencies are all completed."""
ready = []
for name, step in self.steps.items():
if step.status != StepStatus.PENDING:
continue
# Check all dependencies are completed
deps_met = all(
self.steps[dep].status == StepStatus.COMPLETED
for dep in step.dependencies
)
if deps_met:
ready.append(name)
return ready
async def _execute_step(self, step_name: str):
"""Execute a single workflow step with retry logic."""
step = self.steps[step_name]
# Check condition (skip if condition returns False)
if step.condition and not step.condition(self.results):
step.status = StepStatus.SKIPPED
logger.info(f"Step '{step_name}' skipped (condition not met)")
return
step.status = StepStatus.RUNNING
last_error = None
for attempt in range(step.retry_count + 1):
try:
# Execute with timeout
result = await asyncio.wait_for(
step.handler(self.results),
timeout=step.timeout_seconds
)
step.status = StepStatus.COMPLETED
step.result = result
self.results[step_name] = result
logger.info(f"Step '{step_name}' completed (attempt {attempt + 1})")
return
except asyncio.TimeoutError:
last_error = f"Timeout after {step.timeout_seconds}s"
logger.warning(f"Step '{step_name}' timed out (attempt {attempt + 1})")
except Exception as e:
last_error = str(e)
logger.warning(f"Step '{step_name}' failed (attempt {attempt + 1}): {e}")
if attempt < step.retry_count:
await asyncio.sleep(2 ** attempt) # Exponential backoff
# All retries exhausted
step.status = StepStatus.FAILED
step.error = last_error
logger.error(f"Step '{step_name}' permanently failed: {last_error}")
async def execute(self) -> dict[str, Any]:
"""Execute the workflow DAG, running parallel steps where possible."""
logger.info(f"Starting workflow: {self.name}")
while True:
ready = self._get_ready_steps()
if not ready:
# Check if all steps are done (completed, failed, or skipped)
all_done = all(
s.status in (StepStatus.COMPLETED, StepStatus.FAILED, StepStatus.SKIPPED)
for s in self.steps.values()
)
if all_done:
break
else:
# Some steps are blocked by failed dependencies
for name, step in self.steps.items():
if step.status == StepStatus.PENDING:
has_failed_dep = any(
self.steps[dep].status == StepStatus.FAILED
for dep in step.dependencies
)
if has_failed_dep:
step.status = StepStatus.SKIPPED
step.error = "Dependency failed"
break
# Execute all ready steps in parallel
await asyncio.gather(
*[self._execute_step(name) for name in ready]
)
# Run compensation for failed workflows
await self._compensate()
return self._get_summary()
async def _compensate(self):
"""Run compensation (rollback) handlers for completed steps if workflow failed."""
has_failure = any(s.status == StepStatus.FAILED for s in self.steps.values())
if not has_failure:
return
logger.info("Running compensation for failed workflow...")
for name, step in reversed(list(self.steps.items())):
if step.status == StepStatus.COMPLETED and step.compensation:
try:
await step.compensation(self.results)
logger.info(f"Compensation for '{name}' completed")
except Exception as e:
logger.error(f"Compensation for '{name}' failed: {e}")
def _get_summary(self) -> dict:
"""Generate workflow execution summary."""
return {
"workflow": self.name,
"steps": {
name: {
"status": step.status.value,
"result_preview": str(step.result)[:200] if step.result else None,
"error": step.error
}
for name, step in self.steps.items()
},
"overall_status": "completed" if all(
s.status in (StepStatus.COMPLETED, StepStatus.SKIPPED)
for s in self.steps.values()
) else "failed"
}
# --- Example Workflow: Quarterly Business Review ---
async def build_quarterly_review():
"""Build a complex workflow that generates a quarterly business review."""
wf = WorkflowOrchestrator("quarterly-business-review")
# Step 1 & 2 run in parallel (no dependencies)
wf.add_step(WorkflowStep(
name="fetch_headcount",
handler=lambda ctx: asyncio.coroutine(lambda: {"total": 342, "engineering": 128})(),
dependencies=[]
))
wf.add_step(WorkflowStep(
name="fetch_financials",
handler=lambda ctx: asyncio.coroutine(lambda: {"revenue": "12.4M", "burn": "2.1M"})(),
dependencies=[]
))
# Step 3 depends on both Step 1 and Step 2
wf.add_step(WorkflowStep(
name="analyze_trends",
handler=lambda ctx: asyncio.coroutine(lambda: {
"headcount_growth": "12%",
"revenue_per_head": f"${12400000/ctx.get('fetch_headcount', {}).get('total', 1):.0f}"
})(),
dependencies=["fetch_headcount", "fetch_financials"]
))
# Step 4 depends on analysis
wf.add_step(WorkflowStep(
name="generate_report",
handler=lambda ctx: asyncio.coroutine(lambda: "Quarterly Review: Growth on track.")(),
dependencies=["analyze_trends"]
))
result = await wf.execute()
print(json.dumps(result, indent=2))
return result
4.4 Memory Systems & RAG Integration
Production agent systems need memory — both short-term context (current conversation) and long-term knowledge (vector storage with RAG). MCP provides the bridge between agents and their memory backends.
# Agent Memory System with MCP + RAG Integration
# pip install chromadb openai tiktoken
import os
import json
import hashlib
from datetime import datetime
from typing import Optional
# API key from environment
# export OPENAI_API_KEY="sk-..."
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "sk-your-key-here")
class AgentMemorySystem:
"""
Hierarchical memory system for MCP-powered agents.
Combines short-term conversation context with long-term vector storage.
"""
def __init__(self, agent_id: str, collection_name: str = "agent_memory"):
import chromadb
self.agent_id = agent_id
# Short-term memory: recent conversation turns (in-memory)
self.short_term: list[dict] = []
self.max_short_term = 20 # Keep last 20 messages
# Long-term memory: vector store for persistent knowledge
self.chroma_client = chromadb.Client()
self.collection = self.chroma_client.get_or_create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"}
)
# Working memory: current task context (ephemeral)
self.working_memory: dict = {}
def add_short_term(self, role: str, content: str, metadata: dict = {}):
"""Add a message to short-term (conversation) memory."""
entry = {
"role": role,
"content": content,
"timestamp": datetime.now().isoformat(),
"metadata": metadata
}
self.short_term.append(entry)
# Evict oldest if over capacity
if len(self.short_term) > self.max_short_term:
# Move evicted messages to long-term storage
evicted = self.short_term.pop(0)
self.store_long_term(
text=f"{evicted['role']}: {evicted['content']}",
metadata={"source": "evicted_conversation", **evicted.get("metadata", {})}
)
def store_long_term(self, text: str, metadata: dict = {}):
"""Store information in long-term vector memory."""
doc_id = hashlib.md5(text.encode()).hexdigest()[:12]
self.collection.upsert(
documents=[text],
metadatas=[{
"agent_id": self.agent_id,
"stored_at": datetime.now().isoformat(),
**metadata
}],
ids=[doc_id]
)
def recall_long_term(self, query: str, top_k: int = 5) -> list[dict]:
"""Retrieve relevant memories from long-term storage (RAG)."""
results = self.collection.query(
query_texts=[query],
n_results=top_k,
where={"agent_id": self.agent_id}
)
memories = []
if results and results["documents"]:
for i, doc in enumerate(results["documents"][0]):
memories.append({
"content": doc,
"metadata": results["metadatas"][0][i] if results["metadatas"] else {},
"distance": results["distances"][0][i] if results["distances"] else None
})
return memories
def set_working_memory(self, key: str, value):
"""Store ephemeral task-specific data in working memory."""
self.working_memory[key] = value
def get_working_memory(self, key: str, default=None):
"""Retrieve task-specific data from working memory."""
return self.working_memory.get(key, default)
def build_context(self, current_query: str) -> str:
"""
Build a complete context string combining all memory types.
This is what gets injected into the LLM prompt.
"""
context_parts = []
# 1. Relevant long-term memories
memories = self.recall_long_term(current_query, top_k=3)
if memories:
memory_text = "\n".join([f"- {m['content']}" for m in memories])
context_parts.append(f"## Relevant Past Knowledge\n{memory_text}")
# 2. Working memory (current task state)
if self.working_memory:
working_text = json.dumps(self.working_memory, indent=2, default=str)
context_parts.append(f"## Current Task State\n{working_text}")
# 3. Recent conversation (short-term)
if self.short_term:
recent = self.short_term[-5:] # Last 5 messages
convo_text = "\n".join([f"{m['role']}: {m['content']}" for m in recent])
context_parts.append(f"## Recent Conversation\n{convo_text}")
return "\n\n".join(context_parts)
def clear_working_memory(self):
"""Clear working memory (called when a task completes)."""
self.working_memory = {}
def get_stats(self) -> dict:
"""Get memory system statistics."""
return {
"agent_id": self.agent_id,
"short_term_count": len(self.short_term),
"long_term_count": self.collection.count(),
"working_memory_keys": list(self.working_memory.keys())
}
# --- Usage Example ---
def demo_memory_system():
"""Demonstrate the agent memory system."""
memory = AgentMemorySystem(agent_id="data-analyst-01")
# Store some long-term knowledge
memory.store_long_term(
"Q3 2025 engineering headcount was 115, up 12% from Q2.",
metadata={"source": "quarterly_report", "quarter": "Q3-2025"}
)
memory.store_long_term(
"Remote work policy updated in March 2026 to allow 4 days remote.",
metadata={"source": "hr_update", "effective_date": "2026-03-01"}
)
# Simulate conversation
memory.add_short_term("user", "How has engineering headcount changed?")
memory.add_short_term("assistant", "Let me check the latest data...")
# Set working memory for current task
memory.set_working_memory("current_query_department", "Engineering")
memory.set_working_memory("data_sources_checked", ["employees_table", "quarterly_reports"])
# Build context for the LLM
context = memory.build_context("engineering headcount trends")
print(f"Built context ({len(context)} chars):\n{context}")
print(f"\nMemory stats: {memory.get_stats()}")
if __name__ == "__main__":
demo_memory_system()
5. Performance Optimization
MCP server performance directly impacts agent response times and user experience. This section covers three optimization dimensions: latency (caching and parallel execution to minimize wait times), memory (connection pooling and resource management), and throughput (batch processing and async request handling). Each technique is demonstrated with production-ready code patterns that you can adapt to your specific workload.
5.1 Latency: Caching & Parallel Execution
The biggest performance wins in MCP systems come from caching repeated tool calls and executing independent tools in parallel. A well-optimized system can reduce end-to-end latency by 60-80%.
# MCP Performance Optimization — Caching + Parallel Execution
# pip install cachetools aiohttp
import os
import json
import time
import asyncio
import hashlib
import logging
from typing import Any, Callable
from functools import wraps
from cachetools import TTLCache
logger = logging.getLogger("mcp-performance")
class MCPResponseCache:
"""
Semantic cache for MCP tool responses.
Caches tool results by input hash with configurable TTL.
Dramatically reduces latency and cost for repeated queries.
"""
def __init__(self, max_size: int = 1000, ttl_seconds: int = 300):
self.cache = TTLCache(maxsize=max_size, ttl=ttl_seconds)
self.hit_count = 0
self.miss_count = 0
def _make_key(self, tool_name: str, args: dict) -> str:
"""Generate a deterministic cache key from tool name and arguments."""
normalized = json.dumps({"tool": tool_name, "args": args}, sort_keys=True)
return hashlib.sha256(normalized.encode()).hexdigest()[:16]
def get(self, tool_name: str, args: dict) -> tuple[bool, Any]:
"""Look up a cached result. Returns (hit: bool, result: Any)."""
key = self._make_key(tool_name, args)
if key in self.cache:
self.hit_count += 1
logger.debug(f"Cache HIT for {tool_name} (key={key})")
return True, self.cache[key]
self.miss_count += 1
logger.debug(f"Cache MISS for {tool_name} (key={key})")
return False, None
def put(self, tool_name: str, args: dict, result: Any):
"""Store a tool result in the cache."""
key = self._make_key(tool_name, args)
self.cache[key] = result
@property
def hit_rate(self) -> float:
"""Calculate the cache hit rate."""
total = self.hit_count + self.miss_count
return self.hit_count / total if total > 0 else 0.0
def get_stats(self) -> dict:
"""Return cache performance statistics."""
return {
"size": len(self.cache),
"hits": self.hit_count,
"misses": self.miss_count,
"hit_rate": f"{self.hit_rate:.1%}",
"max_size": self.cache.maxsize,
"ttl_seconds": int(self.cache.ttl)
}
# Global cache instance
response_cache = MCPResponseCache(max_size=2000, ttl_seconds=300)
def cached_tool(ttl_seconds: int = 300):
"""Decorator that adds caching to any MCP tool handler."""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(**kwargs) -> str:
tool_name = func.__name__
# Check cache first
hit, cached_result = response_cache.get(tool_name, kwargs)
if hit:
return cached_result
# Cache miss — execute the tool
result = await func(**kwargs)
# Store in cache
response_cache.put(tool_name, kwargs, result)
return result
return wrapper
return decorator
class ParallelToolExecutor:
"""
Execute multiple MCP tool calls in parallel.
Used when the LLM requests multiple independent tool calls in one turn.
"""
def __init__(self, max_concurrency: int = 10, timeout: float = 30.0):
self.semaphore = asyncio.Semaphore(max_concurrency)
self.timeout = timeout
async def execute_parallel(
self,
tool_calls: list[dict]
) -> list[dict]:
"""
Execute multiple tool calls in parallel with concurrency limits.
Args:
tool_calls: List of {"name": str, "handler": Callable, "args": dict}
Returns:
List of {"name": str, "result": Any, "duration_ms": float, "status": str}
"""
async def _execute_one(call: dict) -> dict:
async with self.semaphore:
start = time.perf_counter()
try:
result = await asyncio.wait_for(
call["handler"](**call["args"]),
timeout=self.timeout
)
duration = (time.perf_counter() - start) * 1000
return {
"name": call["name"],
"result": result,
"duration_ms": round(duration, 2),
"status": "success"
}
except asyncio.TimeoutError:
duration = (time.perf_counter() - start) * 1000
return {
"name": call["name"],
"result": f"Tool timed out after {self.timeout}s",
"duration_ms": round(duration, 2),
"status": "timeout"
}
except Exception as e:
duration = (time.perf_counter() - start) * 1000
return {
"name": call["name"],
"result": f"Error: {str(e)}",
"duration_ms": round(duration, 2),
"status": "error"
}
# Execute all tool calls concurrently
results = await asyncio.gather(
*[_execute_one(call) for call in tool_calls]
)
total_time = max(r["duration_ms"] for r in results)
sequential_time = sum(r["duration_ms"] for r in results)
speedup = sequential_time / total_time if total_time > 0 else 1
logger.info(
f"Parallel execution: {len(tool_calls)} tools, "
f"wall time={total_time:.0f}ms, "
f"sequential would be={sequential_time:.0f}ms, "
f"speedup={speedup:.1f}x"
)
return list(results)
# --- Example: Cached + Parallel Tool Execution ---
@cached_tool(ttl_seconds=300)
async def get_employee_count(department: str = "all") -> str:
"""Get employee count (cached for 5 minutes)."""
await asyncio.sleep(0.5) # Simulate DB query latency
return json.dumps({"department": department, "count": 128})
@cached_tool(ttl_seconds=600)
async def get_policy_summary(topic: str = "general") -> str:
"""Get policy summary (cached for 10 minutes)."""
await asyncio.sleep(0.8) # Simulate KB search latency
return json.dumps({"topic": topic, "summary": "Remote work allowed 3 days/week."})
@cached_tool(ttl_seconds=60)
async def get_project_status(project_id: str = "all") -> str:
"""Get project status (cached for 1 minute — more volatile data)."""
await asyncio.sleep(0.3) # Simulate API call
return json.dumps({"project": project_id, "status": "on_track", "progress": 78})
async def demo_performance():
"""Demonstrate caching and parallel execution."""
executor = ParallelToolExecutor(max_concurrency=5, timeout=10.0)
# First call — cache miss, executes all three tools in parallel
print("=== First call (cache cold) ===")
results = await executor.execute_parallel([
{"name": "employee_count", "handler": get_employee_count, "args": {"department": "Engineering"}},
{"name": "policy_summary", "handler": get_policy_summary, "args": {"topic": "remote_work"}},
{"name": "project_status", "handler": get_project_status, "args": {"project_id": "proj-42"}},
])
for r in results:
print(f" {r['name']}: {r['duration_ms']}ms ({r['status']})")
# Second call — cache hit, near-instant
print("\n=== Second call (cache warm) ===")
results2 = await executor.execute_parallel([
{"name": "employee_count", "handler": get_employee_count, "args": {"department": "Engineering"}},
{"name": "policy_summary", "handler": get_policy_summary, "args": {"topic": "remote_work"}},
{"name": "project_status", "handler": get_project_status, "args": {"project_id": "proj-42"}},
])
for r in results2:
print(f" {r['name']}: {r['duration_ms']}ms ({r['status']})")
print(f"\nCache stats: {response_cache.get_stats()}")
if __name__ == "__main__":
asyncio.run(demo_performance())
5.2 Cost Optimization
MCP agent systems can become expensive fast — each agent loop iteration costs tokens, and multi-agent systems multiply this. Smart cost optimization requires multiple strategies working together:
| Strategy |
Description |
Typical Savings |
| Model Routing |
Use cheap models (GPT-4o-mini) for simple tasks, expensive models (GPT-4o) only for complex reasoning |
40-60% |
| Response Caching |
Cache identical tool calls (see above). Most enterprise queries are repeated. |
20-50% |
| Token Minimization |
Compress context, summarize conversation history, remove redundant tool schemas |
15-30% |
| Smart Sampling |
Use temperature=0 for deterministic tasks, lower max_tokens when short answers suffice |
10-20% |
| Iteration Limits |
Cap agent reasoning loops (e.g., max 5 iterations). Infinite loops drain budgets. |
Prevents 10x+ overruns |
# Cost-Aware Model Router for MCP Agents
# pip install openai tiktoken
import os
import tiktoken
# export OPENAI_API_KEY="sk-..."
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "sk-your-key-here")
# Cost per 1M tokens (as of early 2026)
MODEL_COSTS = {
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"gpt-3.5-turbo": {"input": 0.50, "output": 1.50},
}
class CostAwareRouter:
"""
Routes requests to the cheapest model that can handle the task.
Uses task complexity heuristics to decide which model to use.
"""
def __init__(self, budget_limit_usd: float = 50.0):
self.budget_limit = budget_limit_usd
self.total_spent = 0.0
self.call_log: list[dict] = []
self.encoder = tiktoken.encoding_for_model("gpt-4o")
def estimate_complexity(self, prompt: str, tool_count: int) -> str:
"""
Estimate task complexity to choose the right model.
Returns: "simple", "moderate", or "complex"
"""
token_count = len(self.encoder.encode(prompt))
# Simple: short prompt, no tools, factual lookup
if token_count < 200 and tool_count <= 1:
return "simple"
# Complex: long context, multiple tools, requires reasoning
if token_count > 1000 or tool_count > 3:
return "complex"
return "moderate"
def select_model(self, prompt: str, tool_count: int = 0) -> str:
"""Select the optimal model based on task complexity and budget."""
complexity = self.estimate_complexity(prompt, tool_count)
# Budget check — downgrade if running low
remaining_budget = self.budget_limit - self.total_spent
if remaining_budget < 5.0:
return "gpt-4o-mini" # Force cheapest model when low budget
model_map = {
"simple": "gpt-4o-mini",
"moderate": "gpt-4o-mini", # 4o-mini handles most tasks well
"complex": "gpt-4o",
}
model = model_map.get(complexity, "gpt-4o-mini")
return model
def record_usage(self, model: str, input_tokens: int, output_tokens: int):
"""Record token usage and calculate cost."""
costs = MODEL_COSTS.get(model, MODEL_COSTS["gpt-4o-mini"])
cost = (input_tokens * costs["input"] + output_tokens * costs["output"]) / 1_000_000
self.total_spent += cost
self.call_log.append({
"model": model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"cost_usd": round(cost, 6)
})
if self.total_spent > self.budget_limit * 0.8:
import logging
logging.warning(
f"Budget alert: spent ${self.total_spent:.4f} of ${self.budget_limit:.2f} "
f"({self.total_spent/self.budget_limit:.0%})"
)
def get_report(self) -> dict:
"""Generate a cost report."""
return {
"total_spent_usd": round(self.total_spent, 4),
"budget_limit_usd": self.budget_limit,
"budget_remaining_usd": round(self.budget_limit - self.total_spent, 4),
"total_calls": len(self.call_log),
"by_model": self._aggregate_by_model(),
}
def _aggregate_by_model(self) -> dict:
"""Aggregate costs by model."""
agg = {}
for entry in self.call_log:
model = entry["model"]
if model not in agg:
agg[model] = {"calls": 0, "total_cost": 0.0, "total_tokens": 0}
agg[model]["calls"] += 1
agg[model]["total_cost"] += entry["cost_usd"]
agg[model]["total_tokens"] += entry["input_tokens"] + entry["output_tokens"]
# Round totals
for model in agg:
agg[model]["total_cost"] = round(agg[model]["total_cost"], 4)
return agg
5.3 Throughput: Batch Requests & Async Processing
When MCP servers handle high request volumes, individual request processing becomes a bottleneck. Batch processing and async queue systems let you process multiple requests concurrently, amortize overhead across batches, and handle bursts gracefully with backpressure. The implementation below demonstrates a queue-based architecture with configurable batch sizes, concurrent workers, and priority scheduling.
# High-Throughput MCP Request Processing with Queue System
# pip install aiohttp asyncio
import asyncio
import json
import time
import logging
from collections import deque
from typing import Any, Callable
from dataclasses import dataclass, field
logger = logging.getLogger("mcp-throughput")
@dataclass
class QueuedRequest:
"""A request waiting in the processing queue."""
request_id: str
tool_name: str
args: dict
priority: int = 0 # Higher = more urgent
created_at: float = field(default_factory=time.time)
future: asyncio.Future = field(default_factory=lambda: asyncio.get_event_loop().create_future())
class MCPRequestQueue:
"""
Async request queue for high-throughput MCP server processing.
Batches requests, prioritizes, and processes with controlled concurrency.
"""
def __init__(
self,
tool_handlers: dict[str, Callable],
max_workers: int = 20,
batch_size: int = 10,
batch_timeout: float = 0.1 # Max wait time to fill a batch (seconds)
):
self.tool_handlers = tool_handlers
self.max_workers = max_workers
self.batch_size = batch_size
self.batch_timeout = batch_timeout
self.queue: asyncio.PriorityQueue = asyncio.PriorityQueue()
self.semaphore = asyncio.Semaphore(max_workers)
# Metrics
self.processed_count = 0
self.error_count = 0
self.total_latency_ms = 0.0
async def enqueue(self, tool_name: str, args: dict, priority: int = 0) -> Any:
"""Submit a request and wait for the result."""
request = QueuedRequest(
request_id=f"req-{time.time_ns() % 1000000}",
tool_name=tool_name,
args=args,
priority=-priority # Negate because PriorityQueue is min-heap
)
await self.queue.put((request.priority, request))
logger.debug(f"Enqueued: {request.request_id} (tool={tool_name}, priority={priority})")
# Wait for the result
result = await request.future
return result
async def start_workers(self):
"""Start background workers that process the queue."""
workers = [
asyncio.create_task(self._worker(i))
for i in range(self.max_workers)
]
logger.info(f"Started {self.max_workers} queue workers")
return workers
async def _worker(self, worker_id: int):
"""Worker coroutine that processes requests from the queue."""
while True:
try:
priority, request = await self.queue.get()
async with self.semaphore:
start = time.perf_counter()
try:
handler = self.tool_handlers.get(request.tool_name)
if not handler:
request.future.set_result(
json.dumps({"error": f"Unknown tool: {request.tool_name}"})
)
continue
result = await handler(**request.args)
request.future.set_result(result)
duration = (time.perf_counter() - start) * 1000
self.processed_count += 1
self.total_latency_ms += duration
except Exception as e:
request.future.set_result(json.dumps({"error": str(e)}))
self.error_count += 1
finally:
self.queue.task_done()
except asyncio.CancelledError:
break
def get_metrics(self) -> dict:
"""Get queue processing metrics."""
avg_latency = (
self.total_latency_ms / self.processed_count
if self.processed_count > 0 else 0
)
return {
"queue_size": self.queue.qsize(),
"processed": self.processed_count,
"errors": self.error_count,
"error_rate": f"{self.error_count / max(self.processed_count, 1):.1%}",
"avg_latency_ms": round(avg_latency, 2),
"workers": self.max_workers
}
# --- Example: High-throughput MCP server ---
async def demo_throughput():
"""Demonstrate high-throughput request processing."""
# Define tool handlers
async def fast_lookup(key: str = "default") -> str:
await asyncio.sleep(0.01) # 10ms simulated latency
return json.dumps({"key": key, "value": "found"})
async def slow_query(table: str = "employees") -> str:
await asyncio.sleep(0.5) # 500ms simulated latency
return json.dumps({"table": table, "rows": 100})
queue = MCPRequestQueue(
tool_handlers={"fast_lookup": fast_lookup, "slow_query": slow_query},
max_workers=10
)
# Start background workers
workers = await queue.start_workers()
# Submit 50 requests concurrently
start = time.perf_counter()
tasks = []
for i in range(50):
tool = "fast_lookup" if i % 3 != 0 else "slow_query"
tasks.append(queue.enqueue(tool, {"key": f"item-{i}"} if tool == "fast_lookup" else {"table": "employees"}))
results = await asyncio.gather(*tasks)
elapsed = (time.perf_counter() - start) * 1000
print(f"Processed 50 requests in {elapsed:.0f}ms")
print(f"Metrics: {json.dumps(queue.get_metrics(), indent=2)}")
# Cleanup workers
for w in workers:
w.cancel()
if __name__ == "__main__":
asyncio.run(demo_throughput())
6. Real-World Use Cases & Production Checklist
MCP is already powering production AI systems across industries — from enterprise assistants that query internal databases to developer tools that interact with cloud infrastructure. This section showcases concrete use cases with architecture patterns, then provides a comprehensive production checklist covering security, monitoring, error handling, and operational readiness that teams should validate before deploying MCP servers to production.
6.1 Enterprise AI Assistants
Case Study
Global Bank: Secure Knowledge Access via MCP
A global bank deployed MCP servers to give their internal AI assistant access to HR policies, compliance documents, and customer data — all behind strict authorization boundaries. Each data domain (HR, Compliance, Customer Service) runs its own MCP server with distinct RBAC policies. The compliance MCP server requires compliance:read scope and logs every query for audit. The result: 80% reduction in time-to-answer for employee policy questions, with zero unauthorized data access incidents in 12 months of production.
Enterprise
RBAC
Domain Separation
Audit Logging
Case Study
Platform Team: CI/CD Automation MCP Server
A platform engineering team built a suite of MCP servers that let their AI coding assistant interact with the entire development lifecycle. The mcp-github server provides repository search, PR creation, and code review tools. The mcp-ci server exposes pipeline triggers, build status, and test result analysis. The mcp-deploy server handles staging deployments and rollback triggers. Developers use Claude with these MCP tools to go from "fix this bug" to "PR merged and deployed to staging" in a single conversation. Average PR cycle time dropped from 4 hours to 45 minutes.
Developer Experience
CI/CD
Micro MCP
Productivity
6.3 Automation Agents
MCP-powered automation agents handle repetitive business workflows end-to-end:
| Use Case |
MCP Tools Involved |
Workflow |
| CRM Updates |
mcp-salesforce (CRUD), mcp-email (read inbox) |
Read emails → Extract deal updates → Update Salesforce → Notify sales rep |
| Email Workflows |
mcp-gmail (read/send), mcp-calendar (schedule), mcp-kb (search) |
Triage inbox → Draft responses → Schedule follow-ups → File in CRM |
| Data Pipelines |
mcp-s3 (files), mcp-db (query), mcp-transform (ETL) |
Detect new files → Validate schema → Transform → Load → Alert on errors |
| Research Agents |
mcp-web-search, mcp-arxiv, mcp-kb (store), mcp-summarizer |
Multi-source search → Aggregate → Cross-reference → Summarize → Store |
6.4 Production Checklist
Before deploying any MCP system to production, verify every item on this checklist. Each item represents a real production failure we have seen or prevented.
| Category |
Checklist Item |
Why It Matters |
| Security |
All MCP endpoints require authentication (API key or OAuth) |
Unauthenticated endpoints are attack vectors |
| Tool inputs are validated and sanitized (no SQL injection, no path traversal) |
MCP tools execute real operations — malicious inputs cause damage |
| RBAC enforced per tool (not all users can access all tools) |
A marketing user should not query the salary database |
| Secrets stored in environment variables or secret manager, never in code |
Leaked API keys lead to unauthorized access and cost overruns |
| Observability |
Structured logging on every tool call (input, output, duration, errors) |
Cannot debug production issues without logs |
| Metrics exported (call count, latency, error rate, token usage) |
Need dashboards and alerts for operational visibility |
| Distributed tracing enabled (OpenTelemetry) |
Multi-hop requests (client → agent → MCP → DB) need end-to-end visibility |
| Scalability |
Stateless server design (no in-memory session state) |
Required for horizontal scaling and load balancing |
| Auto-scaling configured (HPA in K8s or serverless scaling) |
Traffic spikes should not cause outages |
| Connection pooling for databases and external APIs |
Connection exhaustion is the most common scaling bottleneck |
| Fault Tolerance |
Circuit breakers on all external calls (DB, APIs, LLM providers) |
Prevents cascading failures when a dependency goes down |
| Retry with exponential backoff and jitter |
Retries without backoff amplify outages (thundering herd) |
| Graceful degradation (tool returns error JSON, not crash) |
The agent can continue with partial data rather than failing entirely |
| Versioning |
MCP server version in capability registry |
Clients need to know which tools and schemas are available |
| Backward-compatible API changes (additive only) |
Breaking changes crash agents that depend on the old schema |
| Testing |
Unit tests for every tool handler |
Catch regressions before deployment |
| Contract tests for MCP protocol compliance |
Ensure tools return valid JSON and follow error conventions |
Exercises & Self-Assessment
Exercise 1
Build a Production MCP Server
Build a complete MCP server using FastMCP that provides access to a real data source:
- Choose a domain (e.g., GitHub repos, weather data, a SQLite database)
- Implement at least 3 tools with proper input validation using Pydantic
- Add a resource endpoint that provides schema/metadata to the model
- Write unit tests for all tool handlers
- Add structured logging with JSON output format
Exercise 2
Dockerize and Deploy MCP Servers
Containerize your MCP server and deploy it:
- Write a Dockerfile with multi-stage build, non-root user, and health check
- Create a docker-compose.yml with at least 2 MCP servers and a database
- Configure SSE transport for networked access
- Test that a LangChain agent can connect to your Dockerized MCP servers
- Bonus: Write a Kubernetes manifest with HPA for auto-scaling
Exercise 3
Multi-Agent Orchestration
Build a multi-agent system using MCP:
- Create 3 specialized agents (e.g., data analyst, writer, reviewer)
- Each agent should connect to its own MCP server(s)
- Implement a coordinator that routes tasks to the right specialist
- Add parallel execution where possible (agents that can work independently)
- Track total tokens and cost across all agents
Exercise 4
Performance Optimization Challenge
Optimize an MCP-based system for speed and cost:
- Implement a semantic cache with TTL for tool responses
- Add parallel tool execution with concurrency limits
- Build a cost-aware model router that picks the cheapest adequate model
- Measure before/after: latency, cost per query, cache hit rate
- Target: 50% latency reduction and 40% cost reduction
Exercise 5
Reflective Questions
- When would you choose a monolithic MCP server over micro MCP servers? What are the specific trade-offs in terms of deployment complexity, fault isolation, and team ownership?
- How does the A2A protocol differ from simply having agents call each other's APIs? What does capability negotiation enable that a hardcoded API integration does not?
- Your MCP server cache has a 95% hit rate but your users are complaining about stale data. How do you balance cache freshness with performance? What invalidation strategies would you consider?
- Design a security model for an MCP server that handles both public company information and confidential salary data. How do you prevent a prompt injection attack from bypassing your RBAC?
- An agent using your MCP tools enters an infinite loop, repeatedly calling the same database query. How would you detect and prevent this in production? What monitoring and safeguards would you implement?
Conclusion & Next Steps
You now have the knowledge to build, deploy, and operate production MCP systems — the infrastructure layer that connects AI models to the real world. Here are the key takeaways from Part 14:
- MCP server architecture — Production servers are layered systems with capability registries, tool handlers, resource providers, auth middleware, and observability. Micro MCP servers provide the best balance of isolation, scalability, and maintainability.
- Ecosystem integrations — MCP integrates naturally with LangChain for agent orchestration, Docker for isolation, Kubernetes for scaling, Cloudflare Workers for edge deployment, and OAuth for enterprise security.
- Observability — OpenTelemetry-based tracing, metrics (latency, error rate, throughput), and structured logging are non-negotiable for production MCP systems. You cannot debug what you cannot see.
- Multi-agent systems — Specialized agents per domain, coordinated by an orchestrator, with MCP for tool access and A2A for inter-agent communication. This is the architecture powering the most capable AI systems today.
- Performance optimization — Response caching (50% latency reduction), parallel tool execution (2-5x speedup), cost-aware model routing (40-60% cost savings), and async queue processing for throughput.
- Production checklist — Security hardened, fully observable, horizontally scalable, fault tolerant with circuit breakers, version-controlled APIs, and comprehensive test coverage.
Next in the Series
In Part 15: Evaluation & LLMOps, we dive deep into the operational side of AI applications — how to evaluate prompt quality, trace agent executions, run A/B tests on prompts, track experiments with LangSmith and Langfuse, and build reliable CI/CD pipelines for AI systems.
Continue the Series
Part 15: Evaluation & LLMOps
Master LLM evaluation types, RAG eval with RAGAS, observability with LangSmith and Langfuse, experiment tracking, and CI/CD for LLM apps.
Read Article
Part 16: Production AI Systems
Build production-grade LLM APIs with FastAPI, async streaming, queuing, caching, and auto-scaling infrastructure.
Read Article
Part 17: Safety, Guardrails & Reliability
Implement input/output guardrails, hallucination mitigation, prompt injection defense, and reliability patterns for AI systems.
Read Article