Skip to content

Event Sourcing & CQRS

Event sourcing and CQRS are architectural patterns that fundamentally change how applications store and retrieve data. Instead of storing the current state, event sourcing stores a sequence of state-changing events. CQRS separates the read and write sides of an application into different models. Together, they enable systems that are auditable, scalable, and capable of rebuilding state from history.


Event Sourcing

The Core Idea

Traditional systems store the current state of an entity. Event sourcing stores the sequence of events that led to the current state.

Traditional (State-Based):
┌───────────────────────────────────┐
│ Account: ACC-001 │
│ Balance: $750 │
│ Last Updated: 2024-01-15 │
└───────────────────────────────────┘
(How did we get to $750? We don't know.)
Event Sourced:
┌───────────────────────────────────┐
│ Event 1: AccountCreated │
│ { balance: $0 } │
│ Event 2: MoneyDeposited │
│ { amount: $1000 } │
│ Event 3: MoneyWithdrawn │
│ { amount: $200 } │
│ Event 4: MoneyWithdrawn │
│ { amount: $50 } │
└───────────────────────────────────┘
Current state: $0 + $1000 - $200 - $50 = $750
(Full history. Auditable. Reproducible.)

Analogy: Git vs File Saves

Event sourcing is like Git version control for your data:

File Save (state-based): Git (event-sourced):
Save v1: "Hello" Commit 1: Create file "Hello"
Save v2: "Hello World" Commit 2: Append " World"
Save v3: "Hello World!" Commit 3: Append "!"
With file saves: With Git:
Only v3 exists. All history is preserved.
Cannot undo or audit. Can checkout any version.
Can see who changed what.

Event Store

An event store is an append-only database optimized for storing and retrieving event sequences:

Event Store:
┌─────────────────────────────────────────────────────────────┐
│ Stream: Account-ACC-001 │
├─────┬────────────────────┬───────────────┬──────────────────┤
│ Seq │ Event Type │ Data │ Timestamp │
├─────┼────────────────────┼───────────────┼──────────────────┤
│ 1 │ AccountCreated │ balance: 0 │ 2024-01-01 09:00│
│ 2 │ MoneyDeposited │ amount: 1000 │ 2024-01-05 14:30│
│ 3 │ MoneyWithdrawn │ amount: 200 │ 2024-01-10 10:15│
│ 4 │ MoneyWithdrawn │ amount: 50 │ 2024-01-15 16:45│
└─────┴────────────────────┴───────────────┴──────────────────┘
Key properties:
- Append-only (events are never modified or deleted)
- Ordered by sequence number within each stream
- Each stream represents one aggregate/entity
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import uuid
@dataclass
class Event:
event_id: str
stream_id: str
event_type: str
data: dict
timestamp: datetime
version: int
class EventStore:
"""Simple in-memory event store."""
def __init__(self):
self.streams: dict[str, list[Event]] = {}
self.all_events: list[Event] = []
def append(self, stream_id: str, event_type: str,
data: dict, expected_version: int = -1):
"""
Append an event to a stream.
Uses optimistic concurrency with expected_version.
"""
if stream_id not in self.streams:
self.streams[stream_id] = []
current_version = len(self.streams[stream_id])
if expected_version >= 0 and \
current_version != expected_version:
raise ConcurrencyError(
f"Expected version {expected_version}, "
f"but stream is at version {current_version}"
)
event = Event(
event_id=str(uuid.uuid4()),
stream_id=stream_id,
event_type=event_type,
data=data,
timestamp=datetime.utcnow(),
version=current_version + 1
)
self.streams[stream_id].append(event)
self.all_events.append(event)
return event
def read_stream(self, stream_id: str,
from_version: int = 0) -> list[Event]:
"""Read events from a stream."""
events = self.streams.get(stream_id, [])
return [e for e in events
if e.version > from_version]
def read_all(self, from_position: int = 0
) -> list[Event]:
"""Read all events across all streams."""
return self.all_events[from_position:]
class ConcurrencyError(Exception):
pass
# Bank Account Aggregate
class BankAccount:
def __init__(self):
self.account_id = None
self.balance = 0
self.is_open = False
self.version = 0
self._pending_events = []
@classmethod
def create(cls, account_id: str,
initial_deposit: float = 0):
account = cls()
account._apply_event('AccountCreated', {
'account_id': account_id,
'initial_deposit': initial_deposit
})
return account
def deposit(self, amount: float):
if not self.is_open:
raise ValueError("Account is closed")
if amount <= 0:
raise ValueError("Amount must be positive")
self._apply_event('MoneyDeposited', {
'amount': amount
})
def withdraw(self, amount: float):
if not self.is_open:
raise ValueError("Account is closed")
if amount > self.balance:
raise ValueError("Insufficient funds")
self._apply_event('MoneyWithdrawn', {
'amount': amount
})
def close(self):
if self.balance != 0:
raise ValueError(
"Cannot close account with balance"
)
self._apply_event('AccountClosed', {})
def _apply_event(self, event_type: str, data: dict):
"""Apply an event and record it."""
self._handle_event(event_type, data)
self._pending_events.append(
(event_type, data)
)
def _handle_event(self, event_type: str, data: dict):
"""Update state based on event."""
if event_type == 'AccountCreated':
self.account_id = data['account_id']
self.balance = data.get('initial_deposit', 0)
self.is_open = True
elif event_type == 'MoneyDeposited':
self.balance += data['amount']
elif event_type == 'MoneyWithdrawn':
self.balance -= data['amount']
elif event_type == 'AccountClosed':
self.is_open = False
self.version += 1
@classmethod
def load_from_events(cls, events: list[Event]):
"""Rebuild account state from events."""
account = cls()
for event in events:
account._handle_event(
event.event_type, event.data
)
return account
def get_pending_events(self):
events = self._pending_events.copy()
self._pending_events.clear()
return events
# Usage
store = EventStore()
# Create account and perform operations
account = BankAccount.create('ACC-001', 1000)
account.deposit(500)
account.withdraw(200)
# Persist events
for event_type, data in account.get_pending_events():
store.append('ACC-001', event_type, data)
# Later: rebuild account from events
events = store.read_stream('ACC-001')
rebuilt = BankAccount.load_from_events(events)
print(f"Balance: ${rebuilt.balance}") # $1300

Projections

A projection (also called a read model or materialized view) transforms events into a queryable format optimized for reads. While the event store is the source of truth, projections provide the views that applications actually query.

Event Store (source of truth):
[AccountCreated] → [Deposited $1000] → [Withdrawn $200]
Projection 1: Account Balance View
┌──────────┬─────────┬────────────┐
│ Account │ Balance │ Updated At │
├──────────┼─────────┼────────────┤
│ ACC-001 │ $800 │ 2024-01-15 │
└──────────┴─────────┴────────────┘
Projection 2: Transaction History View
┌──────────┬───────────┬────────┬────────────┐
│ Account │ Type │ Amount │ Date │
├──────────┼───────────┼────────┼────────────┤
│ ACC-001 │ Deposit │ $1000 │ 2024-01-05 │
│ ACC-001 │ Withdrawal│ $200 │ 2024-01-15 │
└──────────┴───────────┴────────┴────────────┘
Projection 3: Daily Summary View
┌────────────┬────────────┬─────────────┬───────┐
│ Date │ Deposits │ Withdrawals │ Net │
├────────────┼────────────┼─────────────┼───────┤
│ 2024-01-05 │ $1000 │ $0 │+$1000 │
│ 2024-01-15 │ $0 │ $200 │-$200 │
└────────────┴────────────┴─────────────┴───────┘
Different projections from the SAME events.
You can add new projections at any time and
rebuild them from the event history.

Snapshots

As event streams grow long, rebuilding state from all events becomes slow. Snapshots cache the aggregate state at a point in time, so you only need to replay events after the snapshot.

Without snapshots:
Load account ACC-001:
Replay events 1, 2, 3, 4, 5, ... 10,000
Time: O(n) where n = total events
With snapshots:
Load snapshot at event 9,900: balance = $5,432
Replay events 9,901 through 10,000 (100 events)
Time: O(1) for snapshot + O(100) for recent events
Snapshot strategy:
Create a snapshot every N events (e.g., every 100)
or every T time period (e.g., daily)
# Snapshot example
class SnapshotStore:
def __init__(self):
self.snapshots = {} # stream_id -> snapshot
def save_snapshot(self, stream_id: str,
version: int, state: dict):
self.snapshots[stream_id] = {
'version': version,
'state': state,
'timestamp': datetime.utcnow()
}
def load_snapshot(self, stream_id: str):
return self.snapshots.get(stream_id)
# Loading with snapshots
def load_account(account_id, event_store, snapshot_store):
# Try to load snapshot first
snapshot = snapshot_store.load_snapshot(account_id)
if snapshot:
account = BankAccount.from_snapshot(
snapshot['state']
)
# Only replay events after snapshot
events = event_store.read_stream(
account_id,
from_version=snapshot['version']
)
else:
account = BankAccount()
events = event_store.read_stream(account_id)
# Replay remaining events
for event in events:
account._handle_event(
event.event_type, event.data
)
return account

CQRS (Command Query Responsibility Segregation)

CQRS separates the command (write) model from the query (read) model. Commands modify state; queries read state. Each side can be independently optimized.

Traditional CRUD:
┌────────────────────────────┐
│ Single Data Model │
│ │
│ CREATE ─┐ │
│ READ ─┤── Same model │
│ UPDATE ─┤ Same DB │
│ DELETE ─┘ │
└────────────────────────────┘
CQRS:
┌────────────────────┐ ┌────────────────────┐
│ Command Side │ │ Query Side │
│ │ │ │
│ Commands ──▶ │ │ Queries ──▶ │
│ Domain Model │ │ Read Models │
│ Write Database │ │ Read Database │
│ (normalized) │ │ (denormalized) │
│ │ │ │
└──────────┬──────────┘ └──────────▲──────────┘
│ │
│ Events / Sync │
└──────────────────────────┘

Why Separate Reads and Writes?

AspectWritesReads
FrequencyOften less frequentOften much more frequent
Data modelNormalized for integrityDenormalized for performance
ScalingScale for write throughputScale for read throughput
OptimizationTransactions, constraintsCaching, materialized views
ConsistencyMust be immediately consistentCan tolerate eventual consistency

Combining Event Sourcing + CQRS

Event sourcing and CQRS are natural partners. The event store serves as the write model, and projections serve as the read models:

┌────────────────────────────────────────────────────────────────┐
│ ES + CQRS Architecture │
│ │
│ Command Side: Query Side: │
│ │
│ Client ──▶ Command Handler Client ──▶ Query Handler │
│ │ │ │
│ ▼ ▼ │
│ Domain Aggregate Read Model (View) │
│ │ ▲ │
│ ▼ │ │
│ Event Store ────events────▶ Projection Engine │
│ (append-only) (builds read models) │
│ │
│ Write DB: Read DB: │
│ Event Store PostgreSQL views │
│ (Kafka, EventStoreDB) Elasticsearch (search) │
│ Redis (cache) │
│ DynamoDB (key-value) │
└────────────────────────────────────────────────────────────────┘
from dataclasses import dataclass
from typing import Callable
# Commands
@dataclass
class CreateAccount:
account_id: str
owner: str
initial_deposit: float
@dataclass
class DepositMoney:
account_id: str
amount: float
@dataclass
class WithdrawMoney:
account_id: str
amount: float
# Command Handler (Write Side)
class AccountCommandHandler:
def __init__(self, event_store: EventStore,
snapshot_store: SnapshotStore):
self.event_store = event_store
self.snapshot_store = snapshot_store
def handle(self, command):
if isinstance(command, CreateAccount):
return self._create_account(command)
elif isinstance(command, DepositMoney):
return self._deposit(command)
elif isinstance(command, WithdrawMoney):
return self._withdraw(command)
def _create_account(self, cmd: CreateAccount):
account = BankAccount.create(
cmd.account_id, cmd.initial_deposit
)
self._save_events(
cmd.account_id,
account.get_pending_events(),
expected_version=0
)
def _deposit(self, cmd: DepositMoney):
account = self._load_account(cmd.account_id)
account.deposit(cmd.amount)
self._save_events(
cmd.account_id,
account.get_pending_events(),
expected_version=account.version
)
def _withdraw(self, cmd: WithdrawMoney):
account = self._load_account(cmd.account_id)
account.withdraw(cmd.amount)
self._save_events(
cmd.account_id,
account.get_pending_events(),
expected_version=account.version
)
def _load_account(self, account_id):
events = self.event_store.read_stream(account_id)
return BankAccount.load_from_events(events)
def _save_events(self, stream_id, events,
expected_version):
for event_type, data in events:
self.event_store.append(
stream_id, event_type, data,
expected_version=expected_version
)
expected_version += 1
# Projection (Read Side)
class AccountBalanceProjection:
"""Builds a read model of account balances."""
def __init__(self):
self.balances = {} # account_id -> balance info
def handle_event(self, event: Event):
"""Process an event and update the read model."""
if event.event_type == 'AccountCreated':
self.balances[event.stream_id] = {
'account_id': event.stream_id,
'balance': event.data.get(
'initial_deposit', 0
),
'last_updated': event.timestamp
}
elif event.event_type == 'MoneyDeposited':
if event.stream_id in self.balances:
self.balances[event.stream_id][
'balance'
] += event.data['amount']
self.balances[event.stream_id][
'last_updated'
] = event.timestamp
elif event.event_type == 'MoneyWithdrawn':
if event.stream_id in self.balances:
self.balances[event.stream_id][
'balance'
] -= event.data['amount']
self.balances[event.stream_id][
'last_updated'
] = event.timestamp
def get_balance(self, account_id: str):
return self.balances.get(account_id)
def get_all_balances(self):
return list(self.balances.values())
# Query Handler (Read Side)
class AccountQueryHandler:
def __init__(self,
balance_projection:
AccountBalanceProjection):
self.balance_projection = balance_projection
def get_balance(self, account_id: str):
return self.balance_projection.get_balance(
account_id
)
def get_all_accounts(self):
return self.balance_projection.get_all_balances()
# Wire everything together
store = EventStore()
snapshots = SnapshotStore()
balance_projection = AccountBalanceProjection()
# Command side
cmd_handler = AccountCommandHandler(store, snapshots)
# Process commands
cmd_handler.handle(CreateAccount('ACC-001', 'Alice', 1000))
cmd_handler.handle(DepositMoney('ACC-001', 500))
cmd_handler.handle(WithdrawMoney('ACC-001', 200))
# Build projections from events
for event in store.read_all():
balance_projection.handle_event(event)
# Query side
query_handler = AccountQueryHandler(balance_projection)
balance = query_handler.get_balance('ACC-001')
print(f"Balance: ${balance['balance']}") # $1300

Benefits and Challenges

Benefits

BenefitDescription
Complete audit trailEvery state change is recorded; full history always available
Temporal queriesQuery the state of the system at any point in time
Event replayRebuild state, fix bugs by replaying corrected logic
New projectionsAdd new read models retroactively from existing events
DebuggingReproduce any bug by replaying the exact event sequence
DecouplingEvents are a natural integration point between services

Challenges

ChallengeMitigation
ComplexityStart simple; only use ES where audit/history is truly needed
Event schema evolutionUse versioned events, upcasting transformers
Eventual consistencyEducate team; design UX to handle read lag
Storage growthUse snapshots; archive old events to cold storage
Learning curveInvest in training; start with one bounded context
Query complexityBuild purpose-specific projections; do not query events directly

Event Store Technologies

TechnologyTypeDescription
EventStoreDBPurpose-builtBuilt specifically for event sourcing; projections built in
Apache KafkaEvent streamingDurable log; often used as an event store
PostgreSQLRelational + eventsAppend-only table with sequence numbers
DynamoDBNoSQLPartition by stream ID, sort by version
Axon ServerPurpose-builtJava ecosystem; includes CQRS framework
MartenLibrary.NET library using PostgreSQL as event store

Summary

ConceptKey Takeaway
Event SourcingStore state as a sequence of events, not current state
Event StoreAppend-only database for event sequences
ProjectionsMaterialized views built from events for query optimization
SnapshotsCached aggregate state to avoid replaying all events
CQRSSeparate read and write models for independent optimization
ES + CQRSEvents as the write model, projections as read models
BenefitsAudit trail, temporal queries, event replay, new projections
ChallengesComplexity, eventual consistency, schema evolution