Project Scenario: MediaFlow Corp
MediaFlow Corp is a digital media company operating 12 content brands across news, entertainment, and education verticals. They publish 2,000+ pieces of content daily across web, mobile, email, and social channels, serving 45 million monthly active users.
Current Challenges
- Content Silos: Each brand has its own CMS, taxonomy, and publishing workflow
- No Personalization: Same homepage for all 45M users regardless of behavior
- Batch Analytics: Engagement reports available next-day, preventing real-time optimization
- Manual Curation: Editorial team manually curates homepages for each brand (12 teams × 3 shifts)
- Data Waste: 500TB of behavioral data collected but only 3% analyzed
System Requirements
The pipeline must handle the full lifecycle from content creation to personalized delivery:
- Ingestion: Unify 12 CMS systems into a single content lake with metadata enrichment
- Processing: Real-time stream processing of 50,000 user events/second
- Intelligence: ML models scoring content relevance per user in <50ms
- Delivery: Personalized content feeds with A/B testing and explainability
- Analytics: Real-time dashboards showing engagement, revenue, and model performance
Content Ingestion Layer
The content ingestion layer normalizes content from 12 heterogeneous CMS platforms into a unified content model. Each source publishes content events to Kafka; enrichment processors add NLP-derived metadata (topics, entities, sentiment, reading level).
flowchart LR
subgraph Sources["Content Sources"]
CMS1["WordPress
(5 brands)"]
CMS2["Drupal
(4 brands)"]
CMS3["Custom CMS
(3 brands)"]
UGC["User-Generated
Content"]
end
subgraph Ingestion["Ingestion Layer"]
KC["Kafka Connect
CDC Connectors"]
NORM["Content
Normalizer"]
ENR["NLP
Enrichment"]
end
subgraph Lake["Data Lake"]
RAW["Raw Zone"]
ENR2["Enriched Zone"]
FEAT["Feature Store"]
end
subgraph ML["ML Platform"]
TRAIN["Model Training
(Offline)"]
SERVE["Model Serving
(Online)"]
AB["A/B Testing
Engine"]
end
subgraph Delivery["Delivery"]
API["Content API"]
FEED["Personalized
Feeds"]
PUSH["Push
Notifications"]
end
CMS1 --> KC
CMS2 --> KC
CMS3 --> KC
UGC --> KC
KC --> NORM
NORM --> ENR
ENR --> RAW
RAW --> ENR2
ENR2 --> FEAT
FEAT --> TRAIN
TRAIN --> SERVE
SERVE --> AB
AB --> API
API --> FEED
API --> PUSH
style KC fill:#3B9797,color:#fff
style NORM fill:#3B9797,color:#fff
style ENR fill:#16476A,color:#fff
style FEAT fill:#132440,color:#fff
style SERVE fill:#BF092F,color:#fff
style AB fill:#BF092F,color:#fff
Stream Processing with Kafka + Spark
"""
Content ingestion pipeline: Kafka consumer that normalizes content
from multiple CMS sources and enriches with NLP metadata.
"""
import json
import hashlib
from datetime import datetime
# Simulated Kafka message processing
def process_content_event(raw_event):
"""
Normalize content from any CMS into unified schema.
In production, this runs as a Kafka Streams / Flink job.
"""
# Unified content schema
normalized = {
"content_id": generate_content_id(raw_event),
"title": raw_event.get("title", "").strip(),
"body": raw_event.get("body") or raw_event.get("content", ""),
"source_cms": raw_event.get("source", "unknown"),
"brand": raw_event.get("brand", ""),
"author": raw_event.get("author", {}).get("name", "Unknown"),
"published_at": normalize_timestamp(raw_event.get("published")),
"categories": normalize_taxonomy(raw_event.get("categories", [])),
"media_assets": extract_media(raw_event),
"metadata": {
"word_count": len(raw_event.get("body", "").split()),
"reading_time_min": len(raw_event.get("body", "").split()) // 200,
"language": detect_language(raw_event.get("body", "")),
"ingested_at": datetime.utcnow().isoformat()
}
}
# NLP enrichment (simulated — real implementation uses spaCy/HuggingFace)
enrichment = enrich_content(normalized["body"])
normalized["nlp"] = enrichment
return normalized
def generate_content_id(event):
"""Deterministic ID from source + original ID for deduplication."""
source = event.get("source", "unknown")
original_id = str(event.get("id", event.get("post_id", "")))
return hashlib.sha256(f"{source}:{original_id}".encode()).hexdigest()[:16]
def normalize_timestamp(ts):
"""Handle multiple timestamp formats from different CMSs."""
if ts is None:
return datetime.utcnow().isoformat()
if isinstance(ts, (int, float)):
return datetime.fromtimestamp(ts).isoformat()
return ts # Assume ISO format string
def normalize_taxonomy(categories):
"""Map CMS-specific taxonomies to unified category model."""
taxonomy_map = {
"tech": "technology", "sci": "science", "biz": "business",
"sports": "sports", "entertainment": "entertainment",
"politics": "news", "world": "news", "local": "news"
}
return [taxonomy_map.get(c.lower(), c.lower()) for c in categories]
def extract_media(event):
"""Extract and catalog all media assets."""
media = []
for asset in event.get("images", []) + event.get("media", []):
media.append({
"type": asset.get("type", "image"),
"url": asset.get("url", ""),
"alt_text": asset.get("alt", "")
})
return media
def detect_language(text):
"""Simplified language detection."""
return "en" # In production: use langdetect or fasttext
def enrich_content(body):
"""NLP enrichment: topics, entities, sentiment, readability."""
words = body.split() if body else []
return {
"topics": ["technology", "ai"], # From topic model
"entities": [], # From NER model
"sentiment_score": 0.65, # From sentiment model
"readability_grade": min(12, max(6, len(words) // 50)),
"embedding_vector_id": "vec_" + hashlib.md5(body[:100].encode()).hexdigest()[:8]
}
# === Demo execution ===
sample_event = {
"id": 42871,
"source": "wordpress",
"brand": "TechDaily",
"title": "How AI Is Reshaping Content Delivery in 2026",
"body": "Artificial intelligence continues to transform how digital media companies deliver personalized experiences to their audiences. " * 20,
"author": {"name": "Sarah Chen", "id": 105},
"published": "2026-04-28T09:30:00Z",
"categories": ["tech", "AI"],
"images": [{"url": "/img/ai-content.jpg", "type": "image", "alt": "AI content"}]
}
result = process_content_event(sample_event)
print("=== NORMALIZED CONTENT EVENT ===\n")
print(json.dumps(result, indent=2, default=str))
Data Lake Architecture
MediaFlow's data lake uses a medallion architecture with clear boundaries between raw ingestion, curated enrichment, and consumption-ready datasets. This enables both batch analytics and real-time feature serving from the same data estate.
- Bronze (Raw): Immutable event log — every content publish, user click, and system event exactly as received
- Silver (Enriched): Cleaned, deduplicated, schema-enforced data with NLP enrichments and joined references
- Gold (Consumption): Aggregated datasets optimized for specific use cases — dashboards, ML features, reporting
Stream Processing Design
Real-time user behavior flows through Kafka topics and is processed by Spark Structured Streaming to maintain user profiles and compute real-time features for the recommendation engine:
$$\text{User Score}(u, c) = \alpha \cdot \text{TopicAffinity}(u, c) + \beta \cdot \text{Recency}(c) + \gamma \cdot \text{Engagement}(u)$$
Where $\alpha + \beta + \gamma = 1$ and weights are tuned per brand via Bayesian optimization.
ML Model Serving
The ML serving infrastructure must deliver personalized content scores in under 50ms at 50,000 requests/second. We use a two-tier architecture: a feature store for precomputed user vectors and a lightweight scoring service that combines features at request time.
flowchart TB
subgraph Request["Request Path (< 50ms)"]
REQ["API Request
user_id + context"]
FS["Feature Store
(Redis)"]
SCORE["Scoring Service
(FastAPI)"]
CACHE["Result Cache
(CDN Edge)"]
end
subgraph Offline["Offline Pipeline (Hourly)"]
TRAIN["Model Training
(SageMaker)"]
REG["Model Registry
(MLflow)"]
FEAT["Feature
Computation
(Spark)"]
end
REQ --> FS
FS --> SCORE
SCORE --> CACHE
FEAT --> FS
TRAIN --> REG
REG -->|"Deploy"| SCORE
style REQ fill:#3B9797,color:#fff
style SCORE fill:#BF092F,color:#fff
style FS fill:#132440,color:#fff
style TRAIN fill:#16476A,color:#fff
FastAPI Model Endpoint
"""
FastAPI model serving endpoint for content recommendations.
Scores content items for a given user in real-time.
"""
import numpy as np
from datetime import datetime
# Simulated model serving (in production: FastAPI + Redis + ML model)
class RecommendationService:
"""Real-time content scoring service."""
def __init__(self):
# Simulated model weights (in production: loaded from MLflow registry)
self.topic_weights = np.random.rand(50) # 50-dim topic space
self.recency_decay = 0.95 # Exponential decay per hour
def get_user_features(self, user_id):
"""Fetch precomputed user features from feature store (Redis)."""
# Simulated feature vector
np.random.seed(hash(user_id) % 2**32)
return {
"topic_affinity": np.random.rand(50), # 50-dim topic preferences
"avg_session_min": np.random.uniform(2, 15),
"preferred_length": np.random.choice(["short", "medium", "long"]),
"active_hours": list(range(9, 22)), # Typical active hours
"engagement_rate": np.random.uniform(0.05, 0.35)
}
def score_content(self, user_features, content_items):
"""Score content items for relevance to user."""
scores = []
now = datetime.utcnow()
for item in content_items:
# Topic affinity score (cosine similarity)
item_topics = np.random.rand(50) # Simulated content embedding
topic_score = np.dot(user_features["topic_affinity"], item_topics)
topic_score /= (np.linalg.norm(user_features["topic_affinity"]) *
np.linalg.norm(item_topics) + 1e-8)
# Recency score (exponential decay)
hours_old = (now - item["published_at"]).total_seconds() / 3600
recency_score = self.recency_decay ** hours_old
# Engagement prediction
engagement_score = user_features["engagement_rate"] * 2.0
# Weighted combination
final_score = (0.5 * topic_score +
0.3 * recency_score +
0.2 * engagement_score)
scores.append({
"content_id": item["id"],
"title": item["title"],
"score": round(float(final_score), 4),
"breakdown": {
"topic_relevance": round(float(topic_score), 3),
"recency": round(float(recency_score), 3),
"engagement": round(float(engagement_score), 3)
}
})
# Sort by score descending
scores.sort(key=lambda x: x["score"], reverse=True)
return scores
# === Demo execution ===
service = RecommendationService()
# Simulate user request
user_id = "user_8472"
user_features = service.get_user_features(user_id)
# Simulate content candidates
content_items = [
{"id": f"c_{i}", "title": f"Article {i}: {t}",
"published_at": datetime(2026, 4, 30, 10 - i, 0)}
for i, t in enumerate([
"AI Transforms Media Delivery",
"Cloud Cost Optimization Guide",
"New JavaScript Framework Released",
"Data Lake Best Practices",
"Remote Work Productivity Tips"
])
]
# Score and rank
results = service.score_content(user_features, content_items)
print(f"=== PERSONALIZED FEED for {user_id} ===\n")
print(f"User engagement rate: {user_features['engagement_rate']:.2%}")
print(f"Preferred content length: {user_features['preferred_length']}\n")
for i, item in enumerate(results, 1):
print(f" #{i} [{item['score']:.4f}] {item['title']}")
b = item['breakdown']
print(f" Topic: {b['topic_relevance']:.3f} | "
f"Recency: {b['recency']:.3f} | "
f"Engagement: {b['engagement']:.3f}")
Personalization Engine
Feature Store & Recommendation Logic
The personalization engine maintains real-time user profiles by processing clickstream events. The feature store serves as the bridge between offline model training and online scoring — ensuring training features exactly match serving features (preventing training-serving skew).
Feature Categories
| Feature Type | Update Frequency | Examples | Storage |
|---|---|---|---|
| Static | Daily | Demographics, registration date | Feast offline |
| Batch | Hourly | Topic affinities, engagement rates | Feast online |
| Real-time | Per event | Last 5 articles read, session depth | Redis streams |
| Contextual | Per request | Time of day, device, location | Request context |
Real-Time Analytics
Key Metrics Dashboard
The analytics layer provides real-time visibility into content performance, user engagement, and model health. Metrics stream from Kafka through Flink aggregations into a time-series database (InfluxDB) powering Grafana dashboards.
- Content Velocity: Articles published/hour per brand (target: 15-25/hour aggregate)
- Personalization Lift: CTR of personalized vs. editorial picks (target: +40% lift)
- Model Latency: p99 scoring time (SLA: <50ms)
- Feature Freshness: Time since last feature store update (target: <5 min)
- Revenue/Session: Ad revenue per personalized session vs. baseline
Conclusion & Architecture Summary
This capstone designed a complete Content + Data + AI pipeline for MediaFlow Corp. The system handles the full lifecycle:
- Content Ingestion: Kafka Connect normalizes 2,000+ daily articles from 12 CMS platforms into a unified content lake with NLP enrichment
- Data Lake: Medallion architecture (Bronze/Silver/Gold) on cloud storage with Delta Lake for ACID transactions
- ML Serving: Two-tier architecture — offline feature computation via Spark, online scoring via FastAPI with <50ms latency
- Personalization: Feature store bridging offline training and online serving, eliminating training-serving skew
- Real-Time Analytics: Streaming metrics through Kafka → Flink → InfluxDB → Grafana for live operational visibility
The architecture delivers a projected +40% engagement lift through personalization and reduces editorial staffing needs by 60% through algorithmic curation — transforming MediaFlow from a manual content operation to an AI-powered media platform.