Back to Technology

AI Application Development Mastery Part 16: Production AI Systems

April 1, 2026 Wasil Zafar 44 min read

Master the infrastructure behind production AI systems. Build FastAPI services for LLM APIs, implement async streaming with SSE and WebSockets, design queuing systems with Celery and Redis, deploy semantic caching, scale inference with vLLM and TGI, optimize costs through model routing, and monitor latency at P50/P95/P99.

Table of Contents

  1. FastAPI for LLM APIs
  2. Async & Streaming
  3. Queuing & Background Tasks
  4. Caching Strategies
  5. Scaling LLM Inference
  6. Cost Optimization
  7. Infrastructure
  8. Production Monitoring
  9. Exercises & Self-Assessment
  10. Production Architecture Generator
  11. Conclusion & Next Steps

Introduction: From Prototype to Production

Series Overview: This is Part 14 of our 18-part AI Application Development Mastery series. We now shift from building AI logic to building the infrastructure that makes AI applications reliable, fast, and cost-effective in production. This is where AI engineering meets systems engineering.

AI Application Development Mastery

Your 20-step learning path • Currently on Step 16
1
Foundations & Evolution of AI Apps
Pre-LLM era, transformers, LLM revolution
2
LLM Fundamentals for Developers
Tokens, context windows, sampling, API patterns
3
Prompt Engineering Mastery
Zero/few-shot, CoT, ReAct, structured outputs
4
LangChain Core Concepts
Chains, prompts, LLMs, tools, LCEL
5
Retrieval-Augmented Generation (RAG)
Embeddings, vector DBs, retrievers, RAG pipelines
6
Memory & Context Engineering
Buffer/summary/vector memory, chunking, re-ranking
7
Agents — Core of Modern AI Apps
ReAct, tool-calling, planner-executor agents
8
LangGraph — Stateful Agent Workflows
Nodes, edges, state, graph execution, cycles
9
Deep Agents & Autonomous Systems
Multi-step reasoning, self-reflection, planning
10
Multi-Agent Systems
Supervisor, swarm, debate, role-based collaboration
11
AI Application Design Patterns
RAG, chat+memory, workflow automation, agent loops
12
Ecosystem & Frameworks
LlamaIndex, Haystack, HuggingFace, vLLM
13
MCP Foundations & Architecture
Protocol design, Host/Client/Server, primitives, security
14
MCP in Production
Building servers, integrations, scaling, agent systems
15
Evaluation & LLMOps
Prompt eval, tracing, LangSmith, experiment tracking
16
Production AI Systems
APIs, queues, caching, streaming, scaling
You Are Here
17
Safety, Guardrails & Reliability
Input filtering, hallucination mitigation, prompt injection
18
Advanced Topics
Fine-tuning, tool learning, hybrid LLM+symbolic
19
Building Real AI Applications
Chatbot, document QA, coding assistant, full-stack
20
Future of AI Applications
Autonomous agents, self-improving, multi-modal, AI OS

There is a canyon between a working Jupyter notebook and a production service handling 10,000 requests per hour. The notebook uses synchronous calls, has no error handling, ignores latency, and burns money on uncached repeated queries. A production system needs async processing, streaming responses, intelligent caching, queue-based workloads, auto-scaling infrastructure, and real-time monitoring.

This part covers every layer of the production AI stack — from the API surface (FastAPI) to the compute layer (vLLM, TGI), the caching layer (semantic cache), the orchestration layer (Celery, Redis), the infrastructure layer (Docker, Kubernetes, cloud providers), and the monitoring layer (latency percentiles, cost dashboards, alerting).

Key Insight: The single biggest difference between prototype and production AI is latency management. Users expect responses to start within 1-2 seconds. A production system must stream tokens as they are generated, cache frequently asked queries, route simple queries to fast models, and gracefully degrade under load. Every optimization technique in this part is ultimately about delivering faster perceived response times.

1. FastAPI for LLM APIs

FastAPI is the framework of choice for building production LLM APIs due to its native async support, automatic OpenAPI documentation, Pydantic-based validation, and streaming response capabilities. This section covers the essential patterns for building LLM-serving APIs: request/response schema design, error handling, authentication, and rate limiting — all tailored to the unique requirements of LLM applications where requests are long-running and token-intensive.

1.1 API Design Patterns

A well-designed LLM API starts with clear Pydantic request/response models, supports both synchronous and streaming responses, and includes health check endpoints for load balancer probes. The implementation below demonstrates a production-ready FastAPI service with typed schemas, CORS middleware, and a streaming chat endpoint that yields tokens as they’re generated.

# Production FastAPI service for LLM applications
# pip install fastapi uvicorn openai pydantic

import os
import time
import uuid
from typing import Optional, AsyncGenerator

from fastapi import FastAPI, HTTPException, Depends, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
import uvicorn

# Ensure OPENAI_API_KEY is set in environment
# export OPENAI_API_KEY="your-key-here"

app = FastAPI(
    title="AI Chat API",
    description="Production LLM API with streaming, caching, and monitoring",
    version="2.0.0"
)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://app.example.com"],
    allow_methods=["POST", "GET"],
    allow_headers=["*"],
)

# --- Request/Response Models ---
class ChatRequest(BaseModel):
    """Structured chat request with validation."""
    message: str = Field(..., min_length=1, max_length=10000,
                         description="User message")
    conversation_id: Optional[str] = Field(None, description="Session ID")
    model: str = Field("gpt-4o-mini", description="Model to use")
    temperature: float = Field(0.7, ge=0.0, le=2.0)
    max_tokens: int = Field(1000, ge=1, le=4096)
    stream: bool = Field(False, description="Enable streaming response")

class ChatResponse(BaseModel):
    """Structured chat response."""
    request_id: str
    message: str
    model: str
    usage: dict
    latency_ms: float

class HealthResponse(BaseModel):
    status: str
    version: str
    models_available: list[str]

# --- Endpoints ---
@app.get("/health", response_model=HealthResponse)
async def health_check():
    return HealthResponse(
        status="healthy",
        version="2.0.0",
        models_available=["gpt-4o", "gpt-4o-mini", "claude-3-5-sonnet"]
    )

@app.post("/v1/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    """Non-streaming chat endpoint."""
    request_id = str(uuid.uuid4())
    start_time = time.time()

    try:
        from openai import AsyncOpenAI
        client = AsyncOpenAI()

        response = await client.chat.completions.create(
            model=request.model,
            messages=[
                {"role": "system", "content": "You are a helpful assistant."},
                {"role": "user", "content": request.message}
            ],
            temperature=request.temperature,
            max_tokens=request.max_tokens
        )

        latency = (time.time() - start_time) * 1000

        return ChatResponse(
            request_id=request_id,
            message=response.choices[0].message.content,
            model=request.model,
            usage={
                "prompt_tokens": response.usage.prompt_tokens,
                "completion_tokens": response.usage.completion_tokens,
                "total_tokens": response.usage.total_tokens
            },
            latency_ms=round(latency, 2)
        )

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/v1/chat/stream")
async def chat_stream(request: ChatRequest):
    """Streaming chat endpoint using SSE."""
    return StreamingResponse(
        generate_stream(request),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Request-ID": str(uuid.uuid4())
        }
    )

async def generate_stream(request: ChatRequest) -> AsyncGenerator[str, None]:
    """Generate SSE stream from LLM."""
    from openai import AsyncOpenAI
    client = AsyncOpenAI()

    stream = await client.chat.completions.create(
        model=request.model,
        messages=[
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": request.message}
        ],
        temperature=request.temperature,
        max_tokens=request.max_tokens,
        stream=True
    )

    async for chunk in stream:
        if chunk.choices[0].delta.content:
            token = chunk.choices[0].delta.content
            yield f"data: {token}\n\n"

    yield "data: [DONE]\n\n"

if __name__ == "__main__":
    uvicorn.run("main:app", host="0.0.0.0", port=8000, workers=4)

1.2 Request Validation & Error Handling

LLM API errors require special handling because failures can occur at multiple levels — input validation, model loading, inference timeouts, and rate limit exhaustion. A production error handling layer needs custom exception classes, structured error responses with request IDs for debugging, and graceful degradation that returns useful error messages rather than raw stack traces.

# Production error handling and request validation
# Extends the FastAPI app defined above

from fastapi import Request, status
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
import logging

logger = logging.getLogger("ai-api")

# Custom exception handler for validation errors
@app.exception_handler(RequestValidationError)
async def validation_handler(request: Request, exc: RequestValidationError):
    errors = []
    for error in exc.errors():
        errors.append({
            "field": ".".join(str(loc) for loc in error["loc"]),
            "message": error["msg"],
            "type": error["type"]
        })
    return JSONResponse(
        status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
        content={"error": "validation_error", "details": errors}
    )

# Custom exception for LLM-specific errors
class LLMError(Exception):
    def __init__(self, message: str, error_type: str, status_code: int = 500):
        self.message = message
        self.error_type = error_type
        self.status_code = status_code

@app.exception_handler(LLMError)
async def llm_error_handler(request: Request, exc: LLMError):
    logger.error(f"LLM Error: {exc.error_type} - {exc.message}")
    return JSONResponse(
        status_code=exc.status_code,
        content={
            "error": exc.error_type,
            "message": exc.message,
            "request_id": request.headers.get("X-Request-ID", "unknown")
        }
    )

1.3 Middleware, Auth & Rate Limiting

Production LLM APIs need three layers of middleware: authentication (API key or JWT validation), rate limiting (preventing individual users from monopolizing expensive GPU resources), and observability (logging request latency and token usage for every call). The middleware stack below implements all three using FastAPI’s dependency injection and middleware hooks.

# API key authentication and rate limiting middleware
# Extends the FastAPI app defined above

from fastapi import Security, HTTPException
from fastapi.security import APIKeyHeader
from collections import defaultdict
import asyncio
import time

api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)

# Simple in-memory rate limiter (use Redis in production)
class RateLimiter:
    def __init__(self, max_requests: int = 60, window_seconds: int = 60):
        self.max_requests = max_requests
        self.window = window_seconds
        self.requests = defaultdict(list)

    def is_allowed(self, key: str) -> bool:
        now = time.time()
        # Clean old entries
        self.requests[key] = [t for t in self.requests[key] if now - t < self.window]
        if len(self.requests[key]) >= self.max_requests:
            return False
        self.requests[key].append(now)
        return True

rate_limiter = RateLimiter(max_requests=100, window_seconds=60)

async def verify_api_key(api_key: str = Security(api_key_header)):
    """Verify API key and apply rate limiting."""
    if not api_key:
        raise HTTPException(status_code=401, detail="Missing API key")

    # Validate key (in production, check against database/cache)
    valid_keys = {"sk-prod-abc123", "sk-prod-def456"}
    if api_key not in valid_keys:
        raise HTTPException(status_code=403, detail="Invalid API key")

    if not rate_limiter.is_allowed(api_key):
        raise HTTPException(
            status_code=429,
            detail="Rate limit exceeded. Max 100 requests per minute."
        )

    return api_key

# Latency tracking middleware
@app.middleware("http")
async def track_latency(request: Request, call_next):
    start = time.time()
    response = await call_next(request)
    latency = (time.time() - start) * 1000
    response.headers["X-Response-Time-Ms"] = str(round(latency, 2))
    logger.info(f"{request.method} {request.url.path} - {latency:.0f}ms")
    return response

2. Async & Streaming

LLM inference is inherently slow (seconds per response), making asynchronous programming and streaming essential for production UX. Streaming delivers tokens to the user as they’re generated (reducing perceived latency from seconds to milliseconds), while async patterns let your server handle thousands of concurrent LLM requests without blocking. This section covers the three primary streaming protocols and async patterns for LLM applications.

2.1 Server-Sent Events (SSE)

SSE is the standard protocol for streaming LLM responses to web clients. It uses a single HTTP connection where the server pushes data events. This is the same protocol used by ChatGPT, Claude, and every major LLM chat interface.

# SSE streaming with RAG pipeline
# Extends the FastAPI app defined above

from fastapi.responses import StreamingResponse
from typing import AsyncGenerator
import json

async def rag_stream(
    question: str, retriever, llm_client
) -> AsyncGenerator[str, None]:
    """Stream a RAG response with metadata events."""

    # Event 1: Signal retrieval start
    yield f"event: status\ndata: {json.dumps({'step': 'retrieving'})}\n\n"

    # Step 1: Retrieve documents
    docs = await retriever.aget_relevant_documents(question)
    sources = [{"title": d.metadata.get("title", ""), "page": d.metadata.get("page", 0)}
               for d in docs]

    # Event 2: Send retrieved sources
    yield f"event: sources\ndata: {json.dumps({'sources': sources})}\n\n"

    # Event 3: Signal generation start
    yield f"event: status\ndata: {json.dumps({'step': 'generating'})}\n\n"

    # Step 2: Stream LLM generation
    context = "\n\n".join(d.page_content for d in docs)
    stream = await llm_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": f"Answer using this context:\n{context}"},
            {"role": "user", "content": question}
        ],
        stream=True
    )

    total_tokens = 0
    async for chunk in stream:
        if chunk.choices[0].delta.content:
            token = chunk.choices[0].delta.content
            total_tokens += 1
            yield f"data: {json.dumps({'token': token})}\n\n"

    # Event 4: Completion with metadata
    yield f"event: done\ndata: {json.dumps({'total_tokens': total_tokens})}\n\n"

@app.post("/v1/rag/stream")
async def rag_stream_endpoint(request: ChatRequest):
    return StreamingResponse(
        rag_stream(request.message, retriever, llm_client),
        media_type="text/event-stream"
    )

2.2 WebSocket Streaming

While SSE is unidirectional (server to client), WebSockets provide full-duplex communication — enabling interactive chat interfaces where the user can send new messages while the model is still generating a response. The implementation below manages multiple concurrent WebSocket connections, broadcasts tokens in real-time, and handles connection lifecycle events (join, disconnect, errors) gracefully.

# WebSocket endpoint for bidirectional LLM communication
# Extends the FastAPI app defined above
# pip install websockets

from fastapi import WebSocket, WebSocketDisconnect
import json

class ConnectionManager:
    """Manage active WebSocket connections."""

    def __init__(self):
        self.active: dict[str, WebSocket] = {}

    async def connect(self, ws: WebSocket, client_id: str):
        await ws.accept()
        self.active[client_id] = ws

    def disconnect(self, client_id: str):
        self.active.pop(client_id, None)

    async def send_json(self, client_id: str, data: dict):
        if client_id in self.active:
            await self.active[client_id].send_json(data)

manager = ConnectionManager()

@app.websocket("/ws/chat/{client_id}")
async def websocket_chat(websocket: WebSocket, client_id: str):
    await manager.connect(websocket, client_id)

    try:
        while True:
            # Receive message from client
            data = await websocket.receive_json()
            message = data.get("message", "")
            conversation_id = data.get("conversation_id", client_id)

            # Send acknowledgment
            await manager.send_json(client_id, {
                "type": "ack",
                "message_id": data.get("id")
            })

            # Stream LLM response
            from openai import AsyncOpenAI
            client = AsyncOpenAI()

            stream = await client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[{"role": "user", "content": message}],
                stream=True
            )

            async for chunk in stream:
                if chunk.choices[0].delta.content:
                    await manager.send_json(client_id, {
                        "type": "token",
                        "content": chunk.choices[0].delta.content
                    })

            await manager.send_json(client_id, {"type": "done"})

    except WebSocketDisconnect:
        manager.disconnect(client_id)

2.3 Async Patterns for LLMs

When your application needs to make multiple LLM calls for a single request (e.g., generating summaries for several documents), sequential execution wastes time. Async patterns with asyncio.gather() let you run multiple LLM calls in parallel, while semaphores prevent overwhelming the API with too many concurrent requests. The combination of parallelism and rate limiting is essential for production throughput optimization.

# Parallel LLM calls with asyncio
# pip install openai

import os
import asyncio
from openai import AsyncOpenAI

# Uses OPENAI_API_KEY from environment
client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))

async def parallel_llm_calls(prompts: list[str], model: str = "gpt-4o-mini"):
    """Execute multiple LLM calls in parallel."""
    tasks = [
        client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": p}],
            max_tokens=500
        )
        for p in prompts
    ]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    outputs = []
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            outputs.append({"prompt": prompts[i], "error": str(result)})
        else:
            outputs.append({
                "prompt": prompts[i],
                "response": result.choices[0].message.content,
                "tokens": result.usage.total_tokens
            })
    return outputs

# Semaphore for controlling concurrent LLM calls
async def rate_limited_calls(
    prompts: list[str], max_concurrent: int = 5
):
    """Rate-limited parallel execution to avoid API throttling."""
    semaphore = asyncio.Semaphore(max_concurrent)

    async def limited_call(prompt):
        async with semaphore:
            return await client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[{"role": "user", "content": prompt}]
            )

    tasks = [limited_call(p) for p in prompts]
    return await asyncio.gather(*tasks, return_exceptions=True)

3. Queuing & Background Tasks

Some LLM workloads — batch document processing, long-form content generation, multi-step agent workflows — take too long for synchronous HTTP requests. Task queuing with Celery and Redis lets you accept requests immediately, process them asynchronously in background workers, and notify clients when results are ready. This pattern also enables retry logic, priority queues, and horizontal scaling by adding more workers.

3.1 Celery & Redis

Not every LLM workload should be synchronous. Document processing, batch evaluation, report generation, and agent workflows can take minutes. Use Celery with Redis as the message broker to offload these to background workers.

# Celery worker for background LLM tasks
# pip install celery redis
# celery_app.py

from celery import Celery

celery_app = Celery(
    "ai_tasks",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1"
)

celery_app.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    task_track_started=True,
    task_time_limit=600,      # 10 min hard limit
    task_soft_time_limit=540, # 9 min soft limit (raises exception)
    worker_prefetch_multiplier=1,  # One task at a time per worker
    worker_concurrency=4,
)

# --- Background Tasks ---
@celery_app.task(bind=True, max_retries=3)
def process_document(self, document_id: str, user_id: str):
    """Process a document through RAG ingestion pipeline."""
    try:
        # Step 1: Load document
        from document_processor import load_document, chunk_document
        doc = load_document(document_id)

        # Step 2: Chunk
        chunks = chunk_document(doc, chunk_size=512, overlap=50)

        # Step 3: Embed and store
        from vector_store import embed_and_store
        embed_and_store(chunks, collection=f"user_{user_id}")

        return {
            "status": "completed",
            "document_id": document_id,
            "chunks_created": len(chunks)
        }

    except Exception as e:
        # Retry with exponential backoff
        self.retry(exc=e, countdown=2 ** self.request.retries)

@celery_app.task(bind=True)
def batch_evaluation(self, eval_config: dict):
    """Run batch evaluation of a RAG pipeline."""
    from eval_pipeline import RAGEvalPipeline, RAGEvalConfig

    config = RAGEvalConfig(**eval_config)
    pipeline = RAGEvalPipeline(config)

    # Update task state with progress
    self.update_state(state="EVALUATING", meta={"progress": 0})

    results = pipeline.run_evaluation()

    self.update_state(state="CHECKING", meta={"progress": 80})

    report = pipeline.generate_report()

    return report

# --- FastAPI integration ---
from fastapi import BackgroundTasks

@app.post("/v1/documents/process")
async def process_doc(document_id: str, user_id: str):
    """Queue document for background processing."""
    task = process_document.delay(document_id, user_id)
    return {"task_id": task.id, "status": "queued"}

@app.get("/v1/tasks/{task_id}")
async def get_task_status(task_id: str):
    """Check status of a background task."""
    result = celery_app.AsyncResult(task_id)
    return {
        "task_id": task_id,
        "status": result.status,
        "result": result.result if result.ready() else None,
        "meta": result.info if not result.ready() else None
    }

3.2 Task Patterns for LLM Workloads

Pattern Use Case Implementation Timeout
Fire-and-Forget Logging, analytics, feedback storage Celery .delay() with no result tracking 30s
Polling Document processing, batch jobs Return task_id, client polls /tasks/{id} 10min
Callback Agent workflows, report generation Task calls webhook on completion 30min
Chained Multi-step pipelines (ingest -> embed -> index) Celery chain/chord composition Varies

4. Caching Strategies

LLM API calls are expensive (both in latency and cost), making caching one of the highest-impact optimizations for production systems. Unlike traditional caching where keys must match exactly, semantic caching uses embedding similarity to return cached responses for queries that are semantically similar (but not identical) to previous requests. This can reduce API costs by 30–70% while improving response times from seconds to milliseconds for cached queries.

4.1 Semantic Caching

Traditional caching uses exact key matching. But LLM queries are natural language — "What's our PTO policy?" and "How many vacation days do I get?" are semantically identical but textually different. Semantic caching uses embeddings to match semantically similar queries to cached responses.

# Semantic cache for LLM responses
# pip install openai numpy redis

import os
import json
import hashlib
from typing import Optional

import numpy as np
import redis
from openai import OpenAI

class SemanticCache:
    """Cache LLM responses using semantic similarity matching."""

    def __init__(
        self,
        redis_url: str = "redis://localhost:6379/2",
        similarity_threshold: float = 0.92,
        embedding_model: str = "text-embedding-3-small",
        ttl_seconds: int = 3600  # 1 hour default TTL
    ):
        self.redis = redis.from_url(redis_url)
        self.threshold = similarity_threshold
        self.embedding_model = embedding_model
        self.ttl = ttl_seconds
        self.openai = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

    def _get_embedding(self, text: str) -> list[float]:
        """Get embedding for a text string."""
        response = self.openai.embeddings.create(
            input=text, model=self.embedding_model
        )
        return response.data[0].embedding

    def _cosine_similarity(self, a: list[float], b: list[float]) -> float:
        """Calculate cosine similarity between two embeddings."""
        a, b = np.array(a), np.array(b)
        return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))

    def get(self, query: str) -> Optional[dict]:
        """Look up a semantically similar cached response."""
        query_embedding = self._get_embedding(query)

        # Scan cache entries (in production, use a vector index)
        for key in self.redis.scan_iter("cache:*"):
            cached = json.loads(self.redis.get(key))
            similarity = self._cosine_similarity(
                query_embedding, cached["embedding"]
            )

            if similarity >= self.threshold:
                return {
                    "response": cached["response"],
                    "original_query": cached["query"],
                    "similarity": round(similarity, 4),
                    "cached": True
                }

        return None

    def set(self, query: str, response: str):
        """Cache a query-response pair with its embedding."""
        embedding = self._get_embedding(query)
        key = f"cache:{hashlib.md5(query.encode()).hexdigest()}"

        self.redis.setex(
            key, self.ttl,
            json.dumps({
                "query": query,
                "response": response,
                "embedding": embedding
            })
        )

    def invalidate(self, pattern: str = "cache:*"):
        """Invalidate cache entries matching a pattern."""
        for key in self.redis.scan_iter(pattern):
            self.redis.delete(key)

# Integration with FastAPI endpoint
cache = SemanticCache(similarity_threshold=0.92)

@app.post("/v1/chat/cached")
async def chat_cached(request: ChatRequest):
    """Chat endpoint with semantic caching."""

    # Check cache first
    cached = cache.get(request.message)
    if cached:
        return {
            "message": cached["response"],
            "cached": True,
            "similarity": cached["similarity"],
            "latency_ms": 5  # Cache hit is nearly instant
        }

    # Cache miss — call LLM
    start = time.time()
    response = await get_llm_response(request)
    latency = (time.time() - start) * 1000

    # Store in cache
    cache.set(request.message, response)

    return {
        "message": response,
        "cached": False,
        "latency_ms": round(latency, 2)
    }

4.2 Multi-Layer Cache Architecture

Cache Layer What It Caches Latency Hit Rate
L1: Exact Match (Redis) Identical query strings <1ms 10-20%
L2: Semantic Cache Semantically similar queries 50-100ms (embedding lookup) 30-50%
L3: Embedding Cache Pre-computed embeddings for documents <1ms 90%+
L4: KV Cache (vLLM) Prefix/system prompt KV states Saves GPU compute Depends on prefix overlap
Key Insight: Semantic caching is one of the highest-ROI optimizations for production LLM apps. If 40% of your queries are semantically similar to previously answered queries, a semantic cache with a 0.92 similarity threshold can reduce your LLM API costs by 35-40% while delivering sub-100ms response times for cache hits.

5. Scaling LLM Inference

Scaling LLM inference is fundamentally different from scaling traditional web services because GPU memory, not CPU, is the bottleneck. Specialized serving frameworks like vLLM and TGI implement GPU-optimized techniques — continuous batching, PagedAttention, tensor parallelism, and speculative decoding — that maximize throughput while minimizing latency. This section covers the two leading open-source inference servers and how to deploy them.

5.1 vLLM

vLLM is the leading open-source inference engine for serving LLMs. It uses PagedAttention to efficiently manage GPU memory, achieving 2-24x higher throughput than naive HuggingFace inference.

# Deploy a model with vLLM
pip install vllm

# Start vLLM server (OpenAI-compatible API)
python -m vllm.entrypoints.openai.api_server \
    --model meta-llama/Llama-3.1-70B-Instruct \
    --tensor-parallel-size 4 \
    --gpu-memory-utilization 0.90 \
    --max-model-len 8192 \
    --port 8001

# Use with OpenAI client (just change base_url)
# from openai import OpenAI
# client = OpenAI(base_url="http://localhost:8001/v1", api_key="not-needed")
# Using vLLM with OpenAI-compatible client
# pip install openai

from openai import OpenAI

# Point to your vLLM server
vllm_client = OpenAI(
    base_url="http://localhost:8001/v1",
    api_key="not-needed"
)

# Use exactly like the OpenAI API
response = vllm_client.chat.completions.create(
    model="meta-llama/Llama-3.1-70B-Instruct",
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "Explain PagedAttention in 3 sentences."}
    ],
    temperature=0.7,
    max_tokens=500
)

print(response.choices[0].message.content)

5.2 Text Generation Inference (TGI)

HuggingFace’s Text Generation Inference (TGI) is a production-ready serving solution optimized for HuggingFace models. It supports GPTQ/AWQ quantization for fitting large models on smaller GPUs, tensor parallelism for multi-GPU setups, and a streaming API compatible with the OpenAI format. The Docker-based deployment below shows how to serve a quantized model with sharding across GPUs.

# Deploy with HuggingFace TGI (Docker)
docker run --gpus all --shm-size 1g -p 8002:80 \
    ghcr.io/huggingface/text-generation-inference:latest \
    --model-id meta-llama/Llama-3.1-70B-Instruct \
    --num-shard 4 \
    --max-input-length 4096 \
    --max-total-tokens 8192 \
    --quantize awq

5.3 Scaling Strategies

Strategy Description Best For Complexity
Vertical Scaling Bigger GPU (A100 -> H100) Single model, higher throughput Low
Horizontal Scaling Multiple replicas behind load balancer High request volume Medium
Tensor Parallelism Split model across multiple GPUs Models too large for single GPU Medium
Quantization (AWQ/GPTQ) Reduce model precision (FP16 -> INT4) Reduce GPU memory, increase throughput Low
Model Routing Route queries to different models by complexity Cost optimization with quality maintenance High

6. Cost Optimization

LLM API costs scale linearly with token usage, so cost optimization is critical for any production application. The two most impactful strategies are intelligent model routing (sending simple queries to cheap models and complex queries to expensive ones) and token efficiency (compressing prompts, trimming context, and dynamically adjusting max_tokens). Together, these techniques can reduce API costs by 50–80% without measurable quality degradation.

6.1 Intelligent Model Routing

Not every query needs GPT-4o. A simple greeting can be handled by a tiny model. A complex multi-step reasoning task needs the best model available. Intelligent model routing classifies query complexity and routes to the appropriate model — reducing costs by 50-70% with minimal quality loss.

# Intelligent model routing based on query complexity
# pip install openai pydantic

import os
from enum import Enum
from pydantic import BaseModel
from openai import AsyncOpenAI

class QueryComplexity(str, Enum):
    SIMPLE = "simple"      # Greetings, simple lookups
    MODERATE = "moderate"  # Standard Q&A, summaries
    COMPLEX = "complex"    # Multi-step reasoning, analysis

MODEL_MAP = {
    QueryComplexity.SIMPLE: "gpt-4o-mini",
    QueryComplexity.MODERATE: "gpt-4o-mini",
    QueryComplexity.COMPLEX: "gpt-4o",
}

COST_MAP = {
    "gpt-4o-mini": {"input": 0.15, "output": 0.60},
    "gpt-4o": {"input": 2.50, "output": 10.00},
}

class QueryClassifier:
    """Classify query complexity for model routing."""

    def __init__(self):
        self.client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))

    async def classify(self, query: str) -> QueryComplexity:
        """Use a fast model to classify query complexity."""
        response = await self.client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{
                "role": "system",
                "content": "Classify the complexity of this query. "
                           "Reply with ONLY one word: simple, moderate, or complex.\n"
                           "simple: greetings, yes/no, single-fact lookup\n"
                           "moderate: explanation, comparison, standard Q&A\n"
                           "complex: multi-step analysis, creative writing, coding"
            }, {
                "role": "user",
                "content": query
            }],
            max_tokens=10,
            temperature=0
        )

        label = response.choices[0].message.content.strip().lower()
        try:
            return QueryComplexity(label)
        except ValueError:
            return QueryComplexity.MODERATE  # Default to moderate

class ModelRouter:
    """Route queries to the optimal model."""

    def __init__(self):
        self.classifier = QueryClassifier()
        self.client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))

    async def route_and_respond(self, query: str, system_prompt: str = "") -> dict:
        """Classify, route, and generate response."""
        complexity = await self.classifier.classify(query)
        model = MODEL_MAP[complexity]

        response = await self.client.chat.completions.create(
            model=model,
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": query}
            ]
        )

        return {
            "response": response.choices[0].message.content,
            "model_used": model,
            "complexity": complexity.value,
            "tokens": response.usage.total_tokens,
            "estimated_cost": self._estimate_cost(model, response.usage)
        }

    def _estimate_cost(self, model: str, usage) -> float:
        pricing = COST_MAP.get(model, {"input": 0, "output": 0})
        return round(
            (usage.prompt_tokens / 1e6) * pricing["input"] +
            (usage.completion_tokens / 1e6) * pricing["output"], 6
        )

6.2 Token Efficiency

Token efficiency optimizations reduce the number of tokens consumed per request without sacrificing output quality. Key techniques include prompt compression (removing redundant phrasing), context trimming (intelligently truncating retrieved documents), and adaptive max_tokens (setting response length limits based on query complexity). The TokenOptimizer below combines all three approaches into a reusable pipeline.

# Token optimization strategies
# No external dependencies required — standalone utility class

class TokenOptimizer:
    """Reduce token usage without sacrificing quality."""

    @staticmethod
    def compress_system_prompt(prompt: str, max_tokens: int = 200) -> str:
        """Compress verbose system prompts."""
        # Remove redundant instructions
        lines = [l.strip() for l in prompt.split('\n') if l.strip()]
        # Remove bullet point prefixes
        lines = [l.lstrip('- ').lstrip('* ') for l in lines]
        return '\n'.join(lines)

    @staticmethod
    def trim_context(docs: list[str], max_tokens: int = 2000) -> list[str]:
        """Trim retrieved documents to fit token budget."""
        trimmed = []
        token_count = 0
        for doc in docs:
            doc_tokens = len(doc.split()) * 1.3  # Rough token estimate
            if token_count + doc_tokens > max_tokens:
                remaining = max_tokens - token_count
                words = int(remaining / 1.3)
                trimmed.append(' '.join(doc.split()[:words]))
                break
            trimmed.append(doc)
            token_count += doc_tokens
        return trimmed

    @staticmethod
    def adaptive_max_tokens(query: str) -> int:
        """Set max_tokens based on expected response length."""
        query_lower = query.lower()
        if any(w in query_lower for w in ['yes or no', 'true or false', 'which one']):
            return 50
        if any(w in query_lower for w in ['list', 'summarize', 'brief']):
            return 300
        if any(w in query_lower for w in ['explain', 'compare', 'analyze', 'write']):
            return 1000
        return 500  # Default
Common Mistake: Setting max_tokens to 4096 for every request. Most responses are under 300 tokens. Setting a high max_tokens does not cost extra tokens, but it prevents the model from stopping early and can lead to verbose, lower-quality responses. Use adaptive max_tokens based on query type.

7. Infrastructure

Deploying LLM applications requires infrastructure that handles GPU scheduling, memory-intensive containers, and model artifact management. This section covers containerization with Docker, orchestration with Kubernetes (including GPU-aware scheduling and horizontal pod autoscaling), and multi-cloud provider abstractions that enable failover between OpenAI, AWS Bedrock, and Azure OpenAI.

7.1 Docker & Kubernetes

A production LLM deployment starts with a well-structured Dockerfile (multi-stage build, non-root user, health checks) and Kubernetes manifests for deployment, service, and autoscaling. The configurations below demonstrate GPU-aware scheduling with nvidia.com/gpu resource requests, horizontal pod autoscaling based on custom metrics, and proper liveness/readiness probes for LLM services that have slow startup times.

# Dockerfile for production LLM API
FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY src/ ./src/
COPY config/ ./config/

# Non-root user for security
RUN adduser --disabled-password --gecos '' appuser
USER appuser

# Health check
HEALTHCHECK --interval=30s --timeout=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

EXPOSE 8000

CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8000", \
     "--workers", "4", "--loop", "uvloop"]
# Kubernetes deployment for LLM API
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ai-chat-api
  labels:
    app: ai-chat-api
spec:
  replicas: 3
  selector:
    matchLabels:
      app: ai-chat-api
  template:
    metadata:
      labels:
        app: ai-chat-api
    spec:
      containers:
      - name: api
        image: ai-chat-api:2.0.0
        ports:
        - containerPort: 8000
        env:
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: llm-secrets
              key: openai-api-key
        - name: REDIS_URL
          value: "redis://redis-service:6379/0"
        resources:
          requests:
            cpu: "500m"
            memory: "512Mi"
          limits:
            cpu: "2000m"
            memory: "2Gi"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 10
          periodSeconds: 30
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 10
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: ai-chat-api-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: ai-chat-api
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Pods
    pods:
      metric:
        name: http_requests_per_second
      target:
        type: AverageValue
        averageValue: "50"

7.2 AWS Bedrock & Azure OpenAI

Production applications should never depend on a single LLM provider. A multi-cloud abstraction layer lets you route requests to the cheapest available provider, fail over automatically when one provider has an outage, and comply with data residency requirements by using region-specific endpoints. The implementation below wraps OpenAI, AWS Bedrock, and Azure OpenAI behind a unified interface with automatic failover and retry logic.

# Multi-cloud LLM provider abstraction
# pip install openai boto3

import os
from abc import ABC, abstractmethod
from typing import AsyncGenerator

class LLMProvider(ABC):
    """Abstract base for multi-cloud LLM providers."""

    @abstractmethod
    async def generate(self, messages: list, **kwargs) -> str:
        pass

    @abstractmethod
    async def stream(self, messages: list, **kwargs) -> AsyncGenerator[str, None]:
        pass

class AWSBedrockProvider(LLMProvider):
    """AWS Bedrock integration."""

    def __init__(self, model_id: str = "anthropic.claude-3-5-sonnet-20241022-v2:0"):
        import boto3
        self.client = boto3.client("bedrock-runtime", region_name="us-east-1")
        self.model_id = model_id

    async def generate(self, messages: list, **kwargs) -> str:
        import json
        body = json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "messages": messages,
            "max_tokens": kwargs.get("max_tokens", 1000),
            "temperature": kwargs.get("temperature", 0.7)
        })

        response = self.client.invoke_model(
            modelId=self.model_id, body=body
        )
        result = json.loads(response["body"].read())
        return result["content"][0]["text"]

    async def stream(self, messages: list, **kwargs) -> AsyncGenerator[str, None]:
        import json
        body = json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "messages": messages,
            "max_tokens": kwargs.get("max_tokens", 1000),
            "stream": True
        })

        response = self.client.invoke_model_with_response_stream(
            modelId=self.model_id, body=body
        )
        for event in response["body"]:
            chunk = json.loads(event["chunk"]["bytes"])
            if chunk["type"] == "content_block_delta":
                yield chunk["delta"]["text"]

class AzureOpenAIProvider(LLMProvider):
    """Azure OpenAI integration."""

    def __init__(self):
        from openai import AsyncAzureOpenAI
        self.client = AsyncAzureOpenAI(
            azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT", "https://your-resource.openai.azure.com/"),
            api_key=os.getenv("AZURE_OPENAI_API_KEY"),
            api_version="2024-02-15-preview"
        )
        self.deployment = "gpt-4o"

    async def generate(self, messages: list, **kwargs) -> str:
        response = await self.client.chat.completions.create(
            model=self.deployment,
            messages=messages,
            **kwargs
        )
        return response.choices[0].message.content

    async def stream(self, messages: list, **kwargs) -> AsyncGenerator[str, None]:
        stream = await self.client.chat.completions.create(
            model=self.deployment,
            messages=messages,
            stream=True,
            **kwargs
        )
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                yield chunk.choices[0].delta.content

class OpenAIProvider(LLMProvider):
    """OpenAI direct integration."""

    def __init__(self):
        from openai import AsyncOpenAI
        self.client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))

    async def generate(self, messages: list, **kwargs) -> str:
        response = await self.client.chat.completions.create(
            model="gpt-4o", messages=messages, **kwargs
        )
        return response.choices[0].message.content

    async def stream(self, messages: list, **kwargs) -> AsyncGenerator[str, None]:
        stream = await self.client.chat.completions.create(
            model="gpt-4o", messages=messages, stream=True, **kwargs
        )
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                yield chunk.choices[0].delta.content

# Provider factory with failover
class LLMProviderFactory:
    """Multi-provider factory with automatic failover."""

    def __init__(self):
        self.providers = {
            "openai": lambda: OpenAIProvider(),
            "bedrock": lambda: AWSBedrockProvider(),
            "azure": lambda: AzureOpenAIProvider(),
        }
        self.primary = "openai"
        self.fallback_order = ["azure", "bedrock"]

    async def generate_with_failover(self, messages: list, **kwargs) -> dict:
        providers_to_try = [self.primary] + self.fallback_order

        for provider_name in providers_to_try:
            try:
                provider = self.providers[provider_name]()
                result = await provider.generate(messages, **kwargs)
                return {"response": result, "provider": provider_name}
            except Exception as e:
                print(f"Provider {provider_name} failed: {e}")
                continue

        raise Exception("All LLM providers failed")

8. Production Monitoring

LLM systems require monitoring beyond traditional web metrics. In addition to request latency and error rates, you need to track token usage per request, cache hit rates, model drift indicators, and cost per query. This section covers latency percentile tracking (P50/P95/P99) and Prometheus-based dashboards that give operations teams real-time visibility into LLM service health and costs.

8.1 Latency P50/P95/P99

Monitoring average latency is misleading. The tail latencies (P95, P99) reveal the experience of your worst-served users. A system with 200ms average but 5s P99 has 1% of users waiting 25x longer than typical.

# Production latency monitoring — standalone utility

import time
import statistics
from collections import deque
from dataclasses import dataclass

@dataclass
class LatencyStats:
    p50: float
    p95: float
    p99: float
    mean: float
    count: int

class LatencyMonitor:
    """Track and report latency percentiles for LLM endpoints."""

    def __init__(self, window_size: int = 1000):
        self.latencies: dict[str, deque] = {}
        self.window_size = window_size

    def record(self, endpoint: str, latency_ms: float):
        """Record a latency measurement."""
        if endpoint not in self.latencies:
            self.latencies[endpoint] = deque(maxlen=self.window_size)
        self.latencies[endpoint].append(latency_ms)

    def get_stats(self, endpoint: str) -> LatencyStats:
        """Get latency percentiles for an endpoint."""
        if endpoint not in self.latencies or not self.latencies[endpoint]:
            return LatencyStats(0, 0, 0, 0, 0)

        data = sorted(self.latencies[endpoint])
        n = len(data)

        return LatencyStats(
            p50=data[int(n * 0.50)],
            p95=data[int(n * 0.95)] if n >= 20 else data[-1],
            p99=data[int(n * 0.99)] if n >= 100 else data[-1],
            mean=round(statistics.mean(data), 2),
            count=n
        )

    def check_slos(self, endpoint: str, slos: dict) -> dict:
        """Check if latency meets Service Level Objectives."""
        stats = self.get_stats(endpoint)
        violations = {}

        if "p50_ms" in slos and stats.p50 > slos["p50_ms"]:
            violations["p50"] = {"target": slos["p50_ms"], "actual": stats.p50}
        if "p95_ms" in slos and stats.p95 > slos["p95_ms"]:
            violations["p95"] = {"target": slos["p95_ms"], "actual": stats.p95}
        if "p99_ms" in slos and stats.p99 > slos["p99_ms"]:
            violations["p99"] = {"target": slos["p99_ms"], "actual": stats.p99}

        return {
            "endpoint": endpoint,
            "stats": stats,
            "slos": slos,
            "violations": violations,
            "healthy": len(violations) == 0
        }

# Usage with SLOs
monitor = LatencyMonitor()

# SLOs for different endpoints
SLOS = {
    "/v1/chat": {"p50_ms": 500, "p95_ms": 2000, "p99_ms": 5000},
    "/v1/chat/stream": {"p50_ms": 200, "p95_ms": 800, "p99_ms": 2000},
    "/v1/rag": {"p50_ms": 1000, "p95_ms": 3000, "p99_ms": 8000},
}

8.2 Monitoring Dashboards

A comprehensive monitoring dashboard for LLM applications exposes Prometheus metrics for the key operational signals: request latency histograms, token usage counters, cache hit/miss ratios, active request gauges, and estimated cost per request. The instrumentation below defines the core metrics and creates a monitoring middleware that automatically tracks every API call. These metrics feed into Grafana dashboards for real-time visualization and alerting.

# Prometheus metrics for LLM monitoring
# pip install prometheus-client fastapi

from prometheus_client import (
    Counter, Histogram, Gauge, generate_latest,
    CollectorRegistry, CONTENT_TYPE_LATEST
)
from fastapi.responses import Response

registry = CollectorRegistry()

# Request metrics
REQUEST_COUNT = Counter(
    "llm_requests_total",
    "Total LLM API requests",
    ["endpoint", "model", "status"],
    registry=registry
)

REQUEST_LATENCY = Histogram(
    "llm_request_duration_seconds",
    "LLM request latency in seconds",
    ["endpoint", "model"],
    buckets=[0.1, 0.25, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0],
    registry=registry
)

TOKEN_USAGE = Counter(
    "llm_tokens_total",
    "Total tokens consumed",
    ["model", "type"],  # type: input/output
    registry=registry
)

ACTIVE_REQUESTS = Gauge(
    "llm_active_requests",
    "Currently active LLM requests",
    ["endpoint"],
    registry=registry
)

CACHE_HIT = Counter(
    "llm_cache_hits_total",
    "Semantic cache hits",
    ["cache_layer"],
    registry=registry
)

COST_TOTAL = Counter(
    "llm_cost_dollars_total",
    "Total LLM API cost in dollars",
    ["model"],
    registry=registry
)

@app.get("/metrics")
async def metrics():
    """Prometheus metrics endpoint."""
    return Response(
        content=generate_latest(registry),
        media_type=CONTENT_TYPE_LATEST
    )
Key Insight: The four golden signals for LLM applications are: Latency (P50/P95/P99 per endpoint), Throughput (requests per second), Error rate (failed LLM calls, timeouts, rate limits), and Cost (dollars per request, per user, per day). Monitor all four continuously.

Exercises & Self-Assessment

Exercise 1

Build a Production Chat API

Create a complete FastAPI service with:

  1. Non-streaming and SSE streaming endpoints
  2. API key authentication and rate limiting
  3. Request validation with Pydantic models
  4. Latency tracking middleware
  5. Health check endpoint
Exercise 2

Implement Semantic Caching

Build a semantic cache and measure its impact:

  1. Implement the SemanticCache class with Redis
  2. Send 100 queries (with 40% semantic duplicates)
  3. Measure cache hit rate, latency reduction, and cost savings
  4. Experiment with different similarity thresholds (0.85, 0.90, 0.95)
Exercise 3

Model Routing Experiment

Implement intelligent model routing and evaluate the trade-offs:

  1. Build a query complexity classifier
  2. Route simple/moderate/complex queries to different models
  3. Compare quality (using LLM-as-judge) and cost vs. sending everything to GPT-4o
  4. What is the cost reduction? What is the quality impact?
Exercise 4

Dockerize and Deploy

Containerize your LLM API and deploy it:

  1. Create a Dockerfile for your FastAPI service
  2. Write a docker-compose.yml with Redis and your API
  3. Add health checks and resource limits
  4. Load test with 50 concurrent users — measure P50/P95/P99
Exercise 5

Reflective Questions

  1. Why is streaming more important for LLM APIs than for traditional APIs? How does perceived latency differ from actual latency?
  2. When would you choose self-hosted inference (vLLM) over API-based inference (OpenAI)? What are the total cost of ownership considerations?
  3. Design a semantic caching strategy that handles personalized responses. Can you cache if each user gets different answers?
  4. What are the failure modes of model routing? What happens if the classifier misroutes a complex query to a simple model?
  5. Compare AWS Bedrock vs. Azure OpenAI vs. direct OpenAI API for a production deployment. What factors would determine your choice?

Production Architecture Document Generator

Document your production AI system architecture. Download as Word, Excel, PDF, or PowerPoint.

Draft auto-saved

All data stays in your browser. Nothing is sent to or stored on any server.

Conclusion & Next Steps

You now have the complete toolkit for building production-grade AI systems — from API design to infrastructure scaling and real-time monitoring. Here are the key takeaways from Part 16:

  • FastAPI is the standard framework for building LLM APIs — providing async support, Pydantic validation, automatic OpenAPI docs, and middleware for auth/rate limiting
  • Streaming (SSE/WebSocket) is essential for perceived performance — users expect to see tokens appear within 1-2 seconds, not wait 10 seconds for a complete response
  • Queuing (Celery/Redis) handles long-running workloads — document processing, batch evaluation, and agent workflows should be offloaded to background workers
  • Semantic caching reduces costs 35-40% by matching semantically similar queries to cached responses, with sub-100ms response times for cache hits
  • vLLM and TGI provide 2-24x inference speedups for self-hosted models through PagedAttention and continuous batching
  • Model routing sends simple queries to fast/cheap models and complex queries to powerful models — reducing costs 50-70% with minimal quality impact
  • Latency monitoring at P50/P95/P99 reveals the true user experience — average latency hides tail latency issues that affect your most frustrated users

Next in the Series

In Part 17: Safety, Guardrails & Reliability, we address the critical safety layer — input/output guardrails, hallucination mitigation through RAG grounding and verification loops, prompt injection defense, jailbreak prevention, data privacy with PII masking, and reliability patterns including retry, fallback, and circuit breakers.

Technology