Back to Technology

System Design Series Part 7: Message Queues & Event-Driven

January 25, 2026 Wasil Zafar 40 min read

Master asynchronous communication patterns for building resilient distributed systems. Learn message queues, event-driven architecture, Kafka, RabbitMQ, and event sourcing patterns.

Table of Contents

  1. Message Queues
  2. Event-Driven Architecture
  3. Technologies
  4. Stream Processing
  5. Next Steps

Message Queues

Series Navigation: This is Part 7 of the 15-part System Design Series. Review Part 6: API Design first.

Message queues enable asynchronous communication between services by decoupling the sender from the receiver. This improves system resilience, scalability, and allows services to operate independently.

Key Insight: Message queues turn synchronous failures into asynchronous retries. When a downstream service is down, messages wait safely in the queue.

Why Use Message Queues?

  • Decoupling: Producers and consumers don't need to know about each other
  • Buffering: Handle traffic spikes by queuing requests
  • Resilience: Messages persist even if consumers are temporarily unavailable
  • Scalability: Add more consumers to process messages in parallel
  • Ordering: Guarantee message processing order when needed

Basic Queue Producer/Consumer

# Producer - Sends messages to queue
import pika
import json

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Declare a durable queue (survives broker restart)
channel.queue_declare(queue='order_queue', durable=True)

def send_order(order):
    message = json.dumps(order)
    channel.basic_publish(
        exchange='',
        routing_key='order_queue',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # Make message persistent
        )
    )
    print(f"Sent order: {order['id']}")

# Send sample orders
send_order({"id": "order_123", "user": "john", "total": 99.99})
send_order({"id": "order_124", "user": "jane", "total": 149.50})
# Consumer - Processes messages from queue
import pika
import json
import time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='order_queue', durable=True)

def process_order(ch, method, properties, body):
    order = json.loads(body)
    print(f"Processing order: {order['id']}")
    
    # Simulate processing time
    time.sleep(2)
    
    # Acknowledge message after successful processing
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print(f"Completed order: {order['id']}")

# Fetch one message at a time (fair dispatch)
channel.basic_qos(prefetch_count=1)

# Start consuming
channel.basic_consume(queue='order_queue', on_message_callback=process_order)
print("Waiting for orders...")
channel.start_consuming()
RabbitMQ Producer-Consumer

Queue Patterns

Point-to-Point (Work Queue)

Each message is consumed by exactly one consumer. Multiple consumers can compete for messages to distribute workload.

# Multiple workers compete for messages
# Message goes to only ONE worker

Producer ---> [Queue] ---> Consumer 1
                      ---> Consumer 2
                      ---> Consumer 3

# Each message processed by exactly one consumer
# Great for distributing work across workers

Use cases: Order processing, background jobs, task distribution

Publish-Subscribe (Fanout)

Each message is delivered to all subscribed consumers. Publishers don't know about subscribers.

# All subscribers receive every message
                           ---> [Queue 1] ---> Email Service
Producer ---> [Exchange] ---> [Queue 2] ---> SMS Service
                           ---> [Queue 3] ---> Push Notification

# Example: User signup triggers multiple notifications
channel.exchange_declare(exchange='user_events', exchange_type='fanout')

def publish_signup(user):
    channel.basic_publish(
        exchange='user_events',
        routing_key='',  # Ignored for fanout
        body=json.dumps({'event': 'signup', 'user': user})
    )

# Each service creates its own queue bound to exchange

Use cases: Notifications, event broadcasting, logging

Topic-Based Routing

Messages are routed to queues based on routing key patterns.

# Route messages based on topic pattern
channel.exchange_declare(exchange='orders', exchange_type='topic')

# Publish with routing key
channel.basic_publish(
    exchange='orders',
    routing_key='order.created.electronics',  # topic pattern
    body=json.dumps(order)
)

# Consumer 1: All order events
channel.queue_bind(exchange='orders', queue='all_orders', routing_key='order.#')

# Consumer 2: Only electronics orders  
channel.queue_bind(exchange='orders', queue='electronics', routing_key='order.*.electronics')

# Consumer 3: Only order creation events
channel.queue_bind(exchange='orders', queue='new_orders', routing_key='order.created.*')

# Patterns: * matches one word, # matches zero or more words

Use cases: Event filtering, multi-tenant systems, log aggregation

Dead Letter Queue (DLQ)

Messages that fail processing are moved to a separate queue for investigation.

# DLQ Configuration
# Create DLQ
channel.queue_declare(queue='order_queue_dlq', durable=True)

# Create main queue with DLQ routing
channel.queue_declare(
    queue='order_queue',
    durable=True,
    arguments={
        'x-dead-letter-exchange': '',
        'x-dead-letter-routing-key': 'order_queue_dlq',
        'x-message-ttl': 86400000  # 24 hours TTL
    }
)

# Consumer with error handling
def process_order(ch, method, properties, body):
    try:
        order = json.loads(body)
        process(order)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        # Reject message -> goes to DLQ
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
        log_error(f"Order failed: {e}")

Use cases: Error investigation, retry mechanisms, audit trails

Delivery Guarantees

Guarantee Description Trade-off
At-most-once Message may be lost but never duplicated Highest performance, potential data loss
At-least-once Message delivered at least once, may duplicate No data loss, consumers must be idempotent
Exactly-once Message delivered exactly once Most complex, highest latency

Event-Driven Architecture

Event-driven architecture (EDA) is a design pattern where the flow of the program is determined by events—significant changes in state that other parts of the system may need to react to.

Event Types

  • Domain Events: Business-significant occurrences (OrderPlaced, UserRegistered)
  • Integration Events: Cross-service communication events
  • System Events: Infrastructure events (ServerStarted, DatabaseConnected)

Event-Driven Design

# Event definitions
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import uuid

@dataclass
class Event:
    event_id: str
    event_type: str
    timestamp: datetime
    correlation_id: Optional[str] = None

@dataclass
class OrderPlaced(Event):
    order_id: str
    user_id: str
    items: list
    total: float

@dataclass
class PaymentProcessed(Event):
    order_id: str
    payment_id: str
    amount: float
    status: str

@dataclass
class InventoryReserved(Event):
    order_id: str
    items: list
    warehouse_id: str

# Event publisher
class EventPublisher:
    def __init__(self, message_queue):
        self.mq = message_queue
    
    def publish(self, event):
        self.mq.publish(
            topic=event.event_type,
            message={
                'event_id': event.event_id,
                'event_type': event.event_type,
                'timestamp': event.timestamp.isoformat(),
                'data': event.__dict__
            }
        )

# Event handler
class OrderEventHandler:
    def handle(self, event):
        if isinstance(event, OrderPlaced):
            # Trigger inventory reservation
            self.inventory_service.reserve(event.order_id, event.items)
            # Trigger payment processing
            self.payment_service.process(event.order_id, event.total)

Choreography vs Orchestration

Aspect Choreography Orchestration
Coordination Services react to events independently Central orchestrator directs services
Coupling Loose (event-based) Tighter (orchestrator knows all services)
Visibility Distributed, harder to trace Centralized, easy to monitor
Complexity Logic distributed across services Logic centralized in orchestrator
Best For Simple flows, loose coupling Complex workflows, explicit control

Event Sourcing & CQRS

Event Sourcing

Store all state changes as a sequence of events. Current state is derived by replaying events.

# Event Sourcing Example - Bank Account
class BankAccount:
    def __init__(self, account_id):
        self.account_id = account_id
        self.balance = 0
        self.events = []
    
    # Commands create events
    def deposit(self, amount, timestamp):
        event = DepositEvent(
            account_id=self.account_id,
            amount=amount,
            timestamp=timestamp
        )
        self._apply(event)
        self.events.append(event)
    
    def withdraw(self, amount, timestamp):
        if amount > self.balance:
            raise InsufficientFundsError()
        
        event = WithdrawEvent(
            account_id=self.account_id,
            amount=amount,
            timestamp=timestamp
        )
        self._apply(event)
        self.events.append(event)
    
    # Apply events to update state
    def _apply(self, event):
        if isinstance(event, DepositEvent):
            self.balance += event.amount
        elif isinstance(event, WithdrawEvent):
            self.balance -= event.amount
    
    # Rebuild state from events
    @classmethod
    def rebuild_from_events(cls, account_id, events):
        account = cls(account_id)
        for event in events:
            account._apply(event)
        return account

# Event Store
class EventStore:
    def __init__(self):
        self.events = {}  # account_id -> [events]
    
    def save_events(self, aggregate_id, events, expected_version):
        # Optimistic concurrency check
        current_version = len(self.events.get(aggregate_id, []))
        if current_version != expected_version:
            raise ConcurrencyException()
        
        self.events.setdefault(aggregate_id, []).extend(events)
    
    def get_events(self, aggregate_id):
        return self.events.get(aggregate_id, [])

Benefits: Complete audit trail, temporal queries, debugging, replay

CQRS (Command Query Responsibility Segregation)

Separate models for reading (queries) and writing (commands) data.

# CQRS Implementation
# Write Model (Commands) - Handles business logic
class OrderCommandHandler:
    def __init__(self, event_store, event_bus):
        self.event_store = event_store
        self.event_bus = event_bus
    
    def handle_create_order(self, command):
        # Load aggregate from events
        events = self.event_store.get_events(command.order_id)
        order = Order.rebuild_from_events(events)
        
        # Execute business logic
        new_events = order.create(command.user_id, command.items)
        
        # Save new events
        self.event_store.save_events(command.order_id, new_events)
        
        # Publish events for read model update
        for event in new_events:
            self.event_bus.publish(event)

# Read Model (Queries) - Optimized for reading
class OrderReadModel:
    def __init__(self):
        self.orders = {}  # Denormalized view
    
    def handle_order_created(self, event):
        self.orders[event.order_id] = {
            'id': event.order_id,
            'user_id': event.user_id,
            'items': event.items,
            'status': 'created',
            'total': sum(item['price'] for item in event.items)
        }
    
    def handle_order_shipped(self, event):
        self.orders[event.order_id]['status'] = 'shipped'
        self.orders[event.order_id]['shipped_at'] = event.timestamp
    
    # Fast queries on denormalized data
    def get_user_orders(self, user_id):
        return [o for o in self.orders.values() if o['user_id'] == user_id]
When to Use: Event Sourcing + CQRS shine in domains with complex business logic, audit requirements, or high read/write asymmetry. Overkill for simple CRUD applications.

Technologies

Apache Kafka

Distributed streaming platform for high-throughput, fault-tolerant messaging.

# Kafka Producer
from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    acks='all',  # Wait for all replicas
    retries=3
)

def send_event(topic, event):
    future = producer.send(topic, value=event)
    record_metadata = future.get(timeout=10)
    print(f"Sent to {record_metadata.topic} partition {record_metadata.partition}")

send_event('orders', {'order_id': '123', 'status': 'created'})

# Kafka Consumer
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['localhost:9092'],
    group_id='order-processor',
    auto_offset_reset='earliest',
    enable_auto_commit=False,  # Manual commit for exactly-once
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    print(f"Received: {message.value}")
    process_order(message.value)
    consumer.commit()  # Commit after successful processing

Key Features: Partitions for parallelism, replication for durability, log compaction, exactly-once semantics

High Throughput Event Streaming

RabbitMQ

Traditional message broker with flexible routing and multiple protocols (AMQP, MQTT, STOMP).

Key Features: Exchange types (direct, fanout, topic, headers), message acknowledgment, dead letter queues, priority queues

Flexible Routing Multiple Protocols

AWS SQS + SNS

Managed messaging services. SQS for queues, SNS for pub/sub.

# AWS SQS
import boto3

sqs = boto3.client('sqs')

# Send message
sqs.send_message(
    QueueUrl='https://sqs.region.amazonaws.com/account/queue-name',
    MessageBody=json.dumps({'order_id': '123'}),
    MessageGroupId='order-group',  # For FIFO queues
    MessageDeduplicationId='unique-id'
)

# Receive and delete
response = sqs.receive_message(
    QueueUrl=queue_url,
    MaxNumberOfMessages=10,
    WaitTimeSeconds=20  # Long polling
)

for message in response.get('Messages', []):
    process(message['Body'])
    sqs.delete_message(
        QueueUrl=queue_url,
        ReceiptHandle=message['ReceiptHandle']
    )
Managed AWS Native

Choosing the Right Tool

Tool Best For Throughput Features
Apache Kafka Event streaming, high volume Millions/sec Log retention, replay, exactly-once
RabbitMQ Complex routing, traditional MQ Tens of thousands/sec Flexible routing, multiple protocols
AWS SQS Simple queuing, serverless Thousands/sec Managed, pay-per-use, FIFO option
Redis Streams Lightweight streaming Hundreds of thousands/sec In-memory, consumer groups
Apache Pulsar Multi-tenant, geo-replication Millions/sec Tiered storage, functions
Selection Guide:
  • High throughput streaming: Kafka or Pulsar
  • Complex routing needs: RabbitMQ
  • AWS-native, managed: SQS/SNS
  • Already using Redis: Redis Streams

Real-Time Stream Processing

Key Insight: Stream processing analyzes data in motion, enabling real-time insights and decisions as events flow through the system.

Stream processing differs from batch processing in that it processes data continuously as it arrives, rather than in scheduled batches. This is essential for real-time analytics, fraud detection, and live dashboards.

Stream vs Batch Processing

Aspect Batch Processing Stream Processing
Latency Minutes to hours Milliseconds to seconds
Data Model Bounded datasets Unbounded data streams
Use Cases ETL, reports, ML training Real-time analytics, fraud detection

Windowing Concepts

Windows group streaming events for aggregation:

  • Tumbling Window: Fixed-size, non-overlapping (e.g., count per minute)
  • Sliding Window: Fixed-size, overlapping (e.g., last 5 minutes, updated every minute)
  • Session Window: Dynamic size based on activity gaps

Kafka Streams

Kafka Streams is a lightweight stream processing library built directly on Apache Kafka. It processes data stored in Kafka topics and writes results back to Kafka.

Key Features

  • No external dependencies: Runs as a library within your application
  • Exactly-once semantics: Guarantees each record is processed exactly once
  • Stateful processing: Built-in state stores for aggregations and joins
  • Windowing: Time-based and session windows for temporal analysis
// Kafka Streams Example
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("input-topic");

source
    .filter((key, value) -> value.contains("important"))
    .mapValues(value -> value.toUpperCase())
    .to("output-topic");

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

Use Cases: Real-time analytics, fraud detection, event-driven microservices, and stream processing that needs to stay close to Kafka.

Next Steps

Event-Driven Architecture Design Generator

Design your event-driven system with message queues, event schemas, and DLQ policies. Download as Word, Excel, or PDF.

Draft auto-saved

All data stays in your browser. Nothing is sent to or stored on any server.

Technology