Pub/Sub Patterns
Deep dive into publish/subscribe patterns, fan-out/fan-in, dead letter queues, and backpressure handling.
In a distributed system, services need to communicate. The simplest approach — synchronous HTTP calls — works for many cases but introduces tight coupling, cascading failures, and scalability bottlenecks. Message queues and event streaming provide an alternative: asynchronous communication that decouples producers from consumers, absorbs load spikes, and enables more resilient architectures.
The caller sends a request and waits for a response before continuing.
Synchronous: Client ──▶ Service A ──▶ Service B ──▶ Service C │ Client ◀── Service A ◀── Service B ◀────────┘ (waits at every step)
Total latency = latency(A) + latency(B) + latency(C)
If Service C is slow or down: Client ──▶ Service A ──▶ Service B ──▶ Service C (TIMEOUT) Client ◀── ERROR ◀── ERROR ◀── ERROR (cascading failure)The caller sends a message and continues immediately. The message is processed later by a consumer.
Asynchronous: Client ──▶ Service A ──▶ Message Queue ──▶ Service B │ │ Client ◀── ACK │ Message Queue ──▶ Service C (immediate) │ │ Service B and C process │ messages independently, │ at their own pace.| Aspect | Synchronous | Asynchronous |
|---|---|---|
| Coupling | Tight — caller knows about callee | Loose — producer does not know consumers |
| Latency | Sum of all service latencies | Producer returns immediately |
| Failure handling | Cascading failures | Queue buffers messages during failures |
| Scalability | Limited by slowest service | Consumers scale independently |
| Complexity | Simple to reason about | More complex (eventual consistency, ordering) |
| Debugging | Straightforward stack traces | Harder to trace across async boundaries |
| Use case | Real-time queries, user-facing APIs | Background processing, notifications, ETL |
Without queue (tight coupling): Order Service ──directly calls──▶ Email Service Order Service ──directly calls──▶ Inventory Service Order Service ──directly calls──▶ Analytics Service
Adding a new consumer requires changing Order Service. If Email Service is down, Order Service fails or must handle it.
With queue (loose coupling): Order Service ──▶ [Message Queue] ──▶ Email Service ──▶ Inventory Service ──▶ Analytics Service
Adding a new consumer requires NO changes to Order Service. If Email Service is down, messages wait in the queue.Without queue: Traffic spike: ████████████████████████ (100 req/s) Server capacity: ██████████ (50 req/s) Result: Requests dropped or server crashes
With queue: Traffic spike: ████████████████████████ (100 req/s) │ ┌──────▼──────┐ │ Message │ Buffer absorbs the spike │ Queue │ └──────┬──────┘ │ Server processing: ██████████ (50 req/s, steady rate) Result: All messages processed, just with some delayIf a consumer crashes, messages remain in the queue and are processed when the consumer recovers. No data is lost.
A single message can be delivered to multiple consumers, enabling event-driven architectures.
Each message is consumed by exactly one consumer. Multiple consumers can listen to the same queue, but each message goes to only one of them (competing consumers pattern).
Producer ──▶ [Queue] ──▶ Consumer A ──▶ Consumer B (only one gets each message) ──▶ Consumer C
Message 1 → Consumer AMessage 2 → Consumer BMessage 3 → Consumer CMessage 4 → Consumer A (round-robin or other distribution)Use cases: Task processing, job queues, order processing
Each message is delivered to all subscribers. Every consumer that subscribes to the topic receives a copy of every message.
Publisher ──▶ [Topic] ──▶ Subscriber A (gets ALL messages) ──▶ Subscriber B (gets ALL messages) ──▶ Subscriber C (gets ALL messages)
Message 1 → A, B, CMessage 2 → A, B, CMessage 3 → A, B, CUse cases: Event notifications, real-time updates, event-driven architecture
| Feature | Point-to-Point (Queue) | Pub/Sub (Topic) |
|---|---|---|
| Delivery | One consumer per message | All subscribers get every message |
| Scaling | Add consumers to process faster | Each subscriber processes independently |
| Message lifecycle | Removed after consumption | Retained for all subscribers |
| Use case | Work distribution | Event broadcasting |
One of the most important aspects of message systems is the delivery guarantee: how many times will a message be delivered?
The message is delivered zero or one times. It may be lost but will never be duplicated.
Producer ──▶ Broker: "Here is message M"Broker ──▶ ACK (message stored)
Broker ──▶ Consumer: "Here is message M"Consumer processes MConsumer crashes BEFORE acknowledgingBroker considers M delivered (it was sent)
Result: M is lost. Not retried. At-most-once.Implementation: Fire-and-forget. No acknowledgments from consumers.
Use cases: Metrics collection, logging (where occasional loss is acceptable)
The message is delivered one or more times. It will never be lost but may be duplicated.
Producer ──▶ Broker: "Here is message M"Broker ──▶ ACK (message stored)
Broker ──▶ Consumer: "Here is message M"Consumer processes MConsumer crashes BEFORE acknowledgingBroker: "No ACK received. Redeliver."Broker ──▶ Consumer: "Here is message M" (again!)Consumer processes M AGAIN
Result: M is processed twice. At-least-once.Implementation: Consumer must acknowledge after processing. Broker redelivers unacknowledged messages.
Use cases: Most business-critical operations (with idempotent consumers)
The message is delivered exactly one time. No loss, no duplication. This is the holy grail of messaging but is extremely difficult to achieve in practice.
True exactly-once is impossible in the general case(network failures can always cause ambiguity).
In practice, "exactly-once" means: At-least-once delivery + Idempotent processing = Effectively exactly-once semantics
Kafka achieves this with: - Idempotent producers (deduplication on the broker) - Transactional producers (atomic writes to multiple partitions) - Consumer offset management within the transaction| Guarantee | Lost Messages | Duplicate Messages | Complexity | Performance |
|---|---|---|---|---|
| At-most-once | Possible | Never | Lowest | Highest |
| At-least-once | Never | Possible | Medium | Medium |
| Exactly-once | Never | Never | Highest | Lowest |
┌────────────────────────────────────────────────────────────┐│ Message Broker Landscape │├────────────────┬───────────┬──────────┬────────────────────┤│ Traditional │ Streaming │ Cloud │ Lightweight ││ Brokers │ Platforms │ Managed │ Brokers │├────────────────┼───────────┼──────────┼────────────────────┤│ RabbitMQ │ Kafka │ AWS SQS │ Redis Streams ││ ActiveMQ │ Pulsar │ AWS SNS │ ZeroMQ ││ IBM MQ │ Redpanda │ AWS │ NATS ││ │ NATS │ Kinesis │ ││ │ JetStream │ Azure │ ││ │ │ Service │ ││ │ │ Bus │ ││ │ │ GCP │ ││ │ │ Pub/Sub │ │└────────────────┴───────────┴──────────┴────────────────────┘| Technology | Best For | Not Ideal For |
|---|---|---|
| RabbitMQ | Complex routing, task queues, request-reply | Very high throughput stream processing |
| Kafka | High-throughput event streaming, log aggregation | Simple task queues, request-reply |
| AWS SQS | Simple queuing on AWS, serverless architectures | Complex routing, ordering across partitions |
| Redis Streams | Lightweight streaming, already using Redis | Large-scale production streaming |
| NATS | Low-latency, lightweight messaging | Durable message storage requirements |
| Pulsar | Multi-tenancy, geo-replication | Small deployments (operational overhead) |
import redisimport jsonimport timeimport uuidfrom threading import Thread
class SimpleMessageQueue: """Simple message queue using Redis lists."""
def __init__(self, redis_url='localhost', port=6379): self.redis = redis.Redis( host=redis_url, port=port, decode_responses=True )
def publish(self, queue_name: str, message: dict): """Add a message to the queue.""" envelope = { 'id': str(uuid.uuid4()), 'timestamp': time.time(), 'body': message } self.redis.rpush( queue_name, json.dumps(envelope) ) return envelope['id']
def consume(self, queue_name: str, timeout: int = 0): """ Consume a message from the queue. Blocks until a message is available. Uses BLPOP for atomic dequeue. """ result = self.redis.blpop( queue_name, timeout=timeout ) if result: _, message_json = result return json.loads(message_json) return None
def consume_with_ack(self, queue_name: str, timeout: int = 0): """ Consume with at-least-once guarantee. Message moves to processing list. Must be acknowledged after processing. """ processing_queue = f"{queue_name}:processing" result = self.redis.brpoplpush( queue_name, processing_queue, timeout=timeout ) if result: return json.loads(result) return None
def acknowledge(self, queue_name: str, message: dict): """Acknowledge a processed message.""" processing_queue = f"{queue_name}:processing" self.redis.lrem( processing_queue, 1, json.dumps(message) )
# Usagequeue = SimpleMessageQueue()
# Producerfor i in range(5): msg_id = queue.publish('orders', { 'order_id': f'ORD-{i}', 'amount': 99.99 }) print(f"Published message: {msg_id}")
# Consumerdef worker(queue_instance, queue_name): while True: message = queue_instance.consume_with_ack( queue_name, timeout=5 ) if message: print(f"Processing: {message['body']}") # Process the message... queue_instance.acknowledge( queue_name, message ) print(f"Acknowledged: {message['id']}")
Thread(target=worker, args=(queue, 'orders')).start()const { Queue, Worker } = require('bullmq');
// Connection to Redisconst connection = { host: 'localhost', port: 6379};
// Create a queueconst orderQueue = new Queue('orders', { connection });
// Producer: Add jobs to the queueasync function publishOrders() { for (let i = 0; i < 5; i++) { const job = await orderQueue.add( 'process-order', { orderId: `ORD-${i}`, amount: 99.99, customer: `customer-${i}` }, { // Job options attempts: 3, // retry up to 3 times backoff: { type: 'exponential', delay: 1000 // 1s, 2s, 4s }, removeOnComplete: 100, // keep last 100 removeOnFail: 500 } ); console.log(`Published job: ${job.id}`); }}
// Consumer: Process jobsconst worker = new Worker( 'orders', async (job) => { console.log( `Processing order: ${job.data.orderId}` );
// Simulate processing await processOrder(job.data);
// Report progress await job.updateProgress(100);
return { processed: true }; }, { connection, concurrency: 5 // process 5 jobs at a time });
worker.on('completed', (job, result) => { console.log( `Job ${job.id} completed:`, result );});
worker.on('failed', (job, err) => { console.error( `Job ${job.id} failed: ${err.message}` );});
async function processOrder(data) { // Business logic here return new Promise(r => setTimeout(r, 1000));}
publishOrders();import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.annotation.*;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.stereotype.Service;
// Configuration@Configurationpublic class QueueConfig {
@Bean public Queue orderQueue() { return QueueBuilder.durable("orders") .withArgument("x-dead-letter-exchange", "dlx") .withArgument("x-dead-letter-routing-key", "orders.dead") .build(); }
@Bean public Queue deadLetterQueue() { return QueueBuilder.durable("orders.dead").build(); }}
// Producer@Servicepublic class OrderProducer {
private final RabbitTemplate rabbitTemplate;
public OrderProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; }
public void publishOrder(OrderMessage order) { rabbitTemplate.convertAndSend( "orders", order ); System.out.println( "Published order: " + order.getOrderId() ); }}
// Consumer@Servicepublic class OrderConsumer {
@RabbitListener(queues = "orders") public void processOrder(OrderMessage order, Channel channel, @Header(AmqpHeaders .DELIVERY_TAG) long tag) throws IOException { try { System.out.println( "Processing: " + order.getOrderId() ); // Business logic processOrderLogic(order);
// Manual acknowledgment channel.basicAck(tag, false); } catch (Exception e) { // Reject and requeue (or send to DLQ) channel.basicNack(tag, false, false); } }}Pub/Sub Patterns
Deep dive into publish/subscribe patterns, fan-out/fan-in, dead letter queues, and backpressure handling.
Apache Kafka
Master Kafka’s architecture, producers, consumers, Kafka Streams, and exactly-once semantics.
RabbitMQ & AMQP
Learn the AMQP protocol, exchange types, routing, and how RabbitMQ compares to Kafka.
Event Sourcing & CQRS
Understand event sourcing, event stores, projections, and how CQRS combines with event sourcing.