Replication & Partitioning
Data replication and partitioning are the two fundamental mechanisms for distributing data across multiple machines. Replication keeps copies of the same data on multiple nodes for fault tolerance and read performance. Partitioning (also called sharding) splits data across nodes so that each node is responsible for a subset, enabling horizontal scaling.
Every distributed database uses some combination of these two techniques.
Part 1: Replication
Why Replicate Data?
| Goal | How Replication Helps |
|---|---|
| Fault tolerance | If one node fails, data is still available on other nodes |
| Read scalability | Spread read traffic across multiple replicas |
| Geographic locality | Place replicas close to users for lower latency |
| Offline operation | Mobile devices can work with local replicas |
Replication Strategies Overview
┌─────────────────────────────────────────────────────────────┐│ Replication Strategies │├──────────────────┬───────────────────┬──────────────────────┤│ Leader-Follower │ Multi-Leader │ Leaderless ││ (Primary- │ (Multi-Master) │ (Dynamo-style) ││ Secondary) │ │ │├──────────────────┼───────────────────┼──────────────────────┤│ One leader │ Multiple leaders │ No leader ││ handles writes │ handle writes │ Any node handles ││ │ │ writes │├──────────────────┼───────────────────┼──────────────────────┤│ PostgreSQL │ CouchDB │ Cassandra ││ MySQL │ Galera Cluster │ DynamoDB ││ MongoDB │ Active Directory │ Riak ││ Redis │ Cosmos DB │ Voldemort │└──────────────────┴───────────────────┴──────────────────────┘Leader-Follower Replication
The most common replication strategy. One node is designated as the leader (primary/master). All writes go through the leader, which then replicates changes to followers (secondaries/replicas).
Writes Reads │ ┌────┴────┐ ▼ ▼ ▼ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ Leader │────────▶│Follower │ │Follower │ │ (Primary)│────┐ │ 1 │ │ 2 │ └─────────┘ │ └─────────┘ └─────────┘ │ ┌─────────┐ └───▶│Follower │ │ 3 │ └─────────┘
Replication Log: Leader streams changes to all followersSynchronous vs Asynchronous Replication
Synchronous: The leader waits for the follower to confirm it has received and written the data before acknowledging the write to the client.
Client ──▶ Leader ──▶ Follower (write to disk) │ ▼ Leader ◀── ACK │ ▼Client ◀── ACK (write confirmed)
Pro: Follower is guaranteed to have an up-to-date copyCon: One slow follower blocks all writesAsynchronous: The leader acknowledges the write to the client immediately, without waiting for followers.
Client ──▶ Leader ──▶ Follower (write to disk, eventually) │ ▼Client ◀── ACK (write confirmed immediately)
Pro: Low write latency; no blockingCon: Followers may lag; data loss if leader fails before replicationSemi-synchronous: A practical compromise. The leader waits for at least one follower to acknowledge, then replicates to the rest asynchronously.
Handling Leader Failure (Failover)
When the leader fails, a follower must be promoted. This process is called failover.
Normal Operation: Client ──▶ Leader ──▶ Follower 1, Follower 2
Leader Fails: Client ──▶ Leader ✗ Follower 1 ──▶ elected as new Leader Follower 2 ──▶ now follows Follower 1
Recovery: Old Leader comes back ──▶ becomes a FollowerFailover challenges:
- Data loss: If async replication was used, the new leader may be behind
- Split brain: Two nodes think they are leader simultaneously
- Stale reads: Clients reading from a lagging follower see old data
- Autoincrement conflicts: New leader may have different autoincrement state
Replication Log Implementation
import timeimport threadingfrom dataclasses import dataclass, fieldfrom typing import Any
@dataclassclass LogEntry: sequence: int operation: str # 'SET' or 'DELETE' key: str value: Any = None timestamp: float = field(default_factory=time.time)
class ReplicationLog: """Write-ahead log for leader-follower replication."""
def __init__(self): self.log: list[LogEntry] = [] self.sequence = 0 self.lock = threading.Lock()
def append(self, operation: str, key: str, value: Any = None) -> LogEntry: with self.lock: self.sequence += 1 entry = LogEntry( sequence=self.sequence, operation=operation, key=key, value=value ) self.log.append(entry) return entry
def get_entries_after(self, sequence: int) -> list[LogEntry]: """Get all log entries after a given sequence number. Used by followers to catch up.""" with self.lock: return [ e for e in self.log if e.sequence > sequence ]
class LeaderNode: def __init__(self): self.data = {} self.replication_log = ReplicationLog() self.followers = []
def write(self, key: str, value: Any): """Process a write and replicate to followers.""" self.data[key] = value entry = self.replication_log.append('SET', key, value)
# Replicate to followers asynchronously for follower in self.followers: threading.Thread( target=follower.apply_entry, args=(entry,) ).start()
def read(self, key: str) -> Any: return self.data.get(key)
class FollowerNode: def __init__(self): self.data = {} self.last_applied_sequence = 0
def apply_entry(self, entry: LogEntry): """Apply a replicated log entry.""" if entry.sequence <= self.last_applied_sequence: return # Already applied (idempotent)
if entry.operation == 'SET': self.data[entry.key] = entry.value elif entry.operation == 'DELETE': self.data.pop(entry.key, None)
self.last_applied_sequence = entry.sequence
def read(self, key: str) -> Any: """Reads from follower may return stale data.""" return self.data.get(key)class LogEntry { constructor(sequence, operation, key, value = null) { this.sequence = sequence; this.operation = operation; // 'SET' or 'DELETE' this.key = key; this.value = value; this.timestamp = Date.now(); }}
class ReplicationLog { constructor() { this.log = []; this.sequence = 0; }
append(operation, key, value = null) { this.sequence++; const entry = new LogEntry( this.sequence, operation, key, value ); this.log.push(entry); return entry; }
getEntriesAfter(sequence) { return this.log.filter(e => e.sequence > sequence); }}
class LeaderNode { constructor() { this.data = new Map(); this.replicationLog = new ReplicationLog(); this.followers = []; }
async write(key, value) { this.data.set(key, value); const entry = this.replicationLog.append( 'SET', key, value );
// Replicate to all followers const replicationPromises = this.followers.map( follower => follower.applyEntry(entry) );
// Semi-synchronous: wait for at least one await Promise.any(replicationPromises); }
read(key) { return this.data.get(key) ?? null; }}
class FollowerNode { constructor() { this.data = new Map(); this.lastAppliedSequence = 0; }
async applyEntry(entry) { if (entry.sequence <= this.lastAppliedSequence) { return; // Idempotent -- already applied }
if (entry.operation === 'SET') { this.data.set(entry.key, entry.value); } else if (entry.operation === 'DELETE') { this.data.delete(entry.key); }
this.lastAppliedSequence = entry.sequence; }
read(key) { // May return stale data return this.data.get(key) ?? null; }}import java.util.concurrent.*;import java.util.*;
public class ReplicationExample {
record LogEntry( long sequence, String operation, String key, String value, long timestamp ) {}
static class ReplicationLog { private final List<LogEntry> log = new CopyOnWriteArrayList<>(); private final AtomicLong sequence = new AtomicLong(0);
public LogEntry append(String op, String key, String value) { long seq = sequence.incrementAndGet(); LogEntry entry = new LogEntry( seq, op, key, value, System.currentTimeMillis() ); log.add(entry); return entry; }
public List<LogEntry> getEntriesAfter(long seq) { return log.stream() .filter(e -> e.sequence() > seq) .toList(); } }
static class LeaderNode { private final ConcurrentHashMap<String, String> data = new ConcurrentHashMap<>(); private final ReplicationLog replicationLog = new ReplicationLog(); private final List<FollowerNode> followers = new CopyOnWriteArrayList<>(); private final ExecutorService executor = Executors.newFixedThreadPool(4);
public void write(String key, String value) { data.put(key, value); LogEntry entry = replicationLog.append( "SET", key, value );
// Async replication to followers for (FollowerNode follower : followers) { executor.submit( () -> follower.applyEntry(entry) ); } }
public String read(String key) { return data.get(key); } }
static class FollowerNode { private final ConcurrentHashMap<String, String> data = new ConcurrentHashMap<>(); private volatile long lastAppliedSequence = 0;
public synchronized void applyEntry(LogEntry entry) { if (entry.sequence() <= lastAppliedSequence) { return; // Idempotent } if ("SET".equals(entry.operation())) { data.put(entry.key(), entry.value()); } else if ("DELETE".equals(entry.operation())) { data.remove(entry.key()); } lastAppliedSequence = entry.sequence(); }
public String read(String key) { return data.get(key); // May be stale } }}Multi-Leader Replication
Multiple nodes accept writes independently. Each leader replicates its changes to all other leaders.
Data Center A Data Center B┌─────────────┐ ┌─────────────┐│ Leader A │◀──────────▶│ Leader B ││ (writes) │ bidirect. │ (writes) ││ │ replicat. │ │└──┬──────────┘ └──────────┬──┘ │ │ ▼ ▼┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐│Follower │ │Follower │ │Follower │ │Follower ││ A1 │ │ A2 │ │ B1 │ │ B2 │└─────────┘ └─────────┘ └─────────┘ └─────────┘Use cases:
- Multi-datacenter operation (each datacenter has its own leader)
- Collaborative editing (each user’s device is a “leader”)
- Offline-capable applications
Write Conflict Resolution
The biggest challenge with multi-leader replication is handling conflicting writes. Two leaders can independently modify the same data.
Timeline: T1: Leader A writes key="title", value="Foo" T1: Leader B writes key="title", value="Bar" T2: Both leaders replicate to each other Conflict! Which value wins?Resolution strategies:
| Strategy | Description | Trade-off |
|---|---|---|
| Last-Write-Wins (LWW) | Use timestamps; latest write wins | Simple but loses data; clock skew risk |
| Merge values | Concatenate or union conflicting values | No data loss but may produce nonsense |
| Custom conflict handler | Application-specific logic | Full control but complex |
| CRDTs | Data structures that merge without conflicts | Limited types but automatic |
| Prompt the user | Show conflicts to the user for resolution | Best accuracy but poor UX |
Leaderless Replication
No single node is designated as leader. Any node can accept reads and writes. The client sends writes to multiple nodes simultaneously and reads from multiple nodes, using quorum rules to determine the correct value.
Client / | \ / | \ ▼ ▼ ▼ ┌──────┐ ┌──────┐ ┌──────┐ │Node 1│ │Node 2│ │Node 3│ │ x=5 │ │ x=5 │ │ x=3 │ (Node 3 is stale) └──────┘ └──────┘ └──────┘
Write: Send to all 3 nodes, require W=2 acknowledgments Read: Read from all 3 nodes, require R=2 responses Compare values, return the most recent oneQuorum Reads and Writes
For a system with N replicas, if W nodes acknowledge a write and R nodes respond to a read, then the system guarantees strong consistency when W + R > N.
N = 3 (three replicas)
Write Quorum (W=2): Client ──▶ Node 1 ✓ ACK Client ──▶ Node 2 ✓ ACK Write succeeds (2 ACKs >= W) Client ──▶ Node 3 ✗ FAIL
Read Quorum (R=2): Client ──▶ Node 1 ✓ returns x=5 (version 7) Client ──▶ Node 2 ✓ returns x=5 (version 7) Client ──▶ Node 3 (not needed, R=2 satisfied)
At least one of the R nodes MUST have the latest write because W + R > N (2 + 2 > 3)Anti-Entropy and Read Repair
Leaderless systems use two mechanisms to keep replicas in sync:
Read Repair: When a client reads from multiple nodes and detects a stale value, it writes the correct value back to the stale node.
Read from 3 nodes: Node 1: x=5 (version 7) ← up to date Node 2: x=5 (version 7) ← up to date Node 3: x=3 (version 5) ← STALE!
Client detects stale value on Node 3.Client writes x=5 (version 7) back to Node 3. ← Read RepairAnti-Entropy Process: A background process continuously compares data between replicas and copies missing data. Unlike read repair, this does not depend on clients reading the data.
Part 2: Partitioning (Sharding)
Why Partition Data?
When a dataset is too large for a single machine, or when write throughput exceeds what one machine can handle, the data must be split across multiple nodes.
Before Partitioning:┌───────────────────────────────┐│ All Data ││ (too big for one node) │└───────────────────────────────┘
After Partitioning:┌───────────┐ ┌───────────┐ ┌───────────┐│ Partition │ │ Partition │ │ Partition ││ 1 │ │ 2 │ │ 3 ││ (A - H) │ │ (I - P) │ │ (Q - Z) │└───────────┘ └───────────┘ └───────────┘Partitioning Strategies
1. Range-Based Partitioning
Data is divided into contiguous ranges based on the key.
Key Range: [A-H] [I-P] [Q-Z] │ │ │ ┌───▼───┐ ┌────▼───┐ ┌───▼───┐ │Node 1 │ │Node 2 │ │Node 3 │ │ │ │ │ │ │ │Adams │ │Jones │ │Smith │ │Baker │ │Kim │ │Taylor │ │Clark │ │Lee │ │Wilson │ └───────┘ └────────┘ └───────┘Pros: Range queries are efficient (all keys in a range are on the same node) Cons: Hot spots if data is not uniformly distributed (for example, time-series data where recent data gets all the writes)
2. Hash-Based Partitioning
A hash function determines which partition a key belongs to.
hash(key) mod N = partition number
hash("Alice") mod 3 = 0 → Node 0hash("Bob") mod 3 = 1 → Node 1hash("Carol") mod 3 = 2 → Node 2hash("Dave") mod 3 = 0 → Node 0Pros: Even distribution of keys; no hot spots (with a good hash function) Cons: Range queries are impossible (adjacent keys hash to different partitions)
Problem with simple modular hashing: When you add or remove a node (N changes), almost every key maps to a different partition, requiring massive data migration.
Before (N=3): After adding node (N=4):hash("Alice") % 3 = 0 hash("Alice") % 4 = 2 ← MOVED!hash("Bob") % 3 = 1 hash("Bob") % 4 = 1 ← samehash("Carol") % 3 = 2 hash("Carol") % 4 = 0 ← MOVED!hash("Dave") % 3 = 0 hash("Dave") % 4 = 3 ← MOVED!
75% of keys must move! This is unacceptable at scale.Consistent Hashing
Consistent hashing solves the rebalancing problem. Instead of hash mod N, both keys and nodes are mapped onto a circular hash space (a “hash ring”). Each key is assigned to the nearest node going clockwise on the ring.
0 │ Node C ─┼── Node A / │ \ / │ \ / │ \ / │ \ 270 ─┤ Hash Ring ├─ 90 \ │ / \ │ / \ │ / \ │ / Node B ─┼── │ 180
Keys are placed on the ring using hash(key).Each key is served by the first node clockwise from its position.Adding a Node
When a new node is added, only the keys between the new node and its predecessor need to move:
Before: After adding Node D: A ──── B ──── C A ── D ── B ──── C │ │ │ │ │ │ │ [keys] [keys] [keys] [keys][moved][keys][keys] from B
Only keys between A and D (that were on B) need to move.Everything else stays in place.Virtual Nodes
In practice, each physical node is represented by multiple virtual nodes (vnodes) on the ring. This ensures more even distribution.
Physical nodes: A, B, CVirtual nodes (3 per physical):
Ring: A1 ── B1 ── C1 ── A2 ── B2 ── C2 ── A3 ── B3 ── C3
Now each physical node owns roughly 1/3 of the ring,and the distribution is much more even than with a singlepoint per node.import hashlibfrom bisect import bisect_rightfrom collections import defaultdict
class ConsistentHashRing: def __init__(self, nodes=None, virtual_nodes=150): self.virtual_nodes = virtual_nodes self.ring = {} # hash -> node name self.sorted_keys = [] # sorted hash values self.nodes = set()
if nodes: for node in nodes: self.add_node(node)
def _hash(self, key: str) -> int: """Generate a consistent hash for a key.""" return int( hashlib.md5(key.encode()).hexdigest(), 16 )
def add_node(self, node: str): """Add a node with virtual nodes to the ring.""" self.nodes.add(node) for i in range(self.virtual_nodes): virtual_key = f"{node}:vnode{i}" hash_val = self._hash(virtual_key) self.ring[hash_val] = node self.sorted_keys.append(hash_val) self.sorted_keys.sort()
def remove_node(self, node: str): """Remove a node and its virtual nodes.""" self.nodes.discard(node) for i in range(self.virtual_nodes): virtual_key = f"{node}:vnode{i}" hash_val = self._hash(virtual_key) del self.ring[hash_val] self.sorted_keys.remove(hash_val)
def get_node(self, key: str) -> str: """Find which node a key belongs to.""" if not self.ring: return None
hash_val = self._hash(key) # Find the first node clockwise from the key idx = bisect_right(self.sorted_keys, hash_val) if idx == len(self.sorted_keys): idx = 0 # Wrap around the ring return self.ring[self.sorted_keys[idx]]
def get_distribution(self, keys: list[str]) -> dict: """Show how keys are distributed across nodes.""" distribution = defaultdict(list) for key in keys: node = self.get_node(key) distribution[node].append(key) return dict(distribution)
# Usagering = ConsistentHashRing(['Node-A', 'Node-B', 'Node-C'])
keys = [f"user:{i}" for i in range(20)]dist = ring.get_distribution(keys)for node, assigned_keys in sorted(dist.items()): print(f"{node}: {len(assigned_keys)} keys")
# Add a new node -- only some keys moveprint("\nAdding Node-D...")ring.add_node('Node-D')new_dist = ring.get_distribution(keys)for node, assigned_keys in sorted(new_dist.items()): print(f"{node}: {len(assigned_keys)} keys")const crypto = require('crypto');
class ConsistentHashRing { constructor(nodes = [], virtualNodes = 150) { this.virtualNodes = virtualNodes; this.ring = new Map(); // hash -> node name this.sortedKeys = []; // sorted hash values this.nodes = new Set();
for (const node of nodes) { this.addNode(node); } }
_hash(key) { const hash = crypto.createHash('md5') .update(key) .digest('hex'); return parseInt(hash.substring(0, 8), 16); }
addNode(node) { this.nodes.add(node); for (let i = 0; i < this.virtualNodes; i++) { const virtualKey = `${node}:vnode${i}`; const hashVal = this._hash(virtualKey); this.ring.set(hashVal, node); this.sortedKeys.push(hashVal); } this.sortedKeys.sort((a, b) => a - b); }
removeNode(node) { this.nodes.delete(node); for (let i = 0; i < this.virtualNodes; i++) { const virtualKey = `${node}:vnode${i}`; const hashVal = this._hash(virtualKey); this.ring.delete(hashVal); const idx = this.sortedKeys.indexOf(hashVal); if (idx !== -1) this.sortedKeys.splice(idx, 1); } }
getNode(key) { if (this.ring.size === 0) return null;
const hashVal = this._hash(key);
// Binary search for the first node clockwise let low = 0, high = this.sortedKeys.length; while (low < high) { const mid = (low + high) >>> 1; if (this.sortedKeys[mid] <= hashVal) { low = mid + 1; } else { high = mid; } }
// Wrap around if needed const idx = low % this.sortedKeys.length; return this.ring.get(this.sortedKeys[idx]); }
getDistribution(keys) { const dist = {}; for (const key of keys) { const node = this.getNode(key); if (!dist[node]) dist[node] = []; dist[node].push(key); } return dist; }}
// Usageconst ring = new ConsistentHashRing( ['Node-A', 'Node-B', 'Node-C']);
const keys = Array.from( { length: 20 }, (_, i) => `user:${i}`);console.log('Before:', ring.getDistribution(keys));
ring.addNode('Node-D');console.log('After:', ring.getDistribution(keys));import java.security.MessageDigest;import java.util.*;
public class ConsistentHashRing { private final TreeMap<Long, String> ring = new TreeMap<>(); private final int virtualNodes; private final Set<String> nodes = new HashSet<>();
public ConsistentHashRing(int virtualNodes) { this.virtualNodes = virtualNodes; }
private long hash(String key) { try { MessageDigest md = MessageDigest.getInstance("MD5"); byte[] digest = md.digest(key.getBytes("UTF-8")); return ((long)(digest[0] & 0xFF) << 24) | ((long)(digest[1] & 0xFF) << 16) | ((long)(digest[2] & 0xFF) << 8) | ((long)(digest[3] & 0xFF)); } catch (Exception e) { throw new RuntimeException(e); } }
public void addNode(String node) { nodes.add(node); for (int i = 0; i < virtualNodes; i++) { long h = hash(node + ":vnode" + i); ring.put(h, node); } }
public void removeNode(String node) { nodes.remove(node); for (int i = 0; i < virtualNodes; i++) { long h = hash(node + ":vnode" + i); ring.remove(h); } }
public String getNode(String key) { if (ring.isEmpty()) return null; long h = hash(key); // Find first node clockwise Map.Entry<Long, String> entry = ring.ceilingEntry(h); if (entry == null) { entry = ring.firstEntry(); // wrap around } return entry.getValue(); }
public static void main(String[] args) { ConsistentHashRing ring = new ConsistentHashRing(150);
ring.addNode("Node-A"); ring.addNode("Node-B"); ring.addNode("Node-C");
Map<String, Integer> dist = new HashMap<>(); for (int i = 0; i < 1000; i++) { String node = ring.getNode("key:" + i); dist.merge(node, 1, Integer::sum); } System.out.println("Distribution: " + dist); }}Rebalancing Strategies
When nodes are added or removed, data must be rebalanced. Several strategies exist:
| Strategy | Description | Pros | Cons |
|---|---|---|---|
| Fixed partitions | Create many more partitions than nodes; assign partitions to nodes | Simple; no data splitting | Must choose partition count upfront |
| Dynamic partitioning | Split partitions that grow too large; merge small ones | Adapts to data volume | More complex; split/merge overhead |
| Proportional partitioning | Number of partitions proportional to number of nodes | Even load per node | Requires data movement when nodes change |
Example: Fixed Partitions (Elasticsearch, Riak, Couchbase)
1000 partitions, 3 nodes initially:
Node A: partitions 0-333 (334 partitions)Node B: partitions 334-666 (333 partitions)Node C: partitions 667-999 (333 partitions)
Add Node D:
Node A: partitions 0-249 (250 partitions)Node B: partitions 250-499 (250 partitions)Node C: partitions 500-749 (250 partitions)Node D: partitions 750-999 (250 partitions)
Each existing node gives away ~83 partitions to the new node.No partition is split -- only reassigned.Secondary Indexes with Partitioning
Secondary indexes complicate partitioning because the index key is different from the partition key.
Document-partitioned indexes (local indexes): Each partition maintains its own index for the data it contains. Queries that use the secondary index must scatter to all partitions (“scatter-gather”).
Partition 1: Partition 2: Partition 3:┌──────────────┐ ┌──────────────┐ ┌──────────────┐│ Data: │ │ Data: │ │ Data: ││ ID=1, red │ │ ID=3, blue │ │ ID=5, red ││ ID=2, blue │ │ ID=4, red │ │ ID=6, green ││ │ │ │ │ ││ Index: │ │ Index: │ │ Index: ││ red → [1] │ │ blue → [3] │ │ red → [5] ││ blue → [2] │ │ red → [4] │ │ green → [6] │└──────────────┘ └──────────────┘ └──────────────┘
Query "color=red" must check ALL 3 partitions!Term-partitioned indexes (global indexes): The index itself is partitioned — each partition holds index entries for a range of indexed values.
Index Partition A Index Partition B(colors a-m) (colors n-z)┌──────────────┐ ┌──────────────┐│ blue → [2,3] │ │ red → [1,4,5]││ green → [6] │ │ │└──────────────┘ └──────────────┘
Query "color=red" only needs Index Partition B.But writes must update multiple index partitions.Combining Replication and Partitioning
In practice, each partition is replicated across multiple nodes for fault tolerance:
3 partitions, replication factor 3:
Node 1: P1(leader) P2(follower) P3(follower)Node 2: P1(follower) P2(leader) P3(follower)Node 3: P1(follower) P2(follower) P3(leader)
Each partition has one leader and two followers.Each node hosts leaders for some partitionsand followers for others, distributing the load.Summary
| Concept | Key Takeaway |
|---|---|
| Leader-Follower | Simple, one write path; risk of single leader bottleneck |
| Multi-Leader | Multiple write paths; must handle conflicts |
| Leaderless | No single point of failure; quorum-based consistency |
| Range Partitioning | Good for range queries; risk of hot spots |
| Hash Partitioning | Even distribution; no range queries |
| Consistent Hashing | Minimal data movement when adding/removing nodes |
| Virtual Nodes | Even out consistent hashing distribution |