Back to Technology

System Design Series Part 7: Message Queues & Event-Driven

January 25, 2026 Wasil Zafar 55 min read

Master asynchronous communication patterns for building resilient distributed systems. Learn message queues, event-driven architecture, Kafka, RabbitMQ, and event sourcing patterns.

Table of Contents

  1. Message Queues
  2. Event-Driven Architecture
  3. Technologies
  4. Stream Processing
  5. Next Steps

Message Queues

Series Navigation: This is Part 7 of the 15-part System Design Series. Review Part 6: API Design first.

Message queues enable asynchronous communication between services by decoupling the sender from the receiver. This improves system resilience, scalability, and allows services to operate independently.

Diagram showing producer-queue-consumer architecture with message buffering, decoupling services and enabling asynchronous processing
Message queue architecture — producers send messages to a persistent queue that decouples them from consumers, enabling asynchronous processing and resilience
Key Insight: Message queues turn synchronous failures into asynchronous retries. When a downstream service is down, messages wait safely in the queue.

Why Use Message Queues?

  • Decoupling: Producers and consumers don't need to know about each other
  • Buffering: Handle traffic spikes by queuing requests
  • Resilience: Messages persist even if consumers are temporarily unavailable
  • Scalability: Add more consumers to process messages in parallel
  • Ordering: Guarantee message processing order when needed

Basic Queue Producer/Consumer

# Producer - Sends messages to queue
import pika
import json

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Declare a durable queue (survives broker restart)
channel.queue_declare(queue='order_queue', durable=True)

def send_order(order):
    message = json.dumps(order)
    channel.basic_publish(
        exchange='',
        routing_key='order_queue',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # Make message persistent
        )
    )
    print(f"Sent order: {order['id']}")

# Send sample orders
send_order({"id": "order_123", "user": "john", "total": 99.99})
send_order({"id": "order_124", "user": "jane", "total": 149.50})
# Consumer - Processes messages from queue
import pika
import json
import time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='order_queue', durable=True)

def process_order(ch, method, properties, body):
    order = json.loads(body)
    print(f"Processing order: {order['id']}")
    
    # Simulate processing time
    time.sleep(2)
    
    # Acknowledge message after successful processing
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print(f"Completed order: {order['id']}")

# Fetch one message at a time (fair dispatch)
channel.basic_qos(prefetch_count=1)

# Start consuming
channel.basic_consume(queue='order_queue', on_message_callback=process_order)
print("Waiting for orders...")
channel.start_consuming()
RabbitMQ Producer-Consumer

Queue Patterns

Point-to-Point (Work Queue)

Each message is consumed by exactly one consumer. Multiple consumers can compete for messages to distribute workload.

# Multiple workers compete for messages
# Message goes to only ONE worker

Producer ---> [Queue] ---> Consumer 1
                      ---> Consumer 2
                      ---> Consumer 3

# Each message processed by exactly one consumer
# Great for distributing work across workers

Use cases: Order processing, background jobs, task distribution

Publish-Subscribe (Fanout)

Each message is delivered to all subscribed consumers. Publishers don't know about subscribers.

# All subscribers receive every message
                           ---> [Queue 1] ---> Email Service
Producer ---> [Exchange] ---> [Queue 2] ---> SMS Service
                           ---> [Queue 3] ---> Push Notification

# Example: User signup triggers multiple notifications
channel.exchange_declare(exchange='user_events', exchange_type='fanout')

def publish_signup(user):
    channel.basic_publish(
        exchange='user_events',
        routing_key='',  # Ignored for fanout
        body=json.dumps({'event': 'signup', 'user': user})
    )

# Each service creates its own queue bound to exchange

Use cases: Notifications, event broadcasting, logging

Topic-Based Routing

Messages are routed to queues based on routing key patterns.

# Route messages based on topic pattern
channel.exchange_declare(exchange='orders', exchange_type='topic')

# Publish with routing key
channel.basic_publish(
    exchange='orders',
    routing_key='order.created.electronics',  # topic pattern
    body=json.dumps(order)
)

# Consumer 1: All order events
channel.queue_bind(exchange='orders', queue='all_orders', routing_key='order.#')

# Consumer 2: Only electronics orders  
channel.queue_bind(exchange='orders', queue='electronics', routing_key='order.*.electronics')

# Consumer 3: Only order creation events
channel.queue_bind(exchange='orders', queue='new_orders', routing_key='order.created.*')

# Patterns: * matches one word, # matches zero or more words

Use cases: Event filtering, multi-tenant systems, log aggregation

Dead Letter Queue (DLQ)

Messages that fail processing are moved to a separate queue for investigation.

# DLQ Configuration
# Create DLQ
channel.queue_declare(queue='order_queue_dlq', durable=True)

# Create main queue with DLQ routing
channel.queue_declare(
    queue='order_queue',
    durable=True,
    arguments={
        'x-dead-letter-exchange': '',
        'x-dead-letter-routing-key': 'order_queue_dlq',
        'x-message-ttl': 86400000  # 24 hours TTL
    }
)

# Consumer with error handling
def process_order(ch, method, properties, body):
    try:
        order = json.loads(body)
        process(order)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        # Reject message -> goes to DLQ
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
        log_error(f"Order failed: {e}")

Use cases: Error investigation, retry mechanisms, audit trails

Delivery Guarantees

Guarantee Description Trade-off
At-most-once Message may be lost but never duplicated Highest performance, potential data loss
At-least-once Message delivered at least once, may duplicate No data loss, consumers must be idempotent
Exactly-once Message delivered exactly once Most complex, highest latency

Event-Driven Architecture

Event-driven architecture (EDA) is a design pattern where the flow of the program is determined by events—significant changes in state that other parts of the system may need to react to.

Event-driven architecture diagram showing event producers emitting domain events through an event bus to multiple independent consumer services
Event-driven architecture pattern — services communicate through domain events published to an event bus, enabling loose coupling and independent scaling

Event Types

  • Domain Events: Business-significant occurrences (OrderPlaced, UserRegistered)
  • Integration Events: Cross-service communication events
  • System Events: Infrastructure events (ServerStarted, DatabaseConnected)

Event-Driven Design

# Event definitions
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import uuid

@dataclass
class Event:
    event_id: str
    event_type: str
    timestamp: datetime
    correlation_id: Optional[str] = None

@dataclass
class OrderPlaced(Event):
    order_id: str
    user_id: str
    items: list
    total: float

@dataclass
class PaymentProcessed(Event):
    order_id: str
    payment_id: str
    amount: float
    status: str

@dataclass
class InventoryReserved(Event):
    order_id: str
    items: list
    warehouse_id: str

# Event publisher
class EventPublisher:
    def __init__(self, message_queue):
        self.mq = message_queue
    
    def publish(self, event):
        self.mq.publish(
            topic=event.event_type,
            message={
                'event_id': event.event_id,
                'event_type': event.event_type,
                'timestamp': event.timestamp.isoformat(),
                'data': event.__dict__
            }
        )

# Event handler
class OrderEventHandler:
    def handle(self, event):
        if isinstance(event, OrderPlaced):
            # Trigger inventory reservation
            self.inventory_service.reserve(event.order_id, event.items)
            # Trigger payment processing
            self.payment_service.process(event.order_id, event.total)

Choreography vs Orchestration

Aspect Choreography Orchestration
Coordination Services react to events independently Central orchestrator directs services
Coupling Loose (event-based) Tighter (orchestrator knows all services)
Visibility Distributed, harder to trace Centralized, easy to monitor
Complexity Logic distributed across services Logic centralized in orchestrator
Best For Simple flows, loose coupling Complex workflows, explicit control

Event Sourcing & CQRS

Event Sourcing

Store all state changes as a sequence of events. Current state is derived by replaying events.

# Event Sourcing Example - Bank Account
class BankAccount:
    def __init__(self, account_id):
        self.account_id = account_id
        self.balance = 0
        self.events = []
    
    # Commands create events
    def deposit(self, amount, timestamp):
        event = DepositEvent(
            account_id=self.account_id,
            amount=amount,
            timestamp=timestamp
        )
        self._apply(event)
        self.events.append(event)
    
    def withdraw(self, amount, timestamp):
        if amount > self.balance:
            raise InsufficientFundsError()
        
        event = WithdrawEvent(
            account_id=self.account_id,
            amount=amount,
            timestamp=timestamp
        )
        self._apply(event)
        self.events.append(event)
    
    # Apply events to update state
    def _apply(self, event):
        if isinstance(event, DepositEvent):
            self.balance += event.amount
        elif isinstance(event, WithdrawEvent):
            self.balance -= event.amount
    
    # Rebuild state from events
    @classmethod
    def rebuild_from_events(cls, account_id, events):
        account = cls(account_id)
        for event in events:
            account._apply(event)
        return account

# Event Store
class EventStore:
    def __init__(self):
        self.events = {}  # account_id -> [events]
    
    def save_events(self, aggregate_id, events, expected_version):
        # Optimistic concurrency check
        current_version = len(self.events.get(aggregate_id, []))
        if current_version != expected_version:
            raise ConcurrencyException()
        
        self.events.setdefault(aggregate_id, []).extend(events)
    
    def get_events(self, aggregate_id):
        return self.events.get(aggregate_id, [])

Benefits: Complete audit trail, temporal queries, debugging, replay

CQRS with Event Sourcing
flowchart LR
    subgraph Write ["Write Path"]
        CMD["Commands"] --> WM["Write Model"]
        WM --> ES["Event Store"]
    end

    ES --> EP["Events Published"]

    subgraph Read ["Read Path"]
        EP --> RM["Read Model
Materialized Views"]
        Q["Queries"] --> RM
    end

    style Write fill:#f0f4f8,stroke:#16476A
    style Read fill:#e8f4f4,stroke:#3B9797
                        

CQRS (Command Query Responsibility Segregation)

Separate models for reading (queries) and writing (commands) data.

# CQRS Implementation
# Write Model (Commands) - Handles business logic
class OrderCommandHandler:
    def __init__(self, event_store, event_bus):
        self.event_store = event_store
        self.event_bus = event_bus
    
    def handle_create_order(self, command):
        # Load aggregate from events
        events = self.event_store.get_events(command.order_id)
        order = Order.rebuild_from_events(events)
        
        # Execute business logic
        new_events = order.create(command.user_id, command.items)
        
        # Save new events
        self.event_store.save_events(command.order_id, new_events)
        
        # Publish events for read model update
        for event in new_events:
            self.event_bus.publish(event)

# Read Model (Queries) - Optimized for reading
class OrderReadModel:
    def __init__(self):
        self.orders = {}  # Denormalized view
    
    def handle_order_created(self, event):
        self.orders[event.order_id] = {
            'id': event.order_id,
            'user_id': event.user_id,
            'items': event.items,
            'status': 'created',
            'total': sum(item['price'] for item in event.items)
        }
    
    def handle_order_shipped(self, event):
        self.orders[event.order_id]['status'] = 'shipped'
        self.orders[event.order_id]['shipped_at'] = event.timestamp
    
    # Fast queries on denormalized data
    def get_user_orders(self, user_id):
        return [o for o in self.orders.values() if o['user_id'] == user_id]
When to Use: Event Sourcing + CQRS shine in domains with complex business logic, audit requirements, or high read/write asymmetry. Overkill for simple CRUD applications.

Transactional Messaging

A fundamental challenge in event-driven microservices: how do you atomically update a database and publish a message? If you update the DB but the message publish fails, other services never learn about the change. If the publish succeeds but the DB write fails, downstream services act on phantom data.

Transactional Outbox Pattern

Write messages to an outbox table in the same database transaction as the business data. A separate relay process reads the outbox and publishes to the message broker.

# Transactional Outbox Pattern — full implementation
import json
import uuid
import time

class OrderService:
    """Business logic that needs to publish events atomically."""
    
    def __init__(self, db):
        self.db = db  # Database connection
    
    def create_order(self, user_id, items, total):
        order_id = str(uuid.uuid4())
        
        # SINGLE database transaction — both writes or neither
        with self.db.transaction() as tx:
            # 1. Business data write
            tx.execute(
                "INSERT INTO orders (id, user_id, total, status) "
                "VALUES (%s, %s, %s, 'created')",
                (order_id, user_id, total)
            )
            for item in items:
                tx.execute(
                    "INSERT INTO order_items (order_id, product_id, qty) "
                    "VALUES (%s, %s, %s)",
                    (order_id, item["product_id"], item["qty"])
                )
            
            # 2. Outbox write (SAME transaction)
            event_payload = json.dumps({
                "order_id": order_id,
                "user_id": user_id,
                "total": total,
                "items": items
            })
            tx.execute(
                "INSERT INTO outbox "
                "(id, aggregate_type, aggregate_id, event_type, payload) "
                "VALUES (%s, 'Order', %s, 'OrderCreated', %s)",
                (str(uuid.uuid4()), order_id, event_payload)
            )
        
        return order_id

# Outbox table schema:
# CREATE TABLE outbox (
#     id UUID PRIMARY KEY,
#     aggregate_type VARCHAR(255),
#     aggregate_id VARCHAR(255),
#     event_type VARCHAR(255),
#     payload JSONB,
#     created_at TIMESTAMP DEFAULT NOW(),
#     published BOOLEAN DEFAULT FALSE
# );
print("Outbox guarantees: DB write + event are in same transaction")

Relay Strategies: Polling vs Log Tailing

The outbox table needs a relay process to publish events to the broker. Two main approaches:

Strategy How It Works Latency Complexity Tools
Polling Publisher Periodically query SELECT * FROM outbox WHERE published = FALSE, publish each event, mark as published Medium (polling interval, e.g., 500ms–5s) Low — simple SQL + broker client Custom code, Spring Integration
Transaction Log Tailing (CDC) Read the database's write-ahead log (WAL/binlog) in real time, extract outbox inserts, and publish as events Low (near real-time, sub-second) High — CDC infrastructure required Debezium, Maxwell, AWS DMS

Polling Publisher Implementation

# Polling Publisher — simple relay that polls the outbox
import time

class OutboxPollingRelay:
    """Polls outbox table and publishes events to message broker."""
    
    def __init__(self, db, broker, poll_interval=1.0, batch_size=100):
        self.db = db
        self.broker = broker
        self.poll_interval = poll_interval
        self.batch_size = batch_size
    
    def run(self):
        """Main loop — run as a background thread or separate process."""
        while True:
            published = self.poll_and_publish()
            if published == 0:
                time.sleep(self.poll_interval)
    
    def poll_and_publish(self):
        rows = self.db.query(
            "SELECT id, event_type, aggregate_type, aggregate_id, payload "
            "FROM outbox WHERE published = FALSE "
            "ORDER BY created_at ASC LIMIT %s",
            (self.batch_size,)
        )
        
        for row in rows:
            # Publish to broker (topic derived from aggregate type)
            topic = f"{row['aggregate_type']}.{row['event_type']}"
            self.broker.publish(
                topic=topic,
                key=row["aggregate_id"],
                value=row["payload"],
                headers={"outbox_id": row["id"]}
            )
            
            # Mark as published
            self.db.execute(
                "UPDATE outbox SET published = TRUE WHERE id = %s",
                (row["id"],)
            )
        
        return len(rows)

print("Polling Publisher: simple, reliable, slightly higher latency")
print("Add index: CREATE INDEX idx_outbox_unpublished ON outbox (published, created_at)")

Idempotent Consumer

With at-least-once delivery (the standard guarantee in distributed messaging), consumers must be idempotent — processing the same message multiple times should produce the same result:

Idempotent Consumer Pattern

# Idempotent Consumer — deduplication via processed message tracking
import json

class IdempotentEventHandler:
    """Base class ensuring each message is processed exactly once."""
    
    def __init__(self, db):
        self.db = db
    
    def handle(self, message):
        message_id = message["message_id"]
        
        with self.db.transaction() as tx:
            # Check if already processed (SELECT FOR UPDATE for safety)
            already = tx.query_one(
                "SELECT 1 FROM processed_messages WHERE message_id = %s "
                "FOR UPDATE",
                (message_id,)
            )
            
            if already:
                print(f"Duplicate message {message_id} — skipping")
                return  # Idempotent: no side effects
            
            # Process the message (implemented by subclass)
            self._process(message, tx)
            
            # Record that we processed this message
            tx.execute(
                "INSERT INTO processed_messages (message_id, processed_at) "
                "VALUES (%s, NOW())",
                (message_id,)
            )
    
    def _process(self, message, tx):
        raise NotImplementedError

class PaymentHandler(IdempotentEventHandler):
    def _process(self, message, tx):
        tx.execute(
            "INSERT INTO payments (order_id, amount, status) "
            "VALUES (%s, %s, 'charged')",
            (message["order_id"], message["total"])
        )

# Clean up old records with TTL
# DELETE FROM processed_messages WHERE processed_at < NOW() - INTERVAL '7 days'
print("Strategies: message_id dedup table, DB unique constraint, or idempotent operations")
Key Insight: Not all operations need a dedup table. Some operations are naturally idempotent: SET status = 'shipped' is idempotent (re-applying changes nothing), but balance = balance + amount is NOT (re-applying double-charges). Design for idempotency first; add dedup tracking when needed.

Technologies

Apache Kafka

Distributed streaming platform for high-throughput, fault-tolerant messaging.

# Kafka Producer
from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    acks='all',  # Wait for all replicas
    retries=3
)

def send_event(topic, event):
    future = producer.send(topic, value=event)
    record_metadata = future.get(timeout=10)
    print(f"Sent to {record_metadata.topic} partition {record_metadata.partition}")

send_event('orders', {'order_id': '123', 'status': 'created'})

# Kafka Consumer
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['localhost:9092'],
    group_id='order-processor',
    auto_offset_reset='earliest',
    enable_auto_commit=False,  # Manual commit for exactly-once
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    print(f"Received: {message.value}")
    process_order(message.value)
    consumer.commit()  # Commit after successful processing

Key Features: Partitions for parallelism, replication for durability, log compaction, exactly-once semantics

High Throughput Event Streaming

RabbitMQ

Traditional message broker with flexible routing and multiple protocols (AMQP, MQTT, STOMP).

Key Features: Exchange types (direct, fanout, topic, headers), message acknowledgment, dead letter queues, priority queues

Flexible Routing Multiple Protocols

AWS SQS + SNS

Managed messaging services. SQS for queues, SNS for pub/sub.

# AWS SQS
import boto3

sqs = boto3.client('sqs')

# Send message
sqs.send_message(
    QueueUrl='https://sqs.region.amazonaws.com/account/queue-name',
    MessageBody=json.dumps({'order_id': '123'}),
    MessageGroupId='order-group',  # For FIFO queues
    MessageDeduplicationId='unique-id'
)

# Receive and delete
response = sqs.receive_message(
    QueueUrl=queue_url,
    MaxNumberOfMessages=10,
    WaitTimeSeconds=20  # Long polling
)

for message in response.get('Messages', []):
    process(message['Body'])
    sqs.delete_message(
        QueueUrl=queue_url,
        ReceiptHandle=message['ReceiptHandle']
    )
Managed AWS Native

Choosing the Right Tool

Tool Best For Throughput Features
Apache Kafka Event streaming, high volume Millions/sec Log retention, replay, exactly-once
RabbitMQ Complex routing, traditional MQ Tens of thousands/sec Flexible routing, multiple protocols
AWS SQS Simple queuing, serverless Thousands/sec Managed, pay-per-use, FIFO option
Redis Streams Lightweight streaming Hundreds of thousands/sec In-memory, consumer groups
Apache Pulsar Multi-tenant, geo-replication Millions/sec Tiered storage, functions
Selection Guide:
  • High throughput streaming: Kafka or Pulsar
  • Complex routing needs: RabbitMQ
  • AWS-native, managed: SQS/SNS
  • Already using Redis: Redis Streams

Real-Time Stream Processing

Key Insight: Stream processing analyzes data in motion, enabling real-time insights and decisions as events flow through the system.

Stream processing differs from batch processing in that it processes data continuously as it arrives, rather than in scheduled batches. This is essential for real-time analytics, fraud detection, and live dashboards.

Side-by-side comparison of batch processing with bounded datasets and scheduled execution versus stream processing with continuous unbounded data flow
Stream processing vs batch processing — continuous real-time data flow with millisecond latency compared to scheduled batch jobs with minutes-to-hours delay

Stream vs Batch Processing

Aspect Batch Processing Stream Processing
Latency Minutes to hours Milliseconds to seconds
Data Model Bounded datasets Unbounded data streams
Use Cases ETL, reports, ML training Real-time analytics, fraud detection

Windowing Concepts

Windows group streaming events for aggregation:

  • Tumbling Window: Fixed-size, non-overlapping (e.g., count per minute)
  • Sliding Window: Fixed-size, overlapping (e.g., last 5 minutes, updated every minute)
  • Session Window: Dynamic size based on activity gaps

Kafka Streams

Kafka Streams is a lightweight stream processing library built directly on Apache Kafka. It processes data stored in Kafka topics and writes results back to Kafka.

Key Features

  • No external dependencies: Runs as a library within your application
  • Exactly-once semantics: Guarantees each record is processed exactly once
  • Stateful processing: Built-in state stores for aggregations and joins
  • Windowing: Time-based and session windows for temporal analysis
// Kafka Streams Example
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("input-topic");

source
    .filter((key, value) -> value.contains("important"))
    .mapValues(value -> value.toUpperCase())
    .to("output-topic");

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

Use Cases: Real-time analytics, fraud detection, event-driven microservices, and stream processing that needs to stay close to Kafka.

Kafka Streams topology diagram showing source topics flowing through filter, map, and aggregate processors to output topics with state stores
Kafka Streams processing topology — lightweight stream processing directly within applications, reading from and writing to Kafka topics

Next Steps

Event-Driven Architecture Design Generator

Design your event-driven system with message queues, event schemas, and DLQ policies. Download as Word, Excel, or PDF.

Draft auto-saved

All data stays in your browser. Nothing is sent to or stored on any server.

Technology