Back to Technology

System Design Series Part 9: Rate Limiting & Security

January 25, 2026 Wasil Zafar 35 min read

Master rate limiting algorithms and security patterns for protecting your APIs and systems. Learn token bucket, leaky bucket, sliding window, and authentication best practices.

Table of Contents

  1. Rate Limiting
  2. Security Patterns
  3. DDoS Protection
  4. Next Steps

Rate Limiting

Series Navigation: This is Part 9 of the 15-part System Design Series. Review Part 8: CAP Theorem first.

Rate limiting controls the number of requests a client can make to your API within a given time window. It protects your services from abuse, ensures fair usage, and maintains system stability under load.

Key Insight: Rate limiting isn't just about security—it's about fairness. Without limits, one misbehaving client can degrade service for everyone.

Why Rate Limit?

  • Prevent abuse: Stop malicious actors from overwhelming your system
  • Ensure fairness: Give all users equal access to resources
  • Control costs: Limit expensive operations and API calls
  • Protect downstream: Prevent cascading failures to dependent services
  • Maintain SLAs: Guarantee performance for paying customers

Rate Limiting Algorithms

Token Bucket

Tokens are added to a bucket at a fixed rate. Requests consume tokens. If bucket is empty, requests are rejected.

# Token Bucket Algorithm
import time
import threading

class TokenBucket:
    def __init__(self, capacity, fill_rate):
        """
        capacity: Maximum tokens in bucket
        fill_rate: Tokens added per second
        """
        self.capacity = capacity
        self.fill_rate = fill_rate
        self.tokens = capacity
        self.last_time = time.time()
        self.lock = threading.Lock()
    
    def _refill(self):
        """Add tokens based on elapsed time"""
        now = time.time()
        elapsed = now - self.last_time
        tokens_to_add = elapsed * self.fill_rate
        self.tokens = min(self.capacity, self.tokens + tokens_to_add)
        self.last_time = now
    
    def consume(self, tokens=1):
        """Try to consume tokens. Returns True if successful."""
        with self.lock:
            self._refill()
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False

# Usage
bucket = TokenBucket(capacity=100, fill_rate=10)  # 100 max, 10/sec refill

for i in range(120):
    if bucket.consume():
        print(f"Request {i}: Allowed")
    else:
        print(f"Request {i}: Rate limited")

Pros: Allows bursts up to capacity, smooth rate limiting

Cons: Memory per user, complexity for distributed systems

AWS API Gateway Stripe

Leaky Bucket

Requests enter a queue and are processed at a constant rate. Overflow is rejected.

# Leaky Bucket Algorithm
import time
import threading
from collections import deque

class LeakyBucket:
    def __init__(self, capacity, leak_rate):
        """
        capacity: Maximum queue size
        leak_rate: Requests processed per second
        """
        self.capacity = capacity
        self.leak_rate = leak_rate
        self.queue = deque()
        self.lock = threading.Lock()
        
        # Start leaking thread
        self.running = True
        self.leak_thread = threading.Thread(target=self._leak)
        self.leak_thread.start()
    
    def _leak(self):
        """Process requests at constant rate"""
        while self.running:
            time.sleep(1 / self.leak_rate)
            with self.lock:
                if self.queue:
                    request = self.queue.popleft()
                    self._process(request)
    
    def _process(self, request):
        print(f"Processing: {request}")
    
    def add(self, request):
        """Add request to queue if space available"""
        with self.lock:
            if len(self.queue) < self.capacity:
                self.queue.append(request)
                return True
            return False  # Queue full, reject

# Usage
bucket = LeakyBucket(capacity=10, leak_rate=5)  # 10 queue, 5/sec process

Pros: Smooths out bursts, predictable output rate

Cons: Can introduce latency, no burst allowance

Fixed Window Counter

Count requests in fixed time windows. Reset counter when window expires.

# Fixed Window Counter
import time
import redis

class FixedWindowCounter:
    def __init__(self, redis_client, limit, window_seconds):
        self.redis = redis_client
        self.limit = limit
        self.window = window_seconds
    
    def is_allowed(self, client_id):
        """Check if request is allowed"""
        # Window key based on current time
        window_key = f"rate:{client_id}:{int(time.time() // self.window)}"
        
        # Increment and get count
        count = self.redis.incr(window_key)
        
        # Set expiration on first request
        if count == 1:
            self.redis.expire(window_key, self.window)
        
        return count <= self.limit

# Usage
redis_client = redis.Redis()
limiter = FixedWindowCounter(redis_client, limit=100, window_seconds=60)

# Each user gets 100 requests per minute
if limiter.is_allowed("user_123"):
    process_request()
else:
    return_429_error()

Pros: Simple, low memory

Cons: Burst at window boundaries (2x limit possible)

Sliding Window Log

Track timestamps of all requests. Count requests in sliding window.

# Sliding Window Log
import time
import redis

class SlidingWindowLog:
    def __init__(self, redis_client, limit, window_seconds):
        self.redis = redis_client
        self.limit = limit
        self.window = window_seconds
    
    def is_allowed(self, client_id):
        """Check if request is allowed"""
        now = time.time()
        window_start = now - self.window
        key = f"rate:{client_id}"
        
        # Use Redis sorted set (timestamp as score)
        pipe = self.redis.pipeline()
        
        # Remove old entries
        pipe.zremrangebyscore(key, 0, window_start)
        
        # Count current entries
        pipe.zcard(key)
        
        # Add current request
        pipe.zadd(key, {str(now): now})
        
        # Set expiration
        pipe.expire(key, self.window)
        
        results = pipe.execute()
        count = results[1]
        
        return count < self.limit

# Usage - More accurate but more memory
limiter = SlidingWindowLog(redis_client, limit=100, window_seconds=60)

Pros: Most accurate, no boundary issues

Cons: Higher memory (stores all timestamps)

Sliding Window Counter

Hybrid approach: weighted combination of current and previous window counters.

# Sliding Window Counter (Best of both worlds)
import time
import redis

class SlidingWindowCounter:
    def __init__(self, redis_client, limit, window_seconds):
        self.redis = redis_client
        self.limit = limit
        self.window = window_seconds
    
    def is_allowed(self, client_id):
        """Weighted count from current and previous window"""
        now = time.time()
        current_window = int(now // self.window)
        previous_window = current_window - 1
        
        # Position within current window (0 to 1)
        position = (now % self.window) / self.window
        
        # Get counts from both windows
        current_key = f"rate:{client_id}:{current_window}"
        previous_key = f"rate:{client_id}:{previous_window}"
        
        current_count = int(self.redis.get(current_key) or 0)
        previous_count = int(self.redis.get(previous_key) or 0)
        
        # Weighted count
        # More weight to previous window at start of current window
        weighted_count = previous_count * (1 - position) + current_count
        
        if weighted_count < self.limit:
            # Increment current window
            self.redis.incr(current_key)
            self.redis.expire(current_key, self.window * 2)
            return True
        return False

# Example: At 30 seconds into 60-second window
# weighted = previous * 0.5 + current * 1.0
# Smooths the boundary between windows

Pros: Accurate, low memory, smooth boundaries

Cons: Approximate (not exact count)

Algorithm Comparison

Algorithm Burst Memory Accuracy Use Case
Token Bucket ? Allows Low High API rate limiting
Leaky Bucket ? No burst Low High Traffic shaping
Fixed Window ?? At edges Very Low Medium Simple limits
Sliding Log ? Controlled High Very High Precise limits
Sliding Counter ? Controlled Low High Production APIs

Security Patterns

Authentication vs Authorization

  • Authentication: Who are you? (Identity verification)
  • Authorization: What can you do? (Permission check)

JWT Authentication

# JWT (JSON Web Token) Authentication
import jwt
from datetime import datetime, timedelta
from functools import wraps
from flask import request, jsonify

SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"

def create_token(user_id, roles):
    """Generate JWT token"""
    payload = {
        "sub": user_id,
        "roles": roles,
        "iat": datetime.utcnow(),
        "exp": datetime.utcnow() + timedelta(hours=1)
    }
    return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)

def verify_token(token):
    """Verify and decode JWT token"""
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload
    except jwt.ExpiredSignatureError:
        raise AuthError("Token expired")
    except jwt.InvalidTokenError:
        raise AuthError("Invalid token")

def require_auth(f):
    """Decorator to require authentication"""
    @wraps(f)
    def decorated(*args, **kwargs):
        auth_header = request.headers.get("Authorization")
        if not auth_header or not auth_header.startswith("Bearer "):
            return jsonify({"error": "Missing token"}), 401
        
        token = auth_header.split(" ")[1]
        payload = verify_token(token)
        request.user = payload
        return f(*args, **kwargs)
    return decorated

@app.route("/api/orders")
@require_auth
def get_orders():
    user_id = request.user["sub"]
    return get_user_orders(user_id)

API Key Authentication

# API Key Authentication with Rate Limiting
import hashlib
import redis
from functools import wraps

class APIKeyAuth:
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def verify_key(self, api_key):
        """Verify API key and return associated metadata"""
        # Hash key for storage lookup
        key_hash = hashlib.sha256(api_key.encode()).hexdigest()
        
        # Get key metadata
        key_data = self.redis.hgetall(f"apikey:{key_hash}")
        if not key_data:
            return None
        
        return {
            "client_id": key_data.get("client_id"),
            "tier": key_data.get("tier"),  # free, pro, enterprise
            "rate_limit": int(key_data.get("rate_limit", 100))
        }
    
    def create_key(self, client_id, tier="free"):
        """Create new API key"""
        import secrets
        api_key = f"sk_{secrets.token_hex(32)}"
        key_hash = hashlib.sha256(api_key.encode()).hexdigest()
        
        rate_limits = {"free": 100, "pro": 1000, "enterprise": 10000}
        
        self.redis.hset(f"apikey:{key_hash}", mapping={
            "client_id": client_id,
            "tier": tier,
            "rate_limit": rate_limits[tier]
        })
        
        return api_key  # Return only once, store securely

def require_api_key(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        api_key = request.headers.get("X-API-Key")
        if not api_key:
            return jsonify({"error": "API key required"}), 401
        
        key_data = auth.verify_key(api_key)
        if not key_data:
            return jsonify({"error": "Invalid API key"}), 401
        
        request.client = key_data
        return f(*args, **kwargs)
    return decorated

Authorization

Role-Based Access Control (RBAC)

# RBAC Implementation
from functools import wraps

# Define permissions per role
ROLE_PERMISSIONS = {
    "admin": ["read", "write", "delete", "manage_users"],
    "editor": ["read", "write"],
    "viewer": ["read"]
}

class RBAC:
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def get_user_roles(self, user_id):
        """Get user's assigned roles"""
        return self.redis.smembers(f"user:{user_id}:roles")
    
    def has_permission(self, user_id, permission):
        """Check if user has specific permission"""
        roles = self.get_user_roles(user_id)
        for role in roles:
            if permission in ROLE_PERMISSIONS.get(role, []):
                return True
        return False
    
    def assign_role(self, user_id, role):
        """Assign role to user"""
        self.redis.sadd(f"user:{user_id}:roles", role)

def require_permission(permission):
    """Decorator to require specific permission"""
    def decorator(f):
        @wraps(f)
        def decorated(*args, **kwargs):
            user_id = request.user["sub"]
            if not rbac.has_permission(user_id, permission):
                return jsonify({"error": "Permission denied"}), 403
            return f(*args, **kwargs)
        return decorated
    return decorator

@app.route("/api/users", methods=["DELETE"])
@require_auth
@require_permission("manage_users")
def delete_user():
    # Only admins can reach here
    pass

Attribute-Based Access Control (ABAC)

# ABAC - More flexible than RBAC
# Decisions based on attributes of user, resource, action, and context

class ABAC:
    def __init__(self):
        self.policies = []
    
    def add_policy(self, policy):
        self.policies.append(policy)
    
    def is_allowed(self, user, resource, action, context=None):
        """Evaluate all policies"""
        for policy in self.policies:
            if policy.evaluate(user, resource, action, context):
                return True
        return False

class Policy:
    def __init__(self, name, conditions, effect="allow"):
        self.name = name
        self.conditions = conditions
        self.effect = effect
    
    def evaluate(self, user, resource, action, context):
        """Check if all conditions are met"""
        for condition in self.conditions:
            if not condition(user, resource, action, context):
                return False
        return self.effect == "allow"

# Example policies
abac = ABAC()

# Policy: Users can only access their own orders
abac.add_policy(Policy(
    name="own_orders",
    conditions=[
        lambda u, r, a, c: r["type"] == "order",
        lambda u, r, a, c: r["owner_id"] == u["id"]
    ]
))

# Policy: Admins can access all orders
abac.add_policy(Policy(
    name="admin_all_orders",
    conditions=[
        lambda u, r, a, c: "admin" in u["roles"],
        lambda u, r, a, c: r["type"] == "order"
    ]
))

# Policy: Working hours only for non-admins
abac.add_policy(Policy(
    name="business_hours",
    conditions=[
        lambda u, r, a, c: "admin" not in u["roles"],
        lambda u, r, a, c: 9 <= c["hour"] <= 17
    ]
))

DDoS Protection

Attack Types

  • Volumetric: Flood with traffic (UDP flood, DNS amplification)
  • Protocol: Exploit protocol weaknesses (SYN flood, Ping of Death)
  • Application: Target application layer (HTTP flood, Slowloris)

Defense Layers

# Multi-layer DDoS defense architecture

# Layer 1: CDN/Edge (Cloudflare, AWS Shield)
# - Absorb volumetric attacks
# - Geographic distribution
# - Anycast routing

# Layer 2: Load Balancer
# - Connection limits per IP
# - SYN cookies for SYN flood
# - Rate limiting

# Layer 3: Application
class ApplicationDDoSProtection:
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def check_request(self, ip, user_agent, request_path):
        """Multi-signal threat detection"""
        score = 0
        
        # Check request rate
        rate = self.get_request_rate(ip)
        if rate > 100:  # requests per second
            score += 30
        
        # Check for suspicious patterns
        if self.is_known_bad_agent(user_agent):
            score += 50
        
        # Check geographic velocity
        if self.impossible_travel(ip):
            score += 40
        
        # Check request patterns
        if self.is_automated(ip):
            score += 20
        
        # Decision
        if score >= 100:
            self.block_ip(ip)
            return "block"
        elif score >= 50:
            return "challenge"  # CAPTCHA
        else:
            return "allow"
    
    def is_automated(self, ip):
        """Detect bot-like behavior"""
        # Check for uniform request timing
        timestamps = self.redis.lrange(f"req_times:{ip}", 0, 100)
        if len(timestamps) < 10:
            return False
        
        intervals = []
        for i in range(1, len(timestamps)):
            intervals.append(float(timestamps[i]) - float(timestamps[i-1]))
        
        # Bots often have very uniform timing
        variance = sum((i - sum(intervals)/len(intervals))**2 
                       for i in intervals) / len(intervals)
        return variance < 0.01  # Suspiciously uniform

DDoS Mitigation Strategies

Strategy Layer Effectiveness
CDN/Anycast Network High for volumetric
Rate Limiting Application Medium
CAPTCHA Application High for bots
Web Application Firewall Application High for known patterns
Auto-scaling Infrastructure Medium (costly)

Best Practices

Security Best Practices:
  • Defense in depth: Multiple layers of security
  • Least privilege: Grant minimum necessary permissions
  • Fail securely: Default to deny on errors
  • Audit everything: Log all security-relevant events
  • Rotate secrets: Regular key/token rotation

Security Headers

# Essential Security Headers (Flask example)
from flask import Flask
from flask_talisman import Talisman

app = Flask(__name__)
Talisman(app, 
    force_https=True,
    strict_transport_security=True,
    content_security_policy={
        'default-src': "'self'",
        'script-src': ["'self'", 'cdn.example.com'],
        'style-src': ["'self'", "'unsafe-inline'"]
    }
)

# Manual header setting
@app.after_request
def set_security_headers(response):
    # Prevent clickjacking
    response.headers['X-Frame-Options'] = 'DENY'
    
    # XSS protection
    response.headers['X-XSS-Protection'] = '1; mode=block'
    
    # Prevent MIME sniffing
    response.headers['X-Content-Type-Options'] = 'nosniff'
    
    # Referrer policy
    response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
    
    return response

Input Validation & SQL Injection Prevention

# NEVER do this (SQL Injection vulnerable)
query = f"SELECT * FROM users WHERE id = {user_id}"

# ALWAYS use parameterized queries
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))

# For ORMs, use built-in protection
user = User.query.filter_by(id=user_id).first()

# Input validation
from pydantic import BaseModel, validator
import re

class UserInput(BaseModel):
    username: str
    email: str
    
    @validator('username')
    def validate_username(cls, v):
        if not re.match(r'^[a-zA-Z0-9_]{3,20}$', v):
            raise ValueError('Invalid username format')
        return v
    
    @validator('email')
    def validate_email(cls, v):
        if not re.match(r'^[\w\.-]+@[\w\.-]+\.\w+$', v):
            raise ValueError('Invalid email format')
        return v

Next Steps

Rate Limiting & API Security Policy Generator

Define your rate limiting tiers, throttling algorithms, and API security policies. Download as Word, Excel, or PDF.

Draft auto-saved

All data stays in your browser. Nothing is sent to or stored on any server.

Technology