Pub/Sub Patterns
Interactive Message Queue Visualizer
See how different messaging patterns work. Compare point-to-point, pub/sub, and consumer group delivery in real time.
The publish/subscribe (pub/sub) pattern is a messaging paradigm where message senders (publishers) do not send messages directly to specific receivers (subscribers). Instead, messages are published to a channel (topic), and all subscribers to that topic receive a copy. This decouples producers from consumers, enabling flexible, scalable event-driven architectures.
Point-to-Point vs Publish/Subscribe
Point-to-Point (Queue)
┌──────────┐Producer ──MSG 1──▶ │ │ ──MSG 1──▶ Consumer AProducer ──MSG 2──▶ │ Queue │ ──MSG 2──▶ Consumer BProducer ──MSG 3──▶ │ │ ──MSG 3──▶ Consumer A └──────────┘
Each message delivered to exactly ONE consumer.Consumers compete for messages.Use cases: Work distribution, task processing, load balancing across workers
Publish/Subscribe (Topic)
┌──────────┐ ──MSG 1──▶ Subscriber APublisher ──MSG 1──▶│ │ ──MSG 1──▶ Subscriber B │ Topic │ ──MSG 1──▶ Subscriber CPublisher ──MSG 2──▶│ │ ──MSG 2──▶ Subscriber A └──────────┘ ──MSG 2──▶ Subscriber B ──MSG 2──▶ Subscriber C
Each message delivered to ALL subscribers.Subscribers receive independent copies.Use cases: Event notifications, real-time updates, broadcasting state changes
Hybrid: Consumer Groups
Modern systems like Kafka combine both patterns using consumer groups:
┌──────────┐ │ │ ──▶ Consumer A1 ┐ ConsumerPublisher ──MSG──▶ │ Topic │ ──▶ Consumer A2 ┘ Group A │ │ │ │ ──▶ Consumer B1 ┐ Consumer │ │ ──▶ Consumer B2 ┘ Group B └──────────┘
Within a consumer group: point-to-point (one consumer gets each message)Across consumer groups: pub/sub (each group gets every message)Topic-Based Routing
Hierarchical Topics
Topics can be organized hierarchically using dots or slashes to enable fine-grained subscription:
Topic Hierarchy: orders.created orders.updated orders.cancelled orders.shipped payments.processed payments.failed payments.refunded inventory.low inventory.restocked
Subscription Patterns: "orders.*" → All order events "payments.*" → All payment events "orders.created" → Only new orders "*.failed" → All failure events (if supported)Content-Based Routing
Messages are routed based on their content (headers or body), not just the topic:
Message: { "type": "order", "region": "US", "priority": "high", "amount": 500}
Routing Rules: Rule 1: region = "US" AND priority = "high" → US-Priority Queue Rule 2: region = "EU" → EU Queue Rule 3: amount > 1000 → Large Order Queueimport redisimport jsonimport threadingimport time
class PubSubBroker: """Pub/Sub implementation using Redis."""
def __init__(self, host='localhost', port=6379): self.redis = redis.Redis( host=host, port=port, decode_responses=True ) self.pubsub = self.redis.pubsub()
def publish(self, topic: str, message: dict): """Publish a message to a topic.""" envelope = { 'id': str(time.time_ns()), 'topic': topic, 'timestamp': time.time(), 'body': message } self.redis.publish(topic, json.dumps(envelope)) return envelope['id']
def subscribe(self, pattern: str, handler): """ Subscribe to topics matching a pattern. Supports wildcard patterns like 'orders.*' """ def message_handler(message): if message['type'] == 'pmessage': data = json.loads(message['data']) handler(data)
self.pubsub.psubscribe( **{pattern: message_handler} )
def start_listening(self): """Start listening for messages in a thread.""" thread = self.pubsub.run_in_thread( sleep_time=0.01 ) return thread
# Usagebroker = PubSubBroker()
# Subscriber 1: All order eventsdef handle_order(message): print(f"[Orders] {message['topic']}: " f"{message['body']}")
broker.subscribe('orders.*', handle_order)
# Subscriber 2: Only failed eventsdef handle_failures(message): print(f"[Failures] {message['topic']}: " f"{message['body']}")
broker.subscribe('*.failed', handle_failures)
# Start listeninglistener = broker.start_listening()
# Publishertime.sleep(0.1) # Wait for subscribers to connectbroker.publish('orders.created', { 'order_id': 'ORD-001', 'amount': 99.99})broker.publish('orders.failed', { 'order_id': 'ORD-002', 'reason': 'Payment declined'})broker.publish('payments.failed', { 'payment_id': 'PAY-001', 'reason': 'Card expired'})const EventEmitter = require('events');
class PubSubBroker extends EventEmitter { constructor() { super(); this.subscriptions = new Map(); }
publish(topic, message) { const envelope = { id: Date.now().toString(36), topic, timestamp: Date.now(), body: message };
// Emit to exact topic subscribers this.emit(topic, envelope);
// Check pattern subscribers for (const [pattern, handlers] of this.subscriptions) { if (this._matchPattern(pattern, topic)) { handlers.forEach(handler => handler(envelope)); } }
return envelope.id; }
subscribe(pattern, handler) { if (!this.subscriptions.has(pattern)) { this.subscriptions.set(pattern, []); } this.subscriptions.get(pattern).push(handler);
return () => { const handlers = this.subscriptions.get(pattern); const index = handlers.indexOf(handler); if (index > -1) handlers.splice(index, 1); }; }
_matchPattern(pattern, topic) { const regex = new RegExp( '^' + pattern .replace(/\./g, '\\.') .replace(/\*/g, '[^.]+') .replace(/#/g, '.*') + '$' ); return regex.test(topic); }}
// Usageconst broker = new PubSubBroker();
// Subscribe to all order eventsconst unsub1 = broker.subscribe('orders.*', (msg) => { console.log(`[Orders] ${msg.topic}:`, msg.body);});
// Subscribe to all failure eventsbroker.subscribe('*.failed', (msg) => { console.log(`[Failures] ${msg.topic}:`, msg.body);});
// Publish eventsbroker.publish('orders.created', { orderId: 'ORD-001', amount: 99.99});
broker.publish('orders.failed', { orderId: 'ORD-002', reason: 'Payment declined'});
// Unsubscribeunsub1(); // Stop receiving order eventsFan-Out and Fan-In Patterns
Fan-Out
One message triggers multiple independent processing paths:
┌──▶ Email Service │Order Created ──▶ [Topic] ────┼──▶ Inventory Service │ ├──▶ Analytics Service │ └──▶ Shipping Service
One event, four independent consumers.Each processes the event differently.Use cases:
- Order processing (notify multiple services)
- User registration (welcome email, setup defaults, analytics)
- Content publishing (CDN invalidation, search index, notifications)
Fan-In
Multiple message sources converge into a single processing pipeline:
Sensor A ──▶ ┐ │Sensor B ──▶ ├──▶ [Aggregator Queue] ──▶ Processing Service │Sensor C ──▶ ┘
Multiple producers, single consumer that aggregates.Use cases:
- Log aggregation from multiple services
- IoT sensor data collection
- Merging results from parallel processing
Fan-Out/Fan-In Combined
Scatter-gather pattern: distribute work, then aggregate results.
Request ──▶ [Scatter] ──▶ Worker 1 ──▶ ┐ ──▶ Worker 2 ──▶ ├──▶ [Gather] ──▶ Response ──▶ Worker 3 ──▶ ┘
Example: Search across multiple indexes Query "shoes" ──▶ Product Index ──▶ ┐ ──▶ Review Index ──▶ ├──▶ Merge Results ──▶ Price Index ──▶ ┘Message Filtering
Subscribers can filter messages to receive only those they are interested in, reducing unnecessary processing.
Broker-Side Filtering
The broker evaluates filters and only delivers matching messages:
Publisher sends to topic "orders": { type: "electronics", amount: 500, region: "US" }
Subscriber A filter: type = "electronics" → RECEIVESSubscriber B filter: region = "EU" → DOES NOT RECEIVESubscriber C filter: amount > 100 → RECEIVESAdvantages: Reduces network traffic and consumer processing Disadvantages: More complex broker; filter evaluation adds latency
Client-Side Filtering
The broker delivers all messages; consumers filter locally:
All messages delivered to all subscribers.Each subscriber discards messages it does not need.
Pros: Simple brokerCons: Wasted bandwidth and processingAWS SNS Message Filtering
{ "order_type": ["electronics", "clothing"], "amount": [{"numeric": [">=", 100]}], "region": ["US", "CA"], "priority": [{"exists": true}]}Dead Letter Queues (DLQ)
A dead letter queue captures messages that cannot be processed successfully after a configured number of retry attempts. Instead of losing the message or retrying forever, it is moved to a DLQ for investigation and reprocessing.
Normal Flow: Producer ──▶ Main Queue ──▶ Consumer ──▶ SUCCESS
Failure Flow: Producer ──▶ Main Queue ──▶ Consumer ──▶ FAIL (attempt 1) ──▶ Consumer ──▶ FAIL (attempt 2) ──▶ Consumer ──▶ FAIL (attempt 3) ──▶ Dead Letter Queue │ Admin investigates and either fixes and replays or discards the messageWhy Messages End Up in DLQ
| Reason | Example |
|---|---|
| Malformed message | Invalid JSON, missing required fields |
| Processing error | Business logic exception, null pointer |
| External dependency failure | Database down, API unreachable (after retries) |
| Schema mismatch | Producer sends v2 format, consumer expects v1 |
| Timeout | Processing takes longer than the visibility timeout |
| Poison message | A specific message that always causes a crash |
import jsonimport timeimport loggingfrom dataclasses import dataclass, field
logger = logging.getLogger(__name__)
@dataclassclass Message: id: str body: dict attempt: int = 0 max_attempts: int = 3
class QueueWithDLQ: def __init__(self, name: str, max_retries: int = 3): self.name = name self.max_retries = max_retries self.main_queue: list[Message] = [] self.dead_letter_queue: list[Message] = [] self.processing: list[Message] = []
def enqueue(self, message_id: str, body: dict): msg = Message( id=message_id, body=body, max_attempts=self.max_retries ) self.main_queue.append(msg)
def dequeue(self) -> Message | None: if not self.main_queue: return None msg = self.main_queue.pop(0) msg.attempt += 1 self.processing.append(msg) return msg
def acknowledge(self, message: Message): """Mark message as successfully processed.""" self.processing.remove(message) logger.info( f"Message {message.id} acknowledged" )
def reject(self, message: Message): """ Reject a message. Retry or move to DLQ. """ self.processing.remove(message)
if message.attempt >= message.max_attempts: # Max retries exceeded → DLQ self.dead_letter_queue.append(message) logger.warning( f"Message {message.id} moved to DLQ " f"after {message.attempt} attempts" ) else: # Re-enqueue for retry self.main_queue.append(message) logger.info( f"Message {message.id} requeued " f"(attempt {message.attempt})" )
def replay_dlq(self): """Move all DLQ messages back to main queue.""" count = len(self.dead_letter_queue) while self.dead_letter_queue: msg = self.dead_letter_queue.pop(0) msg.attempt = 0 # Reset attempts self.main_queue.append(msg) logger.info(f"Replayed {count} messages from DLQ")
@property def dlq_count(self) -> int: return len(self.dead_letter_queue)
# Usagequeue = QueueWithDLQ('orders', max_retries=3)
# Enqueue messagesqueue.enqueue('msg-1', {'order': 'ORD-001'})queue.enqueue('msg-2', {'order': 'INVALID'})
# Process messagesdef process_message(msg: Message) -> bool: if msg.body.get('order') == 'INVALID': raise ValueError("Invalid order format") print(f"Processed: {msg.body}") return True
while msg := queue.dequeue(): try: process_message(msg) queue.acknowledge(msg) except Exception as e: print(f"Failed: {e}") queue.reject(msg)
print(f"DLQ count: {queue.dlq_count}")# Main queue with dead letter queueresource "aws_sqs_queue" "orders" { name = "orders" visibility_timeout_seconds = 60 message_retention_seconds = 1209600 # 14 days
redrive_policy = jsonencode({ deadLetterTargetArn = aws_sqs_queue.orders_dlq.arn maxReceiveCount = 3 # Move to DLQ after 3 failures })}
# Dead letter queueresource "aws_sqs_queue" "orders_dlq" { name = "orders-dlq" message_retention_seconds = 1209600 # 14 days}
# CloudWatch alarm for DLQ messagesresource "aws_cloudwatch_metric_alarm" "dlq_messages" { alarm_name = "orders-dlq-messages" comparison_operator = "GreaterThanThreshold" evaluation_periods = 1 metric_name = "ApproximateNumberOfMessagesVisible" namespace = "AWS/SQS" period = 300 statistic = "Sum" threshold = 0 alarm_description = "Messages in orders DLQ"
dimensions = { QueueName = aws_sqs_queue.orders_dlq.name }
alarm_actions = [aws_sns_topic.alerts.arn]}
# Redrive policy to replay DLQ messagesresource "aws_sqs_queue_redrive_allow_policy" "orders" { queue_url = aws_sqs_queue.orders.id redrive_allow_policy = jsonencode({ redrivePermission = "byQueue" sourceQueueArns = [aws_sqs_queue.orders_dlq.arn] })}Backpressure Handling
Backpressure occurs when a producer generates messages faster than consumers can process them. Without backpressure handling, queues grow unbounded, memory is exhausted, and the system crashes.
Producer rate: 1000 msg/sConsumer rate: 200 msg/s
Without backpressure: Queue depth grows by 800 msg/s After 1 hour: 2,880,000 unprocessed messages Eventually: out of memory, system crash
With backpressure: Queue depth reaches threshold System takes corrective actionBackpressure Strategies
| Strategy | How It Works | Trade-off |
|---|---|---|
| Drop oldest | Discard oldest messages when queue is full | Data loss; good for real-time metrics |
| Drop newest | Reject new messages when queue is full | Producer must handle rejections |
| Block producer | Producer blocks until space is available | Can cascade upstream |
| Rate limiting | Limit producer’s message rate | Producer must adapt |
| Scale consumers | Auto-scale consumers based on queue depth | Costs money; has scaling lag |
| Sampling | Process every Nth message when overloaded | Approximate results acceptable |
Auto-Scaling Consumers Based on Queue Depth
Queue Depth: Consumer Instances:0-100 2 (minimum)100-500 5500-1000 101000-5000 205000+ 50 (maximum)
Scaling Policy: IF queue_depth > 500 for 5 minutes → scale up IF queue_depth < 100 for 15 minutes → scale downMessage Ordering
Maintaining message order is one of the biggest challenges in pub/sub systems:
| Ordering Guarantee | Description | Performance |
|---|---|---|
| No ordering | Messages may arrive in any order | Highest throughput |
| Per-partition ordering | Messages with the same key are ordered within a partition | Good throughput |
| Total ordering | All messages globally ordered | Lowest throughput |
Per-partition ordering (Kafka approach):
Producer sends: M1(user=A), M2(user=B), M3(user=A), M4(user=B)
Partition 0 (user A): M1, M3 → Ordered within partitionPartition 1 (user B): M2, M4 → Ordered within partition
Consumer of partition 0 sees: M1 before M3 (guaranteed)Consumer of partition 1 sees: M2 before M4 (guaranteed)But: No guarantee about ordering between M1 and M2Summary
| Concept | Key Takeaway |
|---|---|
| Point-to-Point | One consumer per message; competing consumers for load distribution |
| Pub/Sub | All subscribers receive every message; event broadcasting |
| Topic Routing | Hierarchical topics with wildcard subscriptions |
| Fan-Out | One event triggers multiple independent processing paths |
| Fan-In | Multiple sources converge into a single processing pipeline |
| Dead Letter Queue | Captures messages that fail processing for investigation |
| Backpressure | Strategies for handling producers faster than consumers |
| Message Ordering | Per-partition ordering balances order guarantees with throughput |