Back to AI App Dev Series

OpenAI SDK Track Part 15: Production Engineering

May 25, 2026Wasil Zafar36 min read

Ship OpenAI-powered applications at scale with production-grade architecture patterns, rate limit engineering, the Batch API for 50% cost savings, intelligent caching, blue/green deployments, horizontal scaling strategies, and systematic cost optimization — all with executable Python code.

Table of Contents

  1. Architecture Patterns
  2. Rate Limit Engineering
  3. Batch API
  4. Caching Strategies
  5. Deployment Patterns
  6. Scaling & Load Management
  7. Cost Optimization
What You’ll Learn: Moving from prototype to production requires a fundamentally different engineering mindset. This article covers the complete production stack for OpenAI applications: gateway architectures with circuit breakers, adaptive rate limiting that respects API quotas, the Batch API for offline processing at 50% cost savings, semantic caching for repeated queries, blue/green deployment patterns for safe model upgrades, horizontal scaling with priority queues, and cost optimization strategies that can reduce your OpenAI spend by 60-80% without sacrificing quality.
Production Architecture with Gateway, Cache & Queue
                flowchart TD
                    A[Client Requests] --> B[API Gateway]
                    B --> C{Rate Limiter}
                    C -->|Allowed| D{Cache Layer}
                    C -->|Throttled| E[429 + Retry-After]
                    D -->|Hit| F[Return Cached Response]
                    D -->|Miss| G{Circuit Breaker}
                    G -->|Closed| H[Request Queue]
                    G -->|Open| I[Fallback Response]
                    H --> J{Priority Router}
                    J -->|High Priority| K[OpenAI API - GPT-4.1]
                    J -->|Standard| L[OpenAI API - GPT-4.1-mini]
                    J -->|Batch| M[Batch API Queue]
                    K --> N[Response Processing]
                    L --> N
                    M --> O[Async Results Store]
                    N --> P[Cache Write]
                    P --> Q[Return Response]
            

1. Architecture Patterns

Production AI applications require a gateway layer that sits between your clients and the OpenAI API. This layer handles cross-cutting concerns — rate limiting, caching, circuit breaking, request routing, and observability — so your application code remains focused on business logic. The gateway pattern is the single most impactful architectural decision for production reliability.

Gateway Pattern with Circuit Breaker

A circuit breaker prevents cascading failures when the OpenAI API is degraded or experiencing outages. It tracks failure rates and “trips” the circuit when failures exceed a threshold, returning fast fallback responses instead of queueing thousands of requests that will timeout. After a cooldown period, it allows a few test requests through to determine if the service has recovered.

PatternUse CaseProsCons
Monolith + GatewaySingle app, moderate trafficSimple deployment, shared stateScaling bottlenecks, single point of failure
Microservices + GatewayMultiple AI features, high trafficIndependent scaling, fault isolationNetwork complexity, distributed state
Serverless + QueueBursty traffic, cost-sensitiveZero idle cost, auto-scalingCold starts, timeout limits
Edge + CentralLow latency, global usersReduced latency, cache proximityCache coherence, deployment complexity
import time
import threading
from enum import Enum
from dataclasses import dataclass, field


class CircuitState(Enum):
    CLOSED = "closed"        # Normal operation — requests pass through
    OPEN = "open"            # Failures exceeded threshold — requests rejected
    HALF_OPEN = "half_open"  # Testing recovery — limited requests allowed


@dataclass
class CircuitBreaker:
    """Circuit breaker for OpenAI API calls with configurable thresholds."""

    failure_threshold: int = 5          # Failures before opening circuit
    recovery_timeout: float = 30.0      # Seconds before trying half-open
    half_open_max_calls: int = 3        # Test calls allowed in half-open
    success_threshold: int = 2          # Successes needed to close circuit

    state: CircuitState = field(default=CircuitState.CLOSED, init=False)
    failure_count: int = field(default=0, init=False)
    success_count: int = field(default=0, init=False)
    last_failure_time: float = field(default=0.0, init=False)
    half_open_calls: int = field(default=0, init=False)
    _lock: threading.Lock = field(default_factory=threading.Lock, init=False)

    def can_execute(self) -> bool:
        """Check if a request should be allowed through."""
        with self._lock:
            if self.state == CircuitState.CLOSED:
                return True
            elif self.state == CircuitState.OPEN:
                # Check if recovery timeout has elapsed
                if time.time() - self.last_failure_time >= self.recovery_timeout:
                    self.state = CircuitState.HALF_OPEN
                    self.half_open_calls = 0
                    self.success_count = 0
                    return True
                return False
            else:  # HALF_OPEN
                return self.half_open_calls < self.half_open_max_calls

    def record_success(self):
        """Record a successful API call."""
        with self._lock:
            if self.state == CircuitState.HALF_OPEN:
                self.success_count += 1
                self.half_open_calls += 1
                if self.success_count >= self.success_threshold:
                    self.state = CircuitState.CLOSED
                    self.failure_count = 0
            elif self.state == CircuitState.CLOSED:
                self.failure_count = 0

    def record_failure(self):
        """Record a failed API call."""
        with self._lock:
            self.failure_count += 1
            self.last_failure_time = time.time()
            if self.state == CircuitState.HALF_OPEN:
                # Any failure in half-open immediately re-opens
                self.state = CircuitState.OPEN
            elif self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN

    def get_status(self) -> dict:
        """Get current circuit breaker status for monitoring."""
        return {
            "state": self.state.value,
            "failure_count": self.failure_count,
            "time_since_last_failure": round(time.time() - self.last_failure_time, 1)
            if self.last_failure_time > 0 else None,
        }


# Usage demonstration
breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=10.0)

print("=== Circuit Breaker Demo ===")
print(f"Initial state: {breaker.get_status()}")

# Simulate successful calls
for i in range(3):
    if breaker.can_execute():
        breaker.record_success()
        print(f"Call {i+1}: SUCCESS — state={breaker.state.value}")

# Simulate failures triggering the circuit to open
for i in range(3):
    if breaker.can_execute():
        breaker.record_failure()
        print(f"Failure {i+1}: FAILED — state={breaker.state.value}")

# Circuit is now OPEN — requests are rejected
can_call = breaker.can_execute()
print(f"\nCircuit OPEN — can_execute(): {can_call}")
print(f"Status: {breaker.get_status()}")

Fallback Strategies

Fallback Hierarchy: When the primary model is unavailable, your application should degrade gracefully rather than returning errors. A well-designed fallback chain: (1) Try the primary model (e.g., GPT-4.1); (2) Fall back to a faster/cheaper model (e.g., GPT-4.1-mini); (3) Return a cached response if semantically similar; (4) Return a templated response acknowledging the delay; (5) Queue the request for async processing and notify the user when ready. Each level trades quality for availability.

2. Rate Limit Engineering

OpenAI enforces rate limits on both requests per minute (RPM) and tokens per minute (TPM). Exceeding these returns a 429 status code. Production systems must proactively manage their request rate to stay within limits, handle throttling gracefully when it occurs, and distribute load across time windows to maximize throughput without hitting ceilings.

Adaptive Rate Limiter

A token bucket rate limiter with adaptive backoff is the gold standard for production API clients. Unlike a simple sleep-between-requests approach, token buckets allow bursts of traffic while maintaining a sustainable average rate. The adaptive component monitors 429 responses and automatically reduces throughput when approaching limits.

import time
import asyncio
from dataclasses import dataclass, field


@dataclass
class TokenBucketRateLimiter:
    """Adaptive token bucket rate limiter for OpenAI API compliance.

    Supports both RPM (requests per minute) and TPM (tokens per minute) limits.
    Automatically backs off when receiving 429 responses.
    """

    max_rpm: int = 500              # Max requests per minute
    max_tpm: int = 200_000          # Max tokens per minute
    burst_multiplier: float = 1.2   # Allow short bursts above average rate

    # Internal state
    request_tokens: float = field(init=False)
    token_tokens: float = field(init=False)
    last_refill: float = field(init=False)
    backoff_factor: float = field(default=1.0, init=False)
    consecutive_429s: int = field(default=0, init=False)

    def __post_init__(self):
        self.request_tokens = self.max_rpm / 60.0 * self.burst_multiplier
        self.token_tokens = self.max_tpm / 60.0 * self.burst_multiplier
        self.last_refill = time.time()

    def _refill(self):
        """Refill tokens based on elapsed time."""
        now = time.time()
        elapsed = now - self.last_refill
        self.last_refill = now

        # Refill at rate adjusted by backoff factor
        rpm_rate = (self.max_rpm / 60.0) / self.backoff_factor
        tpm_rate = (self.max_tpm / 60.0) / self.backoff_factor

        max_request_tokens = self.max_rpm / 60.0 * self.burst_multiplier
        max_token_tokens = self.max_tpm / 60.0 * self.burst_multiplier

        self.request_tokens = min(max_request_tokens, self.request_tokens + elapsed * rpm_rate)
        self.token_tokens = min(max_token_tokens, self.token_tokens + elapsed * tpm_rate)

    def acquire(self, estimated_tokens: int = 1000) -> dict:
        """Try to acquire permission for a request. Returns wait time if throttled."""
        self._refill()

        if self.request_tokens >= 1.0 and self.token_tokens >= estimated_tokens:
            self.request_tokens -= 1.0
            self.token_tokens -= estimated_tokens
            return {"allowed": True, "wait_seconds": 0}

        # Calculate wait time
        wait_for_rpm = max(0, (1.0 - self.request_tokens) / (self.max_rpm / 60.0))
        wait_for_tpm = max(0, (estimated_tokens - self.token_tokens) / (self.max_tpm / 60.0))
        wait_time = max(wait_for_rpm, wait_for_tpm) * self.backoff_factor

        return {"allowed": False, "wait_seconds": round(wait_time, 2)}

    def record_success(self):
        """Record a successful API response — reduce backoff."""
        self.consecutive_429s = 0
        self.backoff_factor = max(1.0, self.backoff_factor * 0.9)

    def record_rate_limit(self):
        """Record a 429 response — increase backoff exponentially."""
        self.consecutive_429s += 1
        self.backoff_factor = min(8.0, self.backoff_factor * 2.0)

    def get_status(self) -> dict:
        """Get current rate limiter status for monitoring."""
        self._refill()
        return {
            "request_tokens_available": round(self.request_tokens, 2),
            "token_tokens_available": round(self.token_tokens, 0),
            "backoff_factor": round(self.backoff_factor, 2),
            "consecutive_429s": self.consecutive_429s,
            "effective_rpm": round(self.max_rpm / self.backoff_factor, 0),
        }


# Usage demonstration
limiter = TokenBucketRateLimiter(max_rpm=60, max_tpm=100_000)

print("=== Adaptive Rate Limiter Demo ===")
print(f"Initial status: {limiter.get_status()}")

# Simulate a burst of requests
for i in range(5):
    result = limiter.acquire(estimated_tokens=2000)
    if result["allowed"]:
        limiter.record_success()
        print(f"Request {i+1}: ALLOWED")
    else:
        print(f"Request {i+1}: THROTTLED — wait {result['wait_seconds']}s")

# Simulate hitting rate limits
print("\n--- Simulating 429 responses ---")
for i in range(3):
    limiter.record_rate_limit()
    print(f"429 #{i+1}: backoff_factor={limiter.backoff_factor}, effective_rpm={limiter.get_status()['effective_rpm']}")

# After backoff, check status
print(f"\nFinal status: {limiter.get_status()}")

Multi-Tier Rate Limiting Strategy

Architecture Pattern

Three-Layer Rate Limiting

Layer 1 — Per-User Limits: Prevent any single user from consuming your entire API quota. Typical: 20 RPM per free user, 100 RPM per paid user. Enforced at the application layer with user-scoped token buckets.

Layer 2 — Per-Service Limits: Each microservice gets a quota allocation proportional to its traffic needs. Customer-facing chat gets 60% of quota, batch processing gets 30%, internal tools get 10%. Enforced via distributed rate limiting (Redis-backed).

Layer 3 — Global Limits: A safety net at the gateway level ensuring total outbound traffic never exceeds your OpenAI tier limits. This catches misconfigurations in Layer 1/2 and prevents 429 cascades across all services.

Rate LimitingQuota ManagementMulti-Tenant

3. Batch API

The OpenAI Batch API is designed for high-volume, non-latency-sensitive workloads. It processes requests asynchronously with a 24-hour completion window and provides a 50% cost reduction compared to synchronous API calls. This makes it ideal for bulk content generation, dataset annotation, evaluation pipelines, report generation, and any workflow where results aren’t needed immediately.

Batch Processing Pipeline

The Batch API workflow involves: (1) Prepare requests in JSONL format; (2) Upload the file; (3) Create a batch job; (4) Poll for completion; (5) Download and parse results. Each request in the batch is independent — failures in one request don’t affect others.

from openai import OpenAI
import json
import time
import tempfile
import os

client = OpenAI()


def create_batch_requests(prompts: list, model: str = "gpt-4.1-mini") -> str:
    """Create a JSONL file of batch requests and upload it.

    Each line is an independent request with a custom_id for tracking.
    Returns the uploaded file ID.
    """
    # Build JSONL content — each line is a complete API request
    jsonl_lines = []
    for i, prompt in enumerate(prompts):
        request = {
            "custom_id": f"request-{i:04d}",
            "method": "POST",
            "url": "/v1/responses",
            "body": {
                "model": model,
                "input": prompt,
                "max_output_tokens": 500,
            },
        }
        jsonl_lines.append(json.dumps(request))

    jsonl_content = "\n".join(jsonl_lines)

    # Write to temporary file and upload
    tmp_path = os.path.join(tempfile.gettempdir(), "batch_requests.jsonl")
    with open(tmp_path, "w", encoding="utf-8") as f:
        f.write(jsonl_content)

    # Upload the file to OpenAI
    with open(tmp_path, "rb") as f:
        uploaded_file = client.files.create(file=f, purpose="batch")

    print(f"Uploaded {len(prompts)} requests as file: {uploaded_file.id}")
    return uploaded_file.id


def submit_batch(file_id: str, description: str = "Production batch job") -> str:
    """Submit a batch job and return the batch ID."""
    batch = client.batches.create(
        input_file_id=file_id,
        endpoint="/v1/responses",
        completion_window="24h",
        metadata={"description": description},
    )
    print(f"Batch created: {batch.id} | Status: {batch.status}")
    return batch.id


def poll_batch_status(batch_id: str, poll_interval: int = 30, max_wait: int = 3600) -> dict:
    """Poll batch status until completion or timeout."""
    start_time = time.time()

    while time.time() - start_time < max_wait:
        batch = client.batches.retrieve(batch_id)
        status = batch.status

        print(f"  Status: {status} | "
              f"Completed: {batch.request_counts.completed}/{batch.request_counts.total} | "
              f"Failed: {batch.request_counts.failed}")

        if status == "completed":
            return {
                "status": "completed",
                "output_file_id": batch.output_file_id,
                "error_file_id": batch.error_file_id,
                "total": batch.request_counts.total,
                "completed": batch.request_counts.completed,
                "failed": batch.request_counts.failed,
            }
        elif status in ("failed", "expired", "cancelled"):
            return {"status": status, "error": "Batch did not complete successfully"}

        time.sleep(poll_interval)

    return {"status": "timeout", "error": f"Batch did not complete within {max_wait}s"}


def download_results(output_file_id: str) -> list:
    """Download and parse batch results from the output file."""
    content = client.files.content(output_file_id)
    results = []

    for line in content.text.strip().split("\n"):
        result = json.loads(line)
        results.append({
            "custom_id": result["custom_id"],
            "status": result["response"]["status_code"],
            "output": result["response"]["body"]["output"][0]["content"][0]["text"]
            if result["response"]["status_code"] == 200 else None,
            "error": result.get("error"),
        })

    return results


# Example: Batch-process product descriptions
product_prompts = [
    "Write a concise 2-sentence product description for: Wireless noise-canceling headphones with 30-hour battery life",
    "Write a concise 2-sentence product description for: Ergonomic standing desk with programmable height presets",
    "Write a concise 2-sentence product description for: Smart water bottle that tracks hydration and syncs with fitness apps",
    "Write a concise 2-sentence product description for: Portable solar panel charger rated at 100W for camping",
    "Write a concise 2-sentence product description for: Mechanical keyboard with hot-swappable switches and RGB backlighting",
]

print("=== Batch API Pipeline ===")
print(f"Processing {len(product_prompts)} requests at 50% cost savings\n")

# Step 1: Upload requests
# file_id = create_batch_requests(product_prompts, model="gpt-4.1-mini")

# Step 2: Submit batch
# batch_id = submit_batch(file_id, description="Product descriptions batch")

# Step 3: Poll for completion (in production, use webhooks instead)
# result = poll_batch_status(batch_id, poll_interval=10)

# Step 4: Download results
# if result["status"] == "completed":
#     results = download_results(result["output_file_id"])
#     for r in results:
#         print(f"  {r['custom_id']}: {r['output'][:80]}...")

# Demo output (showing expected flow)
print("Step 1: Upload JSONL file with 5 requests")
print("Step 2: Create batch — status: validating → in_progress")
print("Step 3: Poll every 30s — typically completes in 1-15 minutes")
print("Step 4: Download results — 50% cheaper than synchronous calls")
print(f"\nCost comparison:")
print(f"  Synchronous (5 calls): ~$0.005")
print(f"  Batch API (5 calls):   ~$0.0025 (50% savings)")
print(f"  At 100K calls/day:     Saves ~$50/day")
Batch API Limits & Gotchas: Maximum 50,000 requests per batch. Each request can use up to the model’s max context window. The 24-hour completion window is a guarantee, not an ETA — most batches complete in minutes to hours. Batches cannot be modified after creation (cancel and resubmit if needed). Error handling is per-request: check both the output file AND the error file. In production, prefer webhooks over polling to avoid wasted compute on status checks.

4. Caching Strategies

Caching is the most effective cost-reduction and latency-improvement technique for production AI applications. Many applications see 30-60% cache hit rates because users frequently ask similar questions. A well-designed cache layer can reduce your OpenAI API costs by half while cutting P95 latency from seconds to milliseconds.

Semantic Caching

Unlike exact-match caching (which only works for identical queries), semantic caching uses embeddings to find similar past queries and return cached responses. “How do I reset my password?” and “I forgot my password, how to change it?” are different strings but the same intent — semantic caching handles this.

import hashlib
import json
import time
from dataclasses import dataclass, field


@dataclass
class CacheEntry:
    """A single cached response with metadata."""
    query: str
    response: str
    model: str
    embedding: list
    created_at: float
    ttl: int
    hit_count: int = 0


class SemanticCache:
    """Semantic cache using cosine similarity for intelligent query matching.

    Combines exact-match (fast, hash-based) with semantic similarity (embedding-based)
    to maximize hit rate while maintaining response quality.
    """

    def __init__(self, similarity_threshold: float = 0.92, default_ttl: int = 3600):
        self.similarity_threshold = similarity_threshold
        self.default_ttl = default_ttl
        self.exact_cache: dict = {}       # hash → CacheEntry
        self.semantic_store: list = []     # List of CacheEntry with embeddings
        self.stats = {"hits_exact": 0, "hits_semantic": 0, "misses": 0}

    def _hash_key(self, query: str, model: str) -> str:
        """Generate a deterministic hash for exact-match lookup."""
        normalized = query.strip().lower()
        return hashlib.sha256(f"{model}:{normalized}".encode()).hexdigest()

    def _cosine_similarity(self, vec_a: list, vec_b: list) -> float:
        """Compute cosine similarity between two embedding vectors."""
        dot_product = sum(a * b for a, b in zip(vec_a, vec_b))
        norm_a = sum(a * a for a in vec_a) ** 0.5
        norm_b = sum(b * b for b in vec_b) ** 0.5
        if norm_a == 0 or norm_b == 0:
            return 0.0
        return dot_product / (norm_a * norm_b)

    def get(self, query: str, model: str, query_embedding: list = None) -> dict:
        """Look up a cached response — first exact match, then semantic similarity."""
        # Layer 1: Exact match (O(1) hash lookup)
        cache_key = self._hash_key(query, model)
        if cache_key in self.exact_cache:
            entry = self.exact_cache[cache_key]
            if time.time() - entry.created_at < entry.ttl:
                entry.hit_count += 1
                self.stats["hits_exact"] += 1
                return {"hit": True, "type": "exact", "response": entry.response}
            else:
                del self.exact_cache[cache_key]

        # Layer 2: Semantic similarity (requires embedding)
        if query_embedding:
            best_match = None
            best_score = 0.0

            for entry in self.semantic_store:
                if entry.model != model:
                    continue
                if time.time() - entry.created_at >= entry.ttl:
                    continue
                score = self._cosine_similarity(query_embedding, entry.embedding)
                if score > best_score:
                    best_score = score
                    best_match = entry

            if best_match and best_score >= self.similarity_threshold:
                best_match.hit_count += 1
                self.stats["hits_semantic"] += 1
                return {
                    "hit": True,
                    "type": "semantic",
                    "response": best_match.response,
                    "similarity": round(best_score, 4),
                    "original_query": best_match.query,
                }

        self.stats["misses"] += 1
        return {"hit": False}

    def put(self, query: str, response: str, model: str, embedding: list = None, ttl: int = None):
        """Store a response in both exact and semantic caches."""
        entry = CacheEntry(
            query=query,
            response=response,
            model=model,
            embedding=embedding or [],
            created_at=time.time(),
            ttl=ttl or self.default_ttl,
        )
        # Always store in exact cache
        cache_key = self._hash_key(query, model)
        self.exact_cache[cache_key] = entry
        # Store in semantic cache if embedding provided
        if embedding:
            self.semantic_store.append(entry)

    def get_stats(self) -> dict:
        """Return cache performance statistics."""
        total = sum(self.stats.values())
        hit_rate = (self.stats["hits_exact"] + self.stats["hits_semantic"]) / total if total > 0 else 0
        return {
            **self.stats,
            "total_queries": total,
            "hit_rate": f"{hit_rate:.1%}",
            "exact_entries": len(self.exact_cache),
            "semantic_entries": len(self.semantic_store),
        }


# Usage demonstration with mock embeddings
cache = SemanticCache(similarity_threshold=0.90, default_ttl=3600)

# Simulate storing a response
mock_embedding_1 = [0.1, 0.8, 0.3, 0.5, 0.2]  # Simplified 5-dim embedding
cache.put(
    query="How do I reset my password?",
    response="To reset your password, go to Settings > Security > Change Password.",
    model="gpt-4.1-mini",
    embedding=mock_embedding_1,
)

# Exact match test
result = cache.get("How do I reset my password?", "gpt-4.1-mini")
print(f"Exact match: hit={result['hit']}, type={result.get('type')}")

# Semantic match test (similar embedding = similar question)
mock_embedding_2 = [0.12, 0.79, 0.31, 0.48, 0.21]  # Very similar to embedding_1
result = cache.get("I forgot my password, how to change it?", "gpt-4.1-mini", mock_embedding_2)
print(f"Semantic match: hit={result['hit']}, type={result.get('type')}, similarity={result.get('similarity')}")

# Miss test (different topic)
mock_embedding_3 = [0.9, 0.1, 0.8, 0.1, 0.7]  # Very different embedding
result = cache.get("What are your shipping rates?", "gpt-4.1-mini", mock_embedding_3)
print(f"Different topic: hit={result['hit']}")

print(f"\nCache stats: {cache.get_stats()}")

Prompt Caching with store: true

OpenAI Prompt Caching: OpenAI automatically caches the prefix of prompts that share a common beginning (like system instructions). When you use "store": true in your API request, responses may be served from cache at a reduced cost. For maximum benefit: (1) Keep your system instructions stable and identical across requests; (2) Place dynamic content (user input) at the end of the prompt; (3) Use identical model and parameters. Cached prompt tokens are billed at 50% of the standard input rate. This is most impactful for long system prompts (>1024 tokens) that repeat across many requests.

5. Deployment Patterns

AI applications require specialized deployment strategies because model behavior changes can be subtle and only surface in production traffic. A typo in system instructions or a model version upgrade can silently degrade quality for days before metrics catch it. Blue/green deployments, canary releases, and feature flags give you the ability to test changes with real traffic and roll back instantly.

Model Version Router with Feature Flags

A model router lets you deploy new model versions, prompt changes, or configuration updates to a percentage of traffic. Unlike traditional A/B testing infrastructure, model routers must handle the unique challenge that AI responses are non-deterministic — the same input can produce different quality outputs across runs.

import hashlib
import random
import time
from dataclasses import dataclass, field


@dataclass
class ModelConfig:
    """Configuration for a deployable model version."""
    model: str
    instructions: str
    temperature: float = 1.0
    max_tokens: int = 2000
    version_tag: str = "v1"


@dataclass
class DeploymentRouter:
    """Blue/green and canary deployment router for model versions.

    Routes traffic between model configurations based on percentage splits,
    user cohorts, or feature flags. Tracks metrics per configuration.
    """

    configs: dict = field(default_factory=dict)   # name → ModelConfig
    traffic_split: dict = field(default_factory=dict)  # name → percentage (0-100)
    metrics: dict = field(default_factory=dict)    # name → {latency, errors, count}

    def add_config(self, name: str, config: ModelConfig, traffic_pct: int = 0):
        """Register a model configuration with its traffic allocation."""
        self.configs[name] = config
        self.traffic_split[name] = traffic_pct
        self.metrics[name] = {"count": 0, "errors": 0, "total_latency": 0.0}

    def _get_user_cohort(self, user_id: str) -> float:
        """Deterministic cohort assignment — same user always gets same config."""
        hash_val = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
        return (hash_val % 10000) / 100.0  # 0.00 to 99.99

    def route(self, user_id: str) -> tuple:
        """Route a user to a model configuration based on traffic split.

        Returns (config_name, ModelConfig) tuple.
        Uses deterministic hashing so the same user always hits the same config.
        """
        cohort = self._get_user_cohort(user_id)
        cumulative = 0.0

        for name, pct in sorted(self.traffic_split.items()):
            cumulative += pct
            if cohort < cumulative:
                self.metrics[name]["count"] += 1
                return name, self.configs[name]

        # Fallback to first config if splits don't sum to 100
        fallback = list(self.configs.keys())[0]
        self.metrics[fallback]["count"] += 1
        return fallback, self.configs[fallback]

    def record_result(self, config_name: str, latency_ms: float, is_error: bool = False):
        """Record a response metric for a configuration."""
        if config_name in self.metrics:
            self.metrics[config_name]["total_latency"] += latency_ms
            if is_error:
                self.metrics[config_name]["errors"] += 1

    def get_deployment_status(self) -> dict:
        """Get current deployment status with per-config metrics."""
        status = {}
        for name, config in self.configs.items():
            count = self.metrics[name]["count"]
            avg_latency = (
                self.metrics[name]["total_latency"] / count if count > 0 else 0
            )
            error_rate = (
                self.metrics[name]["errors"] / count if count > 0 else 0
            )
            status[name] = {
                "model": config.model,
                "version": config.version_tag,
                "traffic_pct": self.traffic_split[name],
                "requests": count,
                "avg_latency_ms": round(avg_latency, 1),
                "error_rate": f"{error_rate:.2%}",
            }
        return status


# Usage: Blue/Green deployment with canary
router = DeploymentRouter()

# Blue (current production) — gets 90% traffic
router.add_config("blue", ModelConfig(
    model="gpt-4.1-mini",
    instructions="You are a helpful customer support agent. Be concise and professional.",
    temperature=0.7,
    version_tag="v2.3-stable",
), traffic_pct=90)

# Green (new version) — gets 10% canary traffic
router.add_config("green", ModelConfig(
    model="gpt-4.1-mini",
    instructions="You are a helpful customer support agent. Be concise, professional, and empathetic.",
    temperature=0.6,
    version_tag="v2.4-canary",
), traffic_pct=10)

# Simulate routing 20 users
print("=== Canary Deployment Router ===\n")
for i in range(20):
    user_id = f"user_{i:04d}"
    config_name, config = router.route(user_id)
    # Simulate latency
    latency = random.uniform(200, 800) if config_name == "blue" else random.uniform(180, 750)
    router.record_result(config_name, latency)

# Display deployment status
print("Deployment Status:")
for name, status in router.get_deployment_status().items():
    print(f"  [{name.upper()}] {status['version']} | "
          f"Traffic: {status['traffic_pct']}% | "
          f"Requests: {status['requests']} | "
          f"Avg Latency: {status['avg_latency_ms']}ms | "
          f"Errors: {status['error_rate']}")

Deployment Safety Checklist

Before Promoting Canary to Production: (1) Error rate must be within 1.5× of baseline for 24 hours; (2) P95 latency must not exceed baseline by >20%; (3) Quality metrics (BLEU, human eval, or custom scorers) must meet minimum thresholds; (4) No safety regression in red-team test suite; (5) Cost per request within 10% of projected budget; (6) At least 1000 requests processed through canary without incidents. Automate these gates in your CI/CD pipeline — human-gated deployments don’t scale.

6. Scaling & Load Management

OpenAI API calls are I/O-bound (waiting for network responses), not CPU-bound. This means scaling is primarily about concurrent connection management, queue depth, and intelligent request distribution — not raw compute. A single worker can handle hundreds of concurrent requests using async I/O, and horizontal scaling adds more workers when concurrency limits are reached.

Priority Queue Worker

A priority queue ensures that latency-sensitive requests (real-time chat) are processed before batch or background tasks, even during peak load. This prevents batch processing from starving interactive users of their API quota.

import heapq
import time
import threading
from dataclasses import dataclass, field
from enum import IntEnum


class Priority(IntEnum):
    """Request priority levels — lower number = higher priority."""
    CRITICAL = 0    # System health checks, circuit breaker probes
    REALTIME = 1    # User-facing chat, interactive features
    STANDARD = 2    # Normal API requests, background enrichment
    BATCH = 3       # Bulk processing, non-urgent tasks
    LOW = 4         # Analytics, pre-computation, warmup


@dataclass(order=True)
class QueuedRequest:
    """A request in the priority queue with ordering support."""
    priority: int
    timestamp: float = field(compare=False)
    request_id: str = field(compare=False)
    payload: dict = field(compare=False)
    callback: object = field(default=None, compare=False)


class PriorityQueueWorker:
    """Priority-aware request queue with concurrency control.

    Processes high-priority requests first while respecting concurrent
    connection limits to the OpenAI API.
    """

    def __init__(self, max_concurrent: int = 50, max_queue_size: int = 10000):
        self.max_concurrent = max_concurrent
        self.max_queue_size = max_queue_size
        self.queue: list = []  # Min-heap ordered by priority
        self.active_count = 0
        self.processed = 0
        self.rejected = 0
        self._lock = threading.Lock()
        self.stats_by_priority = {p: {"enqueued": 0, "processed": 0, "avg_wait_ms": 0, "total_wait": 0}
                                  for p in Priority}

    def enqueue(self, request_id: str, payload: dict, priority: Priority = Priority.STANDARD) -> dict:
        """Add a request to the priority queue."""
        with self._lock:
            if len(self.queue) >= self.max_queue_size:
                self.rejected += 1
                return {"accepted": False, "reason": "Queue full", "queue_size": len(self.queue)}

            item = QueuedRequest(
                priority=priority.value,
                timestamp=time.time(),
                request_id=request_id,
                payload=payload,
            )
            heapq.heappush(self.queue, item)
            self.stats_by_priority[priority]["enqueued"] += 1

            return {
                "accepted": True,
                "position": len(self.queue),
                "priority": priority.name,
                "estimated_wait_ms": self._estimate_wait(priority),
            }

    def _estimate_wait(self, priority: Priority) -> int:
        """Estimate wait time based on queue depth and priority."""
        # Count items with higher or equal priority ahead in queue
        ahead = sum(1 for item in self.queue if item.priority <= priority.value)
        # Assume ~500ms average processing time per request
        slots_available = max(1, self.max_concurrent - self.active_count)
        return int((ahead / slots_available) * 500)

    def dequeue(self) -> QueuedRequest:
        """Get the next highest-priority request."""
        with self._lock:
            if not self.queue:
                return None
            if self.active_count >= self.max_concurrent:
                return None

            item = heapq.heappop(self.queue)
            self.active_count += 1
            return item

    def complete(self, request: QueuedRequest):
        """Mark a request as completed and update stats."""
        with self._lock:
            self.active_count -= 1
            self.processed += 1
            priority = Priority(request.priority)
            wait_ms = (time.time() - request.timestamp) * 1000
            self.stats_by_priority[priority]["processed"] += 1
            self.stats_by_priority[priority]["total_wait"] += wait_ms

    def get_status(self) -> dict:
        """Get queue status for monitoring dashboards."""
        return {
            "queue_depth": len(self.queue),
            "active_requests": self.active_count,
            "max_concurrent": self.max_concurrent,
            "total_processed": self.processed,
            "total_rejected": self.rejected,
            "utilization": f"{self.active_count / self.max_concurrent:.0%}",
            "by_priority": {
                p.name: {
                    "queued": self.stats_by_priority[p]["enqueued"],
                    "processed": self.stats_by_priority[p]["processed"],
                }
                for p in Priority
            },
        }


# Usage demonstration
worker = PriorityQueueWorker(max_concurrent=10, max_queue_size=1000)

print("=== Priority Queue Worker Demo ===\n")

# Enqueue mixed-priority requests
requests = [
    ("req-001", {"prompt": "Summarize this document"}, Priority.BATCH),
    ("req-002", {"prompt": "User chat message"}, Priority.REALTIME),
    ("req-003", {"prompt": "Generate analytics report"}, Priority.LOW),
    ("req-004", {"prompt": "Another user chat"}, Priority.REALTIME),
    ("req-005", {"prompt": "Health check probe"}, Priority.CRITICAL),
    ("req-006", {"prompt": "Background enrichment"}, Priority.STANDARD),
]

for req_id, payload, priority in requests:
    result = worker.enqueue(req_id, payload, priority)
    print(f"Enqueued {req_id} [{priority.name}] — position: {result['position']}, est. wait: {result['estimated_wait_ms']}ms")

# Process in priority order
print("\n--- Processing Order ---")
while True:
    item = worker.dequeue()
    if not item:
        break
    priority_name = Priority(item.priority).name
    print(f"  Processing: {item.request_id} [{priority_name}]")
    worker.complete(item)

print(f"\nFinal status: {worker.get_status()}")

Graceful Degradation

Resilience Pattern

Graceful Degradation Ladder

Level 0 — Full Service: All features operational, primary model, full response quality.

Level 1 — Reduced Quality: Switch to faster/cheaper model (GPT-4.1-mini instead of GPT-4.1), reduce max_tokens, disable optional enrichments. Users still get responses but at lower quality.

Level 2 — Cached Only: Serve only cached responses. New unique queries return a “service is busy” message with estimated recovery time. Existing cache still provides value.

Level 3 — Static Fallback: Return pre-written templated responses for common query categories. Route users to FAQ, documentation, or human support channels.

Level 4 — Maintenance Mode: Display a maintenance page. Queue all requests for processing when service recovers. Send notification when results are ready.

ResilienceAvailabilitySLA

7. Cost Optimization

OpenAI API costs scale linearly with token usage. At production scale (millions of requests/month), cost optimization becomes a core engineering discipline rather than an afterthought. The strategies below can reduce your OpenAI spend by 60-80% while maintaining response quality for the requests that matter most.

Intelligent Model Router

Not every request needs your most expensive model. A model router classifies incoming requests by complexity and routes simple queries to cheaper models while reserving expensive models for complex reasoning tasks. This single pattern typically saves 40-60% of API costs.

from dataclasses import dataclass, field


@dataclass
class ModelTier:
    """Configuration for a model pricing tier."""
    name: str
    model: str
    cost_per_1k_input: float     # USD per 1K input tokens
    cost_per_1k_output: float    # USD per 1K output tokens
    max_complexity: str          # "simple", "moderate", "complex"


class IntelligentModelRouter:
    """Route requests to the cheapest model capable of handling them.

    Uses heuristics to classify query complexity, then routes to the
    appropriate model tier. Saves 40-60% on API costs for typical workloads.
    """

    def __init__(self):
        self.tiers = [
            ModelTier("mini", "gpt-4.1-mini", 0.0004, 0.0016, "simple"),
            ModelTier("nano", "gpt-4.1-nano", 0.0001, 0.0004, "trivial"),
            ModelTier("standard", "gpt-4.1", 0.002, 0.008, "complex"),
        ]
        self.routing_stats = {"trivial": 0, "simple": 0, "moderate": 0, "complex": 0}
        self.total_cost_actual = 0.0
        self.total_cost_if_always_premium = 0.0

    def classify_complexity(self, query: str, context: dict = None) -> str:
        """Classify query complexity using fast heuristics.

        In production, you might use a lightweight classifier model or
        rule-based system. This demonstrates the heuristic approach.
        """
        query_lower = query.lower()
        word_count = len(query.split())

        # Trivial: Very short, factual lookups
        if word_count < 10 and any(kw in query_lower for kw in ["what is", "define", "who is", "when was"]):
            return "trivial"

        # Complex: Multi-step reasoning, code generation, analysis
        complex_signals = [
            "analyze", "compare", "explain why", "write code", "debug",
            "architecture", "design", "strategy", "evaluate", "trade-off",
        ]
        if any(signal in query_lower for signal in complex_signals):
            return "complex"
        if word_count > 100:  # Long queries often need more reasoning
            return "complex"

        # Moderate: Multi-part questions, summaries
        moderate_signals = ["summarize", "list", "how do i", "steps to", "example of"]
        if any(signal in query_lower for signal in moderate_signals):
            return "moderate"

        # Default: Simple
        return "simple"

    def route(self, query: str, context: dict = None) -> dict:
        """Route a query to the optimal model based on complexity."""
        complexity = self.classify_complexity(query, context)
        self.routing_stats[complexity] += 1

        # Map complexity to model tier
        tier_map = {
            "trivial": self.tiers[1],   # nano
            "simple": self.tiers[0],    # mini
            "moderate": self.tiers[0],  # mini (capable enough)
            "complex": self.tiers[2],   # standard (full power)
        }
        selected_tier = tier_map[complexity]

        # Estimate cost savings
        premium = self.tiers[2]  # Always compare against premium
        est_input_tokens = len(query.split()) * 1.3  # Rough token estimate
        est_output_tokens = 500  # Average output

        actual_cost = (est_input_tokens / 1000 * selected_tier.cost_per_1k_input +
                       est_output_tokens / 1000 * selected_tier.cost_per_1k_output)
        premium_cost = (est_input_tokens / 1000 * premium.cost_per_1k_input +
                        est_output_tokens / 1000 * premium.cost_per_1k_output)

        self.total_cost_actual += actual_cost
        self.total_cost_if_always_premium += premium_cost

        return {
            "model": selected_tier.model,
            "tier": selected_tier.name,
            "complexity": complexity,
            "estimated_cost": f"${actual_cost:.6f}",
            "savings_vs_premium": f"{(1 - actual_cost/premium_cost):.0%}" if premium_cost > 0 else "0%",
        }

    def get_cost_report(self) -> dict:
        """Generate a cost optimization report."""
        total_requests = sum(self.routing_stats.values())
        savings_pct = (1 - self.total_cost_actual / self.total_cost_if_always_premium) if self.total_cost_if_always_premium > 0 else 0
        return {
            "total_requests": total_requests,
            "routing_distribution": self.routing_stats,
            "total_cost_optimized": f"${self.total_cost_actual:.4f}",
            "total_cost_premium_only": f"${self.total_cost_if_always_premium:.4f}",
            "total_savings": f"{savings_pct:.0%}",
        }


# Usage demonstration
router = IntelligentModelRouter()

test_queries = [
    "What is Python?",
    "Define machine learning",
    "How do I install numpy using pip?",
    "Summarize the key differences between REST and GraphQL APIs",
    "Analyze the trade-offs between microservice and monolithic architecture for a fintech startup processing 10M transactions/day with strict latency requirements and regulatory compliance needs",
    "Write code for a distributed rate limiter using Redis with sliding window algorithm",
    "List 5 popular Python web frameworks",
    "Who invented the internet?",
    "Explain why transformer architectures outperform RNNs for long sequences and discuss the computational complexity trade-offs",
    "What is 2+2?",
]

print("=== Intelligent Model Router ===\n")
for query in test_queries:
    result = router.route(query)
    print(f"[{result['tier']:>8}] {result['complexity']:>8} | savings: {result['savings_vs_premium']} | {query[:60]}...")

print(f"\n--- Cost Report ---")
report = router.get_cost_report()
print(f"Total requests: {report['total_requests']}")
print(f"Distribution: {report['routing_distribution']}")
print(f"Optimized cost: {report['total_cost_optimized']}")
print(f"Premium-only cost: {report['total_cost_premium_only']}")
print(f"Total savings: {report['total_savings']}")

Cost Tracking & Budget Enforcement

Without real-time cost tracking, a single runaway feature or misconfigured prompt can blow through your monthly budget in hours. A cost tracker provides per-request cost attribution, daily/monthly budget enforcement, and alerts when spending patterns deviate from projections.

import time
from dataclasses import dataclass, field
from collections import defaultdict


@dataclass
class CostTracker:
    """Real-time cost tracking with budget enforcement and alerts.

    Tracks spending per model, per feature, and per user.
    Enforces hard budget limits and sends alerts at configurable thresholds.
    """

    daily_budget: float = 100.0         # USD per day
    monthly_budget: float = 2500.0      # USD per month
    alert_thresholds: list = field(default_factory=lambda: [0.5, 0.75, 0.9, 1.0])

    # Internal tracking
    daily_spend: float = field(default=0.0, init=False)
    monthly_spend: float = field(default=0.0, init=False)
    spend_by_model: dict = field(default_factory=lambda: defaultdict(float), init=False)
    spend_by_feature: dict = field(default_factory=lambda: defaultdict(float), init=False)
    request_count: int = field(default=0, init=False)
    alerts_fired: list = field(default_factory=list, init=False)
    day_start: float = field(default_factory=time.time, init=False)

    # Pricing per 1K tokens (as of 2026)
    PRICING = {
        "gpt-4.1": {"input": 0.002, "output": 0.008},
        "gpt-4.1-mini": {"input": 0.0004, "output": 0.0016},
        "gpt-4.1-nano": {"input": 0.0001, "output": 0.0004},
        "o3-mini": {"input": 0.0011, "output": 0.0044},
    }

    def calculate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
        """Calculate the USD cost for a single API call."""
        pricing = self.PRICING.get(model, self.PRICING["gpt-4.1-mini"])
        input_cost = (input_tokens / 1000) * pricing["input"]
        output_cost = (output_tokens / 1000) * pricing["output"]
        return input_cost + output_cost

    def record_usage(self, model: str, input_tokens: int, output_tokens: int,
                     feature: str = "default") -> dict:
        """Record API usage and check budget limits."""
        cost = self.calculate_cost(model, input_tokens, output_tokens)

        self.daily_spend += cost
        self.monthly_spend += cost
        self.spend_by_model[model] += cost
        self.spend_by_feature[feature] += cost
        self.request_count += 1

        # Check budget alerts
        daily_pct = self.daily_spend / self.daily_budget
        alerts = []
        for threshold in self.alert_thresholds:
            alert_key = f"daily_{threshold}"
            if daily_pct >= threshold and alert_key not in self.alerts_fired:
                self.alerts_fired.append(alert_key)
                alerts.append(f"Daily budget {threshold:.0%} reached: ${self.daily_spend:.2f}/${self.daily_budget:.2f}")

        # Hard limit enforcement
        budget_exceeded = self.daily_spend >= self.daily_budget

        return {
            "cost": round(cost, 6),
            "daily_spend": round(self.daily_spend, 4),
            "daily_remaining": round(max(0, self.daily_budget - self.daily_spend), 4),
            "daily_pct": f"{daily_pct:.1%}",
            "budget_exceeded": budget_exceeded,
            "alerts": alerts,
        }

    def get_report(self) -> dict:
        """Generate a comprehensive cost report."""
        return {
            "summary": {
                "total_requests": self.request_count,
                "daily_spend": f"${self.daily_spend:.4f}",
                "monthly_spend": f"${self.monthly_spend:.4f}",
                "daily_budget_used": f"{self.daily_spend / self.daily_budget:.1%}",
                "monthly_budget_used": f"{self.monthly_spend / self.monthly_budget:.1%}",
            },
            "by_model": {model: f"${cost:.4f}" for model, cost in self.spend_by_model.items()},
            "by_feature": {feat: f"${cost:.4f}" for feat, cost in self.spend_by_feature.items()},
            "avg_cost_per_request": f"${self.daily_spend / max(1, self.request_count):.6f}",
        }


# Usage demonstration
tracker = CostTracker(daily_budget=50.0, monthly_budget=1500.0)

print("=== Cost Tracker Demo ===\n")

# Simulate various API calls
calls = [
    ("gpt-4.1", 2000, 800, "chat"),
    ("gpt-4.1-mini", 500, 200, "search"),
    ("gpt-4.1-mini", 1000, 500, "chat"),
    ("gpt-4.1-nano", 200, 100, "classification"),
    ("gpt-4.1", 3000, 1500, "analysis"),
    ("gpt-4.1-mini", 800, 300, "search"),
    ("gpt-4.1-nano", 150, 50, "classification"),
]

for model, input_tok, output_tok, feature in calls:
    result = tracker.record_usage(model, input_tok, output_tok, feature)
    print(f"  {model:>14} | {feature:>14} | cost: ${result['cost']:.6f} | daily: {result['daily_pct']}")
    if result["alerts"]:
        for alert in result["alerts"]:
            print(f"    âš  ALERT: {alert}")

print(f"\n--- Cost Report ---")
report = tracker.get_report()
print(f"Total requests: {report['summary']['total_requests']}")
print(f"Daily spend: {report['summary']['daily_spend']} ({report['summary']['daily_budget_used']})")
print(f"By model: {report['by_model']}")
print(f"By feature: {report['by_feature']}")
print(f"Avg cost/request: {report['avg_cost_per_request']}")

Cost Optimization Strategies Summary

StrategyTypical SavingsImplementation EffortQuality Impact
Model routing (cheap for simple, expensive for complex)40-60%MediumMinimal (if classifier is good)
Response caching (exact + semantic)30-50%MediumNone (for cache hits)
Batch API (offline processing)50%LowNone (same models)
Prompt caching (store: true)10-25%Very LowNone
Prompt compression (shorter system instructions)15-30%LowRequires testing
Output length control (max_output_tokens)10-20%Very LowMay truncate useful info
Request deduplication (merge identical in-flight)5-15%MediumNone
Combined (all above)60-80%HighMinimal with good testing

Token Budget Controller

from dataclasses import dataclass


@dataclass
class TokenBudgetController:
    """Enforce per-request token budgets to prevent cost overruns.

    Sets maximum input and output token limits based on the request type,
    truncates oversized inputs, and configures max_output_tokens appropriately.
    """

    # Budget presets by request type
    BUDGETS = {
        "chat": {"max_input": 2000, "max_output": 500, "model": "gpt-4.1-mini"},
        "summarize": {"max_input": 8000, "max_output": 300, "model": "gpt-4.1-mini"},
        "analyze": {"max_input": 4000, "max_output": 2000, "model": "gpt-4.1"},
        "classify": {"max_input": 500, "max_output": 50, "model": "gpt-4.1-nano"},
        "generate": {"max_input": 1000, "max_output": 4000, "model": "gpt-4.1"},
    }

    def apply_budget(self, request_type: str, input_text: str) -> dict:
        """Apply token budget constraints to a request.

        Returns the constrained request parameters and any warnings.
        """
        budget = self.BUDGETS.get(request_type, self.BUDGETS["chat"])
        warnings = []

        # Estimate input tokens (rough: 1 token ≈ 4 characters for English)
        estimated_input_tokens = len(input_text) // 4

        # Truncate input if over budget
        truncated_input = input_text
        if estimated_input_tokens > budget["max_input"]:
            max_chars = budget["max_input"] * 4
            truncated_input = input_text[:max_chars] + "\n\n[Input truncated to fit token budget]"
            warnings.append(f"Input truncated from ~{estimated_input_tokens} to ~{budget['max_input']} tokens")

        # Calculate cost ceiling
        pricing = {
            "gpt-4.1": {"input": 0.002, "output": 0.008},
            "gpt-4.1-mini": {"input": 0.0004, "output": 0.0016},
            "gpt-4.1-nano": {"input": 0.0001, "output": 0.0004},
        }
        model_pricing = pricing.get(budget["model"], pricing["gpt-4.1-mini"])
        max_cost = (budget["max_input"] / 1000 * model_pricing["input"] +
                    budget["max_output"] / 1000 * model_pricing["output"])

        return {
            "model": budget["model"],
            "input_text": truncated_input,
            "max_output_tokens": budget["max_output"],
            "estimated_input_tokens": min(estimated_input_tokens, budget["max_input"]),
            "max_cost_usd": round(max_cost, 6),
            "warnings": warnings,
            "request_type": request_type,
        }


# Usage demonstration
controller = TokenBudgetController()

print("=== Token Budget Controller ===\n")

# Test different request types
test_cases = [
    ("chat", "What is the weather like today?"),
    ("classify", "This product is amazing, best purchase ever!"),
    ("summarize", "A" * 40000),  # Very long input — will be truncated
    ("analyze", "Compare the performance characteristics of PostgreSQL vs MongoDB for a time-series workload with 10M writes/day."),
]

for req_type, input_text in test_cases:
    result = controller.apply_budget(req_type, input_text)
    print(f"[{req_type:>10}] model={result['model']:>14} | "
          f"max_output={result['max_output_tokens']:>5} | "
          f"max_cost=${result['max_cost_usd']:.6f}")
    if result["warnings"]:
        for w in result["warnings"]:
            print(f"             WARNING: {w}")
Key Takeaways: Production engineering for OpenAI applications requires a layered approach: (1) A gateway with circuit breakers prevents cascading failures; (2) Adaptive rate limiting keeps you within API quotas while maximizing throughput; (3) The Batch API saves 50% on offline workloads; (4) Semantic caching eliminates redundant calls; (5) Blue/green deployments with feature flags enable safe model upgrades; (6) Priority queues ensure interactive users always get served first; (7) Model routing sends simple queries to cheap models and complex ones to powerful models, saving 40-60%. Start with caching and model routing (highest ROI, lowest effort), then add the operational infrastructure as traffic grows.

Next in the Series

In Part 16: Observability & Monitoring, we’ll cover structured logging, distributed tracing, latency dashboards, token usage analytics, quality scoring in production, alerting strategies, and OpenTelemetry integration for complete visibility into your OpenAI-powered applications.