Skip to content

RabbitMQ & AMQP

RabbitMQ is one of the most widely deployed open-source message brokers. It implements the Advanced Message Queuing Protocol (AMQP) and provides sophisticated message routing, reliable delivery, and flexible deployment options. While Kafka is optimized for high-throughput event streaming, RabbitMQ excels at complex routing, task queues, and request-reply patterns.


AMQP Protocol

AMQP (Advanced Message Queuing Protocol) is an open standard for message-oriented middleware. It defines a wire-level protocol that ensures interoperability between different implementations.

Core AMQP Concepts

Producer ──▶ Exchange ──(binding)──▶ Queue ──▶ Consumer
┌──────────────────────────────────────────────────────────┐
│ RabbitMQ Broker │
│ │
│ Producer ──▶ ┌──────────┐ ┌─────────┐ ──▶ Consumer │
│ │ Exchange │───▶│ Queue │ │
│ │ │ │ │ │
│ │ (routes │ │(stores │ │
│ │ messages)│ │messages)│ │
│ └──────────┘ └─────────┘ │
│ │ │
│ │ binding │
│ │ (routing rule) │
│ ▼ │
│ ┌─────────┐ │
│ │ Queue │ ──▶ Consumer │
│ └─────────┘ │
└──────────────────────────────────────────────────────────┘
ConceptDescription
ExchangeReceives messages from producers and routes them to queues based on rules
QueueStores messages until they are consumed
BindingA rule that connects an exchange to a queue with a routing pattern
Routing KeyA message attribute that the exchange uses to determine routing
Virtual HostA logical grouping of exchanges, queues, and bindings (like a namespace)

Exchange Types

RabbitMQ supports four exchange types, each with different routing behavior.

1. Direct Exchange

Routes messages to queues whose binding key exactly matches the message’s routing key.

Producer sends: routing_key = "payment.processed"
┌──────────────────┐
│ Direct Exchange │
│ │
│ Binding: "payment.processed" ──▶ Payment Queue ✓
│ Binding: "order.created" ──▶ Order Queue ✗
│ Binding: "payment.failed" ──▶ Alert Queue ✗
└──────────────────┘
Only Payment Queue receives the message (exact match).

Use cases: Task queues, direct routing to specific consumers

2. Topic Exchange

Routes messages based on wildcard pattern matching against the routing key.

Wildcard patterns:
* matches exactly one word
# matches zero or more words
Bindings:
"order.*" matches "order.created", "order.shipped"
"order.#" matches "order.created", "order.item.added"
"*.critical" matches "payment.critical", "system.critical"
"#.error" matches "app.service.error", "error"
Example:
Producer sends: routing_key = "order.payment.failed"
┌──────────────────┐
│ Topic Exchange │
│ │
│ Binding: "order.#" ──▶ Order Queue ✓ (matches)
│ Binding: "order.*" ──▶ Simple Queue ✗ (2 words, not 1)
│ Binding: "#.failed" ──▶ Alert Queue ✓ (matches)
│ Binding: "payment.*" ──▶ Payment Queue ✗ (doesn't start with payment)
└──────────────────┘

Use cases: Complex routing based on message categories, selective subscription

3. Fanout Exchange

Broadcasts every message to all bound queues, ignoring the routing key entirely.

Producer sends any message:
┌──────────────────┐
│ Fanout Exchange │
│ │──▶ Queue A ✓ (gets every message)
│ (ignores routing │──▶ Queue B ✓ (gets every message)
│ key entirely) │──▶ Queue C ✓ (gets every message)
└──────────────────┘

Use cases: Broadcasting events to all consumers, pub/sub pattern

4. Headers Exchange

Routes based on message header attributes instead of the routing key. Uses x-match to determine if all headers must match (all) or any header can match (any).

Message headers: { "format": "pdf", "type": "report" }
┌──────────────────────┐
│ Headers Exchange │
│ │
│ Binding: x-match=all, │
│ format=pdf, │──▶ PDF Queue ✓ (both match)
│ type=report │
│ │
│ Binding: x-match=any, │
│ format=pdf, │──▶ Doc Queue ✓ (format matches)
│ type=invoice │
│ │
│ Binding: x-match=all, │
│ format=csv, │──▶ CSV Queue ✗ (format doesn't match)
│ type=report │
└──────────────────────┘

Use cases: Routing based on multiple message attributes, complex filtering

Exchange Type Comparison

ExchangeRouting Based OnPattern MatchingPerformanceUse Case
DirectExact routing key matchNoFastestTask queues, RPC
TopicWildcard routing key patternsYes (*, #)FastSelective pub/sub
FanoutAll queues (broadcast)N/AVery fastBroadcasting
HeadersMessage header valuesall or anySlowerComplex routing

Working with RabbitMQ

import pika
import json
import uuid
# Connection
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
# Declare exchange and queues
channel.exchange_declare(
exchange='orders',
exchange_type='topic',
durable=True
)
channel.queue_declare(
queue='order-processing',
durable=True,
arguments={
'x-dead-letter-exchange': 'orders-dlx',
'x-message-ttl': 300000 # 5 minutes
}
)
channel.queue_declare(
queue='order-notifications',
durable=True
)
# Bindings
channel.queue_bind(
exchange='orders',
queue='order-processing',
routing_key='order.created'
)
channel.queue_bind(
exchange='orders',
queue='order-notifications',
routing_key='order.#'
)
# Publish a message
def publish_order(order_data, routing_key):
channel.basic_publish(
exchange='orders',
routing_key=routing_key,
body=json.dumps(order_data),
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
content_type='application/json',
message_id=str(uuid.uuid4()),
headers={
'source': 'order-service',
'version': '1.0'
}
)
)
publish_order(
{'order_id': 'ORD-001', 'total': 99.99},
'order.created'
)
# Consumer with manual acknowledgment
def on_message(ch, method, properties, body):
order = json.loads(body)
try:
print(f"Processing order: {order['order_id']}")
# Process the message...
# Acknowledge successful processing
ch.basic_ack(
delivery_tag=method.delivery_tag
)
except Exception as e:
print(f"Error: {e}")
# Reject and requeue (or send to DLQ)
ch.basic_nack(
delivery_tag=method.delivery_tag,
requeue=False # Send to DLQ instead
)
# Set prefetch count (how many unacked messages
# a consumer can hold)
channel.basic_qos(prefetch_count=10)
channel.basic_consume(
queue='order-processing',
on_message_callback=on_message,
auto_ack=False
)
channel.start_consuming()

Acknowledgments and Reliability

Message Acknowledgment Flow

Broker ──▶ Consumer: "Here is message M"
├──▶ Consumer processes M successfully
│ Consumer sends: basic_ack(delivery_tag)
│ Broker: removes M from queue ✓
├──▶ Consumer fails to process M
│ Consumer sends: basic_nack(delivery_tag, requeue=true)
│ Broker: puts M back in queue for redelivery
└──▶ Consumer fails to process M (after max retries)
Consumer sends: basic_nack(delivery_tag, requeue=false)
Broker: sends M to dead letter exchange

Publisher Confirms

To ensure messages are not lost between the producer and the broker, use publisher confirms:

Without confirms:
Producer ──▶ Broker
(Message might be lost if broker crashes before persisting)
With confirms:
Producer ──▶ Broker
Broker writes to disk
Broker ──▶ Producer: "confirm" (message persisted)
If no confirm received after timeout:
Producer retries sending the message.

Clustering and High Availability

Classic Mirrored Queues (Legacy)

┌────────────┐ ┌────────────┐ ┌────────────┐
│ Node A │ │ Node B │ │ Node C │
│ │ │ │ │ │
│ Queue X │ │ Queue X │ │ Queue X │
│ (Leader) │────│ (Mirror) │────│ (Mirror) │
│ │ │ │ │ │
└────────────┘ └────────────┘ └────────────┘
All queues are replicated to mirror nodes.
If the leader fails, a mirror is promoted.

Quorum queues use the Raft consensus algorithm for replication, providing stronger guarantees than classic mirrored queues:

Quorum Queue (Raft-based):
- Replicated across N nodes (typically 3 or 5)
- Leader handles reads and writes
- Followers replicate via Raft consensus
- Automatic leader election on failure
- Stronger data safety guarantees
Declare a quorum queue:
channel.queue_declare(
queue='critical-orders',
arguments={'x-queue-type': 'quorum'}
)

RabbitMQ vs Kafka

FeatureRabbitMQKafka
ModelMessage broker (smart broker, simple consumer)Distributed log (simple broker, smart consumer)
Message lifecycleDeleted after consumptionRetained for configured period
RoutingFlexible (exchanges, bindings, routing keys)Topic/partition only
OrderingPer-queue (FIFO)Per-partition
ThroughputThousands to tens of thousands msg/sMillions msg/s
ReplayNot supported (messages deleted)Supported (consumers can seek to any offset)
Consumer modelPush (broker sends to consumer)Pull (consumer fetches from broker)
ProtocolAMQP, MQTT, STOMPCustom binary protocol
Use caseComplex routing, task queues, RPCEvent streaming, log aggregation, CDC
LatencyMicroseconds to low millisecondsLow milliseconds
ScalingVertical + clusteringHorizontal (add partitions and brokers)

When to Choose RabbitMQ

  • Complex routing logic needed (topic, headers, fanout)
  • Request-reply (RPC) pattern
  • Task/job queues with priority support
  • Message-level TTL and dead letter handling
  • Smaller scale with flexible routing
  • Multiple protocol support needed (AMQP, MQTT, STOMP)

When to Choose Kafka

  • High-throughput event streaming (millions of msg/s)
  • Event replay and reprocessing needed
  • Multiple consumer groups reading the same data
  • Event sourcing architecture
  • Stream processing (Kafka Streams)
  • Long-term message retention (days to weeks)
  • Log aggregation at scale

Summary

ConceptKey Takeaway
AMQPOpen standard protocol for message-oriented middleware
ExchangesRoute messages to queues based on type-specific rules
Direct ExchangeExact routing key match
Topic ExchangeWildcard pattern matching on routing keys
Fanout ExchangeBroadcast to all bound queues
Headers ExchangeRoute based on message header attributes
AcknowledgmentsManual ack/nack for reliable processing
Publisher ConfirmsEnsure messages reach the broker
Quorum QueuesRaft-based replication for high availability
vs KafkaRabbitMQ for routing and tasks; Kafka for streaming and replay