Event-Driven Architecture
Event-driven architecture (EDA) is a design paradigm where the flow of the program is determined by events — significant changes in state that are published, detected, and consumed by decoupled components. Instead of one service directly calling another, services communicate by producing and reacting to events, resulting in systems that are loosely coupled, scalable, and resilient.
Event-Driven vs. Request-Driven Architecture
Most developers are familiar with request-driven (synchronous) architecture. Understanding the shift to event-driven thinking is essential.
Request-Driven (Synchronous):┌──────────┐ POST /orders ┌──────────┐ reserveStock() ┌──────────┐│ Client │──────────────────►│ Order │─────────────────►│Inventory ││ │◄──────────────────│ Service │◄─────────────────│ Service │└──────────┘ 201 Created └──────────┘ { success } └──────────┘
- Caller waits for a response - Tight temporal coupling (both must be online) - Simple to reason about - Cascading failures possible
Event-Driven (Asynchronous):┌──────────┐ POST /orders ┌──────────┐ OrderPlaced ┌─────────────┐│ Client │──────────────────►│ Order │────event───────►│ Message ││ │◄──────────────────│ Service │ │ Broker │└──────────┘ 202 Accepted └──────────┘ └──┬──────┬──┘ │ │ consume │ │ consume ▼ ▼ ┌────────┐ ┌────────────┐ │Inventory│ │Notification│ │ Service │ │ Service │ └────────┘ └────────────┘
- Publisher does not wait for consumers - Loose coupling (consumers can be offline temporarily) - Harder to reason about (eventual consistency) - Better fault isolation| Aspect | Request-Driven | Event-Driven |
|---|---|---|
| Coupling | Tight — caller knows the callee | Loose — publisher does not know consumers |
| Response | Synchronous — caller waits | Asynchronous — fire and forget |
| Consistency | Immediate | Eventual |
| Scalability | Limited by synchronous chains | Highly scalable with parallel consumers |
| Failure handling | Cascading failures | Isolated failures, messages are buffered |
| Debugging | Simple stack traces | Distributed tracing required |
| Best for | Queries, immediate validation | Workflows, notifications, data pipelines |
Types of Events
Not all events are created equal. Understanding the different types helps you design cleaner systems.
Domain Events
Represent something meaningful that happened in the business domain. Named in past tense.
OrderPlaced— a customer placed an orderPaymentProcessed— payment was successfully chargedInventoryDepleted— stock reached zero for a product
Domain events are the primary building block of event-driven systems. They carry the facts about what happened, along with enough data for consumers to act.
Integration Events
Domain events that cross service boundaries. They are a subset of domain events that are published to the message broker for other services to consume.
- Typically carry less data than internal domain events (to avoid leaking internals)
- Must have a stable schema (since multiple services depend on them)
- Should be versioned to allow for schema evolution
Commands
Unlike events (which describe something that happened), commands express an intent to do something. Named in imperative form.
ProcessPayment— a request to charge a customerSendNotification— a request to deliver an email or SMSReserveInventory— a request to hold stock for an order
Commands are directed at a specific consumer (point-to-point), while events are broadcast to all interested consumers (pub/sub).
Event: "Something happened" → Zero or more consumers reactCommand: "Do this thing" → Exactly one consumer processes itMessage Brokers
A message broker is the infrastructure that transports messages between producers and consumers. The choice of broker affects durability, ordering, throughput, and delivery guarantees.
Key Concepts
Producer ──► Topic / Queue ──► Consumer
┌──────────┐ ┌─────────────────────────────────┐ ┌──────────┐│ Producer │────►│ Message Broker │────►│ Consumer ││ │ │ │ │ ││ publish │ │ ┌────┬────┬────┬────┬────┐ │ │ consume ││ message │ │ │ m1 │ m2 │ m3 │ m4 │ m5 │ │ │ message ││ │ │ └────┴────┴────┴────┴────┘ │ │ │└──────────┘ │ Topic / Queue │ └──────────┘ └─────────────────────────────────┘Pub/Sub vs. Point-to-Point
Pub/Sub (Topic): One message → delivered to ALL subscribers
Producer ──► Topic ──┬──► Consumer A (Inventory) ├──► Consumer B (Notification) └──► Consumer C (Analytics)
Use case: Broadcasting domain events
Point-to-Point (Queue): One message → delivered to exactly ONE consumer
Producer ──► Queue ──► Consumer A (if A is busy...) └──► Consumer B (...B picks it up)
Use case: Task distribution, command processingBroker Comparison
| Feature | Apache Kafka | RabbitMQ | Amazon SQS |
|---|---|---|---|
| Model | Distributed log (append-only) | Traditional message queue | Managed queue service |
| Ordering | Per-partition ordering | Per-queue ordering | Best-effort (FIFO available) |
| Retention | Configurable (days/weeks/forever) | Until consumed and acknowledged | Up to 14 days |
| Throughput | Very high (millions/sec) | High (tens of thousands/sec) | High (managed, auto-scaling) |
| Replay | Yes — consumers can re-read old messages | No — messages are deleted after ack | No |
| Consumer groups | Built-in (partition assignment) | Via competing consumers | Built-in |
| Best for | Event sourcing, streaming, data pipelines | Task queues, RPC, traditional messaging | Serverless, AWS-native applications |
| Ops complexity | High (self-managed cluster) | Medium | None (fully managed) |
Event Sourcing
Event sourcing is a pattern where instead of storing the current state of an entity, you store the sequence of events that led to that state. The current state is derived by replaying all events from the beginning.
Traditional vs. Event-Sourced Storage
Traditional (State-Based):┌──────────────────────────────┐│ orders table ││ ││ id: ORD-123 ││ status: SHIPPED ││ total: $59.97 ││ items: [{...}, {...}] ││ updated_at: 2025-01-15 │└──────────────────────────────┘ "What is the current state?" ✓ Easy to query ✗ History is lost
Event-Sourced:┌────────────────────────────────────────────────────┐│ order_events table ││ ││ 1. OrderCreated { customer: "C-1", items: [] } ││ 2. ItemAdded { product: "P-1", qty: 2 } ││ 3. ItemAdded { product: "P-2", qty: 1 } ││ 4. OrderPlaced { total: $59.97 } ││ 5. PaymentReceived { amount: $59.97 } ││ 6. OrderShipped { tracking: "TRK-456" } │└────────────────────────────────────────────────────┘ "What happened to produce the current state?" ✓ Complete audit trail ✓ Can rebuild state at any point in time ✗ Querying current state requires replay (use snapshots or CQRS)Rebuilding State from Events
from dataclasses import dataclass, fieldfrom datetime import datetimefrom typing import List, Dict, Anyfrom enum import Enumimport json
class EventType(Enum): ORDER_CREATED = "OrderCreated" ITEM_ADDED = "ItemAdded" ORDER_PLACED = "OrderPlaced" PAYMENT_RECEIVED = "PaymentReceived" ORDER_SHIPPED = "OrderShipped" ORDER_CANCELLED = "OrderCancelled"
@dataclassclass Event: """An immutable record of something that happened.""" event_type: EventType data: Dict[str, Any] timestamp: datetime = field(default_factory=datetime.utcnow) version: int = 0
@dataclassclass OrderState: """The current state, rebuilt from events.""" order_id: str = "" customer_id: str = "" items: List[Dict] = field(default_factory=list) status: str = "UNKNOWN" total: float = 0.0 tracking_number: str = ""
class OrderAggregate: """Rebuilds order state by applying events sequentially."""
def __init__(self): self._state = OrderState() self._events: List[Event] = []
@property def state(self) -> OrderState: return self._state
def load_from_history(self, events: List[Event]) -> None: """Rebuild state by replaying all past events.""" for event in events: self._apply(event) self._events.append(event)
def _apply(self, event: Event) -> None: """Apply a single event to update the state.""" handler = { EventType.ORDER_CREATED: self._on_order_created, EventType.ITEM_ADDED: self._on_item_added, EventType.ORDER_PLACED: self._on_order_placed, EventType.PAYMENT_RECEIVED: self._on_payment_received, EventType.ORDER_SHIPPED: self._on_order_shipped, EventType.ORDER_CANCELLED: self._on_order_cancelled, }.get(event.event_type)
if handler: handler(event.data)
def _on_order_created(self, data: dict) -> None: self._state.order_id = data["order_id"] self._state.customer_id = data["customer_id"] self._state.status = "CREATED"
def _on_item_added(self, data: dict) -> None: self._state.items.append({ "product_id": data["product_id"], "quantity": data["quantity"], "price": data["price"], })
def _on_order_placed(self, data: dict) -> None: self._state.total = data["total"] self._state.status = "PLACED"
def _on_payment_received(self, data: dict) -> None: self._state.status = "PAID"
def _on_order_shipped(self, data: dict) -> None: self._state.tracking_number = data["tracking_number"] self._state.status = "SHIPPED"
def _on_order_cancelled(self, data: dict) -> None: self._state.status = "CANCELLED"
# Usage: rebuild state from stored eventsevents = [ Event(EventType.ORDER_CREATED, { "order_id": "ORD-123", "customer_id": "C-1", }), Event(EventType.ITEM_ADDED, { "product_id": "P-1", "quantity": 2, "price": 19.99, }), Event(EventType.ITEM_ADDED, { "product_id": "P-2", "quantity": 1, "price": 19.99, }), Event(EventType.ORDER_PLACED, {"total": 59.97}), Event(EventType.PAYMENT_RECEIVED, {"amount": 59.97}), Event(EventType.ORDER_SHIPPED, {"tracking_number": "TRK-456"}),]
order = OrderAggregate()order.load_from_history(events)print(order.state)# OrderState(order_id='ORD-123', customer_id='C-1',# items=[...], status='SHIPPED', total=59.97,# tracking_number='TRK-456')// Event types as constantsconst EventType = Object.freeze({ ORDER_CREATED: "OrderCreated", ITEM_ADDED: "ItemAdded", ORDER_PLACED: "OrderPlaced", PAYMENT_RECEIVED: "PaymentReceived", ORDER_SHIPPED: "OrderShipped", ORDER_CANCELLED: "OrderCancelled",});
// An immutable event recordfunction createEvent(type, data) { return Object.freeze({ eventType: type, data: { ...data }, timestamp: new Date().toISOString(), });}
class OrderAggregate { constructor() { this.state = { orderId: "", customerId: "", items: [], status: "UNKNOWN", total: 0, trackingNumber: "", }; this.events = []; }
/** Rebuild state by replaying all past events. */ loadFromHistory(events) { for (const event of events) { this.#apply(event); this.events.push(event); } }
#apply(event) { const handlers = { [EventType.ORDER_CREATED]: (data) => { this.state.orderId = data.orderId; this.state.customerId = data.customerId; this.state.status = "CREATED"; }, [EventType.ITEM_ADDED]: (data) => { this.state.items.push({ productId: data.productId, quantity: data.quantity, price: data.price, }); }, [EventType.ORDER_PLACED]: (data) => { this.state.total = data.total; this.state.status = "PLACED"; }, [EventType.PAYMENT_RECEIVED]: () => { this.state.status = "PAID"; }, [EventType.ORDER_SHIPPED]: (data) => { this.state.trackingNumber = data.trackingNumber; this.state.status = "SHIPPED"; }, [EventType.ORDER_CANCELLED]: () => { this.state.status = "CANCELLED"; }, };
const handler = handlers[event.eventType]; if (handler) handler(event.data); }}
// Usage: rebuild state from stored eventsconst events = [ createEvent(EventType.ORDER_CREATED, { orderId: "ORD-123", customerId: "C-1", }), createEvent(EventType.ITEM_ADDED, { productId: "P-1", quantity: 2, price: 19.99, }), createEvent(EventType.ITEM_ADDED, { productId: "P-2", quantity: 1, price: 19.99, }), createEvent(EventType.ORDER_PLACED, { total: 59.97 }), createEvent(EventType.PAYMENT_RECEIVED, { amount: 59.97 }), createEvent(EventType.ORDER_SHIPPED, { trackingNumber: "TRK-456" }),];
const order = new OrderAggregate();order.loadFromHistory(events);console.log(order.state);// { orderId: 'ORD-123', customerId: 'C-1',// items: [...], status: 'SHIPPED', total: 59.97,// trackingNumber: 'TRK-456' }CQRS with Event Sourcing
CQRS and event sourcing are often used together. Events are the write model (the source of truth), and one or more read models (projections) are derived from those events for efficient querying.
┌──────────┐ Command ┌──────────────┐ Append ┌──────────────┐│ Client │─────────────►│ Command │─────────────►│ Event ││ │ │ Handler │ event │ Store │└──────────┘ └──────────────┘ └──────┬───────┘ │ Publish events │ ┌───────────────────────────────────┘ │ ▼ ┌──────────────┐ │ Event Bus │ └──┬───────┬──┘ │ │ ┌───────▼──┐ ┌──▼────────┐ │Projection│ │Projection │ │ Handler │ │ Handler │ │ (Orders) │ │ (Search) │ └────┬─────┘ └────┬──────┘ │ │ ┌────▼─────┐ ┌───▼───────┐ │ Read DB │ │Elastic- │ │ (Postgres│ │search │ │ views) │ │ index │ └──────────┘ └───────────┘ ▲ ▲ │ │ ┌────┴────────────┴──┐ │ Query API │◄──── Client (reads) └────────────────────┘How it works:
- A command is received and validated
- The command handler loads the aggregate by replaying its events
- The aggregate produces new events and appends them to the event store
- Projection handlers consume the events and update read-optimized views
- Query APIs read from the projections (not the event store)
Event Publishing and Consumption Patterns
Publishing Events
import jsonfrom datetime import datetimefrom confluent_kafka import Producer
class EventPublisher: """Publishes domain events to Kafka topics."""
def __init__(self, bootstrap_servers: str): self._producer = Producer({ "bootstrap.servers": bootstrap_servers, "acks": "all", # Wait for all replicas to acknowledge })
def publish( self, topic: str, event_type: str, data: dict, key: str ) -> None: """Publish an event with a partition key for ordering.""" event = { "event_type": event_type, "data": data, "timestamp": datetime.utcnow().isoformat(), "metadata": { "source": "order-service", "version": 1, }, } self._producer.produce( topic=topic, key=key.encode("utf-8"), value=json.dumps(event).encode("utf-8"), callback=self._delivery_report, ) self._producer.flush()
@staticmethod def _delivery_report(err, msg): if err: print(f"Delivery failed: {err}") else: print(f"Event delivered to {msg.topic()} " f"[partition {msg.partition()}]")
# Usagepublisher = EventPublisher("localhost:9092")publisher.publish( topic="order-events", event_type="OrderPlaced", data={ "order_id": "ORD-123", "customer_id": "C-1", "total": 59.97, "items": [ {"product_id": "P-1", "quantity": 2}, {"product_id": "P-2", "quantity": 1}, ], }, key="ORD-123", # Ensures all events for this order go to same partition)const { Kafka } = require("kafkajs");
class EventPublisher { constructor(brokers) { const kafka = new Kafka({ clientId: "order-service", brokers, }); this.producer = kafka.producer(); }
async connect() { await this.producer.connect(); }
async publish(topic, eventType, data, key) { const event = { eventType, data, timestamp: new Date().toISOString(), metadata: { source: "order-service", version: 1, }, };
await this.producer.send({ topic, messages: [ { key, // Partition key for ordering value: JSON.stringify(event), }, ], });
console.log(`Event ${eventType} published to ${topic}`); }
async disconnect() { await this.producer.disconnect(); }}
// Usageasync function main() { const publisher = new EventPublisher(["localhost:9092"]); await publisher.connect();
await publisher.publish( "order-events", "OrderPlaced", { orderId: "ORD-123", customerId: "C-1", total: 59.97, items: [ { productId: "P-1", quantity: 2 }, { productId: "P-2", quantity: 1 }, ], }, "ORD-123" );
await publisher.disconnect();}
main().catch(console.error);Consuming Events
import jsonfrom confluent_kafka import Consumer, KafkaError
class EventConsumer: """Consumes events from Kafka and dispatches to handlers."""
def __init__(self, bootstrap_servers: str, group_id: str): self._consumer = Consumer({ "bootstrap.servers": bootstrap_servers, "group.id": group_id, "auto.offset.reset": "earliest", "enable.auto.commit": False, # Manual commit for reliability }) self._handlers = {}
def register_handler(self, event_type: str, handler): """Register a handler function for an event type.""" self._handlers[event_type] = handler
def subscribe(self, topics: list): self._consumer.subscribe(topics)
def start(self): """Main consumption loop.""" try: while True: msg = self._consumer.poll(timeout=1.0) if msg is None: continue if msg.error(): if msg.error().code() != KafkaError._PARTITION_EOF: print(f"Consumer error: {msg.error()}") continue
event = json.loads(msg.value().decode("utf-8")) event_type = event["event_type"] handler = self._handlers.get(event_type)
if handler: try: handler(event["data"]) # Commit only after successful processing self._consumer.commit(asynchronous=False) except Exception as e: print(f"Error processing {event_type}: {e}") # Do not commit -- message will be reprocessed else: print(f"No handler for event type: {event_type}") self._consumer.commit(asynchronous=False)
finally: self._consumer.close()
# Handler functionsdef handle_order_placed(data: dict): print(f"Reserving stock for order {data['order_id']}") for item in data["items"]: print(f" Reserve {item['quantity']}x {item['product_id']}")
def handle_order_cancelled(data: dict): print(f"Releasing stock for order {data['order_id']}")
# Usageconsumer = EventConsumer("localhost:9092", "inventory-service")consumer.register_handler("OrderPlaced", handle_order_placed)consumer.register_handler("OrderCancelled", handle_order_cancelled)consumer.subscribe(["order-events"])consumer.start()const { Kafka } = require("kafkajs");
class EventConsumer { constructor(brokers, groupId) { const kafka = new Kafka({ clientId: "inventory-service", brokers, }); this.consumer = kafka.consumer({ groupId }); this.handlers = new Map(); }
registerHandler(eventType, handler) { this.handlers.set(eventType, handler); }
async start(topics) { await this.consumer.connect();
for (const topic of topics) { await this.consumer.subscribe({ topic, fromBeginning: true }); }
await this.consumer.run({ eachMessage: async ({ topic, partition, message }) => { const event = JSON.parse(message.value.toString()); const handler = this.handlers.get(event.eventType);
if (handler) { try { await handler(event.data); console.log( `Processed ${event.eventType} from ${topic}:${partition}` ); } catch (error) { console.error( `Error processing ${event.eventType}:`, error ); // In production: send to dead letter queue } } else { console.log(`No handler for: ${event.eventType}`); } }, }); }}
// Handler functionsfunction handleOrderPlaced(data) { console.log(`Reserving stock for order ${data.orderId}`); for (const item of data.items) { console.log(` Reserve ${item.quantity}x ${item.productId}`); }}
function handleOrderCancelled(data) { console.log(`Releasing stock for order ${data.orderId}`);}
// Usageconst consumer = new EventConsumer( ["localhost:9092"], "inventory-service");consumer.registerHandler("OrderPlaced", handleOrderPlaced);consumer.registerHandler("OrderCancelled", handleOrderCancelled);consumer.start(["order-events"]).catch(console.error);Eventual Consistency
In event-driven systems, data is eventually consistent — after an event is published, it takes some time for all consumers to process it and update their state. During that window, different services may have temporarily different views of the data.
Strategies for Handling Eventual Consistency
-
Optimistic UI: Show the user an optimistic result (“Your order is being processed”) and update later when confirmation arrives.
-
Polling / Long-polling: The client periodically checks for the final state.
-
WebSockets / Server-Sent Events: Push the final state to the client in real time.
-
Compensation: If a downstream action fails, publish a compensating event to undo the upstream action.
Timeline of Eventual Consistency:
T0: Order Service saves order → status: PENDINGT1: OrderPlaced event published → in the brokerT2: Inventory Service processes event → stock reservedT3: Read model updated → status: CONFIRMED ───────────────────────────────────────────────── │ T0 ─── T3: Data is inconsistent │ │ After T3: All views are consistent │ ─────────────────────────────────────────────────Idempotency
Because messages can be delivered more than once (due to retries, network issues, or broker rebalancing), consumers must be idempotent — processing the same message twice must produce the same result as processing it once.
Strategies for Idempotency
| Strategy | Description | Example |
|---|---|---|
| Idempotency key | Store processed event IDs; skip duplicates | Check event_id in a processed_events table |
| Upsert operations | Use INSERT ... ON CONFLICT UPDATE | Database upsert instead of plain insert |
| Natural idempotency | Operation is inherently idempotent | Setting a status to “SHIPPED” (not incrementing a counter) |
| Deduplication at broker | Kafka enable.idempotence=true | Producer-level deduplication |
# Idempotent consumer using an idempotency keyclass IdempotentHandler: def __init__(self, db_session): self._db = db_session
def handle(self, event_id: str, event_data: dict) -> None: # Check if already processed if self._db.query(ProcessedEvent).get(event_id): print(f"Event {event_id} already processed, skipping") return
# Process the event self._do_business_logic(event_data)
# Mark as processed (in same transaction) self._db.add(ProcessedEvent(id=event_id)) self._db.commit()Dead Letter Queues
When a consumer repeatedly fails to process a message (due to bugs, invalid data, or downstream failures), the message is moved to a dead letter queue (DLQ) instead of blocking the pipeline.
Normal Flow: Producer ──► Topic ──► Consumer ✓ (success)
Failure Flow: Producer ──► Topic ──► Consumer ✗ (failure, retry) ──► Consumer ✗ (failure, retry) ──► Consumer ✗ (failure, max retries exceeded) │ ▼ ┌──────────────────┐ │ Dead Letter │ │ Queue (DLQ) │ │ │ │ Failed messages │ │ + error details │ │ + original msg │ └──────────────────┘ │ Ops team investigates, fixes bug, replays messages from DLQBest practices for DLQs:
- Include the original message, error details, and retry count
- Set up alerts when messages land in the DLQ
- Build tooling to inspect and replay DLQ messages after fixing the issue
- Consider a maximum retry count (e.g., 3-5) before sending to DLQ
Schema Evolution
As your system evolves, event schemas will change. Poorly managed schema changes can break consumers. There are three schema evolution strategies:
| Strategy | Description | Rule |
|---|---|---|
| Backward compatible | New schema can read old data | Only add optional fields, never remove or rename |
| Forward compatible | Old schema can read new data | Only remove optional fields, never add required |
| Full compatible | Both directions work | Only add or remove optional fields |
Best Practices
- Use a schema registry (e.g., Confluent Schema Registry) to validate schemas before publishing
- Never remove required fields from an event — add new optional fields instead
- Version your events (
OrderPlaced_v1,OrderPlaced_v2) when breaking changes are unavoidable - Use Avro, Protobuf, or JSON Schema for schema definition and validation
- Maintain a consumer contract — producers must not break the expectations of existing consumers
Schema Evolution Example:
v1: OrderPlaced { orderId, customerId, total }v2: OrderPlaced { orderId, customerId, total, currency? } // Added optional fieldv3: OrderPlaced { orderId, customerId, total, currency?, shippingAddress? }
Consumers reading v1 events can ignore unknown fields (currency, shippingAddress).Consumers expecting v3 must handle missing optional fields gracefully.When to Use Event-Driven Architecture
Good Fit
- Multiple consumers need to react to the same event (notifications, analytics, search indexing)
- Temporal decoupling is important — services can be offline and catch up later
- Audit trail is required — event sourcing provides a natural log of all changes
- High throughput — event streaming handles millions of events per second
- Complex workflows spanning multiple services (order fulfillment, onboarding)
Poor Fit
- Simple CRUD applications where synchronous REST is sufficient
- Strong consistency is required everywhere (banking transactions between accounts)
- Small teams that cannot afford the operational overhead of a message broker
- Debugging simplicity is a priority — distributed event flows are hard to trace