Skip to content

Event-Driven Architecture

Event-driven architecture (EDA) is a design paradigm where the flow of the program is determined by events — significant changes in state that are published, detected, and consumed by decoupled components. Instead of one service directly calling another, services communicate by producing and reacting to events, resulting in systems that are loosely coupled, scalable, and resilient.


Event-Driven vs. Request-Driven Architecture

Most developers are familiar with request-driven (synchronous) architecture. Understanding the shift to event-driven thinking is essential.

Request-Driven (Synchronous):
┌──────────┐ POST /orders ┌──────────┐ reserveStock() ┌──────────┐
│ Client │──────────────────►│ Order │─────────────────►│Inventory │
│ │◄──────────────────│ Service │◄─────────────────│ Service │
└──────────┘ 201 Created └──────────┘ { success } └──────────┘
- Caller waits for a response
- Tight temporal coupling (both must be online)
- Simple to reason about
- Cascading failures possible
Event-Driven (Asynchronous):
┌──────────┐ POST /orders ┌──────────┐ OrderPlaced ┌─────────────┐
│ Client │──────────────────►│ Order │────event───────►│ Message │
│ │◄──────────────────│ Service │ │ Broker │
└──────────┘ 202 Accepted └──────────┘ └──┬──────┬──┘
│ │
consume │ │ consume
▼ ▼
┌────────┐ ┌────────────┐
│Inventory│ │Notification│
│ Service │ │ Service │
└────────┘ └────────────┘
- Publisher does not wait for consumers
- Loose coupling (consumers can be offline temporarily)
- Harder to reason about (eventual consistency)
- Better fault isolation
AspectRequest-DrivenEvent-Driven
CouplingTight — caller knows the calleeLoose — publisher does not know consumers
ResponseSynchronous — caller waitsAsynchronous — fire and forget
ConsistencyImmediateEventual
ScalabilityLimited by synchronous chainsHighly scalable with parallel consumers
Failure handlingCascading failuresIsolated failures, messages are buffered
DebuggingSimple stack tracesDistributed tracing required
Best forQueries, immediate validationWorkflows, notifications, data pipelines

Types of Events

Not all events are created equal. Understanding the different types helps you design cleaner systems.

Domain Events

Represent something meaningful that happened in the business domain. Named in past tense.

  • OrderPlaced — a customer placed an order
  • PaymentProcessed — payment was successfully charged
  • InventoryDepleted — stock reached zero for a product

Domain events are the primary building block of event-driven systems. They carry the facts about what happened, along with enough data for consumers to act.

Integration Events

Domain events that cross service boundaries. They are a subset of domain events that are published to the message broker for other services to consume.

  • Typically carry less data than internal domain events (to avoid leaking internals)
  • Must have a stable schema (since multiple services depend on them)
  • Should be versioned to allow for schema evolution

Commands

Unlike events (which describe something that happened), commands express an intent to do something. Named in imperative form.

  • ProcessPayment — a request to charge a customer
  • SendNotification — a request to deliver an email or SMS
  • ReserveInventory — a request to hold stock for an order

Commands are directed at a specific consumer (point-to-point), while events are broadcast to all interested consumers (pub/sub).

Event: "Something happened" → Zero or more consumers react
Command: "Do this thing" → Exactly one consumer processes it

Message Brokers

A message broker is the infrastructure that transports messages between producers and consumers. The choice of broker affects durability, ordering, throughput, and delivery guarantees.

Key Concepts

Producer ──► Topic / Queue ──► Consumer
┌──────────┐ ┌─────────────────────────────────┐ ┌──────────┐
│ Producer │────►│ Message Broker │────►│ Consumer │
│ │ │ │ │ │
│ publish │ │ ┌────┬────┬────┬────┬────┐ │ │ consume │
│ message │ │ │ m1 │ m2 │ m3 │ m4 │ m5 │ │ │ message │
│ │ │ └────┴────┴────┴────┴────┘ │ │ │
└──────────┘ │ Topic / Queue │ └──────────┘
└─────────────────────────────────┘

Pub/Sub vs. Point-to-Point

Pub/Sub (Topic):
One message → delivered to ALL subscribers
Producer ──► Topic ──┬──► Consumer A (Inventory)
├──► Consumer B (Notification)
└──► Consumer C (Analytics)
Use case: Broadcasting domain events
Point-to-Point (Queue):
One message → delivered to exactly ONE consumer
Producer ──► Queue ──► Consumer A (if A is busy...)
└──► Consumer B (...B picks it up)
Use case: Task distribution, command processing

Broker Comparison

FeatureApache KafkaRabbitMQAmazon SQS
ModelDistributed log (append-only)Traditional message queueManaged queue service
OrderingPer-partition orderingPer-queue orderingBest-effort (FIFO available)
RetentionConfigurable (days/weeks/forever)Until consumed and acknowledgedUp to 14 days
ThroughputVery high (millions/sec)High (tens of thousands/sec)High (managed, auto-scaling)
ReplayYes — consumers can re-read old messagesNo — messages are deleted after ackNo
Consumer groupsBuilt-in (partition assignment)Via competing consumersBuilt-in
Best forEvent sourcing, streaming, data pipelinesTask queues, RPC, traditional messagingServerless, AWS-native applications
Ops complexityHigh (self-managed cluster)MediumNone (fully managed)

Event Sourcing

Event sourcing is a pattern where instead of storing the current state of an entity, you store the sequence of events that led to that state. The current state is derived by replaying all events from the beginning.

Traditional vs. Event-Sourced Storage

Traditional (State-Based):
┌──────────────────────────────┐
│ orders table │
│ │
│ id: ORD-123 │
│ status: SHIPPED │
│ total: $59.97 │
│ items: [{...}, {...}] │
│ updated_at: 2025-01-15 │
└──────────────────────────────┘
"What is the current state?"
✓ Easy to query
✗ History is lost
Event-Sourced:
┌────────────────────────────────────────────────────┐
│ order_events table │
│ │
│ 1. OrderCreated { customer: "C-1", items: [] } │
│ 2. ItemAdded { product: "P-1", qty: 2 } │
│ 3. ItemAdded { product: "P-2", qty: 1 } │
│ 4. OrderPlaced { total: $59.97 } │
│ 5. PaymentReceived { amount: $59.97 } │
│ 6. OrderShipped { tracking: "TRK-456" } │
└────────────────────────────────────────────────────┘
"What happened to produce the current state?"
✓ Complete audit trail
✓ Can rebuild state at any point in time
✗ Querying current state requires replay (use snapshots or CQRS)

Rebuilding State from Events

from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Dict, Any
from enum import Enum
import json
class EventType(Enum):
ORDER_CREATED = "OrderCreated"
ITEM_ADDED = "ItemAdded"
ORDER_PLACED = "OrderPlaced"
PAYMENT_RECEIVED = "PaymentReceived"
ORDER_SHIPPED = "OrderShipped"
ORDER_CANCELLED = "OrderCancelled"
@dataclass
class Event:
"""An immutable record of something that happened."""
event_type: EventType
data: Dict[str, Any]
timestamp: datetime = field(default_factory=datetime.utcnow)
version: int = 0
@dataclass
class OrderState:
"""The current state, rebuilt from events."""
order_id: str = ""
customer_id: str = ""
items: List[Dict] = field(default_factory=list)
status: str = "UNKNOWN"
total: float = 0.0
tracking_number: str = ""
class OrderAggregate:
"""Rebuilds order state by applying events sequentially."""
def __init__(self):
self._state = OrderState()
self._events: List[Event] = []
@property
def state(self) -> OrderState:
return self._state
def load_from_history(self, events: List[Event]) -> None:
"""Rebuild state by replaying all past events."""
for event in events:
self._apply(event)
self._events.append(event)
def _apply(self, event: Event) -> None:
"""Apply a single event to update the state."""
handler = {
EventType.ORDER_CREATED: self._on_order_created,
EventType.ITEM_ADDED: self._on_item_added,
EventType.ORDER_PLACED: self._on_order_placed,
EventType.PAYMENT_RECEIVED: self._on_payment_received,
EventType.ORDER_SHIPPED: self._on_order_shipped,
EventType.ORDER_CANCELLED: self._on_order_cancelled,
}.get(event.event_type)
if handler:
handler(event.data)
def _on_order_created(self, data: dict) -> None:
self._state.order_id = data["order_id"]
self._state.customer_id = data["customer_id"]
self._state.status = "CREATED"
def _on_item_added(self, data: dict) -> None:
self._state.items.append({
"product_id": data["product_id"],
"quantity": data["quantity"],
"price": data["price"],
})
def _on_order_placed(self, data: dict) -> None:
self._state.total = data["total"]
self._state.status = "PLACED"
def _on_payment_received(self, data: dict) -> None:
self._state.status = "PAID"
def _on_order_shipped(self, data: dict) -> None:
self._state.tracking_number = data["tracking_number"]
self._state.status = "SHIPPED"
def _on_order_cancelled(self, data: dict) -> None:
self._state.status = "CANCELLED"
# Usage: rebuild state from stored events
events = [
Event(EventType.ORDER_CREATED, {
"order_id": "ORD-123",
"customer_id": "C-1",
}),
Event(EventType.ITEM_ADDED, {
"product_id": "P-1", "quantity": 2, "price": 19.99,
}),
Event(EventType.ITEM_ADDED, {
"product_id": "P-2", "quantity": 1, "price": 19.99,
}),
Event(EventType.ORDER_PLACED, {"total": 59.97}),
Event(EventType.PAYMENT_RECEIVED, {"amount": 59.97}),
Event(EventType.ORDER_SHIPPED, {"tracking_number": "TRK-456"}),
]
order = OrderAggregate()
order.load_from_history(events)
print(order.state)
# OrderState(order_id='ORD-123', customer_id='C-1',
# items=[...], status='SHIPPED', total=59.97,
# tracking_number='TRK-456')

CQRS with Event Sourcing

CQRS and event sourcing are often used together. Events are the write model (the source of truth), and one or more read models (projections) are derived from those events for efficient querying.

┌──────────┐ Command ┌──────────────┐ Append ┌──────────────┐
│ Client │─────────────►│ Command │─────────────►│ Event │
│ │ │ Handler │ event │ Store │
└──────────┘ └──────────────┘ └──────┬───────┘
Publish events │
┌───────────────────────────────────┘
┌──────────────┐
│ Event Bus │
└──┬───────┬──┘
│ │
┌───────▼──┐ ┌──▼────────┐
│Projection│ │Projection │
│ Handler │ │ Handler │
│ (Orders) │ │ (Search) │
└────┬─────┘ └────┬──────┘
│ │
┌────▼─────┐ ┌───▼───────┐
│ Read DB │ │Elastic- │
│ (Postgres│ │search │
│ views) │ │ index │
└──────────┘ └───────────┘
▲ ▲
│ │
┌────┴────────────┴──┐
│ Query API │◄──── Client (reads)
└────────────────────┘

How it works:

  1. A command is received and validated
  2. The command handler loads the aggregate by replaying its events
  3. The aggregate produces new events and appends them to the event store
  4. Projection handlers consume the events and update read-optimized views
  5. Query APIs read from the projections (not the event store)

Event Publishing and Consumption Patterns

Publishing Events

import json
from datetime import datetime
from confluent_kafka import Producer
class EventPublisher:
"""Publishes domain events to Kafka topics."""
def __init__(self, bootstrap_servers: str):
self._producer = Producer({
"bootstrap.servers": bootstrap_servers,
"acks": "all", # Wait for all replicas to acknowledge
})
def publish(
self, topic: str, event_type: str, data: dict, key: str
) -> None:
"""Publish an event with a partition key for ordering."""
event = {
"event_type": event_type,
"data": data,
"timestamp": datetime.utcnow().isoformat(),
"metadata": {
"source": "order-service",
"version": 1,
},
}
self._producer.produce(
topic=topic,
key=key.encode("utf-8"),
value=json.dumps(event).encode("utf-8"),
callback=self._delivery_report,
)
self._producer.flush()
@staticmethod
def _delivery_report(err, msg):
if err:
print(f"Delivery failed: {err}")
else:
print(f"Event delivered to {msg.topic()} "
f"[partition {msg.partition()}]")
# Usage
publisher = EventPublisher("localhost:9092")
publisher.publish(
topic="order-events",
event_type="OrderPlaced",
data={
"order_id": "ORD-123",
"customer_id": "C-1",
"total": 59.97,
"items": [
{"product_id": "P-1", "quantity": 2},
{"product_id": "P-2", "quantity": 1},
],
},
key="ORD-123", # Ensures all events for this order go to same partition
)

Consuming Events

import json
from confluent_kafka import Consumer, KafkaError
class EventConsumer:
"""Consumes events from Kafka and dispatches to handlers."""
def __init__(self, bootstrap_servers: str, group_id: str):
self._consumer = Consumer({
"bootstrap.servers": bootstrap_servers,
"group.id": group_id,
"auto.offset.reset": "earliest",
"enable.auto.commit": False, # Manual commit for reliability
})
self._handlers = {}
def register_handler(self, event_type: str, handler):
"""Register a handler function for an event type."""
self._handlers[event_type] = handler
def subscribe(self, topics: list):
self._consumer.subscribe(topics)
def start(self):
"""Main consumption loop."""
try:
while True:
msg = self._consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() != KafkaError._PARTITION_EOF:
print(f"Consumer error: {msg.error()}")
continue
event = json.loads(msg.value().decode("utf-8"))
event_type = event["event_type"]
handler = self._handlers.get(event_type)
if handler:
try:
handler(event["data"])
# Commit only after successful processing
self._consumer.commit(asynchronous=False)
except Exception as e:
print(f"Error processing {event_type}: {e}")
# Do not commit -- message will be reprocessed
else:
print(f"No handler for event type: {event_type}")
self._consumer.commit(asynchronous=False)
finally:
self._consumer.close()
# Handler functions
def handle_order_placed(data: dict):
print(f"Reserving stock for order {data['order_id']}")
for item in data["items"]:
print(f" Reserve {item['quantity']}x {item['product_id']}")
def handle_order_cancelled(data: dict):
print(f"Releasing stock for order {data['order_id']}")
# Usage
consumer = EventConsumer("localhost:9092", "inventory-service")
consumer.register_handler("OrderPlaced", handle_order_placed)
consumer.register_handler("OrderCancelled", handle_order_cancelled)
consumer.subscribe(["order-events"])
consumer.start()

Eventual Consistency

In event-driven systems, data is eventually consistent — after an event is published, it takes some time for all consumers to process it and update their state. During that window, different services may have temporarily different views of the data.

Strategies for Handling Eventual Consistency

  1. Optimistic UI: Show the user an optimistic result (“Your order is being processed”) and update later when confirmation arrives.

  2. Polling / Long-polling: The client periodically checks for the final state.

  3. WebSockets / Server-Sent Events: Push the final state to the client in real time.

  4. Compensation: If a downstream action fails, publish a compensating event to undo the upstream action.

Timeline of Eventual Consistency:
T0: Order Service saves order → status: PENDING
T1: OrderPlaced event published → in the broker
T2: Inventory Service processes event → stock reserved
T3: Read model updated → status: CONFIRMED
─────────────────────────────────────────────────
│ T0 ─── T3: Data is inconsistent │
│ After T3: All views are consistent │
─────────────────────────────────────────────────

Idempotency

Because messages can be delivered more than once (due to retries, network issues, or broker rebalancing), consumers must be idempotent — processing the same message twice must produce the same result as processing it once.

Strategies for Idempotency

StrategyDescriptionExample
Idempotency keyStore processed event IDs; skip duplicatesCheck event_id in a processed_events table
Upsert operationsUse INSERT ... ON CONFLICT UPDATEDatabase upsert instead of plain insert
Natural idempotencyOperation is inherently idempotentSetting a status to “SHIPPED” (not incrementing a counter)
Deduplication at brokerKafka enable.idempotence=trueProducer-level deduplication
# Idempotent consumer using an idempotency key
class IdempotentHandler:
def __init__(self, db_session):
self._db = db_session
def handle(self, event_id: str, event_data: dict) -> None:
# Check if already processed
if self._db.query(ProcessedEvent).get(event_id):
print(f"Event {event_id} already processed, skipping")
return
# Process the event
self._do_business_logic(event_data)
# Mark as processed (in same transaction)
self._db.add(ProcessedEvent(id=event_id))
self._db.commit()

Dead Letter Queues

When a consumer repeatedly fails to process a message (due to bugs, invalid data, or downstream failures), the message is moved to a dead letter queue (DLQ) instead of blocking the pipeline.

Normal Flow:
Producer ──► Topic ──► Consumer ✓ (success)
Failure Flow:
Producer ──► Topic ──► Consumer ✗ (failure, retry)
──► Consumer ✗ (failure, retry)
──► Consumer ✗ (failure, max retries exceeded)
┌──────────────────┐
│ Dead Letter │
│ Queue (DLQ) │
│ │
│ Failed messages │
│ + error details │
│ + original msg │
└──────────────────┘
Ops team investigates,
fixes bug, replays
messages from DLQ

Best practices for DLQs:

  • Include the original message, error details, and retry count
  • Set up alerts when messages land in the DLQ
  • Build tooling to inspect and replay DLQ messages after fixing the issue
  • Consider a maximum retry count (e.g., 3-5) before sending to DLQ

Schema Evolution

As your system evolves, event schemas will change. Poorly managed schema changes can break consumers. There are three schema evolution strategies:

StrategyDescriptionRule
Backward compatibleNew schema can read old dataOnly add optional fields, never remove or rename
Forward compatibleOld schema can read new dataOnly remove optional fields, never add required
Full compatibleBoth directions workOnly add or remove optional fields

Best Practices

  1. Use a schema registry (e.g., Confluent Schema Registry) to validate schemas before publishing
  2. Never remove required fields from an event — add new optional fields instead
  3. Version your events (OrderPlaced_v1, OrderPlaced_v2) when breaking changes are unavoidable
  4. Use Avro, Protobuf, or JSON Schema for schema definition and validation
  5. Maintain a consumer contract — producers must not break the expectations of existing consumers
Schema Evolution Example:
v1: OrderPlaced { orderId, customerId, total }
v2: OrderPlaced { orderId, customerId, total, currency? } // Added optional field
v3: OrderPlaced { orderId, customerId, total, currency?, shippingAddress? }
Consumers reading v1 events can ignore unknown fields (currency, shippingAddress).
Consumers expecting v3 must handle missing optional fields gracefully.

When to Use Event-Driven Architecture

Good Fit

  • Multiple consumers need to react to the same event (notifications, analytics, search indexing)
  • Temporal decoupling is important — services can be offline and catch up later
  • Audit trail is required — event sourcing provides a natural log of all changes
  • High throughput — event streaming handles millions of events per second
  • Complex workflows spanning multiple services (order fulfillment, onboarding)

Poor Fit

  • Simple CRUD applications where synchronous REST is sufficient
  • Strong consistency is required everywhere (banking transactions between accounts)
  • Small teams that cannot afford the operational overhead of a message broker
  • Debugging simplicity is a priority — distributed event flows are hard to trace

Next Steps