flowchart TD
A[OpenAI API Call] --> B[Instrumentation Layer]
B --> C[Structured Logs]
B --> D[Metrics Collector]
B --> E[Trace Spans]
B --> F[Cost Events]
C --> G[Log Aggregator]
D --> H[Time-Series DB]
E --> I[Trace Backend]
F --> J[Cost Dashboard]
G --> K[Search & Analysis]
H --> L[Dashboards & Alerts]
I --> M[Request Waterfall]
J --> N[Budget Alerts]
K --> O[Incident Detection]
L --> O
M --> O
N --> O
O --> P[Runbook Automation]
P --> Q[PagerDuty / Slack]
1. OpenAI Dashboard Traces
OpenAI provides built-in trace visualization in the API dashboard at platform.openai.com. Every API request is logged with metadata including model, token counts, latency, status, and any user-provided identifiers. Before building custom observability, understand what’s available out of the box — the dashboard traces are often sufficient for debugging individual requests and understanding usage patterns.
Trace Metadata & Filtering
The OpenAI dashboard supports filtering traces by user (set via the user parameter), model name, status code, and time range. By consistently passing structured user identifiers, you can trace any issue back to a specific user session. The metadata field on requests enables custom key-value pairs for even richer filtering.
| Filter | Parameter | Example | Use Case |
|---|---|---|---|
| User ID | user | "user_abc123" | Debug specific user issues |
| Model | Auto-captured | "gpt-4.1" | Compare model performance |
| Status | Auto-captured | 200, 429, 500 | Find errors, rate limits |
| Time Range | Dashboard filter | Last 24 hours | Incident correlation |
| Metadata | metadata | {"feature": "chat"} | Feature-level attribution |
import time
import hashlib
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class TraceableRequest:
"""Build OpenAI API requests with full trace metadata for dashboard filtering.
Attaches user_id, session_id, and custom metadata to every request,
enabling filtering and debugging in the OpenAI dashboard.
"""
user_id: str
session_id: str
feature: str = "default"
environment: str = "production"
def build_request_params(
self,
model: str,
messages: list,
extra_metadata: Optional[dict] = None,
) -> dict:
"""Construct request parameters with observability metadata."""
# Generate a stable user hash for privacy (don't send raw user IDs)
user_hash = hashlib.sha256(
f"{self.user_id}:{self.environment}".encode()
).hexdigest()[:16]
params = {
"model": model,
"messages": messages,
"user": f"user_{user_hash}",
"metadata": {
"session_id": self.session_id,
"feature": self.feature,
"environment": self.environment,
"request_time": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
**(extra_metadata or {}),
},
}
return params
def build_session_context(self) -> dict:
"""Return session context for trace correlation across requests."""
return {
"user_id": self.user_id,
"session_id": self.session_id,
"feature": self.feature,
"environment": self.environment,
}
# Usage demonstration
tracer = TraceableRequest(
user_id="customer_42",
session_id="sess_abc123def456",
feature="document_summarizer",
environment="production",
)
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Summarize this quarterly report."},
]
params = tracer.build_request_params(
model="gpt-4.1",
messages=messages,
extra_metadata={"document_type": "financial_report", "page_count": 12},
)
print("=== Traceable Request Parameters ===")
for key, value in params.items():
if key == "messages":
print(f" {key}: [{len(value)} messages]")
elif key == "metadata":
print(f" {key}:")
for mk, mv in value.items():
print(f" {mk}: {mv}")
else:
print(f" {key}: {value}")
print(f"\n=== Session Context ===")
ctx = tracer.build_session_context()
for k, v in ctx.items():
print(f" {k}: {v}")
2. Custom Logging
The OpenAI dashboard shows individual requests, but production systems need aggregated views: average latency over time, token usage trends, error rate patterns, and cost accumulation. Structured logging with consistent schemas enables powerful querying in log aggregators like Elasticsearch, Datadog, or CloudWatch. Every API call should emit a structured log event capturing timing, tokens, cost, and outcome.
Structured Log Schema
A well-designed log schema enables ad-hoc querying without schema migrations. The key principle: log everything you might want to filter or aggregate on, but keep the schema flat for easy indexing. Nested objects are acceptable for metadata, but the primary dimensions (model, latency, tokens, status) should be top-level fields.
import json
import time
import logging
import uuid
from dataclasses import dataclass, field, asdict
from typing import Optional, Any
# Configure structured JSON logging
class JSONFormatter(logging.Formatter):
"""Format log records as single-line JSON for log aggregators."""
def format(self, record: logging.LogRecord) -> str:
log_data = {
"timestamp": self.formatTime(record),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}
# Merge any extra structured data
if hasattr(record, "structured_data"):
log_data.update(record.structured_data)
return json.dumps(log_data, default=str)
# Set up logger with JSON output
logger = logging.getLogger("openai_observability")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.handlers = [handler]
@dataclass
class LLMCallMetrics:
"""Structured metrics for a single OpenAI API call."""
request_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
correlation_id: str = ""
model: str = ""
feature: str = ""
status: str = "success" # success | error | timeout | rate_limited
# Timing
latency_ms: float = 0.0
time_to_first_token_ms: Optional[float] = None
# Token usage
input_tokens: int = 0
output_tokens: int = 0
total_tokens: int = 0
# Cost (estimated)
estimated_cost_usd: float = 0.0
# Error details
error_type: Optional[str] = None
error_message: Optional[str] = None
retry_count: int = 0
def to_log_record(self) -> dict:
"""Convert to flat dictionary for structured logging."""
data = asdict(self)
data["total_tokens"] = self.input_tokens + self.output_tokens
return {k: v for k, v in data.items() if v is not None and v != ""}
def log_llm_call(metrics: LLMCallMetrics):
"""Emit a structured log event for an LLM API call."""
record = logging.LogRecord(
name="openai_observability",
level=logging.INFO if metrics.status == "success" else logging.ERROR,
pathname="",
lineno=0,
msg=f"llm_call_{metrics.status}",
args=(),
exc_info=None,
)
record.structured_data = metrics.to_log_record()
logger.handle(record)
# Simulate logging several API calls
print("=== Structured LLM Call Logs ===\n")
# Successful call
metrics_success = LLMCallMetrics(
correlation_id="corr_user42_req1",
model="gpt-4.1",
feature="chat",
status="success",
latency_ms=847.3,
time_to_first_token_ms=234.1,
input_tokens=1250,
output_tokens=380,
estimated_cost_usd=0.0043,
)
log_llm_call(metrics_success)
# Rate-limited call with retry
metrics_retry = LLMCallMetrics(
correlation_id="corr_user42_req2",
model="gpt-4.1-mini",
feature="summarizer",
status="rate_limited",
latency_ms=2340.0,
input_tokens=4500,
output_tokens=0,
retry_count=2,
error_type="RateLimitError",
error_message="Rate limit exceeded. Retry after 3s.",
)
log_llm_call(metrics_retry)
# Timeout
metrics_timeout = LLMCallMetrics(
correlation_id="corr_user99_req1",
model="gpt-4.1",
feature="code_review",
status="timeout",
latency_ms=30000.0,
input_tokens=8200,
output_tokens=0,
error_type="TimeoutError",
error_message="Request timed out after 30s",
)
log_llm_call(metrics_timeout)
print("\n=== Queryable Fields ===")
print("Filter examples for log aggregator:")
print(' status:error AND model:"gpt-4.1"')
print(" latency_ms:>2000 AND feature:chat")
print(" retry_count:>0")
print(' error_type:"RateLimitError"')
3. Metrics & Alerting
Logs tell you what happened; metrics tell you how the system is behaving right now. For OpenAI applications, the four golden signals are: latency (p50/p95/p99), error rate (percentage of non-200 responses), token throughput (tokens/minute consumed vs. quota), and cost rate (USD/hour). Each metric needs baseline values, alert thresholds, and escalation rules.
Key Metrics & Thresholds
| Metric | Calculation | Warning Threshold | Critical Threshold | Action |
|---|---|---|---|---|
| p50 Latency | Median response time | >2s (GPT-4.1) | >5s | Check model load, reduce prompt |
| p95 Latency | 95th percentile | >8s | >15s | Scale horizontally, add timeout |
| Error Rate | Non-200 / Total % | >2% | >5% | Check quotas, circuit breaker |
| 429 Rate | Rate limits / Total % | >1% | >5% | Reduce concurrency, backoff |
| Token Throughput | Tokens/min consumed | >80% quota | >95% quota | Request quota increase |
| Cost Rate | USD/hour rolling | >budget × 1.2 | >budget × 1.5 | Route to cheaper models |
| Quality Score | Avg output quality | <0.7 | <0.5 | Check prompt drift, model changes |
import time
import statistics
from dataclasses import dataclass, field
from collections import deque
from enum import Enum
class AlertSeverity(Enum):
OK = "ok"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class MetricsCollector:
"""Collects and computes observability metrics for OpenAI API calls.
Maintains sliding windows for latency percentiles, error rates,
token throughput, and cost tracking with configurable alert thresholds.
"""
window_seconds: int = 300 # 5-minute sliding window
# Sliding window storage
latencies: deque = field(default_factory=lambda: deque(maxlen=10000))
errors: deque = field(default_factory=lambda: deque(maxlen=10000))
tokens: deque = field(default_factory=lambda: deque(maxlen=10000))
costs: deque = field(default_factory=lambda: deque(maxlen=10000))
# Alert thresholds
p95_latency_warn: float = 8.0 # seconds
p95_latency_crit: float = 15.0
error_rate_warn: float = 0.02 # 2%
error_rate_crit: float = 0.05 # 5%
cost_hourly_warn: float = 50.0 # USD/hour
cost_hourly_crit: float = 100.0
# Counters
total_requests: int = field(default=0, init=False)
total_errors: int = field(default=0, init=False)
def record_call(self, latency_s: float, tokens_used: int,
cost_usd: float, is_error: bool = False):
"""Record metrics from a single API call."""
now = time.time()
self.latencies.append((now, latency_s))
self.tokens.append((now, tokens_used))
self.costs.append((now, cost_usd))
self.errors.append((now, 1 if is_error else 0))
self.total_requests += 1
if is_error:
self.total_errors += 1
def _window_values(self, data: deque) -> list:
"""Get values within the current time window."""
cutoff = time.time() - self.window_seconds
return [v for t, v in data if t > cutoff]
def get_latency_percentiles(self) -> dict:
"""Calculate p50, p95, p99 latency from the sliding window."""
values = self._window_values(self.latencies)
if not values:
return {"p50": 0, "p95": 0, "p99": 0, "count": 0}
sorted_vals = sorted(values)
n = len(sorted_vals)
return {
"p50": sorted_vals[int(n * 0.50)],
"p95": sorted_vals[int(n * 0.95)] if n > 20 else sorted_vals[-1],
"p99": sorted_vals[int(n * 0.99)] if n > 100 else sorted_vals[-1],
"count": n,
}
def get_error_rate(self) -> float:
"""Calculate error rate within the sliding window."""
values = self._window_values(self.errors)
if not values:
return 0.0
return sum(values) / len(values)
def get_token_throughput(self) -> float:
"""Calculate tokens per minute within the sliding window."""
values = self._window_values(self.tokens)
if not values:
return 0.0
window_minutes = self.window_seconds / 60.0
return sum(values) / window_minutes
def get_cost_rate(self) -> float:
"""Calculate USD per hour within the sliding window."""
values = self._window_values(self.costs)
if not values:
return 0.0
window_hours = self.window_seconds / 3600.0
return sum(values) / window_hours
def evaluate_alerts(self) -> list:
"""Evaluate all metrics against alert thresholds."""
alerts = []
percentiles = self.get_latency_percentiles()
error_rate = self.get_error_rate()
cost_rate = self.get_cost_rate()
# Latency alerts
p95 = percentiles["p95"]
if p95 > self.p95_latency_crit:
alerts.append(("p95_latency", AlertSeverity.CRITICAL,
f"p95={p95:.1f}s > {self.p95_latency_crit}s"))
elif p95 > self.p95_latency_warn:
alerts.append(("p95_latency", AlertSeverity.WARNING,
f"p95={p95:.1f}s > {self.p95_latency_warn}s"))
# Error rate alerts
if error_rate > self.error_rate_crit:
alerts.append(("error_rate", AlertSeverity.CRITICAL,
f"error_rate={error_rate:.1%} > {self.error_rate_crit:.1%}"))
elif error_rate > self.error_rate_warn:
alerts.append(("error_rate", AlertSeverity.WARNING,
f"error_rate={error_rate:.1%} > {self.error_rate_warn:.1%}"))
# Cost alerts
if cost_rate > self.cost_hourly_crit:
alerts.append(("cost_rate", AlertSeverity.CRITICAL,
f"cost=${cost_rate:.2f}/hr > ${self.cost_hourly_crit}/hr"))
elif cost_rate > self.cost_hourly_warn:
alerts.append(("cost_rate", AlertSeverity.WARNING,
f"cost=${cost_rate:.2f}/hr > ${self.cost_hourly_warn}/hr"))
return alerts
def get_dashboard(self) -> dict:
"""Generate a complete metrics dashboard snapshot."""
percentiles = self.get_latency_percentiles()
return {
"latency": percentiles,
"error_rate": f"{self.get_error_rate():.2%}",
"token_throughput_per_min": round(self.get_token_throughput()),
"cost_per_hour_usd": f"${self.get_cost_rate():.2f}",
"total_requests": self.total_requests,
"total_errors": self.total_errors,
"active_alerts": [(name, sev.value, msg)
for name, sev, msg in self.evaluate_alerts()],
}
# Usage demonstration — simulate production traffic
import random
random.seed(42)
collector = MetricsCollector(window_seconds=300)
print("=== Simulating 200 API Calls ===\n")
for i in range(200):
# Simulate realistic latency distribution (log-normal)
latency = random.lognormvariate(0.5, 0.8) # median ~1.6s
tokens = random.randint(500, 8000)
cost = tokens * 0.000015 # ~$15/1M tokens
is_error = random.random() < 0.03 # 3% error rate
collector.record_call(latency, tokens, cost, is_error)
dashboard = collector.get_dashboard()
print("--- Metrics Dashboard ---")
print(f" Latency p50: {dashboard['latency']['p50']:.2f}s")
print(f" Latency p95: {dashboard['latency']['p95']:.2f}s")
print(f" Latency p99: {dashboard['latency']['p99']:.2f}s")
print(f" Error Rate: {dashboard['error_rate']}")
print(f" Token Throughput: {dashboard['token_throughput_per_min']} tokens/min")
print(f" Cost Rate: {dashboard['cost_per_hour_usd']}/hour")
print(f" Total Requests: {dashboard['total_requests']}")
print(f" Total Errors: {dashboard['total_errors']}")
print(f"\n--- Active Alerts ---")
if dashboard["active_alerts"]:
for name, severity, message in dashboard["active_alerts"]:
print(f" [{severity.upper()}] {name}: {message}")
else:
print(" No active alerts — all metrics within thresholds")
4. Distributed Tracing
In microservice architectures, a single user request may traverse multiple services before reaching the OpenAI API. Distributed tracing with OpenTelemetry creates a unified view of the entire request lifecycle — from the initial HTTP request through your business logic, prompt construction, API call, response parsing, and back to the user. Each step is a “span” with timing, attributes, and parent-child relationships.
OpenTelemetry Span Attributes
The OpenTelemetry Semantic Conventions for GenAI define standard attribute names for LLM spans. By following these conventions, your traces are compatible with any OpenTelemetry-compatible backend (Jaeger, Tempo, Honeycomb, Datadog) without custom configuration.
gen_ai.system = "openai", gen_ai.request.model = model name, gen_ai.response.model = actual model used, gen_ai.usage.input_tokens, gen_ai.usage.output_tokens, gen_ai.response.finish_reasons = ["stop"], server.address = "api.openai.com". These attributes enable cross-vendor querying and standardized dashboards regardless of your tracing backend.
import time
import uuid
from dataclasses import dataclass, field
from typing import Optional
from contextlib import contextmanager
@dataclass
class Span:
"""Represents a single span in a distributed trace (OpenTelemetry-compatible)."""
name: str
trace_id: str
span_id: str = field(default_factory=lambda: uuid.uuid4().hex[:16])
parent_span_id: Optional[str] = None
start_time: float = field(default_factory=time.time)
end_time: Optional[float] = None
attributes: dict = field(default_factory=dict)
status: str = "OK"
def set_attribute(self, key: str, value):
"""Set a span attribute following OpenTelemetry semantic conventions."""
self.attributes[key] = value
def end(self):
"""Mark the span as complete."""
self.end_time = time.time()
@property
def duration_ms(self) -> float:
if self.end_time:
return (self.end_time - self.start_time) * 1000
return (time.time() - self.start_time) * 1000
@dataclass
class LLMTracer:
"""Distributed tracer for OpenAI API calls with OpenTelemetry-compatible spans.
Creates hierarchical spans following GenAI semantic conventions:
- Root span: User request (HTTP handler)
- Child span: Prompt construction
- Child span: OpenAI API call (with LLM-specific attributes)
- Child span: Response processing
"""
service_name: str = "ai-service"
spans: list = field(default_factory=list)
_current_trace_id: str = field(default_factory=lambda: uuid.uuid4().hex[:32])
@contextmanager
def start_span(self, name: str, parent: Optional[Span] = None):
"""Context manager to create and auto-close a span."""
span = Span(
name=name,
trace_id=self._current_trace_id,
parent_span_id=parent.span_id if parent else None,
)
span.set_attribute("service.name", self.service_name)
self.spans.append(span)
try:
yield span
except Exception as e:
span.status = "ERROR"
span.set_attribute("error.type", type(e).__name__)
span.set_attribute("error.message", str(e))
raise
finally:
span.end()
def create_llm_span(self, parent: Span, model: str,
input_tokens: int, output_tokens: int,
latency_ms: float, finish_reason: str = "stop") -> Span:
"""Create an LLM-specific span with GenAI semantic convention attributes."""
span = Span(
name="openai.chat.completions",
trace_id=self._current_trace_id,
parent_span_id=parent.span_id,
)
# GenAI Semantic Conventions (OpenTelemetry)
span.set_attribute("gen_ai.system", "openai")
span.set_attribute("gen_ai.request.model", model)
span.set_attribute("gen_ai.response.model", model)
span.set_attribute("gen_ai.usage.input_tokens", input_tokens)
span.set_attribute("gen_ai.usage.output_tokens", output_tokens)
span.set_attribute("gen_ai.response.finish_reasons", [finish_reason])
span.set_attribute("server.address", "api.openai.com")
span.set_attribute("server.port", 443)
# Custom attributes for deeper observability
span.set_attribute("llm.latency_ms", latency_ms)
span.set_attribute("llm.cost_usd", (input_tokens * 0.01 + output_tokens * 0.03) / 1000)
span.end_time = span.start_time + (latency_ms / 1000)
self.spans.append(span)
return span
def get_trace_summary(self) -> dict:
"""Generate a human-readable trace summary."""
total_duration = 0
if self.spans:
start = min(s.start_time for s in self.spans)
end = max(s.end_time or s.start_time for s in self.spans)
total_duration = (end - start) * 1000
return {
"trace_id": self._current_trace_id,
"total_spans": len(self.spans),
"total_duration_ms": round(total_duration, 1),
"spans": [
{
"name": s.name,
"duration_ms": round(s.duration_ms, 1),
"parent": s.parent_span_id[:8] if s.parent_span_id else None,
"status": s.status,
"llm_model": s.attributes.get("gen_ai.request.model"),
}
for s in self.spans
],
}
# Usage demonstration — trace a complete user request
tracer = LLMTracer(service_name="document-summarizer")
print("=== Distributed Trace Example ===\n")
with tracer.start_span("handle_request") as root_span:
root_span.set_attribute("http.method", "POST")
root_span.set_attribute("http.route", "/api/summarize")
root_span.set_attribute("user.id", "user_42")
# Span: Prompt construction
with tracer.start_span("build_prompt", parent=root_span) as prompt_span:
prompt_span.set_attribute("prompt.template", "summarize_v2")
prompt_span.set_attribute("prompt.input_length", 4200)
time.sleep(0.005) # Simulate prompt building
# Span: OpenAI API call
llm_span = tracer.create_llm_span(
parent=root_span,
model="gpt-4.1",
input_tokens=1850,
output_tokens=420,
latency_ms=1234.5,
finish_reason="stop",
)
# Span: Response processing
with tracer.start_span("process_response", parent=root_span) as proc_span:
proc_span.set_attribute("response.format", "markdown")
proc_span.set_attribute("response.word_count", 312)
time.sleep(0.002) # Simulate processing
# Print trace summary
summary = tracer.get_trace_summary()
print(f"Trace ID: {summary['trace_id']}")
print(f"Total Spans: {summary['total_spans']}")
print(f"Total Duration: {summary['total_duration_ms']}ms\n")
print("--- Span Waterfall ---")
for span in summary["spans"]:
indent = " " if span["parent"] else ""
model_info = f" [{span['llm_model']}]" if span["llm_model"] else ""
print(f"{indent}{span['name']}{model_info} — {span['duration_ms']}ms [{span['status']}]")
5. Cost Monitoring
OpenAI costs can escalate rapidly in production — a single prompt engineering change that increases output length by 2× doubles your cost overnight. Real-time cost monitoring with budget alerts prevents bill shock. The key is attributing costs to specific features, teams, and users so you can optimize the most expensive call paths first.
Cost Attribution Model
Every API call should be tagged with a cost center (feature, team, or user). This enables drill-down from “we spent $5,000 yesterday” to “the document summarizer for Team Alpha consumed 60% of budget due to a prompt regression.” Without attribution, cost optimization is guesswork.
| Model | Input Cost ($/1M tokens) | Output Cost ($/1M tokens) | Typical Use Case |
|---|---|---|---|
| GPT-4.1 | $2.00 | $8.00 | Complex reasoning, code generation |
| GPT-4.1-mini | $0.40 | $1.60 | Chat, summarization, classification |
| GPT-4.1-nano | $0.10 | $0.40 | Simple extraction, routing, tagging |
| o3 | $2.00 | $8.00 | Multi-step reasoning, math, planning |
| o4-mini | $1.10 | $4.40 | Balanced reasoning tasks |
import time
from dataclasses import dataclass, field
from collections import defaultdict
from typing import Optional
# Pricing per 1M tokens (as of May 2026)
MODEL_PRICING = {
"gpt-4.1": {"input": 2.00, "output": 8.00},
"gpt-4.1-mini": {"input": 0.40, "output": 1.60},
"gpt-4.1-nano": {"input": 0.10, "output": 0.40},
"o3": {"input": 2.00, "output": 8.00},
"o4-mini": {"input": 1.10, "output": 4.40},
}
@dataclass
class CostMonitor:
"""Real-time cost monitoring with budget alerts and attribution.
Tracks costs by feature, team, and model with configurable
budget thresholds and alert escalation.
"""
daily_budget_usd: float = 500.0
alert_warn_pct: float = 0.80 # Alert at 80% of budget
alert_crit_pct: float = 0.95 # Critical at 95% of budget
# Cost accumulators
costs_by_feature: dict = field(default_factory=lambda: defaultdict(float))
costs_by_team: dict = field(default_factory=lambda: defaultdict(float))
costs_by_model: dict = field(default_factory=lambda: defaultdict(float))
total_cost_today: float = field(default=0.0, init=False)
call_count: int = field(default=0, init=False)
def record_usage(self, model: str, input_tokens: int, output_tokens: int,
feature: str = "unknown", team: str = "unknown"):
"""Record token usage and calculate cost."""
pricing = MODEL_PRICING.get(model, {"input": 2.00, "output": 8.00})
input_cost = (input_tokens / 1_000_000) * pricing["input"]
output_cost = (output_tokens / 1_000_000) * pricing["output"]
total_cost = input_cost + output_cost
self.costs_by_feature[feature] += total_cost
self.costs_by_team[team] += total_cost
self.costs_by_model[model] += total_cost
self.total_cost_today += total_cost
self.call_count += 1
# Check budget alerts
alerts = self._check_budget_alerts()
return {"cost_usd": total_cost, "alerts": alerts}
def _check_budget_alerts(self) -> list:
"""Check if spending has crossed alert thresholds."""
alerts = []
pct_used = self.total_cost_today / self.daily_budget_usd
if pct_used >= self.alert_crit_pct:
alerts.append({
"severity": "CRITICAL",
"message": f"Daily budget {pct_used:.0%} consumed (${self.total_cost_today:.2f}/${self.daily_budget_usd})",
"action": "Throttle non-critical requests, route to cheaper models",
})
elif pct_used >= self.alert_warn_pct:
alerts.append({
"severity": "WARNING",
"message": f"Daily budget {pct_used:.0%} consumed (${self.total_cost_today:.2f}/${self.daily_budget_usd})",
"action": "Monitor closely, prepare model downgrade",
})
return alerts
def get_cost_report(self) -> dict:
"""Generate a comprehensive cost attribution report."""
return {
"total_cost_today": f"${self.total_cost_today:.4f}",
"budget_remaining": f"${self.daily_budget_usd - self.total_cost_today:.4f}",
"budget_used_pct": f"{(self.total_cost_today / self.daily_budget_usd):.1%}",
"total_calls": self.call_count,
"avg_cost_per_call": f"${self.total_cost_today / max(self.call_count, 1):.6f}",
"by_feature": {k: f"${v:.4f}" for k, v in
sorted(self.costs_by_feature.items(), key=lambda x: -x[1])},
"by_team": {k: f"${v:.4f}" for k, v in
sorted(self.costs_by_team.items(), key=lambda x: -x[1])},
"by_model": {k: f"${v:.4f}" for k, v in
sorted(self.costs_by_model.items(), key=lambda x: -x[1])},
}
# Usage demonstration — simulate a day of production traffic
import random
random.seed(42)
monitor = CostMonitor(daily_budget_usd=100.0)
# Simulate varied usage across features and teams
usage_patterns = [
("chat", "platform", "gpt-4.1-mini", (800, 2000), (200, 800)),
("summarizer", "content", "gpt-4.1", (3000, 8000), (500, 2000)),
("classifier", "ml-team", "gpt-4.1-nano", (200, 500), (10, 50)),
("code_review", "engineering", "gpt-4.1", (5000, 15000), (1000, 4000)),
("search", "platform", "gpt-4.1-mini", (500, 1500), (100, 400)),
]
print("=== Cost Monitoring Simulation (500 calls) ===\n")
for _ in range(500):
feature, team, model, input_range, output_range = random.choice(usage_patterns)
input_tokens = random.randint(*input_range)
output_tokens = random.randint(*output_range)
result = monitor.record_usage(model, input_tokens, output_tokens, feature, team)
# Print report
report = monitor.get_cost_report()
print("--- Daily Cost Report ---")
print(f" Total Cost: {report['total_cost_today']}")
print(f" Budget Used: {report['budget_used_pct']}")
print(f" Budget Remaining: {report['budget_remaining']}")
print(f" Total Calls: {report['total_calls']}")
print(f" Avg Cost/Call: {report['avg_cost_per_call']}")
print(f"\n--- Cost by Feature ---")
for feature, cost in report["by_feature"].items():
print(f" {feature:>15}: {cost}")
print(f"\n--- Cost by Team ---")
for team, cost in report["by_team"].items():
print(f" {team:>15}: {cost}")
print(f"\n--- Cost by Model ---")
for model, cost in report["by_model"].items():
print(f" {model:>15}: {cost}")
6. Quality Monitoring
Monitoring latency and errors tells you the system is running; quality monitoring tells you it’s running correctly. LLM outputs can degrade silently — hallucination rates increase, responses drift from expected formats, or model updates change behavior. Quality monitoring detects these regressions before users complain, using automated scoring, user feedback loops, and drift detection.
Quality Dimensions
import time
import re
import statistics
from dataclasses import dataclass, field
from collections import deque
from typing import Optional
@dataclass
class QualityScorer:
"""Automated quality scoring for LLM outputs.
Implements multiple quality dimensions:
- Format compliance (JSON validity, length bounds, required fields)
- Hallucination detection (claim density, hedging language)
- Relevance scoring (keyword overlap with input)
- Consistency tracking (output stability for similar inputs)
"""
# Sliding window for drift detection
scores: deque = field(default_factory=lambda: deque(maxlen=1000))
baseline_mean: Optional[float] = None
baseline_std: Optional[float] = None
def score_format_compliance(self, output: str, expected_format: str = "text") -> float:
"""Score whether output matches expected format constraints."""
score = 1.0
if expected_format == "json":
try:
import json
json.loads(output)
except (json.JSONDecodeError, ValueError):
score = 0.0
elif expected_format == "markdown":
has_headers = bool(re.search(r'^#{1,6}\s', output, re.MULTILINE))
has_structure = bool(re.search(r'(\*|-|\d+\.)\s', output))
score = 0.5 * has_headers + 0.5 * has_structure
# Length sanity check (too short or too long is suspicious)
word_count = len(output.split())
if word_count < 10:
score *= 0.5 # Suspiciously short
elif word_count > 5000:
score *= 0.8 # Possibly over-verbose
return round(score, 3)
def score_hallucination_risk(self, output: str) -> float:
"""Estimate hallucination risk based on linguistic markers.
Higher score = lower hallucination risk (better quality).
"""
# Hedging language indicates uncertainty awareness (good)
hedging_markers = [
"may", "might", "could", "possibly", "likely", "approximately",
"it appears", "based on", "according to", "generally",
]
hedging_count = sum(1 for m in hedging_markers if m in output.lower())
# Absolute claims without citations (risky)
absolute_markers = [
"always", "never", "definitely", "certainly", "all",
"everyone knows", "it is a fact", "undoubtedly",
]
absolute_count = sum(1 for m in absolute_markers if m in output.lower())
# Specific numbers/dates without context (risky if unverifiable)
specific_claims = len(re.findall(r'\b\d{4}\b|\b\d+%\b|\$[\d,]+', output))
# Score: more hedging = better, more absolutes = worse
sentences = max(len(re.split(r'[.!?]+', output)), 1)
hedging_density = hedging_count / sentences
absolute_density = absolute_count / sentences
claim_density = specific_claims / sentences
score = 1.0
score -= absolute_density * 0.3 # Penalize absolute claims
score -= claim_density * 0.1 # Mild penalty for unverifiable specifics
score += hedging_density * 0.1 # Reward appropriate hedging
return round(max(0.0, min(1.0, score)), 3)
def score_relevance(self, output: str, input_text: str) -> float:
"""Score output relevance to the input using keyword overlap."""
# Extract meaningful words (>3 chars, not stopwords)
stopwords = {"the", "and", "for", "that", "this", "with", "from", "have", "been"}
input_words = {w.lower() for w in re.findall(r'\b\w{4,}\b', input_text)} - stopwords
output_words = {w.lower() for w in re.findall(r'\b\w{4,}\b', output)} - stopwords
if not input_words:
return 0.5 # Can't assess relevance without input context
# What fraction of input concepts appear in output?
overlap = input_words & output_words
recall = len(overlap) / len(input_words)
return round(min(1.0, recall * 1.5), 3) # Scale up, cap at 1.0
def compute_composite_score(self, output: str, input_text: str,
expected_format: str = "text") -> dict:
"""Compute all quality dimensions and a composite score."""
format_score = self.score_format_compliance(output, expected_format)
hallucination_score = self.score_hallucination_risk(output)
relevance_score = self.score_relevance(output, input_text)
# Weighted composite
composite = (
format_score * 0.3 +
hallucination_score * 0.4 +
relevance_score * 0.3
)
# Track for drift detection
self.scores.append((time.time(), composite))
return {
"composite": round(composite, 3),
"format_compliance": format_score,
"hallucination_risk": hallucination_score,
"relevance": relevance_score,
"drift_detected": self._check_drift(composite),
}
def _check_drift(self, current_score: float) -> bool:
"""Detect quality drift using statistical process control."""
if len(self.scores) < 50:
return False # Not enough data
recent_scores = [s for _, s in list(self.scores)[-50:]]
# Establish baseline from first 80% of data
if self.baseline_mean is None:
baseline_data = [s for _, s in list(self.scores)[:int(len(self.scores) * 0.8)]]
if len(baseline_data) >= 30:
self.baseline_mean = statistics.mean(baseline_data)
self.baseline_std = statistics.stdev(baseline_data)
if self.baseline_mean is None or self.baseline_std is None:
return False
# Alert if recent mean is >2 std deviations below baseline
recent_mean = statistics.mean(recent_scores)
threshold = self.baseline_mean - (2 * self.baseline_std)
return recent_mean < threshold
def set_baseline(self, scores: list):
"""Manually set quality baseline from historical data."""
self.baseline_mean = statistics.mean(scores)
self.baseline_std = statistics.stdev(scores)
# Usage demonstration
scorer = QualityScorer()
scorer.set_baseline([0.78, 0.82, 0.80, 0.75, 0.83, 0.79, 0.81, 0.77, 0.84, 0.80])
print("=== Quality Scoring Examples ===\n")
# Example 1: Good quality output
good_output = """Based on the quarterly financial report, revenue grew approximately 15% year-over-year,
reaching $2.3 billion. The growth was primarily driven by the cloud services division,
which may have benefited from increased enterprise adoption. Operating margins improved
slightly, though this could be partially attributed to one-time cost reductions."""
input_text = "Summarize the quarterly financial report focusing on revenue growth and margins"
result = scorer.compute_composite_score(good_output, input_text)
print("--- Example 1: Well-structured summary ---")
for k, v in result.items():
print(f" {k}: {v}")
# Example 2: Poor quality (hallucination markers)
poor_output = """The company definitely achieved 47.3% growth in Q3 2026, making it
the fastest growing company in history. Everyone knows this was caused by their
revolutionary AI product that always outperforms all competitors. The CEO certainly
confirmed they will never face any challenges."""
result2 = scorer.compute_composite_score(poor_output, input_text)
print("\n--- Example 2: High hallucination risk ---")
for k, v in result2.items():
print(f" {k}: {v}")
# Example 3: Irrelevant output
irrelevant_output = """Here is a recipe for chocolate cake. Preheat oven to 350F.
Mix flour, sugar, and cocoa powder. Add eggs and milk. Bake for 30 minutes."""
result3 = scorer.compute_composite_score(irrelevant_output, input_text)
print("\n--- Example 3: Irrelevant output ---")
for k, v in result3.items():
print(f" {k}: {v}")
7. Incident Response
Even with comprehensive monitoring, incidents will happen. The difference between a 5-minute resolution and a 2-hour outage is having pre-written runbooks. For OpenAI-powered applications, the most common incidents are rate limit exhaustion, elevated latency, quality degradation, and unexpected cost spikes. Each needs a specific diagnosis and remediation playbook.
Runbook: Common Failure Modes
| Incident | Symptoms | Diagnosis | Immediate Action | Escalation |
|---|---|---|---|---|
| Rate Limit Exhaustion | 429 rate >5%, queue depth growing | Check RPM/TPM usage vs. quota | Reduce concurrency, enable backoff | Request quota increase from OpenAI |
| Elevated Latency | p95 >15s, timeouts increasing | Check OpenAI status page, prompt length | Reduce max_tokens, route to mini | Switch to fallback provider |
| Quality Degradation | Quality score <0.5, user complaints | Compare outputs to baseline, check model version | Pin model version, revert prompt changes | Evaluate alternative models |
| Cost Spike | Hourly cost >1.5× budget | Check token counts per request, feature attribution | Route to nano model, disable non-critical features | Implement hard spending cap |
| API Outage | 5xx rate >50%, circuit breaker open | Check status.openai.com | Serve cached responses, queue requests | Activate DR plan, notify users |
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
class IncidentSeverity(Enum):
SEV1 = "sev1" # Total outage, all users affected
SEV2 = "sev2" # Major degradation, many users affected
SEV3 = "sev3" # Minor degradation, some users affected
SEV4 = "sev4" # Low impact, cosmetic or edge case
class IncidentStatus(Enum):
DETECTED = "detected"
ACKNOWLEDGED = "acknowledged"
MITIGATING = "mitigating"
RESOLVED = "resolved"
@dataclass
class Runbook:
"""Pre-defined incident response playbook."""
name: str
trigger_condition: str
diagnosis_steps: list
mitigation_actions: list
escalation_path: list
expected_resolution_min: int
@dataclass
class IncidentManager:
"""Automated incident detection, runbook execution, and escalation.
Monitors system metrics and triggers appropriate runbooks when
thresholds are crossed. Tracks incident lifecycle from detection
through resolution with timing SLAs.
"""
runbooks: dict = field(default_factory=dict)
active_incidents: list = field(default_factory=list)
def register_runbook(self, trigger: str, runbook: Runbook):
"""Register a runbook for a specific failure trigger."""
self.runbooks[trigger] = runbook
def detect_incident(self, trigger: str, metrics: dict,
severity: IncidentSeverity = IncidentSeverity.SEV3) -> dict:
"""Detect an incident and execute the matching runbook."""
runbook = self.runbooks.get(trigger)
if not runbook:
return {"status": "no_runbook", "trigger": trigger}
incident = {
"id": f"INC-{int(time.time()) % 100000:05d}",
"trigger": trigger,
"severity": severity.value,
"status": IncidentStatus.DETECTED.value,
"detected_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"metrics_snapshot": metrics,
"runbook": runbook.name,
"diagnosis_steps": runbook.diagnosis_steps,
"mitigation_actions": runbook.mitigation_actions,
"escalation_path": runbook.escalation_path,
"expected_resolution_min": runbook.expected_resolution_min,
}
self.active_incidents.append(incident)
return incident
def get_status_page(self) -> dict:
"""Generate a status page summary."""
return {
"overall_status": "degraded" if self.active_incidents else "operational",
"active_incidents": len(self.active_incidents),
"incidents": [
{
"id": inc["id"],
"severity": inc["severity"],
"trigger": inc["trigger"],
"status": inc["status"],
"runbook": inc["runbook"],
}
for inc in self.active_incidents
],
}
# Define runbooks for common OpenAI failure modes
manager = IncidentManager()
manager.register_runbook("rate_limit_exhaustion", Runbook(
name="Rate Limit Exhaustion",
trigger_condition="429 error rate > 5% for 2+ minutes",
diagnosis_steps=[
"1. Check current RPM/TPM usage in OpenAI dashboard",
"2. Identify which feature is consuming the most quota",
"3. Check for request amplification (retry storms)",
"4. Verify rate limiter configuration matches current quota",
],
mitigation_actions=[
"A. Reduce max concurrency by 50%",
"B. Enable exponential backoff with jitter",
"C. Route non-critical requests to batch queue",
"D. Temporarily disable lowest-priority features",
],
escalation_path=[
"L1: On-call engineer — apply mitigation actions",
"L2: Platform team — request emergency quota increase",
"L3: Account manager — negotiate higher tier",
],
expected_resolution_min=15,
))
manager.register_runbook("quality_degradation", Runbook(
name="Quality Degradation",
trigger_condition="Composite quality score < 0.5 for 10+ minutes",
diagnosis_steps=[
"1. Compare recent outputs against golden set examples",
"2. Check if OpenAI deployed a model update (changelog)",
"3. Review recent prompt template changes in git log",
"4. Check input data quality (garbage in = garbage out)",
],
mitigation_actions=[
"A. Pin to a specific model snapshot (dated version)",
"B. Revert last prompt template change",
"C. Enable output validation with retry on low score",
"D. Route critical paths to a known-good model version",
],
escalation_path=[
"L1: ML engineer — diagnose prompt/model issue",
"L2: Product team — assess user impact, communicate",
"L3: Leadership — decide on feature flag rollback",
],
expected_resolution_min=30,
))
manager.register_runbook("cost_spike", Runbook(
name="Unexpected Cost Spike",
trigger_condition="Hourly cost > 1.5x daily budget / 24",
diagnosis_steps=[
"1. Check cost attribution by feature — which spiked?",
"2. Look for increased token counts per request",
"3. Check for traffic spike (legitimate or attack)",
"4. Verify no infinite loops or retry amplification",
],
mitigation_actions=[
"A. Route all traffic to cheapest viable model (nano/mini)",
"B. Enable response length caps (max_tokens reduction)",
"C. Activate request deduplication / caching",
"D. Rate-limit the offending feature specifically",
],
escalation_path=[
"L1: On-call engineer — immediate cost reduction",
"L2: Finance + Engineering — assess damage, set hard cap",
"L3: Leadership — approve emergency spending or shutdown",
],
expected_resolution_min=10,
))
# Simulate incident detection
print("=== Incident Response Simulation ===\n")
# Detect a rate limit incident
incident = manager.detect_incident(
trigger="rate_limit_exhaustion",
metrics={"429_rate": 0.08, "queue_depth": 342, "rpm_used": 480, "rpm_limit": 500},
severity=IncidentSeverity.SEV2,
)
print(f"--- Incident Detected: {incident['id']} ---")
print(f" Severity: {incident['severity']}")
print(f" Trigger: {incident['trigger']}")
print(f" Detected: {incident['detected_at']}")
print(f" Runbook: {incident['runbook']}")
print(f" Expected Resolution: {incident['expected_resolution_min']} min")
print(f"\n Diagnosis Steps:")
for step in incident["diagnosis_steps"]:
print(f" {step}")
print(f"\n Mitigation Actions:")
for action in incident["mitigation_actions"]:
print(f" {action}")
print(f"\n Escalation Path:")
for level in incident["escalation_path"]:
print(f" {level}")
# Status page
print(f"\n--- Status Page ---")
status = manager.get_status_page()
print(f" Overall: {status['overall_status']}")
print(f" Active Incidents: {status['active_incidents']}")
Alerting Integration Patterns
import time
from dataclasses import dataclass, field
from typing import Callable, Optional
from enum import Enum
class AlertChannel(Enum):
PAGERDUTY = "pagerduty" # SEV1/SEV2: immediate page
SLACK = "slack" # SEV3: team notification
EMAIL = "email" # SEV4: async notification
JIRA = "jira" # Low priority: ticket creation
@dataclass
class AlertRule:
"""Defines when and how to send an alert."""
name: str
metric: str
condition: str # e.g., "> 0.05" or "< 0.5"
window_seconds: int # Evaluation window
channel: AlertChannel
severity: str
runbook_url: str
cooldown_seconds: int = 300 # Don't re-alert within this period
last_fired: float = field(default=0.0, init=False)
def should_fire(self, current_value: float) -> bool:
"""Evaluate if the alert condition is met."""
# Parse condition
if self.condition.startswith(">"):
threshold = float(self.condition[1:].strip())
return current_value > threshold
elif self.condition.startswith("<"):
threshold = float(self.condition[1:].strip())
return current_value < threshold
return False
def can_fire(self) -> bool:
"""Check if cooldown period has elapsed."""
return (time.time() - self.last_fired) > self.cooldown_seconds
@dataclass
class AlertRouter:
"""Routes alerts to appropriate channels based on severity and type.
Implements deduplication, cooldown periods, and escalation logic.
"""
rules: list = field(default_factory=list)
fired_alerts: list = field(default_factory=list)
def add_rule(self, rule: AlertRule):
"""Register an alert rule."""
self.rules.append(rule)
def evaluate(self, metrics: dict) -> list:
"""Evaluate all rules against current metrics and fire alerts."""
fired = []
for rule in self.rules:
value = metrics.get(rule.metric)
if value is None:
continue
if rule.should_fire(value) and rule.can_fire():
alert = {
"rule": rule.name,
"metric": rule.metric,
"current_value": value,
"condition": rule.condition,
"channel": rule.channel.value,
"severity": rule.severity,
"runbook": rule.runbook_url,
"fired_at": time.strftime("%H:%M:%S"),
"message": f"[{rule.severity}] {rule.name}: {rule.metric}={value} ({rule.condition})",
}
rule.last_fired = time.time()
fired.append(alert)
self.fired_alerts.append(alert)
return fired
# Configure alert rules for OpenAI observability
router = AlertRouter()
router.add_rule(AlertRule(
name="High Error Rate",
metric="error_rate",
condition="> 0.05",
window_seconds=120,
channel=AlertChannel.PAGERDUTY,
severity="SEV2",
runbook_url="https://runbooks.internal/openai/high-error-rate",
))
router.add_rule(AlertRule(
name="Elevated Latency",
metric="p95_latency_s",
condition="> 10.0",
window_seconds=300,
channel=AlertChannel.SLACK,
severity="SEV3",
runbook_url="https://runbooks.internal/openai/high-latency",
))
router.add_rule(AlertRule(
name="Budget Warning",
metric="budget_used_pct",
condition="> 0.80",
window_seconds=600,
channel=AlertChannel.SLACK,
severity="SEV3",
runbook_url="https://runbooks.internal/openai/cost-spike",
))
router.add_rule(AlertRule(
name="Quality Degradation",
metric="quality_score",
condition="< 0.6",
window_seconds=600,
channel=AlertChannel.PAGERDUTY,
severity="SEV2",
runbook_url="https://runbooks.internal/openai/quality-drop",
))
router.add_rule(AlertRule(
name="Rate Limit Pressure",
metric="rate_limit_429_pct",
condition="> 0.03",
window_seconds=120,
channel=AlertChannel.SLACK,
severity="SEV3",
runbook_url="https://runbooks.internal/openai/rate-limits",
))
# Simulate metrics evaluation
current_metrics = {
"error_rate": 0.07, # 7% — above threshold!
"p95_latency_s": 12.3, # 12.3s — above threshold!
"budget_used_pct": 0.65, # 65% — OK
"quality_score": 0.72, # 0.72 — OK
"rate_limit_429_pct": 0.04, # 4% — above threshold!
}
print("=== Alert Evaluation ===\n")
print("Current Metrics:")
for k, v in current_metrics.items():
print(f" {k}: {v}")
print("\n--- Fired Alerts ---")
alerts = router.evaluate(current_metrics)
if alerts:
for alert in alerts:
print(f"\n {alert['message']}")
print(f" Channel: {alert['channel']}")
print(f" Runbook: {alert['runbook']}")
print(f" Fired at: {alert['fired_at']}")
else:
print(" No alerts fired — all metrics within thresholds")
print(f"\n--- Summary ---")
print(f" Rules evaluated: {len(router.rules)}")
print(f" Alerts fired: {len(alerts)}")
print(f" Channels notified: {set(a['channel'] for a in alerts)}")
Conclusion
Next in the Series
In Part 17: Enterprise & Compliance, we’ll cover SOC 2 compliance for AI applications, data residency controls, audit logging, role-based access to models, enterprise SSO integration, data processing agreements, and governance frameworks for responsible AI deployment in regulated industries.