Module 18: Distributed Data Scaling
When your dataset outgrows a single machine — or when your read/write volume exceeds what one database can handle — you must partition (shard) your data across multiple nodes. Sharding is the foundational technique that enables horizontal scaling of data systems, but it introduces profound complexity around data placement, query routing, and rebalancing.
Sharding Strategies
There are four primary approaches to distributing data across shards, each with distinct trade-offs:
flowchart TD
A[Data Partitioning Strategy] --> B[Range-Based]
A --> C[Hash-Based]
A --> D[Directory-Based]
A --> E[Composite]
B --> B1[Shard by date range
or alphabetical range]
B --> B2["✅ Range queries efficient
❌ Hot spots on recent data"]
C --> C1["hash(key) mod N
or consistent hashing"]
C --> C2["✅ Even distribution
❌ Range queries impossible"]
D --> D1[Lookup table maps
key → shard]
D --> D2["✅ Flexible placement
❌ Lookup is bottleneck"]
E --> E1[Combine strategies
e.g., hash + range]
E --> E2["✅ Best of both
❌ Complex routing logic"]
1. Range-Based Sharding: Data is partitioned by contiguous ranges of the shard key (e.g., user IDs 1–1M on shard 1, 1M–2M on shard 2). Range queries are natural — "get all orders from January" hits a single shard. The downside: new data concentrates on the latest shard, creating hot spots.
2. Hash-Based Sharding: A hash function maps each key to a shard number. Distribution is uniform regardless of key patterns. However, range queries require scatter-gather across all shards — you lose data locality.
3. Directory-Based Sharding: A central lookup service maps each key (or key range) to a specific shard. Maximum flexibility — you can move individual keys between shards without changing the hash function. The lookup service becomes a bottleneck and single point of failure.
4. Composite Sharding: Combines strategies — e.g., hash by tenant ID (first level), then range by timestamp (second level). This gives even tenant distribution while preserving time-range queries within a tenant.
"""
Hash-Based Sharding — Consistent Hashing Implementation
Consistent hashing ensures that when nodes are added/removed,
only ~1/N of keys need to be remapped (vs ALL keys with mod N).
"""
import hashlib
from bisect import bisect_right
class ConsistentHashRing:
"""
A consistent hash ring with virtual nodes for even distribution.
Virtual nodes (vnodes) ensure that each physical node owns
multiple small segments of the ring, preventing hot spots
when physical nodes are added or removed.
"""
def __init__(self, nodes=None, virtual_nodes=150):
self.virtual_nodes = virtual_nodes
self.ring = {} # hash_value -> node_name
self.sorted_keys = [] # sorted hash values for binary search
if nodes:
for node in nodes:
self.add_node(node)
def _hash(self, key):
"""MD5 hash → integer for ring position"""
digest = hashlib.md5(key.encode()).hexdigest()
return int(digest, 16)
def add_node(self, node):
"""Add a node with virtual_nodes replicas on the ring"""
for i in range(self.virtual_nodes):
vnode_key = f"{node}:vnode{i}"
hash_val = self._hash(vnode_key)
self.ring[hash_val] = node
self.sorted_keys.append(hash_val)
self.sorted_keys.sort()
def remove_node(self, node):
"""Remove a node and all its virtual nodes from the ring"""
for i in range(self.virtual_nodes):
vnode_key = f"{node}:vnode{i}"
hash_val = self._hash(vnode_key)
del self.ring[hash_val]
self.sorted_keys.remove(hash_val)
def get_node(self, key):
"""Find which node owns this key (clockwise search)"""
if not self.ring:
return None
hash_val = self._hash(key)
# Find the first node clockwise from this hash
idx = bisect_right(self.sorted_keys, hash_val)
if idx == len(self.sorted_keys):
idx = 0 # Wrap around the ring
return self.ring[self.sorted_keys[idx]]
# Demo: Distribute user data across shards
ring = ConsistentHashRing(
nodes=["shard-us-east-1", "shard-us-west-2", "shard-eu-west-1"],
virtual_nodes=150
)
# Route some keys
test_keys = ["user:1001", "user:2002", "user:3003", "order:5001", "order:5002"]
for key in test_keys:
shard = ring.get_node(key)
print(f" {key} → {shard}")
# Add a new shard — only ~25% of keys migrate (1/4 with 4 nodes)
print("\n--- Adding shard-ap-south-1 ---")
ring.add_node("shard-ap-south-1")
for key in test_keys:
shard = ring.get_node(key)
print(f" {key} → {shard}")
Hot Partition Problems
Even with perfect hash distribution, real workloads are skewed. A celebrity's profile, a viral post, a flash sale item — certain keys receive orders of magnitude more traffic than others. When a hot key lands on a single shard, that shard becomes the bottleneck for the entire system.
flowchart TD
A[Monitor Per-Partition Metrics] --> B{Partition Load
> 3× Average?}
B -->|No| A
B -->|Yes| C[Identify Hot Keys]
C --> D{Single Key
or Key Range?}
D -->|Single Key| E[Apply Key-Level Mitigations]
E --> E1[Request Coalescing
Deduplicate concurrent reads]
E --> E2[Local Cache
Cache hot key in-memory]
E --> E3[Virtual Partitions
Append random suffix to key]
D -->|Key Range| F[Apply Partition-Level Mitigations]
F --> F1[Split Partition
Divide hot range in two]
F --> F2[Read Replicas
Add read-only copies]
F --> F3[Rebalance
Move partition to larger node]
E1 --> G[Verify Load Balanced]
E2 --> G
E3 --> G
F1 --> G
F2 --> G
F3 --> G
Mitigation strategies for hot partitions:
- Virtual partitions (key splitting): Append a random suffix (0–9) to hot keys, spreading one logical key across 10 physical locations. Reads must scatter-gather across all suffixes and merge results.
- Request coalescing: When 1000 requests arrive for the same key simultaneously, execute ONE read and share the result with all waiters. Cloudflare and Varnish use this pattern.
- Hot-key caching: Maintain an in-memory cache of known hot keys with very short TTLs (1–5 seconds). The cache absorbs the traffic spikes while still reflecting updates within seconds.
- Adaptive routing: Detect hot partitions in real-time and dynamically reroute a portion of reads to replica shards.
#!/bin/bash
# Hot Partition Monitor — Detect and alert on partition skew
# Monitors DynamoDB or similar partition-based systems
# Configuration
THRESHOLD_RATIO=3.0 # Alert when partition load > 3× average
CHECK_INTERVAL=30 # Check every 30 seconds
echo "=== Hot Partition Monitor ==="
echo "Threshold: ${THRESHOLD_RATIO}× average load"
echo "Checking every ${CHECK_INTERVAL}s"
echo ""
# Simulated partition metrics (replace with real CloudWatch/Prometheus queries)
declare -A partition_reads
partition_reads=(
["partition-001"]=1200
["partition-002"]=1100
["partition-003"]=15800 # ← Hot partition!
["partition-004"]=980
["partition-005"]=1050
["partition-006"]=1300
)
# Calculate average
total=0
count=0
for reads in "${partition_reads[@]}"; do
total=$((total + reads))
count=$((count + 1))
done
average=$((total / count))
echo "Average reads/partition: ${average}"
echo "Hot threshold: $((average * 3))"
echo ""
# Detect hot partitions
hot_found=0
for partition in "${!partition_reads[@]}"; do
reads=${partition_reads[$partition]}
ratio=$(echo "scale=1; $reads / $average" | bc)
if (( reads > average * 3 )); then
echo "🔴 HOT: ${partition} — ${reads} reads (${ratio}× average)"
echo " Action: Enable request coalescing + add read replica"
hot_found=1
elif (( reads > average * 2 )); then
echo "⚠️ WARM: ${partition} — ${reads} reads (${ratio}× average)"
fi
done
if [ $hot_found -eq 0 ]; then
echo "✅ No hot partitions detected — load is balanced"
fi
Rebalancing Without Downtime
As data grows or nodes are added/removed, partitions must be rebalanced — moving data between nodes to maintain even distribution. The challenge: doing this without downtime or data loss.
Safe rebalancing procedure:
- Phase 1 — Double-write: New writes go to BOTH the old shard and the new shard. Reads continue from the old shard.
- Phase 2 — Backfill: Copy historical data from old shard to new shard in background batches.
- Phase 3 — Verify: Compare checksums between old and new shards to ensure consistency.
- Phase 4 — Cutover: Switch reads to the new shard. Old shard becomes read-only.
- Phase 5 — Cleanup: After a safety period, decommission the old shard's copy of migrated data.
Replication & Partition Tolerance
Sharding distributes data for capacity. Replication copies data for availability and durability. In a distributed system, you need both — sharding without replication means each shard is a single point of failure.
- Leader-follower (primary-secondary): One node accepts writes, replicates to followers. Simple, but the leader is a bottleneck for writes.
- Multi-leader: Multiple nodes accept writes for different regions. Requires conflict resolution when the same record is written in two regions simultaneously.
- Leaderless (Dynamo-style): Any node can accept reads and writes. Uses quorum reads/writes (W + R > N) for consistency. Maximum availability but complex conflict resolution.
Module 19: High Throughput Systems
Once data is distributed, the next challenge is processing it at scale. High-throughput systems must handle millions of events per second — requiring deep understanding of parallelism, concurrency, and event streaming architectures.
Parallelism vs Concurrency
These terms are often confused but represent fundamentally different concepts:
- Concurrency: Dealing with many things at once. Multiple tasks make progress by interleaving execution on shared resources. A single-core CPU running async I/O is concurrent but NOT parallel.
- Parallelism: Doing many things at once. Multiple tasks execute simultaneously on separate resources. 8 CPU cores processing 8 requests simultaneously is parallel.
Event Streaming at Scale
Event streaming platforms (Kafka, Kinesis, Pulsar) are the backbone of high-throughput data pipelines. They decouple producers from consumers, provide durability, and enable replay — but scaling them requires understanding partitioned consumption.
Partitioned consumers: A Kafka topic is divided into partitions. Each partition is consumed by exactly ONE consumer in a consumer group. This means:
- Maximum parallelism = number of partitions
- Ordering is guaranteed WITHIN a partition, not across partitions
- Adding consumers beyond the partition count provides no benefit
- Partition count should be set high initially (it's hard to increase later without re-keying)
Exactly-Once Semantics
In distributed systems, message delivery has three guarantees:
- At-most-once: Fire and forget. Messages may be lost. Fastest but unreliable.
- At-least-once: Retry until acknowledged. Messages may be duplicated. Requires idempotent consumers.
- Exactly-once: Each message processed exactly once. Requires transactional coordination between source, stream, and sink.
Module 20: Internet-Scale Architectures
Internet-scale systems serve users globally — hundreds of millions of requests per second, sub-100ms latency worldwide, 99.99% availability. This requires geographic distribution at every layer: traffic routing, compute, and data.
Global Traffic Routing
Getting user requests to the nearest healthy datacenter requires multiple layers of routing:
- GeoDNS: DNS resolves the same domain to different IP addresses based on the client's geographic location. Simple but coarse-grained (DNS caching means slow failover).
- Anycast: Multiple servers share the same IP address. BGP routing directs packets to the nearest server. Used by all major CDNs and DNS providers (Cloudflare, Google 8.8.8.8).
- Global Load Balancers: L7 load balancers (AWS Global Accelerator, GCP Cloud Load Balancing) route based on latency, health, and capacity — not just geography.
flowchart TD
U1[Users: Americas] --> GLB[Global Load Balancer
Anycast + Health Checks]
U2[Users: Europe] --> GLB
U3[Users: Asia-Pacific] --> GLB
GLB --> R1[Region: US-East]
GLB --> R2[Region: EU-West]
GLB --> R3[Region: AP-Southeast]
R1 --> APP1[App Cluster]
R1 --> DB1[(Primary DB
+ Local Read Replicas)]
R2 --> APP2[App Cluster]
R2 --> DB2[(Primary DB
+ Local Read Replicas)]
R3 --> APP3[App Cluster]
R3 --> DB3[(Primary DB
+ Local Read Replicas)]
DB1 <-->|Async Replication
+ Conflict Resolution| DB2
DB2 <-->|Async Replication
+ Conflict Resolution| DB3
DB3 <-->|Async Replication
+ Conflict Resolution| DB1
Edge Computing
Edge computing moves computation FROM centralized datacenters TO the network edge — closer to users. This eliminates the round-trip latency to origin servers for certain workloads.
flowchart LR
subgraph Edge ["Edge Layer (300+ PoPs)"]
E1[Edge Function
Auth, Routing, A/B]
E2[Edge Cache
Static + Dynamic]
E3[Edge WAF
Security Rules]
end
subgraph Regional ["Regional Layer (10-20 Regions)"]
R1[App Servers]
R2[Regional Cache]
R3[Read Replicas]
end
subgraph Origin ["Origin Layer (2-3 Primary)"]
O1[Primary Database]
O2[Event Processing]
O3[Batch Jobs]
end
User --> E1
E1 --> E2
E2 -->|Cache Miss| R1
R1 --> R2
R2 -->|Cache Miss| O1
E3 --> E1
Edge compute platforms:
- Cloudflare Workers: V8 isolates, globally distributed, <50ms cold start. Best for request transformation, auth, routing logic.
- AWS Lambda@Edge / CloudFront Functions: Run at CloudFront PoPs. Limited runtime (5s for Lambda@Edge, 1ms for CF Functions).
- Deno Deploy / Vercel Edge Functions: Full JavaScript/TypeScript runtime at the edge. Good for API responses and server-side rendering.
# Multi-Region Kubernetes Configuration
# Deploy the same service across 3 regions with traffic splitting
apiVersion: networking.istio.io/v1beta1
kind: ServiceEntry
metadata:
name: global-payment-service
namespace: payments
spec:
hosts:
- payments.global.internal
location: MESH_EXTERNAL
ports:
- number: 443
name: https
protocol: TLS
resolution: DNS
endpoints:
# US-East (primary for Americas)
- address: payments.us-east-1.internal
locality: us-east-1/us-east-1a
weight: 40
ports:
https: 443
# EU-West (primary for Europe, GDPR-compliant)
- address: payments.eu-west-1.internal
locality: eu-west-1/eu-west-1a
weight: 30
ports:
https: 443
# AP-Southeast (primary for Asia-Pacific)
- address: payments.ap-southeast-1.internal
locality: ap-southeast-1/ap-southeast-1a
weight: 30
ports:
https: 443
---
# Locality-aware load balancing: prefer local, fallback to remote
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: payment-locality-lb
namespace: payments
spec:
host: payments.global.internal
trafficPolicy:
connectionPool:
tcp:
maxConnections: 1000
http:
h2UpgradePolicy: UPGRADE
maxRequestsPerConnection: 100
loadBalancer:
localityLbSetting:
enabled: true
failover:
- from: us-east-1
to: eu-west-1 # US fails over to EU
- from: eu-west-1
to: us-east-1 # EU fails over to US
- from: ap-southeast-1
to: us-east-1 # APAC fails over to US
outlierDetection:
consecutive5xxErrors: 3
interval: 10s
baseEjectionTime: 30s
maxEjectionPercent: 50
Multi-Region Systems
Multi-region architectures come in two primary patterns:
Active-Passive: One region handles all traffic. The passive region is a warm standby that takes over during failure. Simpler to implement — no conflict resolution needed. Downside: failover takes minutes, and the passive region's resources are idle (wasted cost).
Active-Active: All regions serve traffic simultaneously. Each region can handle any request independently. Benefits: lower latency (users hit nearest region), no wasted standby capacity, instant failover by routing traffic away. Challenge: data consistency across regions requires conflict resolution.
Data Sovereignty & Conflict Resolution
Multi-region systems must respect data sovereignty laws — regulations that require certain data to remain within geographic boundaries:
- GDPR (EU): Personal data of EU residents must be processed within the EU or in countries with adequate protections. Requires data residency controls.
- Data localization laws: Russia (Federal Law 242), China (PIPL), India (proposed DPDP) — require citizen data to be stored domestically.
- Implementation: Route user data to region-specific shards based on user nationality/residence. Use metadata tags to prevent cross-border replication of restricted data.
Conflict resolution for multi-region writes:
- Last-Writer-Wins (LWW): Timestamp-based — the most recent write wins. Simple but can lose data when clocks drift.
- Vector clocks: Track causal ordering across nodes. Detect true conflicts (concurrent writes) vs. sequential updates. Return conflicts to the application for resolution.
- CRDTs (Conflict-free Replicated Data Types): Data structures mathematically guaranteed to converge regardless of operation order. No conflicts possible — every concurrent update can be merged automatically.
"""
CRDT: G-Counter (Grow-Only Counter)
A G-Counter allows increment operations from multiple nodes
without coordination. Each node maintains its own counter.
The global value is the SUM of all node counters.
Merge rule: for each node, take the MAX of the two values.
This guarantees convergence regardless of network order.
"""
import json
from typing import Dict
class GCounter:
"""
Grow-only counter CRDT — supports increment and merge.
Properties:
- Commutative: merge(A, B) == merge(B, A)
- Associative: merge(merge(A, B), C) == merge(A, merge(B, C))
- Idempotent: merge(A, A) == A
These properties guarantee convergence in any network topology.
"""
def __init__(self, node_id: str):
self.node_id = node_id
self.counts: Dict[str, int] = {}
def increment(self, amount: int = 1):
"""Increment this node's counter"""
current = self.counts.get(self.node_id, 0)
self.counts[self.node_id] = current + amount
def value(self) -> int:
"""Global counter value = sum of all node counters"""
return sum(self.counts.values())
def merge(self, other: 'GCounter'):
"""
Merge another counter into this one.
Take the MAX for each node — guarantees no lost increments.
"""
for node_id, count in other.counts.items():
self.counts[node_id] = max(
self.counts.get(node_id, 0),
count
)
def state(self) -> str:
"""Serializable state for network transfer"""
return json.dumps(self.counts)
# Demo: Three regions independently counting page views
us_counter = GCounter("us-east-1")
eu_counter = GCounter("eu-west-1")
ap_counter = GCounter("ap-south-1")
# Each region receives traffic independently
us_counter.increment(1000) # 1000 views from US
eu_counter.increment(750) # 750 views from EU
ap_counter.increment(500) # 500 views from APAC
# Merge all counters (order doesn't matter!)
us_counter.merge(eu_counter)
us_counter.merge(ap_counter)
print(f"Total page views (merged at US): {us_counter.value()}")
# Output: 2250
# EU merges in different order — same result (commutativity)
eu_counter.merge(ap_counter)
eu_counter.merge(us_counter)
print(f"Total page views (merged at EU): {eu_counter.value()}")
# Output: 2250 — guaranteed identical!
# Idempotent: merging the same state twice has no effect
eu_counter.merge(us_counter)
print(f"After redundant merge: {eu_counter.value()}")
# Output: 2250 — still the same
Case Studies
Instagram: Sharding PostgreSQL to Billions of Rows
Instagram — Sharding PostgreSQL Without an ORM
Instagram's data (photos, likes, comments, follows) grew beyond what a single PostgreSQL instance could handle. Rather than migrating to a NoSQL database, they chose to shard PostgreSQL — keeping the SQL query model while distributing data across thousands of logical shards.
Their Sharding Strategy:
- Logical shards: Data is divided into several thousand logical shards. Each logical shard is a PostgreSQL schema (namespace). Multiple logical shards live on a single physical PostgreSQL instance.
- ID generation: Instagram needed globally unique, roughly time-ordered IDs without coordination. Their solution: each ID encodes
timestamp_ms (41 bits) + logical_shard_id (13 bits) + auto_increment (10 bits). This embeds the shard location IN the ID itself — no lookup needed. - Shard routing: Extract the shard ID from the entity ID (bits 13–23), map logical shard → physical host via a lookup table. No consistent hashing needed because the shard is encoded in the ID.
- Rebalancing: Moving a logical shard between physical hosts uses PostgreSQL's streaming replication — replicate the schema to the new host, then switch the routing table entry. Zero downtime.
Why This Works:
- No cross-shard queries needed — Instagram's data model is user-centric (all of a user's data lives on the same shard)
- PostgreSQL gives ACID transactions within a shard — no eventual consistency issues for single-user operations
- Logical shards (thousands) decouple from physical hosts (hundreds) — rebalancing is moving logical units between physical machines
Scale: This architecture has supported Instagram from 30 million users to over 2 billion, handling billions of rows across thousands of PostgreSQL shards.
Cloudflare Workers: Computing at the Edge
Cloudflare Workers — Serverless at 300+ Edge Locations
Cloudflare Workers runs JavaScript/WASM code at every one of Cloudflare's 300+ Points of Presence. Code executes within milliseconds of any user on Earth — no cold starts, no region selection, no capacity planning.
Architecture:
- V8 Isolates (not containers): Each Worker runs in a V8 isolate — the same technology that isolates browser tabs. Startup time is ~0ms (vs 100ms+ for containers). Thousands of isolates share a single process.
- Global deployment by default: When you deploy a Worker, it's immediately available at ALL edge locations. No region selection, no multi-region configuration, no replication setup.
- Durable Objects: Strongly consistent, single-threaded actors pinned to a specific edge location. Enable coordination (counters, rate limiters, WebSocket state) without a traditional database.
- KV (Key-Value) Store: Eventually consistent, globally replicated key-value storage. Reads from nearest edge (fast), writes propagate globally (takes seconds).
- R2 (Object Storage): S3-compatible object storage with no egress fees. Accessible from any edge Worker.
Use Cases at Scale:
- Discord: Uses Workers for rate limiting and bot detection at the edge — 140 million monthly users served without origin round trips for blocked requests
- Shopify: Runs storefront rendering at the edge — sub-50ms page loads globally
- Authentication: JWT validation at the edge means unauthorized requests never reach the origin
Lesson: Edge computing isn't just about caching static assets. With compute at the edge, you can run authentication, authorization, personalization, and even full API responses within 5-20ms of any user — turning 200ms origin latency into a non-issue.
Conclusion & Next Steps
Modules 18–20 covered the engineering of systems that operate at internet scale — from distributing data across shards to serving users globally.
The key takeaways:
- Sharding is a one-way door. Once you shard, cross-shard operations become expensive. Choose your shard key based on your most common access patterns — not your data model.
- Hot partitions are inevitable. No hash function eliminates skew from real workloads. Monitor per-partition metrics and have mitigation strategies (coalescing, caching, virtual partitions) ready.
- Rebalancing must be online. Zero-downtime data migration uses double-write → backfill → verify → cutover. Never stop writes during a migration.
- I/O-bound services need concurrency, not parallelism. Most web services wait on network/disk. Async architectures (Go, Node.js, Python asyncio) scale better than thread-per-request models.
- Exactly-once is "effectively-once." Always design idempotent consumers regardless of delivery guarantees. Deduplication is your safety net.
- Active-active is worth the complexity. Lower latency, no wasted standby capacity, and instant failover. But classify your data — not everything needs multi-region write capability.
- CRDTs eliminate conflicts by design. When you can model your data as a CRDT (counters, sets, registers), multi-region convergence is mathematically guaranteed without coordination.
- Edge computing changes the game. Moving compute to 300+ PoPs turns 200ms origin responses into 5ms edge responses. Authentication, routing, and personalization all belong at the edge.
Next in the Series
In Part 12: Resilience Engineering, we'll explore how to design systems that survive failure — failure domains, blast radius reduction, retries with backoff, circuit breakers, bulkheads, and timeout strategies that prevent cascading failures.