Event Sourcing & CQRS
Event sourcing and CQRS are architectural patterns that fundamentally change how applications store and retrieve data. Instead of storing the current state, event sourcing stores a sequence of state-changing events. CQRS separates the read and write sides of an application into different models. Together, they enable systems that are auditable, scalable, and capable of rebuilding state from history.
Event Sourcing
The Core Idea
Traditional systems store the current state of an entity. Event sourcing stores the sequence of events that led to the current state.
Traditional (State-Based): ┌───────────────────────────────────┐ │ Account: ACC-001 │ │ Balance: $750 │ │ Last Updated: 2024-01-15 │ └───────────────────────────────────┘ (How did we get to $750? We don't know.)
Event Sourced: ┌───────────────────────────────────┐ │ Event 1: AccountCreated │ │ { balance: $0 } │ │ Event 2: MoneyDeposited │ │ { amount: $1000 } │ │ Event 3: MoneyWithdrawn │ │ { amount: $200 } │ │ Event 4: MoneyWithdrawn │ │ { amount: $50 } │ └───────────────────────────────────┘ Current state: $0 + $1000 - $200 - $50 = $750 (Full history. Auditable. Reproducible.)Analogy: Git vs File Saves
Event sourcing is like Git version control for your data:
File Save (state-based): Git (event-sourced): Save v1: "Hello" Commit 1: Create file "Hello" Save v2: "Hello World" Commit 2: Append " World" Save v3: "Hello World!" Commit 3: Append "!"
With file saves: With Git: Only v3 exists. All history is preserved. Cannot undo or audit. Can checkout any version. Can see who changed what.Event Store
An event store is an append-only database optimized for storing and retrieving event sequences:
Event Store:┌─────────────────────────────────────────────────────────────┐│ Stream: Account-ACC-001 │├─────┬────────────────────┬───────────────┬──────────────────┤│ Seq │ Event Type │ Data │ Timestamp │├─────┼────────────────────┼───────────────┼──────────────────┤│ 1 │ AccountCreated │ balance: 0 │ 2024-01-01 09:00││ 2 │ MoneyDeposited │ amount: 1000 │ 2024-01-05 14:30││ 3 │ MoneyWithdrawn │ amount: 200 │ 2024-01-10 10:15││ 4 │ MoneyWithdrawn │ amount: 50 │ 2024-01-15 16:45│└─────┴────────────────────┴───────────────┴──────────────────┘
Key properties: - Append-only (events are never modified or deleted) - Ordered by sequence number within each stream - Each stream represents one aggregate/entityfrom dataclasses import dataclass, fieldfrom datetime import datetimefrom typing import Anyimport uuid
@dataclassclass Event: event_id: str stream_id: str event_type: str data: dict timestamp: datetime version: int
class EventStore: """Simple in-memory event store."""
def __init__(self): self.streams: dict[str, list[Event]] = {} self.all_events: list[Event] = []
def append(self, stream_id: str, event_type: str, data: dict, expected_version: int = -1): """ Append an event to a stream. Uses optimistic concurrency with expected_version. """ if stream_id not in self.streams: self.streams[stream_id] = []
current_version = len(self.streams[stream_id])
if expected_version >= 0 and \ current_version != expected_version: raise ConcurrencyError( f"Expected version {expected_version}, " f"but stream is at version {current_version}" )
event = Event( event_id=str(uuid.uuid4()), stream_id=stream_id, event_type=event_type, data=data, timestamp=datetime.utcnow(), version=current_version + 1 )
self.streams[stream_id].append(event) self.all_events.append(event) return event
def read_stream(self, stream_id: str, from_version: int = 0) -> list[Event]: """Read events from a stream.""" events = self.streams.get(stream_id, []) return [e for e in events if e.version > from_version]
def read_all(self, from_position: int = 0 ) -> list[Event]: """Read all events across all streams.""" return self.all_events[from_position:]
class ConcurrencyError(Exception): pass
# Bank Account Aggregateclass BankAccount: def __init__(self): self.account_id = None self.balance = 0 self.is_open = False self.version = 0 self._pending_events = []
@classmethod def create(cls, account_id: str, initial_deposit: float = 0): account = cls() account._apply_event('AccountCreated', { 'account_id': account_id, 'initial_deposit': initial_deposit }) return account
def deposit(self, amount: float): if not self.is_open: raise ValueError("Account is closed") if amount <= 0: raise ValueError("Amount must be positive") self._apply_event('MoneyDeposited', { 'amount': amount })
def withdraw(self, amount: float): if not self.is_open: raise ValueError("Account is closed") if amount > self.balance: raise ValueError("Insufficient funds") self._apply_event('MoneyWithdrawn', { 'amount': amount })
def close(self): if self.balance != 0: raise ValueError( "Cannot close account with balance" ) self._apply_event('AccountClosed', {})
def _apply_event(self, event_type: str, data: dict): """Apply an event and record it.""" self._handle_event(event_type, data) self._pending_events.append( (event_type, data) )
def _handle_event(self, event_type: str, data: dict): """Update state based on event.""" if event_type == 'AccountCreated': self.account_id = data['account_id'] self.balance = data.get('initial_deposit', 0) self.is_open = True elif event_type == 'MoneyDeposited': self.balance += data['amount'] elif event_type == 'MoneyWithdrawn': self.balance -= data['amount'] elif event_type == 'AccountClosed': self.is_open = False self.version += 1
@classmethod def load_from_events(cls, events: list[Event]): """Rebuild account state from events.""" account = cls() for event in events: account._handle_event( event.event_type, event.data ) return account
def get_pending_events(self): events = self._pending_events.copy() self._pending_events.clear() return events
# Usagestore = EventStore()
# Create account and perform operationsaccount = BankAccount.create('ACC-001', 1000)account.deposit(500)account.withdraw(200)
# Persist eventsfor event_type, data in account.get_pending_events(): store.append('ACC-001', event_type, data)
# Later: rebuild account from eventsevents = store.read_stream('ACC-001')rebuilt = BankAccount.load_from_events(events)print(f"Balance: ${rebuilt.balance}") # $1300class EventStore { constructor() { this.streams = new Map(); this.allEvents = []; }
append(streamId, eventType, data, expectedVersion = -1) { if (!this.streams.has(streamId)) { this.streams.set(streamId, []); }
const stream = this.streams.get(streamId); const currentVersion = stream.length;
if (expectedVersion >= 0 && currentVersion !== expectedVersion) { throw new Error( `Concurrency error: expected ${expectedVersion}` + `, got ${currentVersion}` ); }
const event = { eventId: crypto.randomUUID(), streamId, eventType, data, timestamp: new Date().toISOString(), version: currentVersion + 1 };
stream.push(event); this.allEvents.push(event); return event; }
readStream(streamId, fromVersion = 0) { const stream = this.streams.get(streamId) || []; return stream.filter(e => e.version > fromVersion); }}
class BankAccount { constructor() { this.accountId = null; this.balance = 0; this.isOpen = false; this.version = 0; this.pendingEvents = []; }
static create(accountId, initialDeposit = 0) { const account = new BankAccount(); account._applyEvent('AccountCreated', { accountId, initialDeposit }); return account; }
deposit(amount) { if (!this.isOpen) throw new Error('Account closed'); if (amount <= 0) throw new Error('Invalid amount'); this._applyEvent('MoneyDeposited', { amount }); }
withdraw(amount) { if (!this.isOpen) throw new Error('Account closed'); if (amount > this.balance) { throw new Error('Insufficient funds'); } this._applyEvent('MoneyWithdrawn', { amount }); }
_applyEvent(eventType, data) { this._handleEvent(eventType, data); this.pendingEvents.push({ eventType, data }); }
_handleEvent(eventType, data) { switch (eventType) { case 'AccountCreated': this.accountId = data.accountId; this.balance = data.initialDeposit || 0; this.isOpen = true; break; case 'MoneyDeposited': this.balance += data.amount; break; case 'MoneyWithdrawn': this.balance -= data.amount; break; case 'AccountClosed': this.isOpen = false; break; } this.version++; }
static fromEvents(events) { const account = new BankAccount(); for (const event of events) { account._handleEvent(event.eventType, event.data); } return account; }}
// Usageconst store = new EventStore();const account = BankAccount.create('ACC-001', 1000);account.deposit(500);account.withdraw(200);
for (const { eventType, data } of account.pendingEvents) { store.append('ACC-001', eventType, data);}
const events = store.readStream('ACC-001');const rebuilt = BankAccount.fromEvents(events);console.log(`Balance: $${rebuilt.balance}`); // $1300Projections
A projection (also called a read model or materialized view) transforms events into a queryable format optimized for reads. While the event store is the source of truth, projections provide the views that applications actually query.
Event Store (source of truth): [AccountCreated] → [Deposited $1000] → [Withdrawn $200]
Projection 1: Account Balance View ┌──────────┬─────────┬────────────┐ │ Account │ Balance │ Updated At │ ├──────────┼─────────┼────────────┤ │ ACC-001 │ $800 │ 2024-01-15 │ └──────────┴─────────┴────────────┘
Projection 2: Transaction History View ┌──────────┬───────────┬────────┬────────────┐ │ Account │ Type │ Amount │ Date │ ├──────────┼───────────┼────────┼────────────┤ │ ACC-001 │ Deposit │ $1000 │ 2024-01-05 │ │ ACC-001 │ Withdrawal│ $200 │ 2024-01-15 │ └──────────┴───────────┴────────┴────────────┘
Projection 3: Daily Summary View ┌────────────┬────────────┬─────────────┬───────┐ │ Date │ Deposits │ Withdrawals │ Net │ ├────────────┼────────────┼─────────────┼───────┤ │ 2024-01-05 │ $1000 │ $0 │+$1000 │ │ 2024-01-15 │ $0 │ $200 │-$200 │ └────────────┴────────────┴─────────────┴───────┘
Different projections from the SAME events.You can add new projections at any time andrebuild them from the event history.Snapshots
As event streams grow long, rebuilding state from all events becomes slow. Snapshots cache the aggregate state at a point in time, so you only need to replay events after the snapshot.
Without snapshots: Load account ACC-001: Replay events 1, 2, 3, 4, 5, ... 10,000 Time: O(n) where n = total events
With snapshots: Load snapshot at event 9,900: balance = $5,432 Replay events 9,901 through 10,000 (100 events) Time: O(1) for snapshot + O(100) for recent events
Snapshot strategy: Create a snapshot every N events (e.g., every 100) or every T time period (e.g., daily)# Snapshot exampleclass SnapshotStore: def __init__(self): self.snapshots = {} # stream_id -> snapshot
def save_snapshot(self, stream_id: str, version: int, state: dict): self.snapshots[stream_id] = { 'version': version, 'state': state, 'timestamp': datetime.utcnow() }
def load_snapshot(self, stream_id: str): return self.snapshots.get(stream_id)
# Loading with snapshotsdef load_account(account_id, event_store, snapshot_store): # Try to load snapshot first snapshot = snapshot_store.load_snapshot(account_id)
if snapshot: account = BankAccount.from_snapshot( snapshot['state'] ) # Only replay events after snapshot events = event_store.read_stream( account_id, from_version=snapshot['version'] ) else: account = BankAccount() events = event_store.read_stream(account_id)
# Replay remaining events for event in events: account._handle_event( event.event_type, event.data )
return accountCQRS (Command Query Responsibility Segregation)
CQRS separates the command (write) model from the query (read) model. Commands modify state; queries read state. Each side can be independently optimized.
Traditional CRUD: ┌────────────────────────────┐ │ Single Data Model │ │ │ │ CREATE ─┐ │ │ READ ─┤── Same model │ │ UPDATE ─┤ Same DB │ │ DELETE ─┘ │ └────────────────────────────┘
CQRS: ┌────────────────────┐ ┌────────────────────┐ │ Command Side │ │ Query Side │ │ │ │ │ │ Commands ──▶ │ │ Queries ──▶ │ │ Domain Model │ │ Read Models │ │ Write Database │ │ Read Database │ │ (normalized) │ │ (denormalized) │ │ │ │ │ └──────────┬──────────┘ └──────────▲──────────┘ │ │ │ Events / Sync │ └──────────────────────────┘Why Separate Reads and Writes?
| Aspect | Writes | Reads |
|---|---|---|
| Frequency | Often less frequent | Often much more frequent |
| Data model | Normalized for integrity | Denormalized for performance |
| Scaling | Scale for write throughput | Scale for read throughput |
| Optimization | Transactions, constraints | Caching, materialized views |
| Consistency | Must be immediately consistent | Can tolerate eventual consistency |
Combining Event Sourcing + CQRS
Event sourcing and CQRS are natural partners. The event store serves as the write model, and projections serve as the read models:
┌────────────────────────────────────────────────────────────────┐│ ES + CQRS Architecture ││ ││ Command Side: Query Side: ││ ││ Client ──▶ Command Handler Client ──▶ Query Handler ││ │ │ ││ ▼ ▼ ││ Domain Aggregate Read Model (View) ││ │ ▲ ││ ▼ │ ││ Event Store ────events────▶ Projection Engine ││ (append-only) (builds read models) ││ ││ Write DB: Read DB: ││ Event Store PostgreSQL views ││ (Kafka, EventStoreDB) Elasticsearch (search) ││ Redis (cache) ││ DynamoDB (key-value) │└────────────────────────────────────────────────────────────────┘from dataclasses import dataclassfrom typing import Callable
# Commands@dataclassclass CreateAccount: account_id: str owner: str initial_deposit: float
@dataclassclass DepositMoney: account_id: str amount: float
@dataclassclass WithdrawMoney: account_id: str amount: float
# Command Handler (Write Side)class AccountCommandHandler: def __init__(self, event_store: EventStore, snapshot_store: SnapshotStore): self.event_store = event_store self.snapshot_store = snapshot_store
def handle(self, command): if isinstance(command, CreateAccount): return self._create_account(command) elif isinstance(command, DepositMoney): return self._deposit(command) elif isinstance(command, WithdrawMoney): return self._withdraw(command)
def _create_account(self, cmd: CreateAccount): account = BankAccount.create( cmd.account_id, cmd.initial_deposit ) self._save_events( cmd.account_id, account.get_pending_events(), expected_version=0 )
def _deposit(self, cmd: DepositMoney): account = self._load_account(cmd.account_id) account.deposit(cmd.amount) self._save_events( cmd.account_id, account.get_pending_events(), expected_version=account.version )
def _withdraw(self, cmd: WithdrawMoney): account = self._load_account(cmd.account_id) account.withdraw(cmd.amount) self._save_events( cmd.account_id, account.get_pending_events(), expected_version=account.version )
def _load_account(self, account_id): events = self.event_store.read_stream(account_id) return BankAccount.load_from_events(events)
def _save_events(self, stream_id, events, expected_version): for event_type, data in events: self.event_store.append( stream_id, event_type, data, expected_version=expected_version ) expected_version += 1
# Projection (Read Side)class AccountBalanceProjection: """Builds a read model of account balances."""
def __init__(self): self.balances = {} # account_id -> balance info
def handle_event(self, event: Event): """Process an event and update the read model.""" if event.event_type == 'AccountCreated': self.balances[event.stream_id] = { 'account_id': event.stream_id, 'balance': event.data.get( 'initial_deposit', 0 ), 'last_updated': event.timestamp } elif event.event_type == 'MoneyDeposited': if event.stream_id in self.balances: self.balances[event.stream_id][ 'balance' ] += event.data['amount'] self.balances[event.stream_id][ 'last_updated' ] = event.timestamp elif event.event_type == 'MoneyWithdrawn': if event.stream_id in self.balances: self.balances[event.stream_id][ 'balance' ] -= event.data['amount'] self.balances[event.stream_id][ 'last_updated' ] = event.timestamp
def get_balance(self, account_id: str): return self.balances.get(account_id)
def get_all_balances(self): return list(self.balances.values())
# Query Handler (Read Side)class AccountQueryHandler: def __init__(self, balance_projection: AccountBalanceProjection): self.balance_projection = balance_projection
def get_balance(self, account_id: str): return self.balance_projection.get_balance( account_id )
def get_all_accounts(self): return self.balance_projection.get_all_balances()
# Wire everything togetherstore = EventStore()snapshots = SnapshotStore()balance_projection = AccountBalanceProjection()
# Command sidecmd_handler = AccountCommandHandler(store, snapshots)
# Process commandscmd_handler.handle(CreateAccount('ACC-001', 'Alice', 1000))cmd_handler.handle(DepositMoney('ACC-001', 500))cmd_handler.handle(WithdrawMoney('ACC-001', 200))
# Build projections from eventsfor event in store.read_all(): balance_projection.handle_event(event)
# Query sidequery_handler = AccountQueryHandler(balance_projection)balance = query_handler.get_balance('ACC-001')print(f"Balance: ${balance['balance']}") # $1300// Command Handler (Write Side)class AccountCommandHandler { constructor(eventStore) { this.eventStore = eventStore; }
async handle(command) { switch (command.type) { case 'CreateAccount': return this.createAccount(command); case 'DepositMoney': return this.deposit(command); case 'WithdrawMoney': return this.withdraw(command); } }
createAccount(cmd) { const account = BankAccount.create( cmd.accountId, cmd.initialDeposit ); this.saveEvents(cmd.accountId, account.pendingEvents); }
deposit(cmd) { const account = this.loadAccount(cmd.accountId); account.deposit(cmd.amount); this.saveEvents(cmd.accountId, account.pendingEvents); }
withdraw(cmd) { const account = this.loadAccount(cmd.accountId); account.withdraw(cmd.amount); this.saveEvents(cmd.accountId, account.pendingEvents); }
loadAccount(accountId) { const events = this.eventStore.readStream(accountId); return BankAccount.fromEvents(events); }
saveEvents(streamId, events) { for (const { eventType, data } of events) { this.eventStore.append(streamId, eventType, data); } }}
// Projection (Read Side)class AccountBalanceProjection { constructor() { this.balances = new Map(); }
handleEvent(event) { switch (event.eventType) { case 'AccountCreated': this.balances.set(event.streamId, { accountId: event.streamId, balance: event.data.initialDeposit || 0, lastUpdated: event.timestamp }); break; case 'MoneyDeposited': { const acc = this.balances.get(event.streamId); if (acc) { acc.balance += event.data.amount; acc.lastUpdated = event.timestamp; } break; } case 'MoneyWithdrawn': { const acc = this.balances.get(event.streamId); if (acc) { acc.balance -= event.data.amount; acc.lastUpdated = event.timestamp; } break; } } }
getBalance(accountId) { return this.balances.get(accountId); }}
// Usageconst store = new EventStore();const cmdHandler = new AccountCommandHandler(store);const projection = new AccountBalanceProjection();
cmdHandler.handle({ type: 'CreateAccount', accountId: 'ACC-001', initialDeposit: 1000});cmdHandler.handle({ type: 'DepositMoney', accountId: 'ACC-001', amount: 500});
// Build projectionfor (const event of store.allEvents) { projection.handleEvent(event);}
console.log(projection.getBalance('ACC-001'));Benefits and Challenges
Benefits
| Benefit | Description |
|---|---|
| Complete audit trail | Every state change is recorded; full history always available |
| Temporal queries | Query the state of the system at any point in time |
| Event replay | Rebuild state, fix bugs by replaying corrected logic |
| New projections | Add new read models retroactively from existing events |
| Debugging | Reproduce any bug by replaying the exact event sequence |
| Decoupling | Events are a natural integration point between services |
Challenges
| Challenge | Mitigation |
|---|---|
| Complexity | Start simple; only use ES where audit/history is truly needed |
| Event schema evolution | Use versioned events, upcasting transformers |
| Eventual consistency | Educate team; design UX to handle read lag |
| Storage growth | Use snapshots; archive old events to cold storage |
| Learning curve | Invest in training; start with one bounded context |
| Query complexity | Build purpose-specific projections; do not query events directly |
Event Store Technologies
| Technology | Type | Description |
|---|---|---|
| EventStoreDB | Purpose-built | Built specifically for event sourcing; projections built in |
| Apache Kafka | Event streaming | Durable log; often used as an event store |
| PostgreSQL | Relational + events | Append-only table with sequence numbers |
| DynamoDB | NoSQL | Partition by stream ID, sort by version |
| Axon Server | Purpose-built | Java ecosystem; includes CQRS framework |
| Marten | Library | .NET library using PostgreSQL as event store |
Summary
| Concept | Key Takeaway |
|---|---|
| Event Sourcing | Store state as a sequence of events, not current state |
| Event Store | Append-only database for event sequences |
| Projections | Materialized views built from events for query optimization |
| Snapshots | Cached aggregate state to avoid replaying all events |
| CQRS | Separate read and write models for independent optimization |
| ES + CQRS | Events as the write model, projections as read models |
| Benefits | Audit trail, temporal queries, event replay, new projections |
| Challenges | Complexity, eventual consistency, schema evolution |