Distributed Transactions
In a monolithic application with a single database, transactions are straightforward — the database engine handles ACID guarantees. But in a distributed system where data spans multiple services and databases, maintaining consistency across all of them becomes one of the hardest problems in software engineering.
The Problem
Consider an e-commerce order that involves three services:
┌────────────┐ ┌────────────┐ ┌────────────┐│ Order │ │ Inventory │ │ Payment ││ Service │ │ Service │ │ Service │└─────┬──────┘ └─────┬──────┘ └─────┬──────┘ │ │ │┌─────▼──────┐ ┌─────▼──────┐ ┌─────▼──────┐│ Order DB │ │Inventory DB│ │ Payment DB │└────────────┘ └────────────┘ └────────────┘
To place an order, ALL THREE must succeed: 1. Create order record 2. Reserve inventory (decrement stock) 3. Charge customer payment
If payment fails AFTER inventory is reserved,we must undo the inventory reservation.Without distributed transaction management, partial failures lead to inconsistent state: orders without payments, inventory reserved but never fulfilled, or customers charged without receiving their order.
Two-Phase Commit (2PC)
Two-phase commit is the classic protocol for distributed transactions. A central coordinator ensures that all participants either commit or abort together.
Protocol
Phase 1: Prepare (Voting)─────────────────────────Coordinator ──▶ Participant A: "Can you commit?"Coordinator ──▶ Participant B: "Can you commit?"Coordinator ──▶ Participant C: "Can you commit?"
Each participant: - Acquires locks on the data - Writes changes to a durable log (WAL) - Responds YES (prepared) or NO (abort)
Phase 2: Commit or Abort (Decision)────────────────────────────────────If ALL participants voted YES: Coordinator ──▶ All: "COMMIT" All participants make changes permanent and release locks.
If ANY participant voted NO: Coordinator ──▶ All: "ABORT" All participants roll back changes and release locks.Detailed Walkthrough
Coordinator │ ┌────────────────┼────────────────┐ │ │ │ ▼ ▼ ▼┌────────┐ ┌────────┐ ┌────────┐│Order DB│ │Inv. DB │ │Pay. DB │└────────┘ └────────┘ └────────┘
Step 1: Coordinator sends PREPARE to all threeStep 2: Order DB → YES (order row locked and logged)Step 3: Inventory DB → YES (stock row locked and logged)Step 4: Payment DB → YES (payment row locked and logged)Step 5: Coordinator writes COMMIT to its own logStep 6: Coordinator sends COMMIT to all threeStep 7: All three commit and release locksStep 8: Coordinator records completion
If Step 4 was NO (e.g., insufficient funds):Step 5: Coordinator writes ABORT to its logStep 6: Coordinator sends ABORT to all threeStep 7: All three roll back and release locksThe Blocking Problem
The critical flaw of 2PC: if the coordinator crashes after participants have voted YES but before sending the COMMIT or ABORT decision, participants are blocked. They cannot commit (they do not know the decision) and they cannot abort (the coordinator might have decided to commit).
Timeline of failure:
Participant A: Voted YES ──── waiting... ──── BLOCKEDParticipant B: Voted YES ──── waiting... ──── BLOCKEDCoordinator: Received YES ── CRASH ✗
Participants hold locks indefinitely!No other transactions can access the locked data.Recovery requires coordinator to restart and resend the decision.2PC Implementation
from enum import Enumfrom dataclasses import dataclassimport logging
logger = logging.getLogger(__name__)
class Vote(Enum): YES = "yes" NO = "no"
class Decision(Enum): COMMIT = "commit" ABORT = "abort"
@dataclassclass Participant: name: str prepared: bool = False
def prepare(self, transaction_id: str) -> Vote: """Phase 1: Can this participant commit?""" try: # Acquire locks, validate constraints, # write to WAL self._acquire_locks(transaction_id) self._write_to_wal(transaction_id) self.prepared = True logger.info( f"{self.name}: PREPARED for {transaction_id}" ) return Vote.YES except Exception as e: logger.error( f"{self.name}: Cannot prepare - {e}" ) return Vote.NO
def commit(self, transaction_id: str): """Phase 2: Make changes permanent.""" self._apply_changes(transaction_id) self._release_locks(transaction_id) self.prepared = False logger.info( f"{self.name}: COMMITTED {transaction_id}" )
def abort(self, transaction_id: str): """Phase 2: Roll back changes.""" self._rollback_changes(transaction_id) self._release_locks(transaction_id) self.prepared = False logger.info( f"{self.name}: ABORTED {transaction_id}" )
# Implementation stubs def _acquire_locks(self, txn_id): pass def _write_to_wal(self, txn_id): pass def _apply_changes(self, txn_id): pass def _rollback_changes(self, txn_id): pass def _release_locks(self, txn_id): pass
class TwoPhaseCommitCoordinator: def __init__(self, participants: list[Participant]): self.participants = participants self.transaction_log = []
def execute(self, transaction_id: str) -> Decision: """Execute a distributed transaction using 2PC."""
# Phase 1: Prepare logger.info(f"Phase 1: Preparing {transaction_id}") votes = {} for participant in self.participants: vote = participant.prepare(transaction_id) votes[participant.name] = vote
# Decision all_yes = all(v == Vote.YES for v in votes.values())
if all_yes: # Phase 2: Commit decision = Decision.COMMIT self.transaction_log.append( (transaction_id, decision) ) logger.info( f"Phase 2: Committing {transaction_id}" ) for participant in self.participants: participant.commit(transaction_id) else: # Phase 2: Abort decision = Decision.ABORT self.transaction_log.append( (transaction_id, decision) ) logger.info( f"Phase 2: Aborting {transaction_id}" ) for participant in self.participants: if participant.prepared: participant.abort(transaction_id)
return decision
# Usageorder_db = Participant("OrderDB")inventory_db = Participant("InventoryDB")payment_db = Participant("PaymentDB")
coordinator = TwoPhaseCommitCoordinator( [order_db, inventory_db, payment_db])
result = coordinator.execute("txn-001")print(f"Transaction result: {result.value}")class Participant { constructor(name) { this.name = name; this.prepared = false; }
async prepare(transactionId) { try { await this.acquireLocks(transactionId); await this.writeToWAL(transactionId); this.prepared = true; console.log( `${this.name}: PREPARED for ${transactionId}` ); return 'YES'; } catch (error) { console.error( `${this.name}: Cannot prepare - ${error.message}` ); return 'NO'; } }
async commit(transactionId) { await this.applyChanges(transactionId); await this.releaseLocks(transactionId); this.prepared = false; console.log(`${this.name}: COMMITTED ${transactionId}`); }
async abort(transactionId) { await this.rollbackChanges(transactionId); await this.releaseLocks(transactionId); this.prepared = false; console.log(`${this.name}: ABORTED ${transactionId}`); }
// Stubs async acquireLocks(txnId) {} async writeToWAL(txnId) {} async applyChanges(txnId) {} async rollbackChanges(txnId) {} async releaseLocks(txnId) {}}
class TwoPhaseCommitCoordinator { constructor(participants) { this.participants = participants; this.transactionLog = []; }
async execute(transactionId) { // Phase 1: Prepare console.log(`Phase 1: Preparing ${transactionId}`); const votes = await Promise.all( this.participants.map(async (p) => ({ name: p.name, vote: await p.prepare(transactionId) })) );
const allYes = votes.every(v => v.vote === 'YES');
if (allYes) { // Phase 2: Commit this.transactionLog.push({ id: transactionId, decision: 'COMMIT' }); console.log(`Phase 2: Committing ${transactionId}`); await Promise.all( this.participants.map(p => p.commit(transactionId)) ); return 'COMMIT'; } else { // Phase 2: Abort this.transactionLog.push({ id: transactionId, decision: 'ABORT' }); console.log(`Phase 2: Aborting ${transactionId}`); await Promise.all( this.participants .filter(p => p.prepared) .map(p => p.abort(transactionId)) ); return 'ABORT'; } }}
// Usageconst coordinator = new TwoPhaseCommitCoordinator([ new Participant('OrderDB'), new Participant('InventoryDB'), new Participant('PaymentDB')]);
coordinator.execute('txn-001') .then(result => console.log(`Result: ${result}`));import java.util.*;import java.util.concurrent.*;
public class TwoPhaseCommit {
enum Vote { YES, NO } enum Decision { COMMIT, ABORT }
static class Participant { private final String name; private boolean prepared = false;
Participant(String name) { this.name = name; }
Vote prepare(String txnId) { try { acquireLocks(txnId); writeToWAL(txnId); prepared = true; System.out.printf( "%s: PREPARED for %s%n", name, txnId ); return Vote.YES; } catch (Exception e) { System.err.printf( "%s: Cannot prepare - %s%n", name, e.getMessage() ); return Vote.NO; } }
void commit(String txnId) { applyChanges(txnId); releaseLocks(txnId); prepared = false; System.out.printf( "%s: COMMITTED %s%n", name, txnId ); }
void abort(String txnId) { rollbackChanges(txnId); releaseLocks(txnId); prepared = false; System.out.printf( "%s: ABORTED %s%n", name, txnId ); }
boolean isPrepared() { return prepared; }
// Stubs void acquireLocks(String txnId) {} void writeToWAL(String txnId) {} void applyChanges(String txnId) {} void rollbackChanges(String txnId) {} void releaseLocks(String txnId) {} }
static class Coordinator { private final List<Participant> participants;
Coordinator(List<Participant> participants) { this.participants = participants; }
Decision execute(String txnId) { // Phase 1: Prepare Map<String, Vote> votes = new HashMap<>(); for (Participant p : participants) { votes.put(p.name, p.prepare(txnId)); }
boolean allYes = votes.values().stream() .allMatch(v -> v == Vote.YES);
if (allYes) { // Phase 2: Commit for (Participant p : participants) { p.commit(txnId); } return Decision.COMMIT; } else { // Phase 2: Abort prepared participants for (Participant p : participants) { if (p.isPrepared()) { p.abort(txnId); } } return Decision.ABORT; } } }
public static void main(String[] args) { var coordinator = new Coordinator(List.of( new Participant("OrderDB"), new Participant("InventoryDB"), new Participant("PaymentDB") )); var result = coordinator.execute("txn-001"); System.out.println("Result: " + result); }}Three-Phase Commit (3PC)
Three-phase commit adds a pre-commit phase to reduce the blocking window of 2PC. However, it is rarely used in practice because it still does not handle network partitions correctly and adds latency.
Phase 1: Can Commit? (same as 2PC Prepare) Coordinator ──▶ Participants: "Can you commit?" Participants ──▶ Coordinator: YES or NO
Phase 2: Pre-Commit (new phase) If all YES: Coordinator ──▶ Participants: "PRE-COMMIT" Participants ──▶ Coordinator: ACK If any NO: Coordinator ──▶ Participants: ABORT
Phase 3: Do Commit Coordinator ──▶ Participants: "COMMIT" Participants make changes permanent.Key difference from 2PC: If the coordinator crashes during Phase 2, participants know they can safely abort (they have not yet committed). If it crashes during Phase 3, participants know the decision was COMMIT (because pre-commit was received), so they can proceed.
Why 3PC is rarely used:
- Still vulnerable to network partitions (split-brain)
- Higher latency (three round trips instead of two)
- Saga pattern is preferred in microservices
The Saga Pattern
The saga pattern is the modern approach to distributed transactions in microservice architectures. Instead of locking resources across services, a saga breaks the transaction into a sequence of local transactions, each with a corresponding compensating transaction that can undo its effects.
Forward Flow (success): T1 ──▶ T2 ──▶ T3 ──▶ T4 ──▶ DONE
Compensating Flow (T3 fails): T1 ──▶ T2 ──▶ T3 ✗ ◀── C2 ◀── C1 (Compensating transactions undo T1 and T2)
T = Local transaction C = Compensating transactionE-Commerce Saga Example
Order Saga:
Step 1: Create Order (status: PENDING) Compensate: Cancel Order (status: CANCELLED)
Step 2: Reserve Inventory (-1 stock) Compensate: Release Inventory (+1 stock)
Step 3: Process Payment (charge customer) Compensate: Refund Payment (credit customer)
Step 4: Confirm Order (status: CONFIRMED) No compensation needed (final step)
If Step 3 (Payment) fails: Execute C2: Release Inventory (+1 stock) Execute C1: Cancel Order (status: CANCELLED)Choreography-Based Saga
In choreography, each service listens for events and decides what to do next. There is no central coordinator. Services communicate through events (typically via a message broker).
┌────────────┐ OrderCreated ┌────────────┐│ Order │────────────────────▶│ Inventory ││ Service │ │ Service │└────────────┘ └──────┬─────┘ ▲ │ │ InventoryReserved │ │ │ ┌──────▼─────┐ │ PaymentProcessed │ Payment │ └────────────────────────────│ Service │ └────────────┘
Each service publishes events.Other services subscribe and react.No central coordinator.Choreography implementation:
from dataclasses import dataclassfrom typing import Callableimport json
@dataclassclass Event: type: str data: dict saga_id: str
class EventBus: """Simple in-memory event bus for demonstration.""" def __init__(self): self.handlers: dict[str, list[Callable]] = {}
def subscribe(self, event_type: str, handler: Callable): if event_type not in self.handlers: self.handlers[event_type] = [] self.handlers[event_type].append(handler)
def publish(self, event: Event): handlers = self.handlers.get(event.type, []) for handler in handlers: handler(event)
# Global event busbus = EventBus()
class OrderService: def __init__(self): self.orders = {} bus.subscribe('PaymentProcessed', self.on_payment_processed) bus.subscribe('PaymentFailed', self.on_payment_failed) bus.subscribe('InventoryFailed', self.on_inventory_failed)
def create_order(self, order_id: str, items: list, customer_id: str): self.orders[order_id] = { 'id': order_id, 'items': items, 'customer_id': customer_id, 'status': 'PENDING' } bus.publish(Event( type='OrderCreated', data=self.orders[order_id], saga_id=order_id ))
def on_payment_processed(self, event: Event): order_id = event.saga_id self.orders[order_id]['status'] = 'CONFIRMED' print(f"Order {order_id} CONFIRMED")
def on_payment_failed(self, event: Event): order_id = event.saga_id self.orders[order_id]['status'] = 'CANCELLED' print(f"Order {order_id} CANCELLED (payment failed)")
def on_inventory_failed(self, event: Event): order_id = event.saga_id self.orders[order_id]['status'] = 'CANCELLED' print(f"Order {order_id} CANCELLED (no inventory)")
class InventoryService: def __init__(self): self.stock = {'widget': 10, 'gadget': 5} bus.subscribe('OrderCreated', self.on_order_created) bus.subscribe('PaymentFailed', self.on_payment_failed)
def on_order_created(self, event: Event): order = event.data # Try to reserve inventory can_fulfill = all( self.stock.get(item, 0) > 0 for item in order['items'] ) if can_fulfill: for item in order['items']: self.stock[item] -= 1 bus.publish(Event( type='InventoryReserved', data={'items': order['items']}, saga_id=event.saga_id )) else: bus.publish(Event( type='InventoryFailed', data={'reason': 'Out of stock'}, saga_id=event.saga_id ))
def on_payment_failed(self, event: Event): # Compensating action: release reserved inventory for item in event.data.get('items', []): self.stock[item] = self.stock.get(item, 0) + 1 print(f"Inventory released for saga {event.saga_id}")
class PaymentService: def __init__(self): bus.subscribe('InventoryReserved', self.on_inventory_reserved)
def on_inventory_reserved(self, event: Event): # Try to process payment success = self.charge_customer(event.saga_id) if success: bus.publish(Event( type='PaymentProcessed', data={'amount': 99.99}, saga_id=event.saga_id )) else: bus.publish(Event( type='PaymentFailed', data={ 'reason': 'Insufficient funds', 'items': event.data['items'] }, saga_id=event.saga_id ))
def charge_customer(self, saga_id: str) -> bool: # Simulated payment processing return True
# Wire up and runorder_svc = OrderService()inventory_svc = InventoryService()payment_svc = PaymentService()
order_svc.create_order('order-001', ['widget'], 'cust-42')const EventEmitter = require('events');
class EventBus extends EventEmitter {}const bus = new EventBus();
class OrderService { constructor() { this.orders = new Map();
bus.on('PaymentProcessed', (event) => { this.orders.get(event.sagaId).status = 'CONFIRMED'; console.log(`Order ${event.sagaId} CONFIRMED`); });
bus.on('PaymentFailed', (event) => { this.orders.get(event.sagaId).status = 'CANCELLED'; console.log(`Order ${event.sagaId} CANCELLED`); });
bus.on('InventoryFailed', (event) => { this.orders.get(event.sagaId).status = 'CANCELLED'; console.log(`Order ${event.sagaId} CANCELLED`); }); }
createOrder(orderId, items, customerId) { const order = { id: orderId, items, customerId, status: 'PENDING' }; this.orders.set(orderId, order);
bus.emit('OrderCreated', { type: 'OrderCreated', data: order, sagaId: orderId }); }}
class InventoryService { constructor() { this.stock = { widget: 10, gadget: 5 };
bus.on('OrderCreated', (event) => { const canFulfill = event.data.items.every( item => (this.stock[item] || 0) > 0 );
if (canFulfill) { event.data.items.forEach(item => this.stock[item]--); bus.emit('InventoryReserved', { data: { items: event.data.items }, sagaId: event.sagaId }); } else { bus.emit('InventoryFailed', { data: { reason: 'Out of stock' }, sagaId: event.sagaId }); } });
// Compensating action bus.on('PaymentFailed', (event) => { (event.data.items || []).forEach( item => this.stock[item] = (this.stock[item] || 0) + 1 ); console.log( `Inventory released for ${event.sagaId}` ); }); }}
class PaymentService { constructor() { bus.on('InventoryReserved', (event) => { const success = this.chargeCustomer(event.sagaId); if (success) { bus.emit('PaymentProcessed', { data: { amount: 99.99 }, sagaId: event.sagaId }); } else { bus.emit('PaymentFailed', { data: { reason: 'Insufficient funds', items: event.data.items }, sagaId: event.sagaId }); } }); }
chargeCustomer(sagaId) { return true; }}
// Wire up and runconst orderSvc = new OrderService();const inventorySvc = new InventoryService();const paymentSvc = new PaymentService();
orderSvc.createOrder('order-001', ['widget'], 'cust-42');import java.util.*;import java.util.concurrent.*;import java.util.function.Consumer;
public class ChoreographySaga {
record Event(String type, Map<String, Object> data, String sagaId) {}
static class EventBus { private final Map<String, List<Consumer<Event>>> handlers = new ConcurrentHashMap<>();
void subscribe(String type, Consumer<Event> handler) { handlers .computeIfAbsent(type, k -> new ArrayList<>()) .add(handler); }
void publish(Event event) { handlers.getOrDefault(event.type(), List.of()) .forEach(h -> h.accept(event)); } }
static final EventBus bus = new EventBus();
static class OrderService { Map<String, Map<String, Object>> orders = new ConcurrentHashMap<>();
OrderService() { bus.subscribe("PaymentProcessed", e -> { orders.get(e.sagaId()) .put("status", "CONFIRMED"); System.out.println( "Order " + e.sagaId() + " CONFIRMED"); }); bus.subscribe("PaymentFailed", e -> { orders.get(e.sagaId()) .put("status", "CANCELLED"); System.out.println( "Order " + e.sagaId() + " CANCELLED"); }); }
void createOrder(String orderId, List<String> items) { var order = new HashMap<String, Object>(); order.put("id", orderId); order.put("items", items); order.put("status", "PENDING"); orders.put(orderId, order);
bus.publish(new Event( "OrderCreated", order, orderId)); } }
static class InventoryService { Map<String, Integer> stock = new ConcurrentHashMap<>(Map.of( "widget", 10, "gadget", 5));
@SuppressWarnings("unchecked") InventoryService() { bus.subscribe("OrderCreated", e -> { var items = (List<String>) e.data().get("items"); boolean canFulfill = items.stream() .allMatch(i -> stock.getOrDefault(i, 0) > 0); if (canFulfill) { items.forEach(i -> stock.merge(i, -1, Integer::sum)); bus.publish(new Event( "InventoryReserved", Map.of("items", items), e.sagaId())); } else { bus.publish(new Event( "InventoryFailed", Map.of("reason", "Out of stock"), e.sagaId())); } }); } }
public static void main(String[] args) { var orderSvc = new OrderService(); var inventorySvc = new InventoryService(); orderSvc.createOrder( "order-001", List.of("widget")); }}Orchestration-Based Saga
In orchestration, a central saga orchestrator directs the sequence of local transactions. It explicitly tells each service what to do and handles compensations.
┌───────────────┐ │ Saga │ ┌────────────▶│ Orchestrator │◀────────────┐ │ └───┬───┬───┬───┘ │ │ │ │ │ │ │ ┌────────────┘ │ └────────────┐ │ │ ▼ ▼ ▼ │┌──┴────────┐ ┌────────────┐ ┌──────┴───┐│ Order │ │ Inventory │ │ Payment ││ Service │ │ Service │ │ Service │└───────────┘ └────────────┘ └──────────┘
The orchestrator knows the full saga flow: 1. Tell OrderService to create order 2. Tell InventoryService to reserve 3. Tell PaymentService to charge 4. If any step fails, tell previous services to compensateChoreography vs Orchestration
| Aspect | Choreography | Orchestration |
|---|---|---|
| Coordination | Decentralized (event-driven) | Centralized (orchestrator) |
| Coupling | Services are loosely coupled | Services coupled to orchestrator |
| Complexity | Distributed across services | Centralized in orchestrator |
| Debugging | Hard (events scattered across services) | Easier (single place to see saga state) |
| Single point of failure | None | Orchestrator |
| Scalability | Better (no bottleneck) | Orchestrator can be a bottleneck |
| Best for | Simple sagas with few steps | Complex sagas with many steps and branches |
Compensating Transactions
A compensating transaction undoes the effect of a previously committed transaction. Unlike a database rollback, a compensation is a new forward action that semantically reverses the original.
Designing Compensating Transactions
Original Action Compensating Action─────────────────────────────────────────────────Create order Cancel orderReserve inventory Release inventoryCharge credit card Issue refundSend confirmation email Send cancellation emailCreate shipping label Void shipping labelDebit account Credit accountGrant access permission Revoke access permissionRules for compensating transactions:
- Must be idempotent: Running the compensation twice should have the same effect as running it once
- Must eventually succeed: Compensations should retry until they succeed (with exponential backoff)
- Cannot fail permanently: If a compensation cannot be applied, you have an inconsistent state that requires manual intervention
- Must be commutative where possible: The order of compensations should not matter
Idempotency Patterns
In distributed systems, messages can be delivered multiple times due to retries, network issues, or at-least-once delivery guarantees. Idempotency ensures that processing the same message multiple times has the same effect as processing it once.
Why Idempotency Matters
Client ──▶ Server: "Transfer $100 from A to B"Server processes the transfer.Server sends ACK ── ✗ (network fails, ACK lost)Client retries: "Transfer $100 from A to B"
Without idempotency: $200 transferred (double charge!)With idempotency: $100 transferred (correct)Idempotency Key Pattern
The most common approach: clients include a unique idempotency key with each request. The server tracks which keys have been processed.
import uuidimport timefrom functools import wraps
class IdempotencyStore: """Store for tracking processed idempotency keys."""
def __init__(self, ttl_seconds: int = 86400): self.store = {} # key -> (result, timestamp) self.ttl = ttl_seconds
def get(self, key: str): """Get cached result for an idempotency key.""" if key in self.store: result, timestamp = self.store[key] if time.time() - timestamp < self.ttl: return result else: del self.store[key] return None
def set(self, key: str, result): """Cache result for an idempotency key.""" self.store[key] = (result, time.time())
def is_in_progress(self, key: str) -> bool: """Check if a request is currently being processed.""" return key in self.store and \ self.store[key][0] is None
idempotency_store = IdempotencyStore()
class PaymentService: def __init__(self): self.balances = {'account_a': 1000, 'account_b': 500}
def transfer(self, idempotency_key: str, from_account: str, to_account: str, amount: float) -> dict: # Check if already processed cached_result = idempotency_store.get( idempotency_key ) if cached_result is not None: print(f"Returning cached result for " f"{idempotency_key}") return cached_result
# Mark as in progress idempotency_store.set(idempotency_key, None)
try: # Process the transfer if self.balances.get(from_account, 0) < amount: result = { 'status': 'failed', 'error': 'Insufficient funds' } else: self.balances[from_account] -= amount self.balances[to_account] = \ self.balances.get(to_account, 0) + amount result = { 'status': 'success', 'from': from_account, 'to': to_account, 'amount': amount }
# Cache the result idempotency_store.set(idempotency_key, result) return result
except Exception as e: # Remove in-progress marker on failure # so the request can be retried idempotency_store.store.pop( idempotency_key, None ) raise
# Usageservice = PaymentService()key = str(uuid.uuid4())
# First call - processes the transferresult1 = service.transfer(key, 'account_a', 'account_b', 100)print(f"First call: {result1}")
# Second call (retry) - returns cached resultresult2 = service.transfer(key, 'account_a', 'account_b', 100)print(f"Second call: {result2}")
# Balance is correct: only transferred onceprint(f"Balances: {service.balances}")# account_a: 900, account_b: 600class IdempotencyStore { constructor(ttlMs = 86400000) { // 24 hours this.store = new Map(); this.ttlMs = ttlMs; }
get(key) { const entry = this.store.get(key); if (!entry) return null;
if (Date.now() - entry.timestamp > this.ttlMs) { this.store.delete(key); return null; } return entry.result; }
set(key, result) { this.store.set(key, { result, timestamp: Date.now() }); }
delete(key) { this.store.delete(key); }}
const idempotencyStore = new IdempotencyStore();
class PaymentService { constructor() { this.balances = { account_a: 1000, account_b: 500 }; }
async transfer(idempotencyKey, from, to, amount) { // Check if already processed const cached = idempotencyStore.get(idempotencyKey); if (cached !== null) { console.log( `Returning cached result for ${idempotencyKey}` ); return cached; }
// Mark as in progress idempotencyStore.set(idempotencyKey, null);
try { let result; if ((this.balances[from] || 0) < amount) { result = { status: 'failed', error: 'Insufficient funds' }; } else { this.balances[from] -= amount; this.balances[to] = (this.balances[to] || 0) + amount; result = { status: 'success', from, to, amount }; }
idempotencyStore.set(idempotencyKey, result); return result; } catch (error) { idempotencyStore.delete(idempotencyKey); throw error; } }}
// Usageconst service = new PaymentService();const key = crypto.randomUUID();
(async () => { const r1 = await service.transfer( key, 'account_a', 'account_b', 100 ); console.log('First:', r1);
const r2 = await service.transfer( key, 'account_a', 'account_b', 100 ); console.log('Second:', r2); // cached result
console.log('Balances:', service.balances); // account_a: 900, account_b: 600})();import java.util.*;import java.util.concurrent.*;
public class IdempotentPayment {
record CachedResult( Map<String, Object> result, long timestamp ) {}
static class IdempotencyStore { private final ConcurrentHashMap<String, CachedResult> store = new ConcurrentHashMap<>(); private final long ttlMs;
IdempotencyStore(long ttlMs) { this.ttlMs = ttlMs; }
Map<String, Object> get(String key) { CachedResult entry = store.get(key); if (entry == null) return null; if (System.currentTimeMillis() - entry.timestamp() > ttlMs) { store.remove(key); return null; } return entry.result(); }
void set(String key, Map<String, Object> result) { store.put(key, new CachedResult( result, System.currentTimeMillis() )); }
void remove(String key) { store.remove(key); } }
static class PaymentService { final ConcurrentHashMap<String, Double> balances = new ConcurrentHashMap<>(Map.of( "account_a", 1000.0, "account_b", 500.0 )); final IdempotencyStore idempotencyStore = new IdempotencyStore(86400000);
Map<String, Object> transfer( String key, String from, String to, double amount ) { // Check cache var cached = idempotencyStore.get(key); if (cached != null) { System.out.println( "Returning cached: " + key); return cached; }
// Process Map<String, Object> result; if (balances.getOrDefault(from, 0.0) < amount) { result = Map.of( "status", "failed", "error", "Insufficient funds" ); } else { balances.merge(from, -amount, Double::sum); balances.merge(to, amount, Double::sum); result = Map.of( "status", "success", "amount", amount ); }
idempotencyStore.set(key, result); return result; } }
public static void main(String[] args) { var service = new PaymentService(); var key = UUID.randomUUID().toString();
var r1 = service.transfer( key, "account_a", "account_b", 100); System.out.println("First: " + r1);
var r2 = service.transfer( key, "account_a", "account_b", 100); System.out.println("Second: " + r2);
System.out.println( "Balances: " + service.balances); }}Other Idempotency Techniques
| Technique | How It Works | Best For |
|---|---|---|
| Idempotency keys | Unique client-generated key per request | API endpoints, payment processing |
| Database constraints | Unique constraints prevent duplicate inserts | Data creation operations |
| Conditional writes | Only write if version/ETag matches | Concurrent updates |
| Deduplication table | Track processed message IDs | Message consumers |
| Natural idempotency | Operations that are inherently idempotent (e.g., SET) | State-setting operations |
Choosing the Right Approach
| Criteria | 2PC | Saga (Choreography) | Saga (Orchestration) |
|---|---|---|---|
| Consistency | Strong (ACID) | Eventual | Eventual |
| Latency | High (blocking locks) | Low (async) | Medium |
| Complexity | Medium | High (distributed logic) | Medium (centralized logic) |
| Scalability | Limited (lock contention) | High | High |
| Isolation | Full (locks held during transaction) | None (intermediate states visible) | None |
| Use case | Same-database transactions | Microservices with simple flows | Microservices with complex flows |
Summary
| Concept | Key Takeaway |
|---|---|
| 2PC | Strong consistency but blocking; avoid across service boundaries |
| 3PC | Reduces blocking but adds latency; rarely used |
| Saga Pattern | Modern approach; sequence of local transactions with compensations |
| Choreography | Event-driven sagas; decentralized but hard to debug |
| Orchestration | Centralized saga coordinator; easier to understand and debug |
| Compensating Transactions | Forward actions that semantically undo previous steps |
| Idempotency | Essential for safe retries in distributed systems |