RabbitMQ & AMQP
RabbitMQ is one of the most widely deployed open-source message brokers. It implements the Advanced Message Queuing Protocol (AMQP) and provides sophisticated message routing, reliable delivery, and flexible deployment options. While Kafka is optimized for high-throughput event streaming, RabbitMQ excels at complex routing, task queues, and request-reply patterns.
AMQP Protocol
AMQP (Advanced Message Queuing Protocol) is an open standard for message-oriented middleware. It defines a wire-level protocol that ensures interoperability between different implementations.
Core AMQP Concepts
Producer ──▶ Exchange ──(binding)──▶ Queue ──▶ Consumer
┌──────────────────────────────────────────────────────────┐│ RabbitMQ Broker ││ ││ Producer ──▶ ┌──────────┐ ┌─────────┐ ──▶ Consumer ││ │ Exchange │───▶│ Queue │ ││ │ │ │ │ ││ │ (routes │ │(stores │ ││ │ messages)│ │messages)│ ││ └──────────┘ └─────────┘ ││ │ ││ │ binding ││ │ (routing rule) ││ ▼ ││ ┌─────────┐ ││ │ Queue │ ──▶ Consumer ││ └─────────┘ │└──────────────────────────────────────────────────────────┘| Concept | Description |
|---|---|
| Exchange | Receives messages from producers and routes them to queues based on rules |
| Queue | Stores messages until they are consumed |
| Binding | A rule that connects an exchange to a queue with a routing pattern |
| Routing Key | A message attribute that the exchange uses to determine routing |
| Virtual Host | A logical grouping of exchanges, queues, and bindings (like a namespace) |
Exchange Types
RabbitMQ supports four exchange types, each with different routing behavior.
1. Direct Exchange
Routes messages to queues whose binding key exactly matches the message’s routing key.
Producer sends: routing_key = "payment.processed"
┌──────────────────┐│ Direct Exchange ││ ││ Binding: "payment.processed" ──▶ Payment Queue ✓│ Binding: "order.created" ──▶ Order Queue ✗│ Binding: "payment.failed" ──▶ Alert Queue ✗└──────────────────┘
Only Payment Queue receives the message (exact match).Use cases: Task queues, direct routing to specific consumers
2. Topic Exchange
Routes messages based on wildcard pattern matching against the routing key.
Wildcard patterns: * matches exactly one word # matches zero or more words
Bindings: "order.*" matches "order.created", "order.shipped" "order.#" matches "order.created", "order.item.added" "*.critical" matches "payment.critical", "system.critical" "#.error" matches "app.service.error", "error"
Example:Producer sends: routing_key = "order.payment.failed"
┌──────────────────┐│ Topic Exchange ││ ││ Binding: "order.#" ──▶ Order Queue ✓ (matches)│ Binding: "order.*" ──▶ Simple Queue ✗ (2 words, not 1)│ Binding: "#.failed" ──▶ Alert Queue ✓ (matches)│ Binding: "payment.*" ──▶ Payment Queue ✗ (doesn't start with payment)└──────────────────┘Use cases: Complex routing based on message categories, selective subscription
3. Fanout Exchange
Broadcasts every message to all bound queues, ignoring the routing key entirely.
Producer sends any message:
┌──────────────────┐│ Fanout Exchange ││ │──▶ Queue A ✓ (gets every message)│ (ignores routing │──▶ Queue B ✓ (gets every message)│ key entirely) │──▶ Queue C ✓ (gets every message)└──────────────────┘Use cases: Broadcasting events to all consumers, pub/sub pattern
4. Headers Exchange
Routes based on message header attributes instead of the routing key. Uses x-match to determine if all headers must match (all) or any header can match (any).
Message headers: { "format": "pdf", "type": "report" }
┌──────────────────────┐│ Headers Exchange ││ ││ Binding: x-match=all, ││ format=pdf, │──▶ PDF Queue ✓ (both match)│ type=report ││ ││ Binding: x-match=any, ││ format=pdf, │──▶ Doc Queue ✓ (format matches)│ type=invoice ││ ││ Binding: x-match=all, ││ format=csv, │──▶ CSV Queue ✗ (format doesn't match)│ type=report │└──────────────────────┘Use cases: Routing based on multiple message attributes, complex filtering
Exchange Type Comparison
| Exchange | Routing Based On | Pattern Matching | Performance | Use Case |
|---|---|---|---|---|
| Direct | Exact routing key match | No | Fastest | Task queues, RPC |
| Topic | Wildcard routing key patterns | Yes (*, #) | Fast | Selective pub/sub |
| Fanout | All queues (broadcast) | N/A | Very fast | Broadcasting |
| Headers | Message header values | all or any | Slower | Complex routing |
Working with RabbitMQ
import pikaimport jsonimport uuid
# Connectionconnection = pika.BlockingConnection( pika.ConnectionParameters('localhost'))channel = connection.channel()
# Declare exchange and queueschannel.exchange_declare( exchange='orders', exchange_type='topic', durable=True)
channel.queue_declare( queue='order-processing', durable=True, arguments={ 'x-dead-letter-exchange': 'orders-dlx', 'x-message-ttl': 300000 # 5 minutes })
channel.queue_declare( queue='order-notifications', durable=True)
# Bindingschannel.queue_bind( exchange='orders', queue='order-processing', routing_key='order.created')
channel.queue_bind( exchange='orders', queue='order-notifications', routing_key='order.#')
# Publish a messagedef publish_order(order_data, routing_key): channel.basic_publish( exchange='orders', routing_key=routing_key, body=json.dumps(order_data), properties=pika.BasicProperties( delivery_mode=2, # Persistent content_type='application/json', message_id=str(uuid.uuid4()), headers={ 'source': 'order-service', 'version': '1.0' } ) )
publish_order( {'order_id': 'ORD-001', 'total': 99.99}, 'order.created')
# Consumer with manual acknowledgmentdef on_message(ch, method, properties, body): order = json.loads(body) try: print(f"Processing order: {order['order_id']}") # Process the message...
# Acknowledge successful processing ch.basic_ack( delivery_tag=method.delivery_tag ) except Exception as e: print(f"Error: {e}") # Reject and requeue (or send to DLQ) ch.basic_nack( delivery_tag=method.delivery_tag, requeue=False # Send to DLQ instead )
# Set prefetch count (how many unacked messages# a consumer can hold)channel.basic_qos(prefetch_count=10)
channel.basic_consume( queue='order-processing', on_message_callback=on_message, auto_ack=False)
channel.start_consuming()const amqplib = require('amqplib');
async function setup() { const conn = await amqplib.connect( 'amqp://localhost' ); const channel = await conn.createChannel();
// Declare exchange await channel.assertExchange( 'orders', 'topic', { durable: true } );
// Declare queues await channel.assertQueue('order-processing', { durable: true, arguments: { 'x-dead-letter-exchange': 'orders-dlx', 'x-message-ttl': 300000 } });
await channel.assertQueue('order-notifications', { durable: true });
// Bindings await channel.bindQueue( 'order-processing', 'orders', 'order.created' ); await channel.bindQueue( 'order-notifications', 'orders', 'order.#' );
return { conn, channel };}
// Producerasync function publishOrder(channel, order, routingKey) { channel.publish( 'orders', routingKey, Buffer.from(JSON.stringify(order)), { persistent: true, contentType: 'application/json', messageId: crypto.randomUUID(), headers: { source: 'order-service', version: '1.0' } } );}
// Consumerasync function startConsumer(channel) { // Prefetch: process one message at a time await channel.prefetch(10);
await channel.consume( 'order-processing', async (msg) => { if (!msg) return;
try { const order = JSON.parse( msg.content.toString() ); console.log( `Processing: ${order.order_id}` );
// Process the order... await processOrder(order);
// Acknowledge channel.ack(msg); } catch (error) { console.error('Error:', error.message); // Reject (send to DLQ) channel.nack(msg, false, false); } }, { noAck: false } );}
// Run(async () => { const { channel } = await setup();
await publishOrder(channel, { order_id: 'ORD-001', total: 99.99 }, 'order.created');
await startConsumer(channel);})();import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.annotation.*;import org.springframework.amqp.rabbit.connection.*;import org.springframework.amqp.rabbit.core.*;import org.springframework.context.annotation.*;
@Configurationpublic class RabbitConfig {
// Exchange @Bean public TopicExchange ordersExchange() { return new TopicExchange("orders"); }
// Queues @Bean public Queue orderProcessingQueue() { return QueueBuilder.durable("order-processing") .withArgument("x-dead-letter-exchange", "orders-dlx") .withArgument("x-message-ttl", 300000) .build(); }
@Bean public Queue orderNotificationsQueue() { return QueueBuilder .durable("order-notifications") .build(); }
// Bindings @Bean public Binding processingBinding() { return BindingBuilder .bind(orderProcessingQueue()) .to(ordersExchange()) .with("order.created"); }
@Bean public Binding notificationBinding() { return BindingBuilder .bind(orderNotificationsQueue()) .to(ordersExchange()) .with("order.#"); }}
// Producer@Servicepublic class OrderPublisher { private final RabbitTemplate template;
public OrderPublisher(RabbitTemplate template) { this.template = template; }
public void publishOrder(Order order) { template.convertAndSend( "orders", "order.created", order, message -> { message.getMessageProperties() .setContentType("application/json"); message.getMessageProperties() .setMessageId( UUID.randomUUID().toString()); return message; } ); }}
// Consumer@Componentpublic class OrderConsumer {
@RabbitListener( queues = "order-processing", concurrency = "3-10" ) public void processOrder( Order order, @Header(AmqpHeaders.DELIVERY_TAG) long tag, Channel channel ) throws IOException { try { System.out.println( "Processing: " + order.getOrderId() ); // Business logic... channel.basicAck(tag, false); } catch (Exception e) { channel.basicNack(tag, false, false); } }}Acknowledgments and Reliability
Message Acknowledgment Flow
Broker ──▶ Consumer: "Here is message M" │ ├──▶ Consumer processes M successfully │ Consumer sends: basic_ack(delivery_tag) │ Broker: removes M from queue ✓ │ ├──▶ Consumer fails to process M │ Consumer sends: basic_nack(delivery_tag, requeue=true) │ Broker: puts M back in queue for redelivery │ └──▶ Consumer fails to process M (after max retries) Consumer sends: basic_nack(delivery_tag, requeue=false) Broker: sends M to dead letter exchangePublisher Confirms
To ensure messages are not lost between the producer and the broker, use publisher confirms:
Without confirms: Producer ──▶ Broker (Message might be lost if broker crashes before persisting)
With confirms: Producer ──▶ Broker Broker writes to disk Broker ──▶ Producer: "confirm" (message persisted)
If no confirm received after timeout: Producer retries sending the message.Clustering and High Availability
Classic Mirrored Queues (Legacy)
┌────────────┐ ┌────────────┐ ┌────────────┐│ Node A │ │ Node B │ │ Node C ││ │ │ │ │ ││ Queue X │ │ Queue X │ │ Queue X ││ (Leader) │────│ (Mirror) │────│ (Mirror) ││ │ │ │ │ │└────────────┘ └────────────┘ └────────────┘
All queues are replicated to mirror nodes.If the leader fails, a mirror is promoted.Quorum Queues (Recommended)
Quorum queues use the Raft consensus algorithm for replication, providing stronger guarantees than classic mirrored queues:
Quorum Queue (Raft-based): - Replicated across N nodes (typically 3 or 5) - Leader handles reads and writes - Followers replicate via Raft consensus - Automatic leader election on failure - Stronger data safety guarantees
Declare a quorum queue: channel.queue_declare( queue='critical-orders', arguments={'x-queue-type': 'quorum'} )RabbitMQ vs Kafka
| Feature | RabbitMQ | Kafka |
|---|---|---|
| Model | Message broker (smart broker, simple consumer) | Distributed log (simple broker, smart consumer) |
| Message lifecycle | Deleted after consumption | Retained for configured period |
| Routing | Flexible (exchanges, bindings, routing keys) | Topic/partition only |
| Ordering | Per-queue (FIFO) | Per-partition |
| Throughput | Thousands to tens of thousands msg/s | Millions msg/s |
| Replay | Not supported (messages deleted) | Supported (consumers can seek to any offset) |
| Consumer model | Push (broker sends to consumer) | Pull (consumer fetches from broker) |
| Protocol | AMQP, MQTT, STOMP | Custom binary protocol |
| Use case | Complex routing, task queues, RPC | Event streaming, log aggregation, CDC |
| Latency | Microseconds to low milliseconds | Low milliseconds |
| Scaling | Vertical + clustering | Horizontal (add partitions and brokers) |
When to Choose RabbitMQ
- Complex routing logic needed (topic, headers, fanout)
- Request-reply (RPC) pattern
- Task/job queues with priority support
- Message-level TTL and dead letter handling
- Smaller scale with flexible routing
- Multiple protocol support needed (AMQP, MQTT, STOMP)
When to Choose Kafka
- High-throughput event streaming (millions of msg/s)
- Event replay and reprocessing needed
- Multiple consumer groups reading the same data
- Event sourcing architecture
- Stream processing (Kafka Streams)
- Long-term message retention (days to weeks)
- Log aggregation at scale
Summary
| Concept | Key Takeaway |
|---|---|
| AMQP | Open standard protocol for message-oriented middleware |
| Exchanges | Route messages to queues based on type-specific rules |
| Direct Exchange | Exact routing key match |
| Topic Exchange | Wildcard pattern matching on routing keys |
| Fanout Exchange | Broadcast to all bound queues |
| Headers Exchange | Route based on message header attributes |
| Acknowledgments | Manual ack/nack for reliable processing |
| Publisher Confirms | Ensure messages reach the broker |
| Quorum Queues | Raft-based replication for high availability |
| vs Kafka | RabbitMQ for routing and tasks; Kafka for streaming and replay |