Back to AI App Dev Series

OpenAI SDK Track Part 16: Observability & Monitoring

May 25, 2026Wasil Zafar30 min read

Gain complete visibility into your OpenAI-powered applications with structured logging, distributed tracing via OpenTelemetry, latency percentile dashboards, real-time cost monitoring, output quality scoring, and battle-tested incident response runbooks — all with executable Python code.

Table of Contents

  1. OpenAI Dashboard Traces
  2. Custom Logging
  3. Metrics & Alerting
  4. Distributed Tracing
  5. Cost Monitoring
  6. Quality Monitoring
  7. Incident Response
What You’ll Learn: Production AI systems are opaque by default — you can’t improve what you can’t measure. This article builds a complete observability stack for OpenAI applications: leveraging the built-in dashboard traces for request inspection, implementing structured logging that captures token usage and latency at every call site, building metrics pipelines with p50/p95 latency percentiles and alerting thresholds, integrating OpenTelemetry for distributed tracing across microservices, constructing real-time cost dashboards with budget alerts, monitoring output quality with hallucination detection and drift scoring, and establishing incident response runbooks for the most common failure modes.
Observability Pipeline for OpenAI Applications
                flowchart TD
                    A[OpenAI API Call] --> B[Instrumentation Layer]
                    B --> C[Structured Logs]
                    B --> D[Metrics Collector]
                    B --> E[Trace Spans]
                    B --> F[Cost Events]
                    C --> G[Log Aggregator]
                    D --> H[Time-Series DB]
                    E --> I[Trace Backend]
                    F --> J[Cost Dashboard]
                    G --> K[Search & Analysis]
                    H --> L[Dashboards & Alerts]
                    I --> M[Request Waterfall]
                    J --> N[Budget Alerts]
                    K --> O[Incident Detection]
                    L --> O
                    M --> O
                    N --> O
                    O --> P[Runbook Automation]
                    P --> Q[PagerDuty / Slack]
            

1. OpenAI Dashboard Traces

OpenAI provides built-in trace visualization in the API dashboard at platform.openai.com. Every API request is logged with metadata including model, token counts, latency, status, and any user-provided identifiers. Before building custom observability, understand what’s available out of the box — the dashboard traces are often sufficient for debugging individual requests and understanding usage patterns.

Trace Metadata & Filtering

The OpenAI dashboard supports filtering traces by user (set via the user parameter), model name, status code, and time range. By consistently passing structured user identifiers, you can trace any issue back to a specific user session. The metadata field on requests enables custom key-value pairs for even richer filtering.

FilterParameterExampleUse Case
User IDuser"user_abc123"Debug specific user issues
ModelAuto-captured"gpt-4.1"Compare model performance
StatusAuto-captured200, 429, 500Find errors, rate limits
Time RangeDashboard filterLast 24 hoursIncident correlation
Metadatametadata{"feature": "chat"}Feature-level attribution
import time
import hashlib
from dataclasses import dataclass, field
from typing import Optional


@dataclass
class TraceableRequest:
    """Build OpenAI API requests with full trace metadata for dashboard filtering.

    Attaches user_id, session_id, and custom metadata to every request,
    enabling filtering and debugging in the OpenAI dashboard.
    """

    user_id: str
    session_id: str
    feature: str = "default"
    environment: str = "production"

    def build_request_params(
        self,
        model: str,
        messages: list,
        extra_metadata: Optional[dict] = None,
    ) -> dict:
        """Construct request parameters with observability metadata."""
        # Generate a stable user hash for privacy (don't send raw user IDs)
        user_hash = hashlib.sha256(
            f"{self.user_id}:{self.environment}".encode()
        ).hexdigest()[:16]

        params = {
            "model": model,
            "messages": messages,
            "user": f"user_{user_hash}",
            "metadata": {
                "session_id": self.session_id,
                "feature": self.feature,
                "environment": self.environment,
                "request_time": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
                **(extra_metadata or {}),
            },
        }
        return params

    def build_session_context(self) -> dict:
        """Return session context for trace correlation across requests."""
        return {
            "user_id": self.user_id,
            "session_id": self.session_id,
            "feature": self.feature,
            "environment": self.environment,
        }


# Usage demonstration
tracer = TraceableRequest(
    user_id="customer_42",
    session_id="sess_abc123def456",
    feature="document_summarizer",
    environment="production",
)

messages = [
    {"role": "system", "content": "You are a helpful assistant."},
    {"role": "user", "content": "Summarize this quarterly report."},
]

params = tracer.build_request_params(
    model="gpt-4.1",
    messages=messages,
    extra_metadata={"document_type": "financial_report", "page_count": 12},
)

print("=== Traceable Request Parameters ===")
for key, value in params.items():
    if key == "messages":
        print(f"  {key}: [{len(value)} messages]")
    elif key == "metadata":
        print(f"  {key}:")
        for mk, mv in value.items():
            print(f"    {mk}: {mv}")
    else:
        print(f"  {key}: {value}")

print(f"\n=== Session Context ===")
ctx = tracer.build_session_context()
for k, v in ctx.items():
    print(f"  {k}: {v}")

2. Custom Logging

The OpenAI dashboard shows individual requests, but production systems need aggregated views: average latency over time, token usage trends, error rate patterns, and cost accumulation. Structured logging with consistent schemas enables powerful querying in log aggregators like Elasticsearch, Datadog, or CloudWatch. Every API call should emit a structured log event capturing timing, tokens, cost, and outcome.

Structured Log Schema

A well-designed log schema enables ad-hoc querying without schema migrations. The key principle: log everything you might want to filter or aggregate on, but keep the schema flat for easy indexing. Nested objects are acceptable for metadata, but the primary dimensions (model, latency, tokens, status) should be top-level fields.

Logging Best Practices: (1) Use structured JSON, never plain text; (2) Include a correlation ID linking all logs within a single user request; (3) Log token counts separately for input and output (cost attribution); (4) Record wall-clock latency, not just API response time; (5) Include the first 200 chars of the prompt for debugging (but never log full user content in production — PII risk); (6) Use log levels consistently: DEBUG for request/response bodies, INFO for metrics, WARN for retries, ERROR for failures.
import json
import time
import logging
import uuid
from dataclasses import dataclass, field, asdict
from typing import Optional, Any


# Configure structured JSON logging
class JSONFormatter(logging.Formatter):
    """Format log records as single-line JSON for log aggregators."""

    def format(self, record: logging.LogRecord) -> str:
        log_data = {
            "timestamp": self.formatTime(record),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
        }
        # Merge any extra structured data
        if hasattr(record, "structured_data"):
            log_data.update(record.structured_data)
        return json.dumps(log_data, default=str)


# Set up logger with JSON output
logger = logging.getLogger("openai_observability")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.handlers = [handler]


@dataclass
class LLMCallMetrics:
    """Structured metrics for a single OpenAI API call."""

    request_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
    correlation_id: str = ""
    model: str = ""
    feature: str = ""
    status: str = "success"  # success | error | timeout | rate_limited

    # Timing
    latency_ms: float = 0.0
    time_to_first_token_ms: Optional[float] = None

    # Token usage
    input_tokens: int = 0
    output_tokens: int = 0
    total_tokens: int = 0

    # Cost (estimated)
    estimated_cost_usd: float = 0.0

    # Error details
    error_type: Optional[str] = None
    error_message: Optional[str] = None
    retry_count: int = 0

    def to_log_record(self) -> dict:
        """Convert to flat dictionary for structured logging."""
        data = asdict(self)
        data["total_tokens"] = self.input_tokens + self.output_tokens
        return {k: v for k, v in data.items() if v is not None and v != ""}


def log_llm_call(metrics: LLMCallMetrics):
    """Emit a structured log event for an LLM API call."""
    record = logging.LogRecord(
        name="openai_observability",
        level=logging.INFO if metrics.status == "success" else logging.ERROR,
        pathname="",
        lineno=0,
        msg=f"llm_call_{metrics.status}",
        args=(),
        exc_info=None,
    )
    record.structured_data = metrics.to_log_record()
    logger.handle(record)


# Simulate logging several API calls
print("=== Structured LLM Call Logs ===\n")

# Successful call
metrics_success = LLMCallMetrics(
    correlation_id="corr_user42_req1",
    model="gpt-4.1",
    feature="chat",
    status="success",
    latency_ms=847.3,
    time_to_first_token_ms=234.1,
    input_tokens=1250,
    output_tokens=380,
    estimated_cost_usd=0.0043,
)
log_llm_call(metrics_success)

# Rate-limited call with retry
metrics_retry = LLMCallMetrics(
    correlation_id="corr_user42_req2",
    model="gpt-4.1-mini",
    feature="summarizer",
    status="rate_limited",
    latency_ms=2340.0,
    input_tokens=4500,
    output_tokens=0,
    retry_count=2,
    error_type="RateLimitError",
    error_message="Rate limit exceeded. Retry after 3s.",
)
log_llm_call(metrics_retry)

# Timeout
metrics_timeout = LLMCallMetrics(
    correlation_id="corr_user99_req1",
    model="gpt-4.1",
    feature="code_review",
    status="timeout",
    latency_ms=30000.0,
    input_tokens=8200,
    output_tokens=0,
    error_type="TimeoutError",
    error_message="Request timed out after 30s",
)
log_llm_call(metrics_timeout)

print("\n=== Queryable Fields ===")
print("Filter examples for log aggregator:")
print('  status:error AND model:"gpt-4.1"')
print("  latency_ms:>2000 AND feature:chat")
print("  retry_count:>0")
print('  error_type:"RateLimitError"')

3. Metrics & Alerting

Logs tell you what happened; metrics tell you how the system is behaving right now. For OpenAI applications, the four golden signals are: latency (p50/p95/p99), error rate (percentage of non-200 responses), token throughput (tokens/minute consumed vs. quota), and cost rate (USD/hour). Each metric needs baseline values, alert thresholds, and escalation rules.

Key Metrics & Thresholds

MetricCalculationWarning ThresholdCritical ThresholdAction
p50 LatencyMedian response time>2s (GPT-4.1)>5sCheck model load, reduce prompt
p95 Latency95th percentile>8s>15sScale horizontally, add timeout
Error RateNon-200 / Total %>2%>5%Check quotas, circuit breaker
429 RateRate limits / Total %>1%>5%Reduce concurrency, backoff
Token ThroughputTokens/min consumed>80% quota>95% quotaRequest quota increase
Cost RateUSD/hour rolling>budget × 1.2>budget × 1.5Route to cheaper models
Quality ScoreAvg output quality<0.7<0.5Check prompt drift, model changes
import time
import statistics
from dataclasses import dataclass, field
from collections import deque
from enum import Enum


class AlertSeverity(Enum):
    OK = "ok"
    WARNING = "warning"
    CRITICAL = "critical"


@dataclass
class MetricsCollector:
    """Collects and computes observability metrics for OpenAI API calls.

    Maintains sliding windows for latency percentiles, error rates,
    token throughput, and cost tracking with configurable alert thresholds.
    """

    window_seconds: int = 300  # 5-minute sliding window

    # Sliding window storage
    latencies: deque = field(default_factory=lambda: deque(maxlen=10000))
    errors: deque = field(default_factory=lambda: deque(maxlen=10000))
    tokens: deque = field(default_factory=lambda: deque(maxlen=10000))
    costs: deque = field(default_factory=lambda: deque(maxlen=10000))

    # Alert thresholds
    p95_latency_warn: float = 8.0       # seconds
    p95_latency_crit: float = 15.0
    error_rate_warn: float = 0.02       # 2%
    error_rate_crit: float = 0.05       # 5%
    cost_hourly_warn: float = 50.0      # USD/hour
    cost_hourly_crit: float = 100.0

    # Counters
    total_requests: int = field(default=0, init=False)
    total_errors: int = field(default=0, init=False)

    def record_call(self, latency_s: float, tokens_used: int,
                    cost_usd: float, is_error: bool = False):
        """Record metrics from a single API call."""
        now = time.time()
        self.latencies.append((now, latency_s))
        self.tokens.append((now, tokens_used))
        self.costs.append((now, cost_usd))
        self.errors.append((now, 1 if is_error else 0))
        self.total_requests += 1
        if is_error:
            self.total_errors += 1

    def _window_values(self, data: deque) -> list:
        """Get values within the current time window."""
        cutoff = time.time() - self.window_seconds
        return [v for t, v in data if t > cutoff]

    def get_latency_percentiles(self) -> dict:
        """Calculate p50, p95, p99 latency from the sliding window."""
        values = self._window_values(self.latencies)
        if not values:
            return {"p50": 0, "p95": 0, "p99": 0, "count": 0}
        sorted_vals = sorted(values)
        n = len(sorted_vals)
        return {
            "p50": sorted_vals[int(n * 0.50)],
            "p95": sorted_vals[int(n * 0.95)] if n > 20 else sorted_vals[-1],
            "p99": sorted_vals[int(n * 0.99)] if n > 100 else sorted_vals[-1],
            "count": n,
        }

    def get_error_rate(self) -> float:
        """Calculate error rate within the sliding window."""
        values = self._window_values(self.errors)
        if not values:
            return 0.0
        return sum(values) / len(values)

    def get_token_throughput(self) -> float:
        """Calculate tokens per minute within the sliding window."""
        values = self._window_values(self.tokens)
        if not values:
            return 0.0
        window_minutes = self.window_seconds / 60.0
        return sum(values) / window_minutes

    def get_cost_rate(self) -> float:
        """Calculate USD per hour within the sliding window."""
        values = self._window_values(self.costs)
        if not values:
            return 0.0
        window_hours = self.window_seconds / 3600.0
        return sum(values) / window_hours

    def evaluate_alerts(self) -> list:
        """Evaluate all metrics against alert thresholds."""
        alerts = []
        percentiles = self.get_latency_percentiles()
        error_rate = self.get_error_rate()
        cost_rate = self.get_cost_rate()

        # Latency alerts
        p95 = percentiles["p95"]
        if p95 > self.p95_latency_crit:
            alerts.append(("p95_latency", AlertSeverity.CRITICAL,
                           f"p95={p95:.1f}s > {self.p95_latency_crit}s"))
        elif p95 > self.p95_latency_warn:
            alerts.append(("p95_latency", AlertSeverity.WARNING,
                           f"p95={p95:.1f}s > {self.p95_latency_warn}s"))

        # Error rate alerts
        if error_rate > self.error_rate_crit:
            alerts.append(("error_rate", AlertSeverity.CRITICAL,
                           f"error_rate={error_rate:.1%} > {self.error_rate_crit:.1%}"))
        elif error_rate > self.error_rate_warn:
            alerts.append(("error_rate", AlertSeverity.WARNING,
                           f"error_rate={error_rate:.1%} > {self.error_rate_warn:.1%}"))

        # Cost alerts
        if cost_rate > self.cost_hourly_crit:
            alerts.append(("cost_rate", AlertSeverity.CRITICAL,
                           f"cost=${cost_rate:.2f}/hr > ${self.cost_hourly_crit}/hr"))
        elif cost_rate > self.cost_hourly_warn:
            alerts.append(("cost_rate", AlertSeverity.WARNING,
                           f"cost=${cost_rate:.2f}/hr > ${self.cost_hourly_warn}/hr"))

        return alerts

    def get_dashboard(self) -> dict:
        """Generate a complete metrics dashboard snapshot."""
        percentiles = self.get_latency_percentiles()
        return {
            "latency": percentiles,
            "error_rate": f"{self.get_error_rate():.2%}",
            "token_throughput_per_min": round(self.get_token_throughput()),
            "cost_per_hour_usd": f"${self.get_cost_rate():.2f}",
            "total_requests": self.total_requests,
            "total_errors": self.total_errors,
            "active_alerts": [(name, sev.value, msg)
                              for name, sev, msg in self.evaluate_alerts()],
        }


# Usage demonstration — simulate production traffic
import random
random.seed(42)
collector = MetricsCollector(window_seconds=300)

print("=== Simulating 200 API Calls ===\n")
for i in range(200):
    # Simulate realistic latency distribution (log-normal)
    latency = random.lognormvariate(0.5, 0.8)  # median ~1.6s
    tokens = random.randint(500, 8000)
    cost = tokens * 0.000015  # ~$15/1M tokens
    is_error = random.random() < 0.03  # 3% error rate

    collector.record_call(latency, tokens, cost, is_error)

dashboard = collector.get_dashboard()
print("--- Metrics Dashboard ---")
print(f"  Latency p50: {dashboard['latency']['p50']:.2f}s")
print(f"  Latency p95: {dashboard['latency']['p95']:.2f}s")
print(f"  Latency p99: {dashboard['latency']['p99']:.2f}s")
print(f"  Error Rate: {dashboard['error_rate']}")
print(f"  Token Throughput: {dashboard['token_throughput_per_min']} tokens/min")
print(f"  Cost Rate: {dashboard['cost_per_hour_usd']}/hour")
print(f"  Total Requests: {dashboard['total_requests']}")
print(f"  Total Errors: {dashboard['total_errors']}")
print(f"\n--- Active Alerts ---")
if dashboard["active_alerts"]:
    for name, severity, message in dashboard["active_alerts"]:
        print(f"  [{severity.upper()}] {name}: {message}")
else:
    print("  No active alerts — all metrics within thresholds")

4. Distributed Tracing

In microservice architectures, a single user request may traverse multiple services before reaching the OpenAI API. Distributed tracing with OpenTelemetry creates a unified view of the entire request lifecycle — from the initial HTTP request through your business logic, prompt construction, API call, response parsing, and back to the user. Each step is a “span” with timing, attributes, and parent-child relationships.

OpenTelemetry Span Attributes

The OpenTelemetry Semantic Conventions for GenAI define standard attribute names for LLM spans. By following these conventions, your traces are compatible with any OpenTelemetry-compatible backend (Jaeger, Tempo, Honeycomb, Datadog) without custom configuration.

Key Span Attributes for LLM Calls: gen_ai.system = "openai", gen_ai.request.model = model name, gen_ai.response.model = actual model used, gen_ai.usage.input_tokens, gen_ai.usage.output_tokens, gen_ai.response.finish_reasons = ["stop"], server.address = "api.openai.com". These attributes enable cross-vendor querying and standardized dashboards regardless of your tracing backend.
import time
import uuid
from dataclasses import dataclass, field
from typing import Optional
from contextlib import contextmanager


@dataclass
class Span:
    """Represents a single span in a distributed trace (OpenTelemetry-compatible)."""

    name: str
    trace_id: str
    span_id: str = field(default_factory=lambda: uuid.uuid4().hex[:16])
    parent_span_id: Optional[str] = None
    start_time: float = field(default_factory=time.time)
    end_time: Optional[float] = None
    attributes: dict = field(default_factory=dict)
    status: str = "OK"

    def set_attribute(self, key: str, value):
        """Set a span attribute following OpenTelemetry semantic conventions."""
        self.attributes[key] = value

    def end(self):
        """Mark the span as complete."""
        self.end_time = time.time()

    @property
    def duration_ms(self) -> float:
        if self.end_time:
            return (self.end_time - self.start_time) * 1000
        return (time.time() - self.start_time) * 1000


@dataclass
class LLMTracer:
    """Distributed tracer for OpenAI API calls with OpenTelemetry-compatible spans.

    Creates hierarchical spans following GenAI semantic conventions:
    - Root span: User request (HTTP handler)
    - Child span: Prompt construction
    - Child span: OpenAI API call (with LLM-specific attributes)
    - Child span: Response processing
    """

    service_name: str = "ai-service"
    spans: list = field(default_factory=list)
    _current_trace_id: str = field(default_factory=lambda: uuid.uuid4().hex[:32])

    @contextmanager
    def start_span(self, name: str, parent: Optional[Span] = None):
        """Context manager to create and auto-close a span."""
        span = Span(
            name=name,
            trace_id=self._current_trace_id,
            parent_span_id=parent.span_id if parent else None,
        )
        span.set_attribute("service.name", self.service_name)
        self.spans.append(span)
        try:
            yield span
        except Exception as e:
            span.status = "ERROR"
            span.set_attribute("error.type", type(e).__name__)
            span.set_attribute("error.message", str(e))
            raise
        finally:
            span.end()

    def create_llm_span(self, parent: Span, model: str,
                        input_tokens: int, output_tokens: int,
                        latency_ms: float, finish_reason: str = "stop") -> Span:
        """Create an LLM-specific span with GenAI semantic convention attributes."""
        span = Span(
            name="openai.chat.completions",
            trace_id=self._current_trace_id,
            parent_span_id=parent.span_id,
        )
        # GenAI Semantic Conventions (OpenTelemetry)
        span.set_attribute("gen_ai.system", "openai")
        span.set_attribute("gen_ai.request.model", model)
        span.set_attribute("gen_ai.response.model", model)
        span.set_attribute("gen_ai.usage.input_tokens", input_tokens)
        span.set_attribute("gen_ai.usage.output_tokens", output_tokens)
        span.set_attribute("gen_ai.response.finish_reasons", [finish_reason])
        span.set_attribute("server.address", "api.openai.com")
        span.set_attribute("server.port", 443)

        # Custom attributes for deeper observability
        span.set_attribute("llm.latency_ms", latency_ms)
        span.set_attribute("llm.cost_usd", (input_tokens * 0.01 + output_tokens * 0.03) / 1000)

        span.end_time = span.start_time + (latency_ms / 1000)
        self.spans.append(span)
        return span

    def get_trace_summary(self) -> dict:
        """Generate a human-readable trace summary."""
        total_duration = 0
        if self.spans:
            start = min(s.start_time for s in self.spans)
            end = max(s.end_time or s.start_time for s in self.spans)
            total_duration = (end - start) * 1000

        return {
            "trace_id": self._current_trace_id,
            "total_spans": len(self.spans),
            "total_duration_ms": round(total_duration, 1),
            "spans": [
                {
                    "name": s.name,
                    "duration_ms": round(s.duration_ms, 1),
                    "parent": s.parent_span_id[:8] if s.parent_span_id else None,
                    "status": s.status,
                    "llm_model": s.attributes.get("gen_ai.request.model"),
                }
                for s in self.spans
            ],
        }


# Usage demonstration — trace a complete user request
tracer = LLMTracer(service_name="document-summarizer")

print("=== Distributed Trace Example ===\n")

with tracer.start_span("handle_request") as root_span:
    root_span.set_attribute("http.method", "POST")
    root_span.set_attribute("http.route", "/api/summarize")
    root_span.set_attribute("user.id", "user_42")

    # Span: Prompt construction
    with tracer.start_span("build_prompt", parent=root_span) as prompt_span:
        prompt_span.set_attribute("prompt.template", "summarize_v2")
        prompt_span.set_attribute("prompt.input_length", 4200)
        time.sleep(0.005)  # Simulate prompt building

    # Span: OpenAI API call
    llm_span = tracer.create_llm_span(
        parent=root_span,
        model="gpt-4.1",
        input_tokens=1850,
        output_tokens=420,
        latency_ms=1234.5,
        finish_reason="stop",
    )

    # Span: Response processing
    with tracer.start_span("process_response", parent=root_span) as proc_span:
        proc_span.set_attribute("response.format", "markdown")
        proc_span.set_attribute("response.word_count", 312)
        time.sleep(0.002)  # Simulate processing

# Print trace summary
summary = tracer.get_trace_summary()
print(f"Trace ID: {summary['trace_id']}")
print(f"Total Spans: {summary['total_spans']}")
print(f"Total Duration: {summary['total_duration_ms']}ms\n")

print("--- Span Waterfall ---")
for span in summary["spans"]:
    indent = "  " if span["parent"] else ""
    model_info = f" [{span['llm_model']}]" if span["llm_model"] else ""
    print(f"{indent}{span['name']}{model_info} — {span['duration_ms']}ms [{span['status']}]")

5. Cost Monitoring

OpenAI costs can escalate rapidly in production — a single prompt engineering change that increases output length by 2× doubles your cost overnight. Real-time cost monitoring with budget alerts prevents bill shock. The key is attributing costs to specific features, teams, and users so you can optimize the most expensive call paths first.

Cost Attribution Model

Every API call should be tagged with a cost center (feature, team, or user). This enables drill-down from “we spent $5,000 yesterday” to “the document summarizer for Team Alpha consumed 60% of budget due to a prompt regression.” Without attribution, cost optimization is guesswork.

ModelInput Cost ($/1M tokens)Output Cost ($/1M tokens)Typical Use Case
GPT-4.1$2.00$8.00Complex reasoning, code generation
GPT-4.1-mini$0.40$1.60Chat, summarization, classification
GPT-4.1-nano$0.10$0.40Simple extraction, routing, tagging
o3$2.00$8.00Multi-step reasoning, math, planning
o4-mini$1.10$4.40Balanced reasoning tasks
import time
from dataclasses import dataclass, field
from collections import defaultdict
from typing import Optional


# Pricing per 1M tokens (as of May 2026)
MODEL_PRICING = {
    "gpt-4.1":      {"input": 2.00, "output": 8.00},
    "gpt-4.1-mini": {"input": 0.40, "output": 1.60},
    "gpt-4.1-nano": {"input": 0.10, "output": 0.40},
    "o3":           {"input": 2.00, "output": 8.00},
    "o4-mini":      {"input": 1.10, "output": 4.40},
}


@dataclass
class CostMonitor:
    """Real-time cost monitoring with budget alerts and attribution.

    Tracks costs by feature, team, and model with configurable
    budget thresholds and alert escalation.
    """

    daily_budget_usd: float = 500.0
    alert_warn_pct: float = 0.80    # Alert at 80% of budget
    alert_crit_pct: float = 0.95    # Critical at 95% of budget

    # Cost accumulators
    costs_by_feature: dict = field(default_factory=lambda: defaultdict(float))
    costs_by_team: dict = field(default_factory=lambda: defaultdict(float))
    costs_by_model: dict = field(default_factory=lambda: defaultdict(float))
    total_cost_today: float = field(default=0.0, init=False)
    call_count: int = field(default=0, init=False)

    def record_usage(self, model: str, input_tokens: int, output_tokens: int,
                     feature: str = "unknown", team: str = "unknown"):
        """Record token usage and calculate cost."""
        pricing = MODEL_PRICING.get(model, {"input": 2.00, "output": 8.00})
        input_cost = (input_tokens / 1_000_000) * pricing["input"]
        output_cost = (output_tokens / 1_000_000) * pricing["output"]
        total_cost = input_cost + output_cost

        self.costs_by_feature[feature] += total_cost
        self.costs_by_team[team] += total_cost
        self.costs_by_model[model] += total_cost
        self.total_cost_today += total_cost
        self.call_count += 1

        # Check budget alerts
        alerts = self._check_budget_alerts()
        return {"cost_usd": total_cost, "alerts": alerts}

    def _check_budget_alerts(self) -> list:
        """Check if spending has crossed alert thresholds."""
        alerts = []
        pct_used = self.total_cost_today / self.daily_budget_usd

        if pct_used >= self.alert_crit_pct:
            alerts.append({
                "severity": "CRITICAL",
                "message": f"Daily budget {pct_used:.0%} consumed (${self.total_cost_today:.2f}/${self.daily_budget_usd})",
                "action": "Throttle non-critical requests, route to cheaper models",
            })
        elif pct_used >= self.alert_warn_pct:
            alerts.append({
                "severity": "WARNING",
                "message": f"Daily budget {pct_used:.0%} consumed (${self.total_cost_today:.2f}/${self.daily_budget_usd})",
                "action": "Monitor closely, prepare model downgrade",
            })
        return alerts

    def get_cost_report(self) -> dict:
        """Generate a comprehensive cost attribution report."""
        return {
            "total_cost_today": f"${self.total_cost_today:.4f}",
            "budget_remaining": f"${self.daily_budget_usd - self.total_cost_today:.4f}",
            "budget_used_pct": f"{(self.total_cost_today / self.daily_budget_usd):.1%}",
            "total_calls": self.call_count,
            "avg_cost_per_call": f"${self.total_cost_today / max(self.call_count, 1):.6f}",
            "by_feature": {k: f"${v:.4f}" for k, v in
                           sorted(self.costs_by_feature.items(), key=lambda x: -x[1])},
            "by_team": {k: f"${v:.4f}" for k, v in
                        sorted(self.costs_by_team.items(), key=lambda x: -x[1])},
            "by_model": {k: f"${v:.4f}" for k, v in
                         sorted(self.costs_by_model.items(), key=lambda x: -x[1])},
        }


# Usage demonstration — simulate a day of production traffic
import random
random.seed(42)

monitor = CostMonitor(daily_budget_usd=100.0)

# Simulate varied usage across features and teams
usage_patterns = [
    ("chat", "platform", "gpt-4.1-mini", (800, 2000), (200, 800)),
    ("summarizer", "content", "gpt-4.1", (3000, 8000), (500, 2000)),
    ("classifier", "ml-team", "gpt-4.1-nano", (200, 500), (10, 50)),
    ("code_review", "engineering", "gpt-4.1", (5000, 15000), (1000, 4000)),
    ("search", "platform", "gpt-4.1-mini", (500, 1500), (100, 400)),
]

print("=== Cost Monitoring Simulation (500 calls) ===\n")
for _ in range(500):
    feature, team, model, input_range, output_range = random.choice(usage_patterns)
    input_tokens = random.randint(*input_range)
    output_tokens = random.randint(*output_range)
    result = monitor.record_usage(model, input_tokens, output_tokens, feature, team)

# Print report
report = monitor.get_cost_report()
print("--- Daily Cost Report ---")
print(f"  Total Cost: {report['total_cost_today']}")
print(f"  Budget Used: {report['budget_used_pct']}")
print(f"  Budget Remaining: {report['budget_remaining']}")
print(f"  Total Calls: {report['total_calls']}")
print(f"  Avg Cost/Call: {report['avg_cost_per_call']}")

print(f"\n--- Cost by Feature ---")
for feature, cost in report["by_feature"].items():
    print(f"  {feature:>15}: {cost}")

print(f"\n--- Cost by Team ---")
for team, cost in report["by_team"].items():
    print(f"  {team:>15}: {cost}")

print(f"\n--- Cost by Model ---")
for model, cost in report["by_model"].items():
    print(f"  {model:>15}: {cost}")

6. Quality Monitoring

Monitoring latency and errors tells you the system is running; quality monitoring tells you it’s running correctly. LLM outputs can degrade silently — hallucination rates increase, responses drift from expected formats, or model updates change behavior. Quality monitoring detects these regressions before users complain, using automated scoring, user feedback loops, and drift detection.

Quality Dimensions

Silent Degradation Risk: Unlike traditional APIs where failures are binary (works or doesn’t), LLM quality degrades on a spectrum. A model might still return 200 OK but produce subtly worse outputs after a provider-side update. Without quality monitoring, you won’t know until user complaints spike — which can take days. Automated quality scoring catches degradation within minutes.
import time
import re
import statistics
from dataclasses import dataclass, field
from collections import deque
from typing import Optional


@dataclass
class QualityScorer:
    """Automated quality scoring for LLM outputs.

    Implements multiple quality dimensions:
    - Format compliance (JSON validity, length bounds, required fields)
    - Hallucination detection (claim density, hedging language)
    - Relevance scoring (keyword overlap with input)
    - Consistency tracking (output stability for similar inputs)
    """

    # Sliding window for drift detection
    scores: deque = field(default_factory=lambda: deque(maxlen=1000))
    baseline_mean: Optional[float] = None
    baseline_std: Optional[float] = None

    def score_format_compliance(self, output: str, expected_format: str = "text") -> float:
        """Score whether output matches expected format constraints."""
        score = 1.0

        if expected_format == "json":
            try:
                import json
                json.loads(output)
            except (json.JSONDecodeError, ValueError):
                score = 0.0
        elif expected_format == "markdown":
            has_headers = bool(re.search(r'^#{1,6}\s', output, re.MULTILINE))
            has_structure = bool(re.search(r'(\*|-|\d+\.)\s', output))
            score = 0.5 * has_headers + 0.5 * has_structure

        # Length sanity check (too short or too long is suspicious)
        word_count = len(output.split())
        if word_count < 10:
            score *= 0.5  # Suspiciously short
        elif word_count > 5000:
            score *= 0.8  # Possibly over-verbose

        return round(score, 3)

    def score_hallucination_risk(self, output: str) -> float:
        """Estimate hallucination risk based on linguistic markers.

        Higher score = lower hallucination risk (better quality).
        """
        # Hedging language indicates uncertainty awareness (good)
        hedging_markers = [
            "may", "might", "could", "possibly", "likely", "approximately",
            "it appears", "based on", "according to", "generally",
        ]
        hedging_count = sum(1 for m in hedging_markers if m in output.lower())

        # Absolute claims without citations (risky)
        absolute_markers = [
            "always", "never", "definitely", "certainly", "all",
            "everyone knows", "it is a fact", "undoubtedly",
        ]
        absolute_count = sum(1 for m in absolute_markers if m in output.lower())

        # Specific numbers/dates without context (risky if unverifiable)
        specific_claims = len(re.findall(r'\b\d{4}\b|\b\d+%\b|\$[\d,]+', output))

        # Score: more hedging = better, more absolutes = worse
        sentences = max(len(re.split(r'[.!?]+', output)), 1)
        hedging_density = hedging_count / sentences
        absolute_density = absolute_count / sentences
        claim_density = specific_claims / sentences

        score = 1.0
        score -= absolute_density * 0.3   # Penalize absolute claims
        score -= claim_density * 0.1      # Mild penalty for unverifiable specifics
        score += hedging_density * 0.1    # Reward appropriate hedging
        return round(max(0.0, min(1.0, score)), 3)

    def score_relevance(self, output: str, input_text: str) -> float:
        """Score output relevance to the input using keyword overlap."""
        # Extract meaningful words (>3 chars, not stopwords)
        stopwords = {"the", "and", "for", "that", "this", "with", "from", "have", "been"}
        input_words = {w.lower() for w in re.findall(r'\b\w{4,}\b', input_text)} - stopwords
        output_words = {w.lower() for w in re.findall(r'\b\w{4,}\b', output)} - stopwords

        if not input_words:
            return 0.5  # Can't assess relevance without input context

        # What fraction of input concepts appear in output?
        overlap = input_words & output_words
        recall = len(overlap) / len(input_words)

        return round(min(1.0, recall * 1.5), 3)  # Scale up, cap at 1.0

    def compute_composite_score(self, output: str, input_text: str,
                                expected_format: str = "text") -> dict:
        """Compute all quality dimensions and a composite score."""
        format_score = self.score_format_compliance(output, expected_format)
        hallucination_score = self.score_hallucination_risk(output)
        relevance_score = self.score_relevance(output, input_text)

        # Weighted composite
        composite = (
            format_score * 0.3 +
            hallucination_score * 0.4 +
            relevance_score * 0.3
        )

        # Track for drift detection
        self.scores.append((time.time(), composite))

        return {
            "composite": round(composite, 3),
            "format_compliance": format_score,
            "hallucination_risk": hallucination_score,
            "relevance": relevance_score,
            "drift_detected": self._check_drift(composite),
        }

    def _check_drift(self, current_score: float) -> bool:
        """Detect quality drift using statistical process control."""
        if len(self.scores) < 50:
            return False  # Not enough data

        recent_scores = [s for _, s in list(self.scores)[-50:]]

        # Establish baseline from first 80% of data
        if self.baseline_mean is None:
            baseline_data = [s for _, s in list(self.scores)[:int(len(self.scores) * 0.8)]]
            if len(baseline_data) >= 30:
                self.baseline_mean = statistics.mean(baseline_data)
                self.baseline_std = statistics.stdev(baseline_data)

        if self.baseline_mean is None or self.baseline_std is None:
            return False

        # Alert if recent mean is >2 std deviations below baseline
        recent_mean = statistics.mean(recent_scores)
        threshold = self.baseline_mean - (2 * self.baseline_std)
        return recent_mean < threshold

    def set_baseline(self, scores: list):
        """Manually set quality baseline from historical data."""
        self.baseline_mean = statistics.mean(scores)
        self.baseline_std = statistics.stdev(scores)


# Usage demonstration
scorer = QualityScorer()
scorer.set_baseline([0.78, 0.82, 0.80, 0.75, 0.83, 0.79, 0.81, 0.77, 0.84, 0.80])

print("=== Quality Scoring Examples ===\n")

# Example 1: Good quality output
good_output = """Based on the quarterly financial report, revenue grew approximately 15% year-over-year,
reaching $2.3 billion. The growth was primarily driven by the cloud services division,
which may have benefited from increased enterprise adoption. Operating margins improved
slightly, though this could be partially attributed to one-time cost reductions."""

input_text = "Summarize the quarterly financial report focusing on revenue growth and margins"

result = scorer.compute_composite_score(good_output, input_text)
print("--- Example 1: Well-structured summary ---")
for k, v in result.items():
    print(f"  {k}: {v}")

# Example 2: Poor quality (hallucination markers)
poor_output = """The company definitely achieved 47.3% growth in Q3 2026, making it
the fastest growing company in history. Everyone knows this was caused by their
revolutionary AI product that always outperforms all competitors. The CEO certainly
confirmed they will never face any challenges."""

result2 = scorer.compute_composite_score(poor_output, input_text)
print("\n--- Example 2: High hallucination risk ---")
for k, v in result2.items():
    print(f"  {k}: {v}")

# Example 3: Irrelevant output
irrelevant_output = """Here is a recipe for chocolate cake. Preheat oven to 350F.
Mix flour, sugar, and cocoa powder. Add eggs and milk. Bake for 30 minutes."""

result3 = scorer.compute_composite_score(irrelevant_output, input_text)
print("\n--- Example 3: Irrelevant output ---")
for k, v in result3.items():
    print(f"  {k}: {v}")

7. Incident Response

Even with comprehensive monitoring, incidents will happen. The difference between a 5-minute resolution and a 2-hour outage is having pre-written runbooks. For OpenAI-powered applications, the most common incidents are rate limit exhaustion, elevated latency, quality degradation, and unexpected cost spikes. Each needs a specific diagnosis and remediation playbook.

Runbook: Common Failure Modes

IncidentSymptomsDiagnosisImmediate ActionEscalation
Rate Limit Exhaustion429 rate >5%, queue depth growingCheck RPM/TPM usage vs. quotaReduce concurrency, enable backoffRequest quota increase from OpenAI
Elevated Latencyp95 >15s, timeouts increasingCheck OpenAI status page, prompt lengthReduce max_tokens, route to miniSwitch to fallback provider
Quality DegradationQuality score <0.5, user complaintsCompare outputs to baseline, check model versionPin model version, revert prompt changesEvaluate alternative models
Cost SpikeHourly cost >1.5× budgetCheck token counts per request, feature attributionRoute to nano model, disable non-critical featuresImplement hard spending cap
API Outage5xx rate >50%, circuit breaker openCheck status.openai.comServe cached responses, queue requestsActivate DR plan, notify users
Incident Response Principles: (1) Detect within 1 minute (automated alerts); (2) Acknowledge within 5 minutes (on-call rotation); (3) Mitigate within 15 minutes (pre-written runbooks); (4) Resolve within 1 hour (root cause fix); (5) Post-mortem within 24 hours (prevent recurrence). The goal is mitigation speed, not perfection — a degraded response is always better than no response.
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional


class IncidentSeverity(Enum):
    SEV1 = "sev1"  # Total outage, all users affected
    SEV2 = "sev2"  # Major degradation, many users affected
    SEV3 = "sev3"  # Minor degradation, some users affected
    SEV4 = "sev4"  # Low impact, cosmetic or edge case


class IncidentStatus(Enum):
    DETECTED = "detected"
    ACKNOWLEDGED = "acknowledged"
    MITIGATING = "mitigating"
    RESOLVED = "resolved"


@dataclass
class Runbook:
    """Pre-defined incident response playbook."""

    name: str
    trigger_condition: str
    diagnosis_steps: list
    mitigation_actions: list
    escalation_path: list
    expected_resolution_min: int


@dataclass
class IncidentManager:
    """Automated incident detection, runbook execution, and escalation.

    Monitors system metrics and triggers appropriate runbooks when
    thresholds are crossed. Tracks incident lifecycle from detection
    through resolution with timing SLAs.
    """

    runbooks: dict = field(default_factory=dict)
    active_incidents: list = field(default_factory=list)

    def register_runbook(self, trigger: str, runbook: Runbook):
        """Register a runbook for a specific failure trigger."""
        self.runbooks[trigger] = runbook

    def detect_incident(self, trigger: str, metrics: dict,
                        severity: IncidentSeverity = IncidentSeverity.SEV3) -> dict:
        """Detect an incident and execute the matching runbook."""
        runbook = self.runbooks.get(trigger)
        if not runbook:
            return {"status": "no_runbook", "trigger": trigger}

        incident = {
            "id": f"INC-{int(time.time()) % 100000:05d}",
            "trigger": trigger,
            "severity": severity.value,
            "status": IncidentStatus.DETECTED.value,
            "detected_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
            "metrics_snapshot": metrics,
            "runbook": runbook.name,
            "diagnosis_steps": runbook.diagnosis_steps,
            "mitigation_actions": runbook.mitigation_actions,
            "escalation_path": runbook.escalation_path,
            "expected_resolution_min": runbook.expected_resolution_min,
        }
        self.active_incidents.append(incident)
        return incident

    def get_status_page(self) -> dict:
        """Generate a status page summary."""
        return {
            "overall_status": "degraded" if self.active_incidents else "operational",
            "active_incidents": len(self.active_incidents),
            "incidents": [
                {
                    "id": inc["id"],
                    "severity": inc["severity"],
                    "trigger": inc["trigger"],
                    "status": inc["status"],
                    "runbook": inc["runbook"],
                }
                for inc in self.active_incidents
            ],
        }


# Define runbooks for common OpenAI failure modes
manager = IncidentManager()

manager.register_runbook("rate_limit_exhaustion", Runbook(
    name="Rate Limit Exhaustion",
    trigger_condition="429 error rate > 5% for 2+ minutes",
    diagnosis_steps=[
        "1. Check current RPM/TPM usage in OpenAI dashboard",
        "2. Identify which feature is consuming the most quota",
        "3. Check for request amplification (retry storms)",
        "4. Verify rate limiter configuration matches current quota",
    ],
    mitigation_actions=[
        "A. Reduce max concurrency by 50%",
        "B. Enable exponential backoff with jitter",
        "C. Route non-critical requests to batch queue",
        "D. Temporarily disable lowest-priority features",
    ],
    escalation_path=[
        "L1: On-call engineer — apply mitigation actions",
        "L2: Platform team — request emergency quota increase",
        "L3: Account manager — negotiate higher tier",
    ],
    expected_resolution_min=15,
))

manager.register_runbook("quality_degradation", Runbook(
    name="Quality Degradation",
    trigger_condition="Composite quality score < 0.5 for 10+ minutes",
    diagnosis_steps=[
        "1. Compare recent outputs against golden set examples",
        "2. Check if OpenAI deployed a model update (changelog)",
        "3. Review recent prompt template changes in git log",
        "4. Check input data quality (garbage in = garbage out)",
    ],
    mitigation_actions=[
        "A. Pin to a specific model snapshot (dated version)",
        "B. Revert last prompt template change",
        "C. Enable output validation with retry on low score",
        "D. Route critical paths to a known-good model version",
    ],
    escalation_path=[
        "L1: ML engineer — diagnose prompt/model issue",
        "L2: Product team — assess user impact, communicate",
        "L3: Leadership — decide on feature flag rollback",
    ],
    expected_resolution_min=30,
))

manager.register_runbook("cost_spike", Runbook(
    name="Unexpected Cost Spike",
    trigger_condition="Hourly cost > 1.5x daily budget / 24",
    diagnosis_steps=[
        "1. Check cost attribution by feature — which spiked?",
        "2. Look for increased token counts per request",
        "3. Check for traffic spike (legitimate or attack)",
        "4. Verify no infinite loops or retry amplification",
    ],
    mitigation_actions=[
        "A. Route all traffic to cheapest viable model (nano/mini)",
        "B. Enable response length caps (max_tokens reduction)",
        "C. Activate request deduplication / caching",
        "D. Rate-limit the offending feature specifically",
    ],
    escalation_path=[
        "L1: On-call engineer — immediate cost reduction",
        "L2: Finance + Engineering — assess damage, set hard cap",
        "L3: Leadership — approve emergency spending or shutdown",
    ],
    expected_resolution_min=10,
))

# Simulate incident detection
print("=== Incident Response Simulation ===\n")

# Detect a rate limit incident
incident = manager.detect_incident(
    trigger="rate_limit_exhaustion",
    metrics={"429_rate": 0.08, "queue_depth": 342, "rpm_used": 480, "rpm_limit": 500},
    severity=IncidentSeverity.SEV2,
)

print(f"--- Incident Detected: {incident['id']} ---")
print(f"  Severity: {incident['severity']}")
print(f"  Trigger: {incident['trigger']}")
print(f"  Detected: {incident['detected_at']}")
print(f"  Runbook: {incident['runbook']}")
print(f"  Expected Resolution: {incident['expected_resolution_min']} min")

print(f"\n  Diagnosis Steps:")
for step in incident["diagnosis_steps"]:
    print(f"    {step}")

print(f"\n  Mitigation Actions:")
for action in incident["mitigation_actions"]:
    print(f"    {action}")

print(f"\n  Escalation Path:")
for level in incident["escalation_path"]:
    print(f"    {level}")

# Status page
print(f"\n--- Status Page ---")
status = manager.get_status_page()
print(f"  Overall: {status['overall_status']}")
print(f"  Active Incidents: {status['active_incidents']}")

Alerting Integration Patterns

Alert Routing Rules: Not all alerts are equal. Route SEV1/SEV2 to PagerDuty (pages on-call immediately). Route SEV3 to Slack #incidents channel (acknowledged within 15 min). Route SEV4 to Jira backlog (addressed next sprint). Include in every alert: (1) What broke (metric + current value); (2) Since when (first breach timestamp); (3) Impact (users/features affected); (4) Runbook link (one-click access to remediation steps). Alert fatigue is real — tune thresholds to fire fewer than 5 alerts per on-call shift.
import time
from dataclasses import dataclass, field
from typing import Callable, Optional
from enum import Enum


class AlertChannel(Enum):
    PAGERDUTY = "pagerduty"    # SEV1/SEV2: immediate page
    SLACK = "slack"            # SEV3: team notification
    EMAIL = "email"            # SEV4: async notification
    JIRA = "jira"              # Low priority: ticket creation


@dataclass
class AlertRule:
    """Defines when and how to send an alert."""

    name: str
    metric: str
    condition: str              # e.g., "> 0.05" or "< 0.5"
    window_seconds: int         # Evaluation window
    channel: AlertChannel
    severity: str
    runbook_url: str
    cooldown_seconds: int = 300  # Don't re-alert within this period
    last_fired: float = field(default=0.0, init=False)

    def should_fire(self, current_value: float) -> bool:
        """Evaluate if the alert condition is met."""
        # Parse condition
        if self.condition.startswith(">"):
            threshold = float(self.condition[1:].strip())
            return current_value > threshold
        elif self.condition.startswith("<"):
            threshold = float(self.condition[1:].strip())
            return current_value < threshold
        return False

    def can_fire(self) -> bool:
        """Check if cooldown period has elapsed."""
        return (time.time() - self.last_fired) > self.cooldown_seconds


@dataclass
class AlertRouter:
    """Routes alerts to appropriate channels based on severity and type.

    Implements deduplication, cooldown periods, and escalation logic.
    """

    rules: list = field(default_factory=list)
    fired_alerts: list = field(default_factory=list)

    def add_rule(self, rule: AlertRule):
        """Register an alert rule."""
        self.rules.append(rule)

    def evaluate(self, metrics: dict) -> list:
        """Evaluate all rules against current metrics and fire alerts."""
        fired = []
        for rule in self.rules:
            value = metrics.get(rule.metric)
            if value is None:
                continue

            if rule.should_fire(value) and rule.can_fire():
                alert = {
                    "rule": rule.name,
                    "metric": rule.metric,
                    "current_value": value,
                    "condition": rule.condition,
                    "channel": rule.channel.value,
                    "severity": rule.severity,
                    "runbook": rule.runbook_url,
                    "fired_at": time.strftime("%H:%M:%S"),
                    "message": f"[{rule.severity}] {rule.name}: {rule.metric}={value} ({rule.condition})",
                }
                rule.last_fired = time.time()
                fired.append(alert)
                self.fired_alerts.append(alert)

        return fired


# Configure alert rules for OpenAI observability
router = AlertRouter()

router.add_rule(AlertRule(
    name="High Error Rate",
    metric="error_rate",
    condition="> 0.05",
    window_seconds=120,
    channel=AlertChannel.PAGERDUTY,
    severity="SEV2",
    runbook_url="https://runbooks.internal/openai/high-error-rate",
))

router.add_rule(AlertRule(
    name="Elevated Latency",
    metric="p95_latency_s",
    condition="> 10.0",
    window_seconds=300,
    channel=AlertChannel.SLACK,
    severity="SEV3",
    runbook_url="https://runbooks.internal/openai/high-latency",
))

router.add_rule(AlertRule(
    name="Budget Warning",
    metric="budget_used_pct",
    condition="> 0.80",
    window_seconds=600,
    channel=AlertChannel.SLACK,
    severity="SEV3",
    runbook_url="https://runbooks.internal/openai/cost-spike",
))

router.add_rule(AlertRule(
    name="Quality Degradation",
    metric="quality_score",
    condition="< 0.6",
    window_seconds=600,
    channel=AlertChannel.PAGERDUTY,
    severity="SEV2",
    runbook_url="https://runbooks.internal/openai/quality-drop",
))

router.add_rule(AlertRule(
    name="Rate Limit Pressure",
    metric="rate_limit_429_pct",
    condition="> 0.03",
    window_seconds=120,
    channel=AlertChannel.SLACK,
    severity="SEV3",
    runbook_url="https://runbooks.internal/openai/rate-limits",
))

# Simulate metrics evaluation
current_metrics = {
    "error_rate": 0.07,          # 7% — above threshold!
    "p95_latency_s": 12.3,      # 12.3s — above threshold!
    "budget_used_pct": 0.65,    # 65% — OK
    "quality_score": 0.72,      # 0.72 — OK
    "rate_limit_429_pct": 0.04, # 4% — above threshold!
}

print("=== Alert Evaluation ===\n")
print("Current Metrics:")
for k, v in current_metrics.items():
    print(f"  {k}: {v}")

print("\n--- Fired Alerts ---")
alerts = router.evaluate(current_metrics)
if alerts:
    for alert in alerts:
        print(f"\n  {alert['message']}")
        print(f"    Channel: {alert['channel']}")
        print(f"    Runbook: {alert['runbook']}")
        print(f"    Fired at: {alert['fired_at']}")
else:
    print("  No alerts fired — all metrics within thresholds")

print(f"\n--- Summary ---")
print(f"  Rules evaluated: {len(router.rules)}")
print(f"  Alerts fired: {len(alerts)}")
print(f"  Channels notified: {set(a['channel'] for a in alerts)}")

Conclusion

Key Takeaways: Observability for OpenAI applications requires a multi-layered approach: (1) Leverage built-in dashboard traces for quick debugging with user_id and metadata tagging; (2) Implement structured JSON logging with correlation IDs for cross-service request tracing; (3) Track the four golden signals — p50/p95 latency, error rate, token throughput, and cost rate — with tiered alert thresholds; (4) Use OpenTelemetry with GenAI semantic conventions for distributed tracing across microservices; (5) Build cost attribution by feature/team/model with budget alerts at 80% and 95%; (6) Monitor output quality automatically with format compliance, hallucination scoring, and drift detection; (7) Pre-write incident runbooks for rate limits, latency spikes, quality degradation, and cost overruns. Start with structured logging and cost monitoring (immediate value, low effort), then layer in distributed tracing and quality scoring as your system matures.

Next in the Series

In Part 17: Enterprise & Compliance, we’ll cover SOC 2 compliance for AI applications, data residency controls, audit logging, role-based access to models, enterprise SSO integration, data processing agreements, and governance frameworks for responsible AI deployment in regulated industries.