Back to AI App Dev Series

OpenAI SDK Track Part 19: Migration & Legacy Integration

May 25, 2026Wasil Zafar30 min read

Migrate from Chat Completions to the Responses API with exact parameter mapping, upgrade between model generations using eval-driven confidence, handle SDK version changes gracefully, integrate OpenAI behind legacy systems with adapter patterns, build multi-provider fallbacks across OpenAI and Azure OpenAI, migrate prompts between model generations, and run regression testing to validate every change — all with executable Python code.

Table of Contents

  1. Chat Completions → Responses API
  2. Model Migration Strategy
  3. SDK Version Upgrades
  4. Legacy System Integration
  5. Multi-Provider Fallback
  6. Prompt Migration
  7. Testing Migration
What You’ll Learn: Migration is inevitable in AI engineering. OpenAI evolves its APIs (Chat Completions → Responses), releases new model generations (GPT-4 → GPT-4.1 → GPT-5), and deprecates old SDK versions. Meanwhile, your production systems need to keep running. This article covers seven migration patterns: API-level migration with exact parameter mapping, eval-driven model upgrades, SDK version management, adapter patterns for legacy integration, multi-provider failover strategies, prompt migration between model generations, and regression testing frameworks that give you confidence to ship changes.
Migration Decision Workflow
                flowchart TD
                    T[Migration Trigger] --> A{What Changed?}

                    A -->|API Deprecation| B[API Migration]
                    A -->|New Model Release| C[Model Migration]
                    A -->|SDK Breaking Change| D[SDK Upgrade]

                    B --> B1[Map Parameters]
                    B1 --> B2[Update Response Parsing]
                    B2 --> B3[Run Eval Suite]

                    C --> C1[Pin Current Snapshot]
                    C1 --> C2[Run Evals on New Model]
                    C2 --> C3{Regression?}
                    C3 -->|Yes| C4[Adjust Prompts]
                    C4 --> C2
                    C3 -->|No| C5[Canary Deploy]

                    D --> D1[Check Breaking Changes]
                    D1 --> D2[Update Client Code]
                    D2 --> D3[Run Integration Tests]

                    B3 --> E{Pass Threshold?}
                    C5 --> E
                    D3 --> E

                    E -->|Yes| F[Ship to Production]
                    E -->|No| G[Rollback & Investigate]
                    G --> A
            

1. Chat Completions → Responses API Migration

The Responses API is OpenAI’s next-generation interface, replacing Chat Completions as the primary way to interact with models. While Chat Completions remains available, new features (built-in tools, web search, file search, computer use) are Responses-only. Migration requires understanding the exact parameter mapping, response object differences, and feature parity gaps.

Parameter Mapping Reference

Chat CompletionsResponses APINotes
messages=[{"role":"system","content":"..."}]instructions="..."System prompt becomes top-level parameter
messages=[{"role":"user","content":"..."}]input="..."Simple string or list of content items
response.choices[0].message.contentresponse.output_textConvenience accessor for text output
max_tokensmax_output_tokensRenamed for clarity
response_format={"type":"json_object"}text={"format":{"type":"json_schema","schema":{...}}}Structured outputs with schema enforcement
stream=True + for chunk in stream:stream=True + event typesSSE events with typed deltas
tools=[...] + tool_choicetools=[...] (built-in: web_search, file_search, code_interpreter)Responses adds built-in tools
n=3 (multiple completions)Not supportedCall multiple times or use Batch API
logprobs=TrueNot yet availableUse Chat Completions if logprobs needed
stop=["END"]Not directly supportedUse instructions to control stopping

Chat Completions → Responses Converter

import os
import json
from dataclasses import dataclass, field
from typing import Any, Optional
from openai import OpenAI


@dataclass
class MigrationResult:
    """Result of converting a Chat Completions call to Responses API."""
    original_params: dict
    converted_params: dict
    warnings: list = field(default_factory=list)
    unsupported_features: list = field(default_factory=list)


class ChatCompletionsToResponsesMigrator:
    """Converts Chat Completions API calls to Responses API format.

    Handles the exact parameter mapping:
    - messages (system/user/assistant) → instructions + input
    - max_tokens → max_output_tokens
    - response_format → text.format
    - stream chunk iteration → event-based streaming
    """

    def __init__(self):
        self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-demo-key"))

    def convert_messages_to_input(self, messages: list[dict]) -> dict:
        """Convert Chat Completions messages array to Responses API parameters.

        Mapping:
        - {"role": "system", "content": "..."} → instructions="..."
        - {"role": "user", "content": "..."} → input="..."
        - Multi-turn conversations → input as list of items
        """
        instructions = None
        input_items = []

        for msg in messages:
            role = msg.get("role")
            content = msg.get("content", "")

            if role == "system":
                # System messages become the instructions parameter
                if instructions:
                    instructions += "\n\n" + content
                else:
                    instructions = content

            elif role == "user":
                input_items.append({"role": "user", "content": content})

            elif role == "assistant":
                input_items.append({"role": "assistant", "content": content})

        # Simplify: if single user message, use string input
        if len(input_items) == 1 and input_items[0]["role"] == "user":
            input_value = input_items[0]["content"]
        else:
            input_value = input_items

        return {"instructions": instructions, "input": input_value}

    def convert_response_format(self, response_format: dict) -> dict:
        """Convert response_format to Responses API text.format.

        Chat Completions: response_format={"type": "json_object"}
        Responses API:    text={"format": {"type": "json_schema", "schema": {...}}}
        """
        fmt_type = response_format.get("type")

        if fmt_type == "json_object":
            # Basic JSON mode — Responses API requires a schema
            return {
                "format": {
                    "type": "json_schema",
                    "name": "response",
                    "strict": True,
                    "schema": {
                        "type": "object",
                        "properties": {
                            "result": {"type": "string"}
                        },
                        "required": ["result"],
                        "additionalProperties": False,
                    },
                }
            }
        elif fmt_type == "json_schema":
            # Already has schema — pass through with name field
            schema = response_format.get("json_schema", {})
            return {
                "format": {
                    "type": "json_schema",
                    "name": schema.get("name", "response"),
                    "strict": schema.get("strict", True),
                    "schema": schema.get("schema", {}),
                }
            }
        elif fmt_type == "text":
            return {"format": {"type": "text"}}

        return {}

    def convert_params(self, chat_params: dict) -> MigrationResult:
        """Convert full Chat Completions parameters to Responses API format."""
        warnings = []
        unsupported = []
        converted = {}

        # Model mapping
        converted["model"] = chat_params.get("model", "gpt-4o")

        # Messages → instructions + input
        messages = chat_params.get("messages", [])
        msg_result = self.convert_messages_to_input(messages)
        if msg_result["instructions"]:
            converted["instructions"] = msg_result["instructions"]
        converted["input"] = msg_result["input"]

        # max_tokens → max_output_tokens
        if "max_tokens" in chat_params:
            converted["max_output_tokens"] = chat_params["max_tokens"]

        # temperature (same parameter name)
        if "temperature" in chat_params:
            converted["temperature"] = chat_params["temperature"]

        # top_p (same parameter name)
        if "top_p" in chat_params:
            converted["top_p"] = chat_params["top_p"]

        # response_format → text
        if "response_format" in chat_params:
            text_config = self.convert_response_format(chat_params["response_format"])
            if text_config:
                converted["text"] = text_config

        # tools (function calling — similar but not identical)
        if "tools" in chat_params:
            converted["tools"] = chat_params["tools"]
            warnings.append("Verify tool definitions work with Responses API format")

        # stream (same parameter)
        if "stream" in chat_params:
            converted["stream"] = chat_params["stream"]
            warnings.append("Stream event format differs: use event types instead of chunk iteration")

        # Unsupported features
        if "n" in chat_params and chat_params["n"] > 1:
            unsupported.append(f"n={chat_params['n']} — multiple completions not supported, use multiple calls")
        if "logprobs" in chat_params:
            unsupported.append("logprobs — not available in Responses API")
        if "stop" in chat_params:
            unsupported.append(f"stop={chat_params['stop']} — use instructions to control stopping")
        if "presence_penalty" in chat_params:
            warnings.append("presence_penalty — check if supported in current Responses API version")
        if "frequency_penalty" in chat_params:
            warnings.append("frequency_penalty — check if supported in current Responses API version")

        return MigrationResult(
            original_params=chat_params,
            converted_params=converted,
            warnings=warnings,
            unsupported_features=unsupported,
        )

    def show_code_comparison(self, chat_params: dict) -> str:
        """Generate side-by-side code showing old vs new API usage."""
        result = self.convert_params(chat_params)
        p = result.converted_params

        old_code = f"""# === BEFORE: Chat Completions API ===
from openai import OpenAI
client = OpenAI()

response = client.chat.completions.create(
    model="{chat_params.get('model', 'gpt-4o')}",
    messages={json.dumps(chat_params.get('messages', []), indent=8)},
    max_tokens={chat_params.get('max_tokens', 1000)},
    temperature={chat_params.get('temperature', 1.0)},
)
answer = response.choices[0].message.content"""

        instructions_line = f'\n    instructions="{p.get("instructions", "")}",' if p.get("instructions") else ""
        new_code = f"""# === AFTER: Responses API ===
from openai import OpenAI
client = OpenAI()

response = client.responses.create(
    model="{p.get('model', 'gpt-4o')}",{instructions_line}
    input="{p.get('input', '')}",
    max_output_tokens={p.get('max_output_tokens', 1000)},
    temperature={p.get('temperature', 1.0)},
)
answer = response.output_text"""

        return old_code + "\n\n" + new_code


# Demonstration
migrator = ChatCompletionsToResponsesMigrator()

print("=== Chat Completions → Responses API Migration ===\n")

# Example 1: Simple chat completion
simple_params = {
    "model": "gpt-4o",
    "messages": [
        {"role": "system", "content": "You are a helpful coding assistant."},
        {"role": "user", "content": "Write a Python function to calculate fibonacci numbers."},
    ],
    "max_tokens": 500,
    "temperature": 0.7,
}

result = migrator.convert_params(simple_params)
print("--- Example 1: Simple Completion ---")
print(f"  Original: messages={len(simple_params['messages'])}, max_tokens={simple_params['max_tokens']}")
print(f"  Converted:")
print(f"    instructions: \"{result.converted_params.get('instructions', '')[:60]}...\"")
print(f"    input: \"{str(result.converted_params.get('input', ''))[:60]}...\"")
print(f"    max_output_tokens: {result.converted_params.get('max_output_tokens')}")
print(f"    temperature: {result.converted_params.get('temperature')}")

# Example 2: JSON mode with structured output
json_params = {
    "model": "gpt-4o",
    "messages": [
        {"role": "system", "content": "Extract entities from text. Return JSON."},
        {"role": "user", "content": "Apple Inc. CEO Tim Cook announced new products in Cupertino."},
    ],
    "max_tokens": 300,
    "response_format": {"type": "json_object"},
    "temperature": 0.0,
}

result2 = migrator.convert_params(json_params)
print("\n--- Example 2: JSON Mode Migration ---")
print(f"  BEFORE: response_format={{\"type\": \"json_object\"}}")
print(f"  AFTER:  text={json.dumps(result2.converted_params.get('text', {}), indent=4)[:120]}...")

# Example 3: Features that don't migrate cleanly
complex_params = {
    "model": "gpt-4o",
    "messages": [{"role": "user", "content": "Generate 3 different taglines."}],
    "max_tokens": 200,
    "n": 3,
    "logprobs": True,
    "stop": ["\n\n"],
}

result3 = migrator.convert_params(complex_params)
print("\n--- Example 3: Unsupported Features ---")
for issue in result3.unsupported_features:
    print(f"  âš  {issue}")

# Show code comparison
print("\n--- Code Comparison ---")
print(migrator.show_code_comparison(simple_params))

2. Model Migration Strategy

Upgrading between model generations (GPT-4 → GPT-4.1 → GPT-5) is not a simple find-and-replace. Each model generation has different strengths, different instruction-following behavior, and different failure modes. An eval-driven approach gives you confidence that the new model performs at least as well as the old one on your specific use cases before you ship the change.

Model Migration Approaches

StrategyRiskSpeedWhen to Use
Snapshot PinningLowestSlowestRegulated environments, no tolerance for behavior change
Alias FollowingHighestFastestNon-critical applications, testing environments
Eval-Gated UpgradeLowModerateProduction systems with quality requirements
Canary DeployLowModerateHigh-traffic systems, gradual rollout needed
Shadow TestingLowestSlowMission-critical systems, zero-downtime requirement

Eval-Driven Model Migration Test Harness

import os
import json
import time
from dataclasses import dataclass, field
from typing import Optional
from openai import OpenAI


@dataclass
class EvalCase:
    """A single evaluation case for model comparison."""
    case_id: str
    input_text: str
    expected_behavior: str
    category: str
    weight: float = 1.0


@dataclass
class EvalResult:
    """Result of running one eval case against a model."""
    case_id: str
    model: str
    output: str
    latency_ms: float
    tokens_used: int
    score: float  # 0.0 to 1.0
    passed: bool


@dataclass
class MigrationReport:
    """Comprehensive report comparing two models."""
    source_model: str
    target_model: str
    total_cases: int
    source_score: float
    target_score: float
    regression_cases: list
    improvement_cases: list
    latency_change_pct: float
    cost_change_pct: float
    recommendation: str


class ModelMigrationHarness:
    """Eval-driven model migration with regression detection.

    Workflow:
    1. Define eval cases covering critical behaviors
    2. Run all cases against current (source) model
    3. Run all cases against new (target) model
    4. Compare scores, detect regressions
    5. Generate migration report with go/no-go recommendation
    """

    def __init__(self):
        self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-demo-key"))
        self.eval_cases: list[EvalCase] = []
        self.results: dict[str, list[EvalResult]] = {}
        self.pass_threshold = 0.85  # Minimum score to pass
        self.regression_tolerance = 0.05  # Max acceptable regression

    def add_eval_cases(self, cases: list[EvalCase]):
        """Register evaluation cases for the migration test."""
        self.eval_cases.extend(cases)

    def run_eval(self, model: str, case: EvalCase) -> EvalResult:
        """Run a single eval case against a model (simulated)."""
        start = time.time()

        # Simulated model response (in production, call OpenAI API)
        # Different models have different quality characteristics
        model_quality = {
            "gpt-4-0613": 0.82,
            "gpt-4o-2024-08-06": 0.88,
            "gpt-4.1-2025-04-14": 0.91,
            "gpt-5-2026-03-01": 0.94,
        }

        base_score = model_quality.get(model, 0.85)
        # Add variance per category
        category_bonus = {"reasoning": 0.03, "coding": 0.05, "instruction_following": 0.02}.get(case.category, 0.0)
        # Newer models get category bonuses
        if "gpt-5" in model or "gpt-4.1" in model:
            score = min(1.0, base_score + category_bonus + (hash(case.case_id) % 10) / 100)
        else:
            score = min(1.0, base_score + (hash(case.case_id) % 8) / 100)

        latency = 800 + (hash(model + case.case_id) % 400)  # Simulated latency
        tokens = 150 + (hash(case.input_text) % 200)

        return EvalResult(
            case_id=case.case_id,
            model=model,
            output=f"[Simulated {model} output for: {case.input_text[:40]}]",
            latency_ms=latency,
            tokens_used=tokens,
            score=round(score, 3),
            passed=score >= self.pass_threshold,
        )

    def run_migration_eval(self, source_model: str, target_model: str) -> MigrationReport:
        """Run full migration evaluation comparing source and target models."""
        source_results = []
        target_results = []

        for case in self.eval_cases:
            source_results.append(self.run_eval(source_model, case))
            target_results.append(self.run_eval(target_model, case))

        self.results[source_model] = source_results
        self.results[target_model] = target_results

        # Calculate aggregate scores
        source_avg = sum(r.score for r in source_results) / len(source_results)
        target_avg = sum(r.score for r in target_results) / len(target_results)

        # Identify regressions and improvements
        regressions = []
        improvements = []
        for src, tgt in zip(source_results, target_results):
            diff = tgt.score - src.score
            if diff < -self.regression_tolerance:
                regressions.append({"case_id": src.case_id, "source_score": src.score, "target_score": tgt.score, "delta": round(diff, 3)})
            elif diff > self.regression_tolerance:
                improvements.append({"case_id": src.case_id, "source_score": src.score, "target_score": tgt.score, "delta": round(diff, 3)})

        # Latency comparison
        source_latency = sum(r.latency_ms for r in source_results) / len(source_results)
        target_latency = sum(r.latency_ms for r in target_results) / len(target_results)
        latency_change = ((target_latency - source_latency) / source_latency) * 100

        # Cost comparison (simplified: based on tokens)
        source_tokens = sum(r.tokens_used for r in source_results)
        target_tokens = sum(r.tokens_used for r in target_results)
        cost_change = ((target_tokens - source_tokens) / source_tokens) * 100

        # Recommendation
        if len(regressions) == 0 and target_avg >= source_avg:
            recommendation = "APPROVE: No regressions detected, safe to migrate"
        elif len(regressions) <= 2 and target_avg > source_avg:
            recommendation = "CONDITIONAL: Minor regressions — review specific cases before migrating"
        else:
            recommendation = f"BLOCK: {len(regressions)} regressions detected — investigate before proceeding"

        return MigrationReport(
            source_model=source_model,
            target_model=target_model,
            total_cases=len(self.eval_cases),
            source_score=round(source_avg, 3),
            target_score=round(target_avg, 3),
            regression_cases=regressions,
            improvement_cases=improvements,
            latency_change_pct=round(latency_change, 1),
            cost_change_pct=round(cost_change, 1),
            recommendation=recommendation,
        )


# Demonstration
harness = ModelMigrationHarness()

# Define eval cases
harness.add_eval_cases([
    EvalCase("tc-001", "Explain quantum computing in simple terms", "Clear, accurate explanation", "reasoning"),
    EvalCase("tc-002", "Write a binary search in Python with error handling", "Correct, idiomatic code", "coding"),
    EvalCase("tc-003", "Summarize this 500-word article in exactly 3 bullet points", "Exactly 3 bullets", "instruction_following"),
    EvalCase("tc-004", "Given these financials, calculate the EBITDA margin", "Correct calculation", "reasoning"),
    EvalCase("tc-005", "Refactor this function to use async/await", "Working async code", "coding"),
    EvalCase("tc-006", "Respond ONLY in JSON format with keys: name, age, city", "Valid JSON, correct keys", "instruction_following"),
    EvalCase("tc-007", "Identify logical fallacies in this argument", "Correct identification", "reasoning"),
    EvalCase("tc-008", "Write unit tests for this REST endpoint", "Comprehensive test coverage", "coding"),
])

print("=== Eval-Driven Model Migration ===\n")

# Run migration comparison
report = harness.run_migration_eval("gpt-4o-2024-08-06", "gpt-4.1-2025-04-14")

print(f"Source: {report.source_model} (score: {report.source_score})")
print(f"Target: {report.target_model} (score: {report.target_score})")
print(f"Delta:  {report.target_score - report.source_score:+.3f}")
print(f"\nCases: {report.total_cases} | Regressions: {len(report.regression_cases)} | Improvements: {len(report.improvement_cases)}")
print(f"Latency change: {report.latency_change_pct:+.1f}%")
print(f"Cost change: {report.cost_change_pct:+.1f}%")

if report.regression_cases:
    print("\nâš  Regressions:")
    for r in report.regression_cases:
        print(f"  {r['case_id']}: {r['source_score']} → {r['target_score']} ({r['delta']:+.3f})")

if report.improvement_cases:
    print("\n✓ Improvements:")
    for i in report.improvement_cases:
        print(f"  {i['case_id']}: {i['source_score']} → {i['target_score']} ({i['delta']:+.3f})")

print(f"\n{'='*50}")
print(f"RECOMMENDATION: {report.recommendation}")

3. SDK Version Upgrades

The OpenAI Python SDK has undergone major breaking changes (v0.x → v1.x introduced a completely new interface). Future major versions will follow. A compatibility layer lets you upgrade incrementally without rewriting your entire codebase at once.

Key Breaking Changes (v0.x → v1.x): openai.ChatCompletion.create() became client.chat.completions.create(). The global configuration (openai.api_key) became instance-based (OpenAI(api_key=...)). All responses became Pydantic models instead of dicts. Async support moved to AsyncOpenAI. Error classes were restructured. Streaming returns typed chunks instead of raw dicts.

SDK Compatibility Layer

import os
import json
from dataclasses import dataclass, field
from typing import Any, Optional
from openai import OpenAI


@dataclass
class SDKVersionInfo:
    """Information about an SDK version and its interface."""
    version: str
    interface_style: str  # "module-level" (v0.x) or "client-instance" (v1.x+)
    has_pydantic_responses: bool
    has_async_client: bool
    deprecation_date: Optional[str] = None
    eol_date: Optional[str] = None


class SDKCompatibilityLayer:
    """Provides a stable interface across SDK version changes.

    Abstracts away SDK-specific differences so application code
    doesn't need to change when the SDK is upgraded.

    Supports:
    - v0.x style (module-level, dict responses)
    - v1.x style (client instance, Pydantic responses)
    - Future v2.x (Responses API as primary)
    """

    def __init__(self, api_key: Optional[str] = None):
        self.api_key = api_key or os.environ.get("OPENAI_API_KEY", "sk-demo-key")
        self._client = None
        self._sdk_version = self._detect_sdk_version()

    def _detect_sdk_version(self) -> SDKVersionInfo:
        """Detect which SDK version is installed."""
        try:
            import openai
            version = openai.__version__

            if version.startswith("0."):
                return SDKVersionInfo(
                    version=version,
                    interface_style="module-level",
                    has_pydantic_responses=False,
                    has_async_client=False,
                    deprecation_date="2024-01-01",
                    eol_date="2024-06-01",
                )
            elif version.startswith("1."):
                return SDKVersionInfo(
                    version=version,
                    interface_style="client-instance",
                    has_pydantic_responses=True,
                    has_async_client=True,
                )
            else:
                # v2.x or later
                return SDKVersionInfo(
                    version=version,
                    interface_style="client-instance",
                    has_pydantic_responses=True,
                    has_async_client=True,
                )
        except ImportError:
            return SDKVersionInfo(
                version="unknown",
                interface_style="client-instance",
                has_pydantic_responses=True,
                has_async_client=True,
            )

    @property
    def client(self) -> OpenAI:
        """Lazy-initialize the client."""
        if self._client is None:
            self._client = OpenAI(api_key=self.api_key)
        return self._client

    def chat_completion(
        self,
        model: str,
        messages: list[dict],
        max_tokens: int = 1000,
        temperature: float = 1.0,
        **kwargs,
    ) -> dict:
        """Unified interface for chat completions across SDK versions.

        Returns a normalized dict regardless of SDK version:
        {
            "content": str,
            "model": str,
            "usage": {"prompt_tokens": int, "completion_tokens": int, "total_tokens": int},
            "finish_reason": str,
        }
        """
        # Simulated response (in production, call actual client)
        simulated_content = f"[Response from {model}]: Processed {len(messages)} messages"
        prompt_tokens = sum(len(m.get("content", "")) // 4 for m in messages)
        completion_tokens = len(simulated_content) // 4

        return {
            "content": simulated_content,
            "model": model,
            "usage": {
                "prompt_tokens": prompt_tokens,
                "completion_tokens": completion_tokens,
                "total_tokens": prompt_tokens + completion_tokens,
            },
            "finish_reason": "stop",
            "_sdk_version": self._sdk_version.version,
            "_interface": self._sdk_version.interface_style,
        }

    def responses_create(
        self,
        model: str,
        input: str,
        instructions: Optional[str] = None,
        max_output_tokens: int = 1000,
        temperature: float = 1.0,
        **kwargs,
    ) -> dict:
        """Unified interface for Responses API (v1.x+ only).

        Returns normalized dict:
        {
            "output_text": str,
            "model": str,
            "usage": {...},
        }
        """
        if self._sdk_version.interface_style == "module-level":
            raise RuntimeError(
                f"Responses API requires SDK v1.x+. Current: {self._sdk_version.version}. "
                f"Upgrade with: pip install --upgrade openai"
            )

        simulated_output = f"[Responses API from {model}]: {input[:50]}"

        return {
            "output_text": simulated_output,
            "model": model,
            "usage": {
                "input_tokens": len(input) // 4,
                "output_tokens": len(simulated_output) // 4,
            },
        }

    def get_upgrade_guide(self) -> dict:
        """Get upgrade recommendations for the current SDK version."""
        info = self._sdk_version

        if info.interface_style == "module-level":
            return {
                "current_version": info.version,
                "status": "DEPRECATED" if info.deprecation_date else "ACTIVE",
                "action": "UPGRADE IMMEDIATELY",
                "steps": [
                    "1. pip install --upgrade openai",
                    "2. Replace openai.ChatCompletion.create() → client.chat.completions.create()",
                    "3. Replace openai.api_key = '...' → client = OpenAI(api_key='...')",
                    "4. Update response access: resp['choices'][0] → resp.choices[0].message.content",
                    "5. Replace openai.error.* → openai.*Error exceptions",
                ],
            }
        else:
            return {
                "current_version": info.version,
                "status": "CURRENT",
                "action": "Monitor for deprecation notices",
                "features_available": [
                    "Chat Completions API",
                    "Responses API",
                    "Async client (AsyncOpenAI)",
                    "Streaming with typed events",
                    "Pydantic response models",
                ],
            }


# Demonstration
compat = SDKCompatibilityLayer()

print("=== SDK Compatibility Layer ===\n")

# Check SDK version
info = compat._sdk_version
print(f"Detected SDK: v{info.version}")
print(f"  Interface: {info.interface_style}")
print(f"  Pydantic responses: {info.has_pydantic_responses}")
print(f"  Async support: {info.has_async_client}")

# Use unified interface
print("\n--- Chat Completion (unified) ---")
result = compat.chat_completion(
    model="gpt-4o",
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "What is the capital of France?"},
    ],
    max_tokens=100,
)
print(f"  Content: {result['content']}")
print(f"  Model: {result['model']}")
print(f"  Tokens: {result['usage']['total_tokens']}")
print(f"  SDK version used: {result['_sdk_version']}")

# Use Responses API
print("\n--- Responses API (unified) ---")
resp = compat.responses_create(
    model="gpt-4o",
    input="What is the capital of France?",
    instructions="Answer in one word.",
)
print(f"  Output: {resp['output_text']}")

# Get upgrade guide
print("\n--- Upgrade Guide ---")
guide = compat.get_upgrade_guide()
print(f"  Status: {guide['status']}")
print(f"  Action: {guide['action']}")
if "steps" in guide:
    for step in guide["steps"]:
        print(f"    {step}")
elif "features_available" in guide:
    for feat in guide["features_available"]:
        print(f"    ✓ {feat}")

4. Legacy System Integration

Most organizations don’t build greenfield AI systems. They need to integrate OpenAI capabilities behind existing REST APIs, message queues, or RPC interfaces. The adapter pattern provides a clean abstraction: your legacy consumers continue calling the same interface they always have, while the implementation behind it now uses OpenAI. This enables gradual rollout without requiring all clients to change simultaneously.

Adapter Pattern for Legacy APIs

import os
import json
import time
from dataclasses import dataclass, field
from typing import Any, Optional, Callable
from openai import OpenAI


@dataclass
class LegacyRequest:
    """Represents a request in the legacy system's format."""
    endpoint: str
    method: str
    payload: dict
    headers: dict = field(default_factory=dict)
    request_id: Optional[str] = None


@dataclass
class LegacyResponse:
    """Response in the legacy system's expected format."""
    status_code: int
    body: dict
    headers: dict = field(default_factory=dict)
    latency_ms: float = 0.0


class OpenAILegacyAdapter:
    """Wraps OpenAI behind a legacy API interface.

    Enables gradual migration by:
    1. Accepting requests in the legacy format
    2. Translating to OpenAI API calls
    3. Returning responses in the legacy format
    4. Supporting feature flags for A/B testing (AI vs old logic)
    """

    def __init__(self):
        self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-demo-key"))
        self.feature_flags = {
            "use_ai_classification": True,
            "use_ai_summarization": True,
            "use_ai_extraction": False,  # Still using regex for now
        }
        self.legacy_fallbacks: dict[str, Callable] = {}
        self.metrics: list[dict] = []

    def register_legacy_fallback(self, endpoint: str, handler: Callable):
        """Register a fallback handler for when AI is disabled or fails."""
        self.legacy_fallbacks[endpoint] = handler

    def _classify_with_ai(self, text: str, categories: list[str]) -> dict:
        """Use OpenAI for text classification."""
        # Simulated AI classification
        category = categories[hash(text) % len(categories)]
        confidence = 0.85 + (hash(text) % 12) / 100
        return {"category": category, "confidence": round(confidence, 3), "method": "ai"}

    def _classify_with_rules(self, text: str, categories: list[str]) -> dict:
        """Legacy rule-based classification fallback."""
        text_lower = text.lower()
        for cat in categories:
            if cat.lower() in text_lower:
                return {"category": cat, "confidence": 0.6, "method": "rules"}
        return {"category": categories[0], "confidence": 0.3, "method": "rules_default"}

    def _summarize_with_ai(self, text: str, max_length: int) -> dict:
        """Use OpenAI for summarization."""
        summary = text[:max_length] + "..." if len(text) > max_length else text
        return {"summary": summary, "original_length": len(text), "method": "ai"}

    def _summarize_with_truncation(self, text: str, max_length: int) -> dict:
        """Legacy truncation fallback."""
        return {"summary": text[:max_length], "original_length": len(text), "method": "truncation"}

    def handle_request(self, request: LegacyRequest) -> LegacyResponse:
        """Process a legacy request, routing to AI or fallback as appropriate."""
        start = time.time()

        try:
            if request.endpoint == "/api/v1/classify":
                response_body = self._handle_classification(request.payload)
            elif request.endpoint == "/api/v1/summarize":
                response_body = self._handle_summarization(request.payload)
            elif request.endpoint == "/api/v1/extract":
                response_body = self._handle_extraction(request.payload)
            else:
                response_body = {"error": "Unknown endpoint", "endpoint": request.endpoint}
                return LegacyResponse(status_code=404, body=response_body)

            latency = (time.time() - start) * 1000
            self.metrics.append({
                "endpoint": request.endpoint,
                "method": response_body.get("method", "unknown"),
                "latency_ms": latency,
                "success": True,
            })

            return LegacyResponse(
                status_code=200,
                body=response_body,
                headers={"X-AI-Method": response_body.get("method", "unknown")},
                latency_ms=latency,
            )

        except Exception as e:
            # Fallback to legacy handler on AI failure
            fallback = self.legacy_fallbacks.get(request.endpoint)
            if fallback:
                response_body = fallback(request.payload)
                response_body["method"] = "fallback"
                latency = (time.time() - start) * 1000
                self.metrics.append({
                    "endpoint": request.endpoint, "method": "fallback",
                    "latency_ms": latency, "success": True, "fallback_reason": str(e),
                })
                return LegacyResponse(status_code=200, body=response_body, latency_ms=latency)

            return LegacyResponse(status_code=500, body={"error": str(e)})

    def _handle_classification(self, payload: dict) -> dict:
        """Route classification to AI or rules based on feature flag."""
        text = payload.get("text", "")
        categories = payload.get("categories", ["general"])

        if self.feature_flags.get("use_ai_classification"):
            return self._classify_with_ai(text, categories)
        else:
            return self._classify_with_rules(text, categories)

    def _handle_summarization(self, payload: dict) -> dict:
        """Route summarization to AI or truncation."""
        text = payload.get("text", "")
        max_length = payload.get("max_length", 200)

        if self.feature_flags.get("use_ai_summarization"):
            return self._summarize_with_ai(text, max_length)
        else:
            return self._summarize_with_truncation(text, max_length)

    def _handle_extraction(self, payload: dict) -> dict:
        """Route extraction to AI or regex."""
        text = payload.get("text", "")
        # Feature flag OFF — use legacy regex extraction
        import re
        emails = re.findall(r'[\w.+-]+@[\w-]+\.[\w.-]+', text)
        return {"entities": emails, "type": "email", "method": "regex"}

    def get_migration_stats(self) -> dict:
        """Show what percentage of traffic is handled by AI vs legacy."""
        if not self.metrics:
            return {"total": 0}

        total = len(self.metrics)
        ai_count = sum(1 for m in self.metrics if m["method"] == "ai")
        rules_count = sum(1 for m in self.metrics if "rules" in m["method"])
        fallback_count = sum(1 for m in self.metrics if m["method"] == "fallback")

        return {
            "total_requests": total,
            "ai_handled": ai_count,
            "rules_handled": rules_count,
            "fallback_handled": fallback_count,
            "ai_percentage": round(ai_count / total * 100, 1),
            "avg_latency_ms": round(sum(m["latency_ms"] for m in self.metrics) / total, 1),
        }


# Demonstration
adapter = OpenAILegacyAdapter()

print("=== Legacy System Adapter Pattern ===\n")

# Simulate legacy API requests (same format as before AI integration)
requests = [
    LegacyRequest("/api/v1/classify", "POST", {
        "text": "I can't log into my account and need to reset my password",
        "categories": ["billing", "technical", "account", "general"],
    }),
    LegacyRequest("/api/v1/summarize", "POST", {
        "text": "The quarterly earnings report shows a 15% increase in revenue driven primarily by the expansion into Asian markets. Operating margins improved by 3 percentage points due to cost optimization initiatives implemented in Q2.",
        "max_length": 80,
    }),
    LegacyRequest("/api/v1/extract", "POST", {
        "text": "Contact us at support@example.com or sales@example.com for more info.",
    }),
    LegacyRequest("/api/v1/classify", "POST", {
        "text": "Why was I charged twice for my subscription?",
        "categories": ["billing", "technical", "account", "general"],
    }),
]

for req in requests:
    response = adapter.handle_request(req)
    method = response.headers.get("X-AI-Method", response.body.get("method", "?"))
    print(f"  {req.endpoint} → [{method}] status={response.status_code} latency={response.latency_ms:.1f}ms")
    # Show key result
    if "category" in response.body:
        print(f"    Category: {response.body['category']} (conf: {response.body.get('confidence', '?')})")
    elif "summary" in response.body:
        print(f"    Summary: \"{response.body['summary'][:60]}...\"")
    elif "entities" in response.body:
        print(f"    Extracted: {response.body['entities']}")
    print()

# Migration stats
print("--- Migration Statistics ---")
stats = adapter.get_migration_stats()
print(f"  Total requests: {stats['total_requests']}")
print(f"  AI-handled: {stats['ai_handled']} ({stats['ai_percentage']}%)")
print(f"  Rules-handled: {stats['rules_handled']}")
print(f"  Avg latency: {stats['avg_latency_ms']}ms")
print(f"\n  Feature flags: {json.dumps(adapter.feature_flags, indent=4)}")

5. Multi-Provider Fallback

Production AI systems shouldn’t depend on a single provider. A multi-provider strategy gives you resilience against outages, the ability to optimize cost by routing to cheaper providers for simple tasks, and leverage for vendor negotiations. The key challenge is maintaining a provider-agnostic interface so your application code doesn’t couple to any single provider’s API.

Multi-Provider Client with Failover

import os
import json
import time
import random
from dataclasses import dataclass, field
from typing import Any, Optional
from enum import Enum
from openai import OpenAI


class ProviderStatus(Enum):
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    DOWN = "down"


@dataclass
class ProviderConfig:
    """Configuration for a single AI provider."""
    name: str
    provider_type: str  # "openai", "azure_openai", "anthropic", etc.
    model: str
    api_key_env: str
    endpoint: Optional[str] = None
    priority: int = 1  # Lower = higher priority
    max_rpm: int = 500
    cost_per_1k_tokens: float = 0.01
    status: ProviderStatus = ProviderStatus.HEALTHY
    consecutive_failures: int = 0
    failure_threshold: int = 3


@dataclass
class ProviderResponse:
    """Normalized response from any provider."""
    content: str
    provider: str
    model: str
    latency_ms: float
    tokens_used: int
    cost_estimate: float
    was_fallback: bool = False


class MultiProviderClient:
    """Provider-agnostic AI client with automatic failover.

    Features:
    - Priority-based routing (primary → secondary → tertiary)
    - Automatic failover on errors or timeouts
    - Circuit breaker pattern (stop calling failed providers)
    - Cost-aware routing for non-critical requests
    - Health monitoring and recovery
    """

    def __init__(self):
        self.providers: list[ProviderConfig] = []
        self.request_log: list[dict] = []
        self.circuit_breaker_recovery_time = 60  # seconds

    def add_provider(self, config: ProviderConfig):
        """Register a provider in priority order."""
        self.providers.append(config)
        self.providers.sort(key=lambda p: p.priority)

    def _is_circuit_open(self, provider: ProviderConfig) -> bool:
        """Check if circuit breaker is open (provider marked as down)."""
        return provider.consecutive_failures >= provider.failure_threshold

    def _call_provider(self, provider: ProviderConfig, messages: list[dict], **kwargs) -> ProviderResponse:
        """Call a specific provider (simulated)."""
        start = time.time()

        # Simulate provider behavior
        if provider.status == ProviderStatus.DOWN:
            raise ConnectionError(f"{provider.name} is down")

        if provider.status == ProviderStatus.DEGRADED and random.random() < 0.3:
            raise TimeoutError(f"{provider.name} timed out")

        # Simulate successful response
        content = f"[{provider.name}/{provider.model}] Response to: {messages[-1].get('content', '')[:40]}"
        latency = 200 + random.randint(0, 500)
        tokens = 100 + random.randint(0, 150)
        cost = tokens / 1000 * provider.cost_per_1k_tokens

        # Reset failure counter on success
        provider.consecutive_failures = 0

        return ProviderResponse(
            content=content,
            provider=provider.name,
            model=provider.model,
            latency_ms=latency,
            tokens_used=tokens,
            cost_estimate=round(cost, 5),
        )

    def complete(
        self,
        messages: list[dict],
        prefer_cost: bool = False,
        **kwargs,
    ) -> ProviderResponse:
        """Send a completion request with automatic failover.

        Args:
            messages: Chat messages in OpenAI format
            prefer_cost: If True, route to cheapest healthy provider
        """
        # Sort providers by cost if cost-optimizing, otherwise by priority
        if prefer_cost:
            candidates = sorted(
                [p for p in self.providers if not self._is_circuit_open(p)],
                key=lambda p: p.cost_per_1k_tokens,
            )
        else:
            candidates = [p for p in self.providers if not self._is_circuit_open(p)]

        if not candidates:
            raise RuntimeError("All providers are down — no healthy candidates available")

        last_error = None
        for i, provider in enumerate(candidates):
            try:
                response = self._call_provider(provider, messages, **kwargs)
                response.was_fallback = (i > 0)

                self.request_log.append({
                    "provider": provider.name,
                    "success": True,
                    "was_fallback": response.was_fallback,
                    "latency_ms": response.latency_ms,
                    "cost": response.cost_estimate,
                })
                return response

            except (ConnectionError, TimeoutError, Exception) as e:
                provider.consecutive_failures += 1
                last_error = e

                self.request_log.append({
                    "provider": provider.name,
                    "success": False,
                    "error": str(e),
                    "consecutive_failures": provider.consecutive_failures,
                })

                # If circuit breaker trips, skip this provider
                if self._is_circuit_open(provider):
                    provider.status = ProviderStatus.DOWN

                continue

        raise RuntimeError(f"All providers failed. Last error: {last_error}")

    def get_health_status(self) -> dict:
        """Get health status of all providers."""
        return {
            "providers": [
                {
                    "name": p.name,
                    "model": p.model,
                    "status": p.status.value,
                    "failures": p.consecutive_failures,
                    "circuit": "OPEN" if self._is_circuit_open(p) else "CLOSED",
                    "cost_per_1k": p.cost_per_1k_tokens,
                    "priority": p.priority,
                }
                for p in self.providers
            ],
            "total_requests": len(self.request_log),
            "success_rate": round(
                sum(1 for r in self.request_log if r["success"]) / max(len(self.request_log), 1) * 100, 1
            ),
            "fallback_rate": round(
                sum(1 for r in self.request_log if r.get("was_fallback")) / max(len(self.request_log), 1) * 100, 1
            ),
        }


# Demonstration
client = MultiProviderClient()

# Register providers in priority order
client.add_provider(ProviderConfig(
    name="OpenAI",
    provider_type="openai",
    model="gpt-4o",
    api_key_env="OPENAI_API_KEY",
    priority=1,
    cost_per_1k_tokens=0.005,
))

client.add_provider(ProviderConfig(
    name="Azure-OpenAI-EastUS",
    provider_type="azure_openai",
    model="gpt-4o",
    api_key_env="AZURE_OPENAI_KEY",
    endpoint="https://myorg-eastus.openai.azure.com",
    priority=2,
    cost_per_1k_tokens=0.005,
))

client.add_provider(ProviderConfig(
    name="Azure-OpenAI-WestEU",
    provider_type="azure_openai",
    model="gpt-4o",
    api_key_env="AZURE_OPENAI_KEY_EU",
    endpoint="https://myorg-westeu.openai.azure.com",
    priority=3,
    cost_per_1k_tokens=0.006,
))

print("=== Multi-Provider Failover Client ===\n")

# Normal operation — primary provider handles requests
print("--- Normal Operation ---")
for i in range(3):
    response = client.complete(
        messages=[{"role": "user", "content": f"Request {i+1}: Explain quantum computing"}]
    )
    print(f"  Request {i+1}: [{response.provider}] latency={response.latency_ms:.0f}ms cost=${response.cost_estimate:.4f}")

# Simulate primary provider degradation
print("\n--- Simulating Primary Provider Failure ---")
client.providers[0].status = ProviderStatus.DEGRADED
client.providers[0].consecutive_failures = 3  # Trip circuit breaker

for i in range(3):
    response = client.complete(
        messages=[{"role": "user", "content": f"Failover request {i+1}"}]
    )
    fallback_indicator = " [FALLBACK]" if response.was_fallback else ""
    print(f"  Request {i+1}: [{response.provider}]{fallback_indicator} latency={response.latency_ms:.0f}ms")

# Cost-optimized routing
print("\n--- Cost-Optimized Routing ---")
response = client.complete(
    messages=[{"role": "user", "content": "Simple FAQ: What are your hours?"}],
    prefer_cost=True,
)
print(f"  Cheapest available: [{response.provider}] cost=${response.cost_estimate:.4f}/request")

# Health status
print("\n--- Provider Health ---")
health = client.get_health_status()
for p in health["providers"]:
    icon = {"healthy": "🟢", "degraded": "🟡", "down": "🔴"}[p["status"]]
    print(f"  {icon} {p['name']:<22} model={p['model']:<8} circuit={p['circuit']:<6} failures={p['failures']}")
print(f"\n  Success rate: {health['success_rate']}% | Fallback rate: {health['fallback_rate']}%")

6. Prompt Migration

Prompts that work well on one model generation may not transfer directly to the next. GPT-4 prompts often need adjustment for GPT-4.1 or GPT-5 because newer models follow instructions more literally, handle ambiguity differently, and may need different temperature settings. A systematic prompt migration process tests each prompt against the new model and adapts as needed.

Prompt Migration Toolkit

import os
import json
from dataclasses import dataclass, field
from typing import Optional
from openai import OpenAI


@dataclass
class PromptVersion:
    """A versioned prompt with its target model and parameters."""
    prompt_id: str
    version: int
    model: str
    system_prompt: str
    temperature: float
    max_tokens: int
    notes: str = ""


@dataclass
class PromptMigrationResult:
    """Result of migrating a prompt to a new model."""
    prompt_id: str
    source_model: str
    target_model: str
    original_score: float
    migrated_score: float
    adjustments_made: list
    temperature_change: Optional[float] = None
    prompt_change: Optional[str] = None
    recommendation: str = ""


class PromptMigrator:
    """Systematically migrates prompts between model generations.

    Key adjustments when migrating to newer models:
    - Temperature: Newer models are often more capable at temp=0; lower temp
    - Specificity: Newer models follow instructions more literally; be precise
    - Verbosity: Remove hedging language ("try to", "if possible"); just instruct
    - Format: Newer models handle structured output better; use explicit formats
    - Examples: May need fewer few-shot examples (stronger zero-shot)
    """

    def __init__(self):
        self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-demo-key"))
        self.prompt_registry: dict[str, list[PromptVersion]] = {}
        self.migration_results: list[PromptMigrationResult] = []

    def register_prompt(self, prompt: PromptVersion):
        """Register a prompt version in the registry."""
        if prompt.prompt_id not in self.prompt_registry:
            self.prompt_registry[prompt.prompt_id] = []
        self.prompt_registry[prompt.prompt_id].append(prompt)

    def analyze_prompt_for_migration(self, prompt: PromptVersion) -> list[str]:
        """Analyze a prompt and suggest adjustments for a newer model."""
        suggestions = []
        text = prompt.system_prompt.lower()

        # Check for hedging language
        hedges = ["try to", "if possible", "attempt to", "do your best to"]
        for hedge in hedges:
            if hedge in text:
                suggestions.append(f"Remove hedging: '{hedge}' → direct instruction")

        # Check temperature
        if prompt.temperature > 0.7:
            suggestions.append(f"Lower temperature: {prompt.temperature} → 0.5-0.7 (newer models more capable at lower temps)")

        # Check for verbose instructions
        if len(prompt.system_prompt) > 1000:
            suggestions.append("Consider condensing: newer models need less prompting for same quality")

        # Check for explicit format instructions
        if "json" in text and "schema" not in text:
            suggestions.append("Add explicit JSON schema: newer models support structured outputs natively")

        # Check for few-shot examples
        if text.count("example:") > 3 or text.count("input:") > 3:
            suggestions.append("Reduce few-shot examples: newer models have stronger zero-shot performance")

        # Check for role reinforcement
        if text.count("you are") > 2 or text.count("remember") > 1:
            suggestions.append("Reduce role reinforcement: newer models maintain persona better")

        if not suggestions:
            suggestions.append("Prompt looks clean — test as-is on new model")

        return suggestions

    def migrate_prompt(self, prompt_id: str, target_model: str) -> PromptMigrationResult:
        """Migrate a prompt to a new model with automatic adjustments."""
        versions = self.prompt_registry.get(prompt_id, [])
        if not versions:
            raise ValueError(f"Prompt {prompt_id} not found in registry")

        current = versions[-1]  # Latest version
        suggestions = self.analyze_prompt_for_migration(current)

        # Apply automated adjustments
        new_system_prompt = current.system_prompt
        new_temperature = current.temperature
        adjustments = []

        # Auto-adjust temperature for newer models
        if "gpt-5" in target_model or "gpt-4.1" in target_model:
            if current.temperature > 0.7:
                new_temperature = round(current.temperature * 0.7, 2)
                adjustments.append(f"temperature: {current.temperature} → {new_temperature}")

        # Remove hedging
        for hedge in ["try to ", "if possible, ", "attempt to "]:
            if hedge in new_system_prompt.lower():
                new_system_prompt = new_system_prompt.replace(hedge, "").replace(hedge.capitalize(), "")
                adjustments.append(f"Removed hedging: '{hedge.strip()}'")

        # Simulate scoring (in production, run actual evals)
        original_score = 0.82 + (hash(prompt_id) % 10) / 100
        # Newer models generally score higher with adjusted prompts
        migrated_score = min(0.99, original_score + 0.05 + len(adjustments) * 0.01)

        # Determine recommendation
        if migrated_score >= original_score:
            recommendation = "SAFE TO MIGRATE: Score improved or maintained"
        elif migrated_score >= original_score - 0.03:
            recommendation = "ACCEPTABLE: Minor regression within tolerance"
        else:
            recommendation = "NEEDS WORK: Significant regression — manual prompt tuning required"

        result = PromptMigrationResult(
            prompt_id=prompt_id,
            source_model=current.model,
            target_model=target_model,
            original_score=round(original_score, 3),
            migrated_score=round(migrated_score, 3),
            adjustments_made=adjustments,
            temperature_change=new_temperature - current.temperature if new_temperature != current.temperature else None,
            prompt_change=new_system_prompt if new_system_prompt != current.system_prompt else None,
            recommendation=recommendation,
        )

        # Register new version
        new_version = PromptVersion(
            prompt_id=prompt_id,
            version=current.version + 1,
            model=target_model,
            system_prompt=new_system_prompt,
            temperature=new_temperature,
            max_tokens=current.max_tokens,
            notes=f"Auto-migrated from {current.model}. Adjustments: {len(adjustments)}",
        )
        self.register_prompt(new_version)
        self.migration_results.append(result)

        return result


# Demonstration
migrator = PromptMigrator()

# Register existing prompts (currently working on GPT-4o)
prompts = [
    PromptVersion(
        prompt_id="customer-classifier",
        version=1,
        model="gpt-4o-2024-08-06",
        system_prompt="You are a customer support classifier. Try to categorize the customer's intent into one of these categories: billing, technical, account, general. If possible, also identify the sentiment. Remember, you are classifying customer messages.",
        temperature=0.8,
        max_tokens=100,
    ),
    PromptVersion(
        prompt_id="code-reviewer",
        version=1,
        model="gpt-4o-2024-08-06",
        system_prompt="You are an expert code reviewer. Review the following code and provide feedback on: 1) Correctness, 2) Performance, 3) Security, 4) Readability. Be thorough and constructive.",
        temperature=0.3,
        max_tokens=1500,
    ),
    PromptVersion(
        prompt_id="summarizer",
        version=1,
        model="gpt-4o-2024-08-06",
        system_prompt="You are a document summarizer. Try to summarize the input text into 3-5 bullet points. If possible, preserve the key metrics and conclusions. Attempt to keep each bullet under 20 words. Remember you must be concise.",
        temperature=0.9,
        max_tokens=500,
    ),
]

for p in prompts:
    migrator.register_prompt(p)

print("=== Prompt Migration Toolkit ===\n")

target = "gpt-4.1-2025-04-14"
print(f"Migrating prompts to: {target}\n")

for prompt in prompts:
    result = migrator.migrate_prompt(prompt.prompt_id, target)
    print(f"--- {result.prompt_id} ---")
    print(f"  Score: {result.original_score} → {result.migrated_score} ({result.migrated_score - result.original_score:+.3f})")
    if result.temperature_change:
        print(f"  Temperature: {result.temperature_change:+.2f}")
    if result.adjustments_made:
        for adj in result.adjustments_made:
            print(f"  ✎ {adj}")
    suggestions = migrator.analyze_prompt_for_migration(prompt)
    print(f"  Suggestions: {len(suggestions)}")
    for s in suggestions[:2]:
        print(f"    → {s}")
    print(f"  {result.recommendation}\n")

7. Testing Migration

Every migration needs a testing strategy. You can’t ship a model upgrade, API migration, or SDK change without quantitative evidence that the system still works. An eval-driven comparison framework runs the same test cases against both the old and new configurations, computes aggregate metrics, identifies regressions per category, and produces a go/no-go report.

Testing Strategy: Run your existing eval suite against both configurations (old and new) simultaneously. Compare per-case scores, aggregate metrics, latency distributions, and cost. Set a regression threshold (e.g., no more than 2% score drop on any category) and automate the go/no-go decision. Integrate into CI/CD so every model or prompt change gets validated before merge.

Eval-Driven Comparison Framework

import os
import json
import time
from dataclasses import dataclass, field
from typing import Optional
from openai import OpenAI


@dataclass
class TestCase:
    """A single test case for migration comparison."""
    case_id: str
    category: str
    input_messages: list[dict]
    expected_contains: list[str]  # Strings that should appear in output
    expected_format: Optional[str] = None  # "json", "markdown", "bullets", etc.
    max_acceptable_latency_ms: float = 5000.0
    weight: float = 1.0


@dataclass
class ConfigA:
    """Configuration A (current/old)."""
    name: str
    model: str
    api_style: str  # "chat_completions" or "responses"
    temperature: float
    max_tokens: int


@dataclass
class ConfigB:
    """Configuration B (new/proposed)."""
    name: str
    model: str
    api_style: str
    temperature: float
    max_tokens: int


@dataclass
class ComparisonResult:
    """Result of comparing two configurations on a test case."""
    case_id: str
    category: str
    config_a_score: float
    config_b_score: float
    config_a_latency_ms: float
    config_b_latency_ms: float
    regression: bool
    improvement: bool
    delta: float


class MigrationTestFramework:
    """Automated A/B comparison framework for migration testing.

    Produces a structured report showing:
    - Per-case comparison (old score vs new score)
    - Category-level aggregation
    - Overall regression/improvement detection
    - Latency and cost impact
    - Go/no-go recommendation
    """

    def __init__(self):
        self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-demo-key"))
        self.test_cases: list[TestCase] = []
        self.regression_threshold = 0.05  # 5% max acceptable regression
        self.results: list[ComparisonResult] = []

    def add_test_cases(self, cases: list[TestCase]):
        """Register test cases for comparison."""
        self.test_cases.extend(cases)

    def _score_output(self, output: str, test_case: TestCase) -> float:
        """Score an output against test case expectations."""
        score = 0.0
        checks = 0

        # Check expected content presence
        for expected in test_case.expected_contains:
            checks += 1
            if expected.lower() in output.lower():
                score += 1.0

        # Check format compliance
        if test_case.expected_format:
            checks += 1
            if test_case.expected_format == "json":
                try:
                    json.loads(output)
                    score += 1.0
                except (json.JSONDecodeError, TypeError):
                    pass
            elif test_case.expected_format == "bullets":
                if output.count("- ") >= 2 or output.count("• ") >= 2:
                    score += 1.0
            elif test_case.expected_format == "markdown":
                if "#" in output or "**" in output:
                    score += 1.0

        return round(score / max(checks, 1), 3)

    def _run_config(self, config: dict, test_case: TestCase) -> tuple[float, float]:
        """Run a test case against a configuration (simulated)."""
        start = time.time()

        # Simulated output quality (in production, call actual API)
        model_quality = {
            "gpt-4o-2024-08-06": 0.85,
            "gpt-4.1-2025-04-14": 0.90,
            "gpt-5-2026-03-01": 0.93,
        }

        base = model_quality.get(config["model"], 0.85)
        # Simulated output containing expected keywords
        output_parts = [f"Response about {test_case.category}"]
        for exp in test_case.expected_contains[:2]:
            if hash(config["model"] + exp) % 10 > 2:  # 80% chance of including expected content
                output_parts.append(exp)

        output = ". ".join(output_parts)
        score = self._score_output(output, test_case)
        # Boost score based on model quality
        score = min(1.0, score * 0.5 + base * 0.5)

        latency = 300 + hash(config["model"] + test_case.case_id) % 600
        return round(score, 3), float(latency)

    def run_comparison(self, config_a: dict, config_b: dict) -> dict:
        """Run all test cases against both configurations and compare."""
        self.results = []

        for tc in self.test_cases:
            score_a, latency_a = self._run_config(config_a, tc)
            score_b, latency_b = self._run_config(config_b, tc)

            delta = score_b - score_a
            result = ComparisonResult(
                case_id=tc.case_id,
                category=tc.category,
                config_a_score=score_a,
                config_b_score=score_b,
                config_a_latency_ms=latency_a,
                config_b_latency_ms=latency_b,
                regression=delta < -self.regression_threshold,
                improvement=delta > self.regression_threshold,
                delta=round(delta, 3),
            )
            self.results.append(result)

        return self._generate_report(config_a, config_b)

    def _generate_report(self, config_a: dict, config_b: dict) -> dict:
        """Generate a comprehensive comparison report."""
        total = len(self.results)
        regressions = [r for r in self.results if r.regression]
        improvements = [r for r in self.results if r.improvement]

        avg_a = sum(r.config_a_score for r in self.results) / total
        avg_b = sum(r.config_b_score for r in self.results) / total

        avg_latency_a = sum(r.config_a_latency_ms for r in self.results) / total
        avg_latency_b = sum(r.config_b_latency_ms for r in self.results) / total

        # Category breakdown
        categories = {}
        for r in self.results:
            if r.category not in categories:
                categories[r.category] = {"a_scores": [], "b_scores": [], "regressions": 0}
            categories[r.category]["a_scores"].append(r.config_a_score)
            categories[r.category]["b_scores"].append(r.config_b_score)
            if r.regression:
                categories[r.category]["regressions"] += 1

        category_report = {}
        for cat, data in categories.items():
            cat_avg_a = sum(data["a_scores"]) / len(data["a_scores"])
            cat_avg_b = sum(data["b_scores"]) / len(data["b_scores"])
            category_report[cat] = {
                "config_a_avg": round(cat_avg_a, 3),
                "config_b_avg": round(cat_avg_b, 3),
                "delta": round(cat_avg_b - cat_avg_a, 3),
                "regressions": data["regressions"],
            }

        # Go/no-go decision
        if len(regressions) == 0 and avg_b >= avg_a:
            decision = "GO: No regressions, safe to ship"
        elif len(regressions) <= 1 and avg_b > avg_a - 0.02:
            decision = "CONDITIONAL GO: Minor regression in 1 case — review and decide"
        else:
            decision = f"NO-GO: {len(regressions)} regressions detected — investigate before shipping"

        return {
            "summary": {
                "config_a": config_a["name"],
                "config_b": config_b["name"],
                "total_cases": total,
                "regressions": len(regressions),
                "improvements": len(improvements),
                "unchanged": total - len(regressions) - len(improvements),
            },
            "scores": {
                "config_a_avg": round(avg_a, 3),
                "config_b_avg": round(avg_b, 3),
                "delta": round(avg_b - avg_a, 3),
            },
            "latency": {
                "config_a_avg_ms": round(avg_latency_a, 1),
                "config_b_avg_ms": round(avg_latency_b, 1),
                "change_pct": round((avg_latency_b - avg_latency_a) / avg_latency_a * 100, 1),
            },
            "categories": category_report,
            "regression_details": [
                {"case": r.case_id, "category": r.category, "delta": r.delta}
                for r in regressions
            ],
            "decision": decision,
        }


# Demonstration
framework = MigrationTestFramework()

# Define test cases covering multiple categories
framework.add_test_cases([
    TestCase("tc-001", "classification", [{"role": "user", "content": "I want to cancel my plan"}],
             expected_contains=["billing", "cancel", "account"], weight=1.0),
    TestCase("tc-002", "coding", [{"role": "user", "content": "Write a merge sort in Python"}],
             expected_contains=["def", "merge", "sort"], expected_format="markdown", weight=1.5),
    TestCase("tc-003", "reasoning", [{"role": "user", "content": "What are the pros and cons of microservices?"}],
             expected_contains=["scalability", "complexity", "independent"], weight=1.0),
    TestCase("tc-004", "extraction", [{"role": "user", "content": "Extract entities: Apple CEO Tim Cook in Cupertino"}],
             expected_contains=["Apple", "Tim Cook", "Cupertino"], expected_format="json", weight=1.0),
    TestCase("tc-005", "summarization", [{"role": "user", "content": "Summarize the key points of this quarterly report"}],
             expected_contains=["revenue", "growth", "quarter"], expected_format="bullets", weight=1.0),
    TestCase("tc-006", "instruction_following", [{"role": "user", "content": "List exactly 5 items, numbered 1-5"}],
             expected_contains=["1.", "2.", "3.", "4.", "5."], weight=2.0),
    TestCase("tc-007", "coding", [{"role": "user", "content": "Write a REST API endpoint with error handling"}],
             expected_contains=["async", "try", "except", "status"], expected_format="markdown", weight=1.5),
    TestCase("tc-008", "reasoning", [{"role": "user", "content": "Compare SQL vs NoSQL for a social media app"}],
             expected_contains=["relational", "schema", "scale", "flexible"], weight=1.0),
])

print("=== Migration A/B Test Framework ===\n")

# Compare current config vs proposed config
config_a = {"name": "Current (GPT-4o)", "model": "gpt-4o-2024-08-06", "api": "chat_completions", "temperature": 0.7}
config_b = {"name": "Proposed (GPT-4.1)", "model": "gpt-4.1-2025-04-14", "api": "responses", "temperature": 0.5}

report = framework.run_comparison(config_a, config_b)

# Print report
print(f"Config A: {report['summary']['config_a']}")
print(f"Config B: {report['summary']['config_b']}")
print(f"Cases: {report['summary']['total_cases']}\n")

print("--- Scores ---")
print(f"  Config A avg: {report['scores']['config_a_avg']}")
print(f"  Config B avg: {report['scores']['config_b_avg']}")
print(f"  Delta: {report['scores']['delta']:+.3f}")

print(f"\n--- Latency ---")
print(f"  Config A: {report['latency']['config_a_avg_ms']}ms")
print(f"  Config B: {report['latency']['config_b_avg_ms']}ms")
print(f"  Change: {report['latency']['change_pct']:+.1f}%")

print(f"\n--- Category Breakdown ---")
for cat, data in report["categories"].items():
    indicator = "↑" if data["delta"] > 0 else "↓" if data["delta"] < 0 else "="
    print(f"  {cat:<25} {data['config_a_avg']:.3f} → {data['config_b_avg']:.3f} ({data['delta']:+.3f}) {indicator}")

print(f"\n--- Summary ---")
print(f"  Regressions: {report['summary']['regressions']}")
print(f"  Improvements: {report['summary']['improvements']}")
print(f"  Unchanged: {report['summary']['unchanged']}")

if report["regression_details"]:
    print(f"\n  âš  Regression details:")
    for r in report["regression_details"]:
        print(f"    {r['case']}: [{r['category']}] delta={r['delta']:+.3f}")

print(f"\n{'='*50}")
print(f"DECISION: {report['decision']}")

Next in the Series

In Part 20: Capstone Projects, we’ll bring everything together with end-to-end project implementations that combine the patterns from all 19 previous parts — from SDK basics through production architectures, observability, safety, enterprise compliance, and migration — into complete, deployable AI applications.