flowchart TD
T[Migration Trigger] --> A{What Changed?}
A -->|API Deprecation| B[API Migration]
A -->|New Model Release| C[Model Migration]
A -->|SDK Breaking Change| D[SDK Upgrade]
B --> B1[Map Parameters]
B1 --> B2[Update Response Parsing]
B2 --> B3[Run Eval Suite]
C --> C1[Pin Current Snapshot]
C1 --> C2[Run Evals on New Model]
C2 --> C3{Regression?}
C3 -->|Yes| C4[Adjust Prompts]
C4 --> C2
C3 -->|No| C5[Canary Deploy]
D --> D1[Check Breaking Changes]
D1 --> D2[Update Client Code]
D2 --> D3[Run Integration Tests]
B3 --> E{Pass Threshold?}
C5 --> E
D3 --> E
E -->|Yes| F[Ship to Production]
E -->|No| G[Rollback & Investigate]
G --> A
1. Chat Completions → Responses API Migration
The Responses API is OpenAI’s next-generation interface, replacing Chat Completions as the primary way to interact with models. While Chat Completions remains available, new features (built-in tools, web search, file search, computer use) are Responses-only. Migration requires understanding the exact parameter mapping, response object differences, and feature parity gaps.
Parameter Mapping Reference
| Chat Completions | Responses API | Notes |
|---|---|---|
messages=[{"role":"system","content":"..."}] | instructions="..." | System prompt becomes top-level parameter |
messages=[{"role":"user","content":"..."}] | input="..." | Simple string or list of content items |
response.choices[0].message.content | response.output_text | Convenience accessor for text output |
max_tokens | max_output_tokens | Renamed for clarity |
response_format={"type":"json_object"} | text={"format":{"type":"json_schema","schema":{...}}} | Structured outputs with schema enforcement |
stream=True + for chunk in stream: | stream=True + event types | SSE events with typed deltas |
tools=[...] + tool_choice | tools=[...] (built-in: web_search, file_search, code_interpreter) | Responses adds built-in tools |
n=3 (multiple completions) | Not supported | Call multiple times or use Batch API |
logprobs=True | Not yet available | Use Chat Completions if logprobs needed |
stop=["END"] | Not directly supported | Use instructions to control stopping |
Chat Completions → Responses Converter
import os
import json
from dataclasses import dataclass, field
from typing import Any, Optional
from openai import OpenAI
@dataclass
class MigrationResult:
"""Result of converting a Chat Completions call to Responses API."""
original_params: dict
converted_params: dict
warnings: list = field(default_factory=list)
unsupported_features: list = field(default_factory=list)
class ChatCompletionsToResponsesMigrator:
"""Converts Chat Completions API calls to Responses API format.
Handles the exact parameter mapping:
- messages (system/user/assistant) → instructions + input
- max_tokens → max_output_tokens
- response_format → text.format
- stream chunk iteration → event-based streaming
"""
def __init__(self):
self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-demo-key"))
def convert_messages_to_input(self, messages: list[dict]) -> dict:
"""Convert Chat Completions messages array to Responses API parameters.
Mapping:
- {"role": "system", "content": "..."} → instructions="..."
- {"role": "user", "content": "..."} → input="..."
- Multi-turn conversations → input as list of items
"""
instructions = None
input_items = []
for msg in messages:
role = msg.get("role")
content = msg.get("content", "")
if role == "system":
# System messages become the instructions parameter
if instructions:
instructions += "\n\n" + content
else:
instructions = content
elif role == "user":
input_items.append({"role": "user", "content": content})
elif role == "assistant":
input_items.append({"role": "assistant", "content": content})
# Simplify: if single user message, use string input
if len(input_items) == 1 and input_items[0]["role"] == "user":
input_value = input_items[0]["content"]
else:
input_value = input_items
return {"instructions": instructions, "input": input_value}
def convert_response_format(self, response_format: dict) -> dict:
"""Convert response_format to Responses API text.format.
Chat Completions: response_format={"type": "json_object"}
Responses API: text={"format": {"type": "json_schema", "schema": {...}}}
"""
fmt_type = response_format.get("type")
if fmt_type == "json_object":
# Basic JSON mode — Responses API requires a schema
return {
"format": {
"type": "json_schema",
"name": "response",
"strict": True,
"schema": {
"type": "object",
"properties": {
"result": {"type": "string"}
},
"required": ["result"],
"additionalProperties": False,
},
}
}
elif fmt_type == "json_schema":
# Already has schema — pass through with name field
schema = response_format.get("json_schema", {})
return {
"format": {
"type": "json_schema",
"name": schema.get("name", "response"),
"strict": schema.get("strict", True),
"schema": schema.get("schema", {}),
}
}
elif fmt_type == "text":
return {"format": {"type": "text"}}
return {}
def convert_params(self, chat_params: dict) -> MigrationResult:
"""Convert full Chat Completions parameters to Responses API format."""
warnings = []
unsupported = []
converted = {}
# Model mapping
converted["model"] = chat_params.get("model", "gpt-4o")
# Messages → instructions + input
messages = chat_params.get("messages", [])
msg_result = self.convert_messages_to_input(messages)
if msg_result["instructions"]:
converted["instructions"] = msg_result["instructions"]
converted["input"] = msg_result["input"]
# max_tokens → max_output_tokens
if "max_tokens" in chat_params:
converted["max_output_tokens"] = chat_params["max_tokens"]
# temperature (same parameter name)
if "temperature" in chat_params:
converted["temperature"] = chat_params["temperature"]
# top_p (same parameter name)
if "top_p" in chat_params:
converted["top_p"] = chat_params["top_p"]
# response_format → text
if "response_format" in chat_params:
text_config = self.convert_response_format(chat_params["response_format"])
if text_config:
converted["text"] = text_config
# tools (function calling — similar but not identical)
if "tools" in chat_params:
converted["tools"] = chat_params["tools"]
warnings.append("Verify tool definitions work with Responses API format")
# stream (same parameter)
if "stream" in chat_params:
converted["stream"] = chat_params["stream"]
warnings.append("Stream event format differs: use event types instead of chunk iteration")
# Unsupported features
if "n" in chat_params and chat_params["n"] > 1:
unsupported.append(f"n={chat_params['n']} — multiple completions not supported, use multiple calls")
if "logprobs" in chat_params:
unsupported.append("logprobs — not available in Responses API")
if "stop" in chat_params:
unsupported.append(f"stop={chat_params['stop']} — use instructions to control stopping")
if "presence_penalty" in chat_params:
warnings.append("presence_penalty — check if supported in current Responses API version")
if "frequency_penalty" in chat_params:
warnings.append("frequency_penalty — check if supported in current Responses API version")
return MigrationResult(
original_params=chat_params,
converted_params=converted,
warnings=warnings,
unsupported_features=unsupported,
)
def show_code_comparison(self, chat_params: dict) -> str:
"""Generate side-by-side code showing old vs new API usage."""
result = self.convert_params(chat_params)
p = result.converted_params
old_code = f"""# === BEFORE: Chat Completions API ===
from openai import OpenAI
client = OpenAI()
response = client.chat.completions.create(
model="{chat_params.get('model', 'gpt-4o')}",
messages={json.dumps(chat_params.get('messages', []), indent=8)},
max_tokens={chat_params.get('max_tokens', 1000)},
temperature={chat_params.get('temperature', 1.0)},
)
answer = response.choices[0].message.content"""
instructions_line = f'\n instructions="{p.get("instructions", "")}",' if p.get("instructions") else ""
new_code = f"""# === AFTER: Responses API ===
from openai import OpenAI
client = OpenAI()
response = client.responses.create(
model="{p.get('model', 'gpt-4o')}",{instructions_line}
input="{p.get('input', '')}",
max_output_tokens={p.get('max_output_tokens', 1000)},
temperature={p.get('temperature', 1.0)},
)
answer = response.output_text"""
return old_code + "\n\n" + new_code
# Demonstration
migrator = ChatCompletionsToResponsesMigrator()
print("=== Chat Completions → Responses API Migration ===\n")
# Example 1: Simple chat completion
simple_params = {
"model": "gpt-4o",
"messages": [
{"role": "system", "content": "You are a helpful coding assistant."},
{"role": "user", "content": "Write a Python function to calculate fibonacci numbers."},
],
"max_tokens": 500,
"temperature": 0.7,
}
result = migrator.convert_params(simple_params)
print("--- Example 1: Simple Completion ---")
print(f" Original: messages={len(simple_params['messages'])}, max_tokens={simple_params['max_tokens']}")
print(f" Converted:")
print(f" instructions: \"{result.converted_params.get('instructions', '')[:60]}...\"")
print(f" input: \"{str(result.converted_params.get('input', ''))[:60]}...\"")
print(f" max_output_tokens: {result.converted_params.get('max_output_tokens')}")
print(f" temperature: {result.converted_params.get('temperature')}")
# Example 2: JSON mode with structured output
json_params = {
"model": "gpt-4o",
"messages": [
{"role": "system", "content": "Extract entities from text. Return JSON."},
{"role": "user", "content": "Apple Inc. CEO Tim Cook announced new products in Cupertino."},
],
"max_tokens": 300,
"response_format": {"type": "json_object"},
"temperature": 0.0,
}
result2 = migrator.convert_params(json_params)
print("\n--- Example 2: JSON Mode Migration ---")
print(f" BEFORE: response_format={{\"type\": \"json_object\"}}")
print(f" AFTER: text={json.dumps(result2.converted_params.get('text', {}), indent=4)[:120]}...")
# Example 3: Features that don't migrate cleanly
complex_params = {
"model": "gpt-4o",
"messages": [{"role": "user", "content": "Generate 3 different taglines."}],
"max_tokens": 200,
"n": 3,
"logprobs": True,
"stop": ["\n\n"],
}
result3 = migrator.convert_params(complex_params)
print("\n--- Example 3: Unsupported Features ---")
for issue in result3.unsupported_features:
print(f" âš {issue}")
# Show code comparison
print("\n--- Code Comparison ---")
print(migrator.show_code_comparison(simple_params))
2. Model Migration Strategy
Upgrading between model generations (GPT-4 → GPT-4.1 → GPT-5) is not a simple find-and-replace. Each model generation has different strengths, different instruction-following behavior, and different failure modes. An eval-driven approach gives you confidence that the new model performs at least as well as the old one on your specific use cases before you ship the change.
Model Migration Approaches
| Strategy | Risk | Speed | When to Use |
|---|---|---|---|
| Snapshot Pinning | Lowest | Slowest | Regulated environments, no tolerance for behavior change |
| Alias Following | Highest | Fastest | Non-critical applications, testing environments |
| Eval-Gated Upgrade | Low | Moderate | Production systems with quality requirements |
| Canary Deploy | Low | Moderate | High-traffic systems, gradual rollout needed |
| Shadow Testing | Lowest | Slow | Mission-critical systems, zero-downtime requirement |
Eval-Driven Model Migration Test Harness
import os
import json
import time
from dataclasses import dataclass, field
from typing import Optional
from openai import OpenAI
@dataclass
class EvalCase:
"""A single evaluation case for model comparison."""
case_id: str
input_text: str
expected_behavior: str
category: str
weight: float = 1.0
@dataclass
class EvalResult:
"""Result of running one eval case against a model."""
case_id: str
model: str
output: str
latency_ms: float
tokens_used: int
score: float # 0.0 to 1.0
passed: bool
@dataclass
class MigrationReport:
"""Comprehensive report comparing two models."""
source_model: str
target_model: str
total_cases: int
source_score: float
target_score: float
regression_cases: list
improvement_cases: list
latency_change_pct: float
cost_change_pct: float
recommendation: str
class ModelMigrationHarness:
"""Eval-driven model migration with regression detection.
Workflow:
1. Define eval cases covering critical behaviors
2. Run all cases against current (source) model
3. Run all cases against new (target) model
4. Compare scores, detect regressions
5. Generate migration report with go/no-go recommendation
"""
def __init__(self):
self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-demo-key"))
self.eval_cases: list[EvalCase] = []
self.results: dict[str, list[EvalResult]] = {}
self.pass_threshold = 0.85 # Minimum score to pass
self.regression_tolerance = 0.05 # Max acceptable regression
def add_eval_cases(self, cases: list[EvalCase]):
"""Register evaluation cases for the migration test."""
self.eval_cases.extend(cases)
def run_eval(self, model: str, case: EvalCase) -> EvalResult:
"""Run a single eval case against a model (simulated)."""
start = time.time()
# Simulated model response (in production, call OpenAI API)
# Different models have different quality characteristics
model_quality = {
"gpt-4-0613": 0.82,
"gpt-4o-2024-08-06": 0.88,
"gpt-4.1-2025-04-14": 0.91,
"gpt-5-2026-03-01": 0.94,
}
base_score = model_quality.get(model, 0.85)
# Add variance per category
category_bonus = {"reasoning": 0.03, "coding": 0.05, "instruction_following": 0.02}.get(case.category, 0.0)
# Newer models get category bonuses
if "gpt-5" in model or "gpt-4.1" in model:
score = min(1.0, base_score + category_bonus + (hash(case.case_id) % 10) / 100)
else:
score = min(1.0, base_score + (hash(case.case_id) % 8) / 100)
latency = 800 + (hash(model + case.case_id) % 400) # Simulated latency
tokens = 150 + (hash(case.input_text) % 200)
return EvalResult(
case_id=case.case_id,
model=model,
output=f"[Simulated {model} output for: {case.input_text[:40]}]",
latency_ms=latency,
tokens_used=tokens,
score=round(score, 3),
passed=score >= self.pass_threshold,
)
def run_migration_eval(self, source_model: str, target_model: str) -> MigrationReport:
"""Run full migration evaluation comparing source and target models."""
source_results = []
target_results = []
for case in self.eval_cases:
source_results.append(self.run_eval(source_model, case))
target_results.append(self.run_eval(target_model, case))
self.results[source_model] = source_results
self.results[target_model] = target_results
# Calculate aggregate scores
source_avg = sum(r.score for r in source_results) / len(source_results)
target_avg = sum(r.score for r in target_results) / len(target_results)
# Identify regressions and improvements
regressions = []
improvements = []
for src, tgt in zip(source_results, target_results):
diff = tgt.score - src.score
if diff < -self.regression_tolerance:
regressions.append({"case_id": src.case_id, "source_score": src.score, "target_score": tgt.score, "delta": round(diff, 3)})
elif diff > self.regression_tolerance:
improvements.append({"case_id": src.case_id, "source_score": src.score, "target_score": tgt.score, "delta": round(diff, 3)})
# Latency comparison
source_latency = sum(r.latency_ms for r in source_results) / len(source_results)
target_latency = sum(r.latency_ms for r in target_results) / len(target_results)
latency_change = ((target_latency - source_latency) / source_latency) * 100
# Cost comparison (simplified: based on tokens)
source_tokens = sum(r.tokens_used for r in source_results)
target_tokens = sum(r.tokens_used for r in target_results)
cost_change = ((target_tokens - source_tokens) / source_tokens) * 100
# Recommendation
if len(regressions) == 0 and target_avg >= source_avg:
recommendation = "APPROVE: No regressions detected, safe to migrate"
elif len(regressions) <= 2 and target_avg > source_avg:
recommendation = "CONDITIONAL: Minor regressions — review specific cases before migrating"
else:
recommendation = f"BLOCK: {len(regressions)} regressions detected — investigate before proceeding"
return MigrationReport(
source_model=source_model,
target_model=target_model,
total_cases=len(self.eval_cases),
source_score=round(source_avg, 3),
target_score=round(target_avg, 3),
regression_cases=regressions,
improvement_cases=improvements,
latency_change_pct=round(latency_change, 1),
cost_change_pct=round(cost_change, 1),
recommendation=recommendation,
)
# Demonstration
harness = ModelMigrationHarness()
# Define eval cases
harness.add_eval_cases([
EvalCase("tc-001", "Explain quantum computing in simple terms", "Clear, accurate explanation", "reasoning"),
EvalCase("tc-002", "Write a binary search in Python with error handling", "Correct, idiomatic code", "coding"),
EvalCase("tc-003", "Summarize this 500-word article in exactly 3 bullet points", "Exactly 3 bullets", "instruction_following"),
EvalCase("tc-004", "Given these financials, calculate the EBITDA margin", "Correct calculation", "reasoning"),
EvalCase("tc-005", "Refactor this function to use async/await", "Working async code", "coding"),
EvalCase("tc-006", "Respond ONLY in JSON format with keys: name, age, city", "Valid JSON, correct keys", "instruction_following"),
EvalCase("tc-007", "Identify logical fallacies in this argument", "Correct identification", "reasoning"),
EvalCase("tc-008", "Write unit tests for this REST endpoint", "Comprehensive test coverage", "coding"),
])
print("=== Eval-Driven Model Migration ===\n")
# Run migration comparison
report = harness.run_migration_eval("gpt-4o-2024-08-06", "gpt-4.1-2025-04-14")
print(f"Source: {report.source_model} (score: {report.source_score})")
print(f"Target: {report.target_model} (score: {report.target_score})")
print(f"Delta: {report.target_score - report.source_score:+.3f}")
print(f"\nCases: {report.total_cases} | Regressions: {len(report.regression_cases)} | Improvements: {len(report.improvement_cases)}")
print(f"Latency change: {report.latency_change_pct:+.1f}%")
print(f"Cost change: {report.cost_change_pct:+.1f}%")
if report.regression_cases:
print("\nâš Regressions:")
for r in report.regression_cases:
print(f" {r['case_id']}: {r['source_score']} → {r['target_score']} ({r['delta']:+.3f})")
if report.improvement_cases:
print("\n✓ Improvements:")
for i in report.improvement_cases:
print(f" {i['case_id']}: {i['source_score']} → {i['target_score']} ({i['delta']:+.3f})")
print(f"\n{'='*50}")
print(f"RECOMMENDATION: {report.recommendation}")
3. SDK Version Upgrades
The OpenAI Python SDK has undergone major breaking changes (v0.x → v1.x introduced a completely new interface). Future major versions will follow. A compatibility layer lets you upgrade incrementally without rewriting your entire codebase at once.
openai.ChatCompletion.create() became client.chat.completions.create(). The global configuration (openai.api_key) became instance-based (OpenAI(api_key=...)). All responses became Pydantic models instead of dicts. Async support moved to AsyncOpenAI. Error classes were restructured. Streaming returns typed chunks instead of raw dicts.
SDK Compatibility Layer
import os
import json
from dataclasses import dataclass, field
from typing import Any, Optional
from openai import OpenAI
@dataclass
class SDKVersionInfo:
"""Information about an SDK version and its interface."""
version: str
interface_style: str # "module-level" (v0.x) or "client-instance" (v1.x+)
has_pydantic_responses: bool
has_async_client: bool
deprecation_date: Optional[str] = None
eol_date: Optional[str] = None
class SDKCompatibilityLayer:
"""Provides a stable interface across SDK version changes.
Abstracts away SDK-specific differences so application code
doesn't need to change when the SDK is upgraded.
Supports:
- v0.x style (module-level, dict responses)
- v1.x style (client instance, Pydantic responses)
- Future v2.x (Responses API as primary)
"""
def __init__(self, api_key: Optional[str] = None):
self.api_key = api_key or os.environ.get("OPENAI_API_KEY", "sk-demo-key")
self._client = None
self._sdk_version = self._detect_sdk_version()
def _detect_sdk_version(self) -> SDKVersionInfo:
"""Detect which SDK version is installed."""
try:
import openai
version = openai.__version__
if version.startswith("0."):
return SDKVersionInfo(
version=version,
interface_style="module-level",
has_pydantic_responses=False,
has_async_client=False,
deprecation_date="2024-01-01",
eol_date="2024-06-01",
)
elif version.startswith("1."):
return SDKVersionInfo(
version=version,
interface_style="client-instance",
has_pydantic_responses=True,
has_async_client=True,
)
else:
# v2.x or later
return SDKVersionInfo(
version=version,
interface_style="client-instance",
has_pydantic_responses=True,
has_async_client=True,
)
except ImportError:
return SDKVersionInfo(
version="unknown",
interface_style="client-instance",
has_pydantic_responses=True,
has_async_client=True,
)
@property
def client(self) -> OpenAI:
"""Lazy-initialize the client."""
if self._client is None:
self._client = OpenAI(api_key=self.api_key)
return self._client
def chat_completion(
self,
model: str,
messages: list[dict],
max_tokens: int = 1000,
temperature: float = 1.0,
**kwargs,
) -> dict:
"""Unified interface for chat completions across SDK versions.
Returns a normalized dict regardless of SDK version:
{
"content": str,
"model": str,
"usage": {"prompt_tokens": int, "completion_tokens": int, "total_tokens": int},
"finish_reason": str,
}
"""
# Simulated response (in production, call actual client)
simulated_content = f"[Response from {model}]: Processed {len(messages)} messages"
prompt_tokens = sum(len(m.get("content", "")) // 4 for m in messages)
completion_tokens = len(simulated_content) // 4
return {
"content": simulated_content,
"model": model,
"usage": {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens,
},
"finish_reason": "stop",
"_sdk_version": self._sdk_version.version,
"_interface": self._sdk_version.interface_style,
}
def responses_create(
self,
model: str,
input: str,
instructions: Optional[str] = None,
max_output_tokens: int = 1000,
temperature: float = 1.0,
**kwargs,
) -> dict:
"""Unified interface for Responses API (v1.x+ only).
Returns normalized dict:
{
"output_text": str,
"model": str,
"usage": {...},
}
"""
if self._sdk_version.interface_style == "module-level":
raise RuntimeError(
f"Responses API requires SDK v1.x+. Current: {self._sdk_version.version}. "
f"Upgrade with: pip install --upgrade openai"
)
simulated_output = f"[Responses API from {model}]: {input[:50]}"
return {
"output_text": simulated_output,
"model": model,
"usage": {
"input_tokens": len(input) // 4,
"output_tokens": len(simulated_output) // 4,
},
}
def get_upgrade_guide(self) -> dict:
"""Get upgrade recommendations for the current SDK version."""
info = self._sdk_version
if info.interface_style == "module-level":
return {
"current_version": info.version,
"status": "DEPRECATED" if info.deprecation_date else "ACTIVE",
"action": "UPGRADE IMMEDIATELY",
"steps": [
"1. pip install --upgrade openai",
"2. Replace openai.ChatCompletion.create() → client.chat.completions.create()",
"3. Replace openai.api_key = '...' → client = OpenAI(api_key='...')",
"4. Update response access: resp['choices'][0] → resp.choices[0].message.content",
"5. Replace openai.error.* → openai.*Error exceptions",
],
}
else:
return {
"current_version": info.version,
"status": "CURRENT",
"action": "Monitor for deprecation notices",
"features_available": [
"Chat Completions API",
"Responses API",
"Async client (AsyncOpenAI)",
"Streaming with typed events",
"Pydantic response models",
],
}
# Demonstration
compat = SDKCompatibilityLayer()
print("=== SDK Compatibility Layer ===\n")
# Check SDK version
info = compat._sdk_version
print(f"Detected SDK: v{info.version}")
print(f" Interface: {info.interface_style}")
print(f" Pydantic responses: {info.has_pydantic_responses}")
print(f" Async support: {info.has_async_client}")
# Use unified interface
print("\n--- Chat Completion (unified) ---")
result = compat.chat_completion(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "What is the capital of France?"},
],
max_tokens=100,
)
print(f" Content: {result['content']}")
print(f" Model: {result['model']}")
print(f" Tokens: {result['usage']['total_tokens']}")
print(f" SDK version used: {result['_sdk_version']}")
# Use Responses API
print("\n--- Responses API (unified) ---")
resp = compat.responses_create(
model="gpt-4o",
input="What is the capital of France?",
instructions="Answer in one word.",
)
print(f" Output: {resp['output_text']}")
# Get upgrade guide
print("\n--- Upgrade Guide ---")
guide = compat.get_upgrade_guide()
print(f" Status: {guide['status']}")
print(f" Action: {guide['action']}")
if "steps" in guide:
for step in guide["steps"]:
print(f" {step}")
elif "features_available" in guide:
for feat in guide["features_available"]:
print(f" ✓ {feat}")
4. Legacy System Integration
Most organizations don’t build greenfield AI systems. They need to integrate OpenAI capabilities behind existing REST APIs, message queues, or RPC interfaces. The adapter pattern provides a clean abstraction: your legacy consumers continue calling the same interface they always have, while the implementation behind it now uses OpenAI. This enables gradual rollout without requiring all clients to change simultaneously.
Adapter Pattern for Legacy APIs
import os
import json
import time
from dataclasses import dataclass, field
from typing import Any, Optional, Callable
from openai import OpenAI
@dataclass
class LegacyRequest:
"""Represents a request in the legacy system's format."""
endpoint: str
method: str
payload: dict
headers: dict = field(default_factory=dict)
request_id: Optional[str] = None
@dataclass
class LegacyResponse:
"""Response in the legacy system's expected format."""
status_code: int
body: dict
headers: dict = field(default_factory=dict)
latency_ms: float = 0.0
class OpenAILegacyAdapter:
"""Wraps OpenAI behind a legacy API interface.
Enables gradual migration by:
1. Accepting requests in the legacy format
2. Translating to OpenAI API calls
3. Returning responses in the legacy format
4. Supporting feature flags for A/B testing (AI vs old logic)
"""
def __init__(self):
self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-demo-key"))
self.feature_flags = {
"use_ai_classification": True,
"use_ai_summarization": True,
"use_ai_extraction": False, # Still using regex for now
}
self.legacy_fallbacks: dict[str, Callable] = {}
self.metrics: list[dict] = []
def register_legacy_fallback(self, endpoint: str, handler: Callable):
"""Register a fallback handler for when AI is disabled or fails."""
self.legacy_fallbacks[endpoint] = handler
def _classify_with_ai(self, text: str, categories: list[str]) -> dict:
"""Use OpenAI for text classification."""
# Simulated AI classification
category = categories[hash(text) % len(categories)]
confidence = 0.85 + (hash(text) % 12) / 100
return {"category": category, "confidence": round(confidence, 3), "method": "ai"}
def _classify_with_rules(self, text: str, categories: list[str]) -> dict:
"""Legacy rule-based classification fallback."""
text_lower = text.lower()
for cat in categories:
if cat.lower() in text_lower:
return {"category": cat, "confidence": 0.6, "method": "rules"}
return {"category": categories[0], "confidence": 0.3, "method": "rules_default"}
def _summarize_with_ai(self, text: str, max_length: int) -> dict:
"""Use OpenAI for summarization."""
summary = text[:max_length] + "..." if len(text) > max_length else text
return {"summary": summary, "original_length": len(text), "method": "ai"}
def _summarize_with_truncation(self, text: str, max_length: int) -> dict:
"""Legacy truncation fallback."""
return {"summary": text[:max_length], "original_length": len(text), "method": "truncation"}
def handle_request(self, request: LegacyRequest) -> LegacyResponse:
"""Process a legacy request, routing to AI or fallback as appropriate."""
start = time.time()
try:
if request.endpoint == "/api/v1/classify":
response_body = self._handle_classification(request.payload)
elif request.endpoint == "/api/v1/summarize":
response_body = self._handle_summarization(request.payload)
elif request.endpoint == "/api/v1/extract":
response_body = self._handle_extraction(request.payload)
else:
response_body = {"error": "Unknown endpoint", "endpoint": request.endpoint}
return LegacyResponse(status_code=404, body=response_body)
latency = (time.time() - start) * 1000
self.metrics.append({
"endpoint": request.endpoint,
"method": response_body.get("method", "unknown"),
"latency_ms": latency,
"success": True,
})
return LegacyResponse(
status_code=200,
body=response_body,
headers={"X-AI-Method": response_body.get("method", "unknown")},
latency_ms=latency,
)
except Exception as e:
# Fallback to legacy handler on AI failure
fallback = self.legacy_fallbacks.get(request.endpoint)
if fallback:
response_body = fallback(request.payload)
response_body["method"] = "fallback"
latency = (time.time() - start) * 1000
self.metrics.append({
"endpoint": request.endpoint, "method": "fallback",
"latency_ms": latency, "success": True, "fallback_reason": str(e),
})
return LegacyResponse(status_code=200, body=response_body, latency_ms=latency)
return LegacyResponse(status_code=500, body={"error": str(e)})
def _handle_classification(self, payload: dict) -> dict:
"""Route classification to AI or rules based on feature flag."""
text = payload.get("text", "")
categories = payload.get("categories", ["general"])
if self.feature_flags.get("use_ai_classification"):
return self._classify_with_ai(text, categories)
else:
return self._classify_with_rules(text, categories)
def _handle_summarization(self, payload: dict) -> dict:
"""Route summarization to AI or truncation."""
text = payload.get("text", "")
max_length = payload.get("max_length", 200)
if self.feature_flags.get("use_ai_summarization"):
return self._summarize_with_ai(text, max_length)
else:
return self._summarize_with_truncation(text, max_length)
def _handle_extraction(self, payload: dict) -> dict:
"""Route extraction to AI or regex."""
text = payload.get("text", "")
# Feature flag OFF — use legacy regex extraction
import re
emails = re.findall(r'[\w.+-]+@[\w-]+\.[\w.-]+', text)
return {"entities": emails, "type": "email", "method": "regex"}
def get_migration_stats(self) -> dict:
"""Show what percentage of traffic is handled by AI vs legacy."""
if not self.metrics:
return {"total": 0}
total = len(self.metrics)
ai_count = sum(1 for m in self.metrics if m["method"] == "ai")
rules_count = sum(1 for m in self.metrics if "rules" in m["method"])
fallback_count = sum(1 for m in self.metrics if m["method"] == "fallback")
return {
"total_requests": total,
"ai_handled": ai_count,
"rules_handled": rules_count,
"fallback_handled": fallback_count,
"ai_percentage": round(ai_count / total * 100, 1),
"avg_latency_ms": round(sum(m["latency_ms"] for m in self.metrics) / total, 1),
}
# Demonstration
adapter = OpenAILegacyAdapter()
print("=== Legacy System Adapter Pattern ===\n")
# Simulate legacy API requests (same format as before AI integration)
requests = [
LegacyRequest("/api/v1/classify", "POST", {
"text": "I can't log into my account and need to reset my password",
"categories": ["billing", "technical", "account", "general"],
}),
LegacyRequest("/api/v1/summarize", "POST", {
"text": "The quarterly earnings report shows a 15% increase in revenue driven primarily by the expansion into Asian markets. Operating margins improved by 3 percentage points due to cost optimization initiatives implemented in Q2.",
"max_length": 80,
}),
LegacyRequest("/api/v1/extract", "POST", {
"text": "Contact us at support@example.com or sales@example.com for more info.",
}),
LegacyRequest("/api/v1/classify", "POST", {
"text": "Why was I charged twice for my subscription?",
"categories": ["billing", "technical", "account", "general"],
}),
]
for req in requests:
response = adapter.handle_request(req)
method = response.headers.get("X-AI-Method", response.body.get("method", "?"))
print(f" {req.endpoint} → [{method}] status={response.status_code} latency={response.latency_ms:.1f}ms")
# Show key result
if "category" in response.body:
print(f" Category: {response.body['category']} (conf: {response.body.get('confidence', '?')})")
elif "summary" in response.body:
print(f" Summary: \"{response.body['summary'][:60]}...\"")
elif "entities" in response.body:
print(f" Extracted: {response.body['entities']}")
print()
# Migration stats
print("--- Migration Statistics ---")
stats = adapter.get_migration_stats()
print(f" Total requests: {stats['total_requests']}")
print(f" AI-handled: {stats['ai_handled']} ({stats['ai_percentage']}%)")
print(f" Rules-handled: {stats['rules_handled']}")
print(f" Avg latency: {stats['avg_latency_ms']}ms")
print(f"\n Feature flags: {json.dumps(adapter.feature_flags, indent=4)}")
5. Multi-Provider Fallback
Production AI systems shouldn’t depend on a single provider. A multi-provider strategy gives you resilience against outages, the ability to optimize cost by routing to cheaper providers for simple tasks, and leverage for vendor negotiations. The key challenge is maintaining a provider-agnostic interface so your application code doesn’t couple to any single provider’s API.
Multi-Provider Client with Failover
import os
import json
import time
import random
from dataclasses import dataclass, field
from typing import Any, Optional
from enum import Enum
from openai import OpenAI
class ProviderStatus(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
DOWN = "down"
@dataclass
class ProviderConfig:
"""Configuration for a single AI provider."""
name: str
provider_type: str # "openai", "azure_openai", "anthropic", etc.
model: str
api_key_env: str
endpoint: Optional[str] = None
priority: int = 1 # Lower = higher priority
max_rpm: int = 500
cost_per_1k_tokens: float = 0.01
status: ProviderStatus = ProviderStatus.HEALTHY
consecutive_failures: int = 0
failure_threshold: int = 3
@dataclass
class ProviderResponse:
"""Normalized response from any provider."""
content: str
provider: str
model: str
latency_ms: float
tokens_used: int
cost_estimate: float
was_fallback: bool = False
class MultiProviderClient:
"""Provider-agnostic AI client with automatic failover.
Features:
- Priority-based routing (primary → secondary → tertiary)
- Automatic failover on errors or timeouts
- Circuit breaker pattern (stop calling failed providers)
- Cost-aware routing for non-critical requests
- Health monitoring and recovery
"""
def __init__(self):
self.providers: list[ProviderConfig] = []
self.request_log: list[dict] = []
self.circuit_breaker_recovery_time = 60 # seconds
def add_provider(self, config: ProviderConfig):
"""Register a provider in priority order."""
self.providers.append(config)
self.providers.sort(key=lambda p: p.priority)
def _is_circuit_open(self, provider: ProviderConfig) -> bool:
"""Check if circuit breaker is open (provider marked as down)."""
return provider.consecutive_failures >= provider.failure_threshold
def _call_provider(self, provider: ProviderConfig, messages: list[dict], **kwargs) -> ProviderResponse:
"""Call a specific provider (simulated)."""
start = time.time()
# Simulate provider behavior
if provider.status == ProviderStatus.DOWN:
raise ConnectionError(f"{provider.name} is down")
if provider.status == ProviderStatus.DEGRADED and random.random() < 0.3:
raise TimeoutError(f"{provider.name} timed out")
# Simulate successful response
content = f"[{provider.name}/{provider.model}] Response to: {messages[-1].get('content', '')[:40]}"
latency = 200 + random.randint(0, 500)
tokens = 100 + random.randint(0, 150)
cost = tokens / 1000 * provider.cost_per_1k_tokens
# Reset failure counter on success
provider.consecutive_failures = 0
return ProviderResponse(
content=content,
provider=provider.name,
model=provider.model,
latency_ms=latency,
tokens_used=tokens,
cost_estimate=round(cost, 5),
)
def complete(
self,
messages: list[dict],
prefer_cost: bool = False,
**kwargs,
) -> ProviderResponse:
"""Send a completion request with automatic failover.
Args:
messages: Chat messages in OpenAI format
prefer_cost: If True, route to cheapest healthy provider
"""
# Sort providers by cost if cost-optimizing, otherwise by priority
if prefer_cost:
candidates = sorted(
[p for p in self.providers if not self._is_circuit_open(p)],
key=lambda p: p.cost_per_1k_tokens,
)
else:
candidates = [p for p in self.providers if not self._is_circuit_open(p)]
if not candidates:
raise RuntimeError("All providers are down — no healthy candidates available")
last_error = None
for i, provider in enumerate(candidates):
try:
response = self._call_provider(provider, messages, **kwargs)
response.was_fallback = (i > 0)
self.request_log.append({
"provider": provider.name,
"success": True,
"was_fallback": response.was_fallback,
"latency_ms": response.latency_ms,
"cost": response.cost_estimate,
})
return response
except (ConnectionError, TimeoutError, Exception) as e:
provider.consecutive_failures += 1
last_error = e
self.request_log.append({
"provider": provider.name,
"success": False,
"error": str(e),
"consecutive_failures": provider.consecutive_failures,
})
# If circuit breaker trips, skip this provider
if self._is_circuit_open(provider):
provider.status = ProviderStatus.DOWN
continue
raise RuntimeError(f"All providers failed. Last error: {last_error}")
def get_health_status(self) -> dict:
"""Get health status of all providers."""
return {
"providers": [
{
"name": p.name,
"model": p.model,
"status": p.status.value,
"failures": p.consecutive_failures,
"circuit": "OPEN" if self._is_circuit_open(p) else "CLOSED",
"cost_per_1k": p.cost_per_1k_tokens,
"priority": p.priority,
}
for p in self.providers
],
"total_requests": len(self.request_log),
"success_rate": round(
sum(1 for r in self.request_log if r["success"]) / max(len(self.request_log), 1) * 100, 1
),
"fallback_rate": round(
sum(1 for r in self.request_log if r.get("was_fallback")) / max(len(self.request_log), 1) * 100, 1
),
}
# Demonstration
client = MultiProviderClient()
# Register providers in priority order
client.add_provider(ProviderConfig(
name="OpenAI",
provider_type="openai",
model="gpt-4o",
api_key_env="OPENAI_API_KEY",
priority=1,
cost_per_1k_tokens=0.005,
))
client.add_provider(ProviderConfig(
name="Azure-OpenAI-EastUS",
provider_type="azure_openai",
model="gpt-4o",
api_key_env="AZURE_OPENAI_KEY",
endpoint="https://myorg-eastus.openai.azure.com",
priority=2,
cost_per_1k_tokens=0.005,
))
client.add_provider(ProviderConfig(
name="Azure-OpenAI-WestEU",
provider_type="azure_openai",
model="gpt-4o",
api_key_env="AZURE_OPENAI_KEY_EU",
endpoint="https://myorg-westeu.openai.azure.com",
priority=3,
cost_per_1k_tokens=0.006,
))
print("=== Multi-Provider Failover Client ===\n")
# Normal operation — primary provider handles requests
print("--- Normal Operation ---")
for i in range(3):
response = client.complete(
messages=[{"role": "user", "content": f"Request {i+1}: Explain quantum computing"}]
)
print(f" Request {i+1}: [{response.provider}] latency={response.latency_ms:.0f}ms cost=${response.cost_estimate:.4f}")
# Simulate primary provider degradation
print("\n--- Simulating Primary Provider Failure ---")
client.providers[0].status = ProviderStatus.DEGRADED
client.providers[0].consecutive_failures = 3 # Trip circuit breaker
for i in range(3):
response = client.complete(
messages=[{"role": "user", "content": f"Failover request {i+1}"}]
)
fallback_indicator = " [FALLBACK]" if response.was_fallback else ""
print(f" Request {i+1}: [{response.provider}]{fallback_indicator} latency={response.latency_ms:.0f}ms")
# Cost-optimized routing
print("\n--- Cost-Optimized Routing ---")
response = client.complete(
messages=[{"role": "user", "content": "Simple FAQ: What are your hours?"}],
prefer_cost=True,
)
print(f" Cheapest available: [{response.provider}] cost=${response.cost_estimate:.4f}/request")
# Health status
print("\n--- Provider Health ---")
health = client.get_health_status()
for p in health["providers"]:
icon = {"healthy": "🟢", "degraded": "🟡", "down": "🔴"}[p["status"]]
print(f" {icon} {p['name']:<22} model={p['model']:<8} circuit={p['circuit']:<6} failures={p['failures']}")
print(f"\n Success rate: {health['success_rate']}% | Fallback rate: {health['fallback_rate']}%")
6. Prompt Migration
Prompts that work well on one model generation may not transfer directly to the next. GPT-4 prompts often need adjustment for GPT-4.1 or GPT-5 because newer models follow instructions more literally, handle ambiguity differently, and may need different temperature settings. A systematic prompt migration process tests each prompt against the new model and adapts as needed.
Prompt Migration Toolkit
import os
import json
from dataclasses import dataclass, field
from typing import Optional
from openai import OpenAI
@dataclass
class PromptVersion:
"""A versioned prompt with its target model and parameters."""
prompt_id: str
version: int
model: str
system_prompt: str
temperature: float
max_tokens: int
notes: str = ""
@dataclass
class PromptMigrationResult:
"""Result of migrating a prompt to a new model."""
prompt_id: str
source_model: str
target_model: str
original_score: float
migrated_score: float
adjustments_made: list
temperature_change: Optional[float] = None
prompt_change: Optional[str] = None
recommendation: str = ""
class PromptMigrator:
"""Systematically migrates prompts between model generations.
Key adjustments when migrating to newer models:
- Temperature: Newer models are often more capable at temp=0; lower temp
- Specificity: Newer models follow instructions more literally; be precise
- Verbosity: Remove hedging language ("try to", "if possible"); just instruct
- Format: Newer models handle structured output better; use explicit formats
- Examples: May need fewer few-shot examples (stronger zero-shot)
"""
def __init__(self):
self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-demo-key"))
self.prompt_registry: dict[str, list[PromptVersion]] = {}
self.migration_results: list[PromptMigrationResult] = []
def register_prompt(self, prompt: PromptVersion):
"""Register a prompt version in the registry."""
if prompt.prompt_id not in self.prompt_registry:
self.prompt_registry[prompt.prompt_id] = []
self.prompt_registry[prompt.prompt_id].append(prompt)
def analyze_prompt_for_migration(self, prompt: PromptVersion) -> list[str]:
"""Analyze a prompt and suggest adjustments for a newer model."""
suggestions = []
text = prompt.system_prompt.lower()
# Check for hedging language
hedges = ["try to", "if possible", "attempt to", "do your best to"]
for hedge in hedges:
if hedge in text:
suggestions.append(f"Remove hedging: '{hedge}' → direct instruction")
# Check temperature
if prompt.temperature > 0.7:
suggestions.append(f"Lower temperature: {prompt.temperature} → 0.5-0.7 (newer models more capable at lower temps)")
# Check for verbose instructions
if len(prompt.system_prompt) > 1000:
suggestions.append("Consider condensing: newer models need less prompting for same quality")
# Check for explicit format instructions
if "json" in text and "schema" not in text:
suggestions.append("Add explicit JSON schema: newer models support structured outputs natively")
# Check for few-shot examples
if text.count("example:") > 3 or text.count("input:") > 3:
suggestions.append("Reduce few-shot examples: newer models have stronger zero-shot performance")
# Check for role reinforcement
if text.count("you are") > 2 or text.count("remember") > 1:
suggestions.append("Reduce role reinforcement: newer models maintain persona better")
if not suggestions:
suggestions.append("Prompt looks clean — test as-is on new model")
return suggestions
def migrate_prompt(self, prompt_id: str, target_model: str) -> PromptMigrationResult:
"""Migrate a prompt to a new model with automatic adjustments."""
versions = self.prompt_registry.get(prompt_id, [])
if not versions:
raise ValueError(f"Prompt {prompt_id} not found in registry")
current = versions[-1] # Latest version
suggestions = self.analyze_prompt_for_migration(current)
# Apply automated adjustments
new_system_prompt = current.system_prompt
new_temperature = current.temperature
adjustments = []
# Auto-adjust temperature for newer models
if "gpt-5" in target_model or "gpt-4.1" in target_model:
if current.temperature > 0.7:
new_temperature = round(current.temperature * 0.7, 2)
adjustments.append(f"temperature: {current.temperature} → {new_temperature}")
# Remove hedging
for hedge in ["try to ", "if possible, ", "attempt to "]:
if hedge in new_system_prompt.lower():
new_system_prompt = new_system_prompt.replace(hedge, "").replace(hedge.capitalize(), "")
adjustments.append(f"Removed hedging: '{hedge.strip()}'")
# Simulate scoring (in production, run actual evals)
original_score = 0.82 + (hash(prompt_id) % 10) / 100
# Newer models generally score higher with adjusted prompts
migrated_score = min(0.99, original_score + 0.05 + len(adjustments) * 0.01)
# Determine recommendation
if migrated_score >= original_score:
recommendation = "SAFE TO MIGRATE: Score improved or maintained"
elif migrated_score >= original_score - 0.03:
recommendation = "ACCEPTABLE: Minor regression within tolerance"
else:
recommendation = "NEEDS WORK: Significant regression — manual prompt tuning required"
result = PromptMigrationResult(
prompt_id=prompt_id,
source_model=current.model,
target_model=target_model,
original_score=round(original_score, 3),
migrated_score=round(migrated_score, 3),
adjustments_made=adjustments,
temperature_change=new_temperature - current.temperature if new_temperature != current.temperature else None,
prompt_change=new_system_prompt if new_system_prompt != current.system_prompt else None,
recommendation=recommendation,
)
# Register new version
new_version = PromptVersion(
prompt_id=prompt_id,
version=current.version + 1,
model=target_model,
system_prompt=new_system_prompt,
temperature=new_temperature,
max_tokens=current.max_tokens,
notes=f"Auto-migrated from {current.model}. Adjustments: {len(adjustments)}",
)
self.register_prompt(new_version)
self.migration_results.append(result)
return result
# Demonstration
migrator = PromptMigrator()
# Register existing prompts (currently working on GPT-4o)
prompts = [
PromptVersion(
prompt_id="customer-classifier",
version=1,
model="gpt-4o-2024-08-06",
system_prompt="You are a customer support classifier. Try to categorize the customer's intent into one of these categories: billing, technical, account, general. If possible, also identify the sentiment. Remember, you are classifying customer messages.",
temperature=0.8,
max_tokens=100,
),
PromptVersion(
prompt_id="code-reviewer",
version=1,
model="gpt-4o-2024-08-06",
system_prompt="You are an expert code reviewer. Review the following code and provide feedback on: 1) Correctness, 2) Performance, 3) Security, 4) Readability. Be thorough and constructive.",
temperature=0.3,
max_tokens=1500,
),
PromptVersion(
prompt_id="summarizer",
version=1,
model="gpt-4o-2024-08-06",
system_prompt="You are a document summarizer. Try to summarize the input text into 3-5 bullet points. If possible, preserve the key metrics and conclusions. Attempt to keep each bullet under 20 words. Remember you must be concise.",
temperature=0.9,
max_tokens=500,
),
]
for p in prompts:
migrator.register_prompt(p)
print("=== Prompt Migration Toolkit ===\n")
target = "gpt-4.1-2025-04-14"
print(f"Migrating prompts to: {target}\n")
for prompt in prompts:
result = migrator.migrate_prompt(prompt.prompt_id, target)
print(f"--- {result.prompt_id} ---")
print(f" Score: {result.original_score} → {result.migrated_score} ({result.migrated_score - result.original_score:+.3f})")
if result.temperature_change:
print(f" Temperature: {result.temperature_change:+.2f}")
if result.adjustments_made:
for adj in result.adjustments_made:
print(f" ✎ {adj}")
suggestions = migrator.analyze_prompt_for_migration(prompt)
print(f" Suggestions: {len(suggestions)}")
for s in suggestions[:2]:
print(f" → {s}")
print(f" {result.recommendation}\n")
7. Testing Migration
Every migration needs a testing strategy. You can’t ship a model upgrade, API migration, or SDK change without quantitative evidence that the system still works. An eval-driven comparison framework runs the same test cases against both the old and new configurations, computes aggregate metrics, identifies regressions per category, and produces a go/no-go report.
Eval-Driven Comparison Framework
import os
import json
import time
from dataclasses import dataclass, field
from typing import Optional
from openai import OpenAI
@dataclass
class TestCase:
"""A single test case for migration comparison."""
case_id: str
category: str
input_messages: list[dict]
expected_contains: list[str] # Strings that should appear in output
expected_format: Optional[str] = None # "json", "markdown", "bullets", etc.
max_acceptable_latency_ms: float = 5000.0
weight: float = 1.0
@dataclass
class ConfigA:
"""Configuration A (current/old)."""
name: str
model: str
api_style: str # "chat_completions" or "responses"
temperature: float
max_tokens: int
@dataclass
class ConfigB:
"""Configuration B (new/proposed)."""
name: str
model: str
api_style: str
temperature: float
max_tokens: int
@dataclass
class ComparisonResult:
"""Result of comparing two configurations on a test case."""
case_id: str
category: str
config_a_score: float
config_b_score: float
config_a_latency_ms: float
config_b_latency_ms: float
regression: bool
improvement: bool
delta: float
class MigrationTestFramework:
"""Automated A/B comparison framework for migration testing.
Produces a structured report showing:
- Per-case comparison (old score vs new score)
- Category-level aggregation
- Overall regression/improvement detection
- Latency and cost impact
- Go/no-go recommendation
"""
def __init__(self):
self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-demo-key"))
self.test_cases: list[TestCase] = []
self.regression_threshold = 0.05 # 5% max acceptable regression
self.results: list[ComparisonResult] = []
def add_test_cases(self, cases: list[TestCase]):
"""Register test cases for comparison."""
self.test_cases.extend(cases)
def _score_output(self, output: str, test_case: TestCase) -> float:
"""Score an output against test case expectations."""
score = 0.0
checks = 0
# Check expected content presence
for expected in test_case.expected_contains:
checks += 1
if expected.lower() in output.lower():
score += 1.0
# Check format compliance
if test_case.expected_format:
checks += 1
if test_case.expected_format == "json":
try:
json.loads(output)
score += 1.0
except (json.JSONDecodeError, TypeError):
pass
elif test_case.expected_format == "bullets":
if output.count("- ") >= 2 or output.count("• ") >= 2:
score += 1.0
elif test_case.expected_format == "markdown":
if "#" in output or "**" in output:
score += 1.0
return round(score / max(checks, 1), 3)
def _run_config(self, config: dict, test_case: TestCase) -> tuple[float, float]:
"""Run a test case against a configuration (simulated)."""
start = time.time()
# Simulated output quality (in production, call actual API)
model_quality = {
"gpt-4o-2024-08-06": 0.85,
"gpt-4.1-2025-04-14": 0.90,
"gpt-5-2026-03-01": 0.93,
}
base = model_quality.get(config["model"], 0.85)
# Simulated output containing expected keywords
output_parts = [f"Response about {test_case.category}"]
for exp in test_case.expected_contains[:2]:
if hash(config["model"] + exp) % 10 > 2: # 80% chance of including expected content
output_parts.append(exp)
output = ". ".join(output_parts)
score = self._score_output(output, test_case)
# Boost score based on model quality
score = min(1.0, score * 0.5 + base * 0.5)
latency = 300 + hash(config["model"] + test_case.case_id) % 600
return round(score, 3), float(latency)
def run_comparison(self, config_a: dict, config_b: dict) -> dict:
"""Run all test cases against both configurations and compare."""
self.results = []
for tc in self.test_cases:
score_a, latency_a = self._run_config(config_a, tc)
score_b, latency_b = self._run_config(config_b, tc)
delta = score_b - score_a
result = ComparisonResult(
case_id=tc.case_id,
category=tc.category,
config_a_score=score_a,
config_b_score=score_b,
config_a_latency_ms=latency_a,
config_b_latency_ms=latency_b,
regression=delta < -self.regression_threshold,
improvement=delta > self.regression_threshold,
delta=round(delta, 3),
)
self.results.append(result)
return self._generate_report(config_a, config_b)
def _generate_report(self, config_a: dict, config_b: dict) -> dict:
"""Generate a comprehensive comparison report."""
total = len(self.results)
regressions = [r for r in self.results if r.regression]
improvements = [r for r in self.results if r.improvement]
avg_a = sum(r.config_a_score for r in self.results) / total
avg_b = sum(r.config_b_score for r in self.results) / total
avg_latency_a = sum(r.config_a_latency_ms for r in self.results) / total
avg_latency_b = sum(r.config_b_latency_ms for r in self.results) / total
# Category breakdown
categories = {}
for r in self.results:
if r.category not in categories:
categories[r.category] = {"a_scores": [], "b_scores": [], "regressions": 0}
categories[r.category]["a_scores"].append(r.config_a_score)
categories[r.category]["b_scores"].append(r.config_b_score)
if r.regression:
categories[r.category]["regressions"] += 1
category_report = {}
for cat, data in categories.items():
cat_avg_a = sum(data["a_scores"]) / len(data["a_scores"])
cat_avg_b = sum(data["b_scores"]) / len(data["b_scores"])
category_report[cat] = {
"config_a_avg": round(cat_avg_a, 3),
"config_b_avg": round(cat_avg_b, 3),
"delta": round(cat_avg_b - cat_avg_a, 3),
"regressions": data["regressions"],
}
# Go/no-go decision
if len(regressions) == 0 and avg_b >= avg_a:
decision = "GO: No regressions, safe to ship"
elif len(regressions) <= 1 and avg_b > avg_a - 0.02:
decision = "CONDITIONAL GO: Minor regression in 1 case — review and decide"
else:
decision = f"NO-GO: {len(regressions)} regressions detected — investigate before shipping"
return {
"summary": {
"config_a": config_a["name"],
"config_b": config_b["name"],
"total_cases": total,
"regressions": len(regressions),
"improvements": len(improvements),
"unchanged": total - len(regressions) - len(improvements),
},
"scores": {
"config_a_avg": round(avg_a, 3),
"config_b_avg": round(avg_b, 3),
"delta": round(avg_b - avg_a, 3),
},
"latency": {
"config_a_avg_ms": round(avg_latency_a, 1),
"config_b_avg_ms": round(avg_latency_b, 1),
"change_pct": round((avg_latency_b - avg_latency_a) / avg_latency_a * 100, 1),
},
"categories": category_report,
"regression_details": [
{"case": r.case_id, "category": r.category, "delta": r.delta}
for r in regressions
],
"decision": decision,
}
# Demonstration
framework = MigrationTestFramework()
# Define test cases covering multiple categories
framework.add_test_cases([
TestCase("tc-001", "classification", [{"role": "user", "content": "I want to cancel my plan"}],
expected_contains=["billing", "cancel", "account"], weight=1.0),
TestCase("tc-002", "coding", [{"role": "user", "content": "Write a merge sort in Python"}],
expected_contains=["def", "merge", "sort"], expected_format="markdown", weight=1.5),
TestCase("tc-003", "reasoning", [{"role": "user", "content": "What are the pros and cons of microservices?"}],
expected_contains=["scalability", "complexity", "independent"], weight=1.0),
TestCase("tc-004", "extraction", [{"role": "user", "content": "Extract entities: Apple CEO Tim Cook in Cupertino"}],
expected_contains=["Apple", "Tim Cook", "Cupertino"], expected_format="json", weight=1.0),
TestCase("tc-005", "summarization", [{"role": "user", "content": "Summarize the key points of this quarterly report"}],
expected_contains=["revenue", "growth", "quarter"], expected_format="bullets", weight=1.0),
TestCase("tc-006", "instruction_following", [{"role": "user", "content": "List exactly 5 items, numbered 1-5"}],
expected_contains=["1.", "2.", "3.", "4.", "5."], weight=2.0),
TestCase("tc-007", "coding", [{"role": "user", "content": "Write a REST API endpoint with error handling"}],
expected_contains=["async", "try", "except", "status"], expected_format="markdown", weight=1.5),
TestCase("tc-008", "reasoning", [{"role": "user", "content": "Compare SQL vs NoSQL for a social media app"}],
expected_contains=["relational", "schema", "scale", "flexible"], weight=1.0),
])
print("=== Migration A/B Test Framework ===\n")
# Compare current config vs proposed config
config_a = {"name": "Current (GPT-4o)", "model": "gpt-4o-2024-08-06", "api": "chat_completions", "temperature": 0.7}
config_b = {"name": "Proposed (GPT-4.1)", "model": "gpt-4.1-2025-04-14", "api": "responses", "temperature": 0.5}
report = framework.run_comparison(config_a, config_b)
# Print report
print(f"Config A: {report['summary']['config_a']}")
print(f"Config B: {report['summary']['config_b']}")
print(f"Cases: {report['summary']['total_cases']}\n")
print("--- Scores ---")
print(f" Config A avg: {report['scores']['config_a_avg']}")
print(f" Config B avg: {report['scores']['config_b_avg']}")
print(f" Delta: {report['scores']['delta']:+.3f}")
print(f"\n--- Latency ---")
print(f" Config A: {report['latency']['config_a_avg_ms']}ms")
print(f" Config B: {report['latency']['config_b_avg_ms']}ms")
print(f" Change: {report['latency']['change_pct']:+.1f}%")
print(f"\n--- Category Breakdown ---")
for cat, data in report["categories"].items():
indicator = "↑" if data["delta"] > 0 else "↓" if data["delta"] < 0 else "="
print(f" {cat:<25} {data['config_a_avg']:.3f} → {data['config_b_avg']:.3f} ({data['delta']:+.3f}) {indicator}")
print(f"\n--- Summary ---")
print(f" Regressions: {report['summary']['regressions']}")
print(f" Improvements: {report['summary']['improvements']}")
print(f" Unchanged: {report['summary']['unchanged']}")
if report["regression_details"]:
print(f"\n âš Regression details:")
for r in report["regression_details"]:
print(f" {r['case']}: [{r['category']}] delta={r['delta']:+.3f}")
print(f"\n{'='*50}")
print(f"DECISION: {report['decision']}")
Next in the Series
In Part 20: Capstone Projects, we’ll bring everything together with end-to-end project implementations that combine the patterns from all 19 previous parts — from SDK basics through production architectures, observability, safety, enterprise compliance, and migration — into complete, deployable AI applications.