Skip to content

Replication & Partitioning

Data replication and partitioning are the two fundamental mechanisms for distributing data across multiple machines. Replication keeps copies of the same data on multiple nodes for fault tolerance and read performance. Partitioning (also called sharding) splits data across nodes so that each node is responsible for a subset, enabling horizontal scaling.

Every distributed database uses some combination of these two techniques.


Part 1: Replication

Why Replicate Data?

GoalHow Replication Helps
Fault toleranceIf one node fails, data is still available on other nodes
Read scalabilitySpread read traffic across multiple replicas
Geographic localityPlace replicas close to users for lower latency
Offline operationMobile devices can work with local replicas

Replication Strategies Overview

┌─────────────────────────────────────────────────────────────┐
│ Replication Strategies │
├──────────────────┬───────────────────┬──────────────────────┤
│ Leader-Follower │ Multi-Leader │ Leaderless │
│ (Primary- │ (Multi-Master) │ (Dynamo-style) │
│ Secondary) │ │ │
├──────────────────┼───────────────────┼──────────────────────┤
│ One leader │ Multiple leaders │ No leader │
│ handles writes │ handle writes │ Any node handles │
│ │ │ writes │
├──────────────────┼───────────────────┼──────────────────────┤
│ PostgreSQL │ CouchDB │ Cassandra │
│ MySQL │ Galera Cluster │ DynamoDB │
│ MongoDB │ Active Directory │ Riak │
│ Redis │ Cosmos DB │ Voldemort │
└──────────────────┴───────────────────┴──────────────────────┘

Leader-Follower Replication

The most common replication strategy. One node is designated as the leader (primary/master). All writes go through the leader, which then replicates changes to followers (secondaries/replicas).

Writes Reads
│ ┌────┴────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Leader │────────▶│Follower │ │Follower │
│ (Primary)│────┐ │ 1 │ │ 2 │
└─────────┘ │ └─────────┘ └─────────┘
│ ┌─────────┐
└───▶│Follower │
│ 3 │
└─────────┘
Replication Log: Leader streams changes to all followers

Synchronous vs Asynchronous Replication

Synchronous: The leader waits for the follower to confirm it has received and written the data before acknowledging the write to the client.

Client ──▶ Leader ──▶ Follower (write to disk)
Leader ◀── ACK
Client ◀── ACK (write confirmed)
Pro: Follower is guaranteed to have an up-to-date copy
Con: One slow follower blocks all writes

Asynchronous: The leader acknowledges the write to the client immediately, without waiting for followers.

Client ──▶ Leader ──▶ Follower (write to disk, eventually)
Client ◀── ACK (write confirmed immediately)
Pro: Low write latency; no blocking
Con: Followers may lag; data loss if leader fails before replication

Semi-synchronous: A practical compromise. The leader waits for at least one follower to acknowledge, then replicates to the rest asynchronously.

Handling Leader Failure (Failover)

When the leader fails, a follower must be promoted. This process is called failover.

Normal Operation:
Client ──▶ Leader ──▶ Follower 1, Follower 2
Leader Fails:
Client ──▶ Leader ✗
Follower 1 ──▶ elected as new Leader
Follower 2 ──▶ now follows Follower 1
Recovery:
Old Leader comes back ──▶ becomes a Follower

Failover challenges:

  1. Data loss: If async replication was used, the new leader may be behind
  2. Split brain: Two nodes think they are leader simultaneously
  3. Stale reads: Clients reading from a lagging follower see old data
  4. Autoincrement conflicts: New leader may have different autoincrement state

Replication Log Implementation

import time
import threading
from dataclasses import dataclass, field
from typing import Any
@dataclass
class LogEntry:
sequence: int
operation: str # 'SET' or 'DELETE'
key: str
value: Any = None
timestamp: float = field(default_factory=time.time)
class ReplicationLog:
"""Write-ahead log for leader-follower replication."""
def __init__(self):
self.log: list[LogEntry] = []
self.sequence = 0
self.lock = threading.Lock()
def append(self, operation: str, key: str,
value: Any = None) -> LogEntry:
with self.lock:
self.sequence += 1
entry = LogEntry(
sequence=self.sequence,
operation=operation,
key=key,
value=value
)
self.log.append(entry)
return entry
def get_entries_after(self, sequence: int) -> list[LogEntry]:
"""Get all log entries after a given sequence number.
Used by followers to catch up."""
with self.lock:
return [
e for e in self.log
if e.sequence > sequence
]
class LeaderNode:
def __init__(self):
self.data = {}
self.replication_log = ReplicationLog()
self.followers = []
def write(self, key: str, value: Any):
"""Process a write and replicate to followers."""
self.data[key] = value
entry = self.replication_log.append('SET', key, value)
# Replicate to followers asynchronously
for follower in self.followers:
threading.Thread(
target=follower.apply_entry,
args=(entry,)
).start()
def read(self, key: str) -> Any:
return self.data.get(key)
class FollowerNode:
def __init__(self):
self.data = {}
self.last_applied_sequence = 0
def apply_entry(self, entry: LogEntry):
"""Apply a replicated log entry."""
if entry.sequence <= self.last_applied_sequence:
return # Already applied (idempotent)
if entry.operation == 'SET':
self.data[entry.key] = entry.value
elif entry.operation == 'DELETE':
self.data.pop(entry.key, None)
self.last_applied_sequence = entry.sequence
def read(self, key: str) -> Any:
"""Reads from follower may return stale data."""
return self.data.get(key)

Multi-Leader Replication

Multiple nodes accept writes independently. Each leader replicates its changes to all other leaders.

Data Center A Data Center B
┌─────────────┐ ┌─────────────┐
│ Leader A │◀──────────▶│ Leader B │
│ (writes) │ bidirect. │ (writes) │
│ │ replicat. │ │
└──┬──────────┘ └──────────┬──┘
│ │
▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│Follower │ │Follower │ │Follower │ │Follower │
│ A1 │ │ A2 │ │ B1 │ │ B2 │
└─────────┘ └─────────┘ └─────────┘ └─────────┘

Use cases:

  • Multi-datacenter operation (each datacenter has its own leader)
  • Collaborative editing (each user’s device is a “leader”)
  • Offline-capable applications

Write Conflict Resolution

The biggest challenge with multi-leader replication is handling conflicting writes. Two leaders can independently modify the same data.

Timeline:
T1: Leader A writes key="title", value="Foo"
T1: Leader B writes key="title", value="Bar"
T2: Both leaders replicate to each other
Conflict! Which value wins?

Resolution strategies:

StrategyDescriptionTrade-off
Last-Write-Wins (LWW)Use timestamps; latest write winsSimple but loses data; clock skew risk
Merge valuesConcatenate or union conflicting valuesNo data loss but may produce nonsense
Custom conflict handlerApplication-specific logicFull control but complex
CRDTsData structures that merge without conflictsLimited types but automatic
Prompt the userShow conflicts to the user for resolutionBest accuracy but poor UX

Leaderless Replication

No single node is designated as leader. Any node can accept reads and writes. The client sends writes to multiple nodes simultaneously and reads from multiple nodes, using quorum rules to determine the correct value.

Client
/ | \
/ | \
▼ ▼ ▼
┌──────┐ ┌──────┐ ┌──────┐
│Node 1│ │Node 2│ │Node 3│
│ x=5 │ │ x=5 │ │ x=3 │ (Node 3 is stale)
└──────┘ └──────┘ └──────┘
Write: Send to all 3 nodes, require W=2 acknowledgments
Read: Read from all 3 nodes, require R=2 responses
Compare values, return the most recent one

Quorum Reads and Writes

For a system with N replicas, if W nodes acknowledge a write and R nodes respond to a read, then the system guarantees strong consistency when W + R > N.

N = 3 (three replicas)
Write Quorum (W=2):
Client ──▶ Node 1 ✓ ACK
Client ──▶ Node 2 ✓ ACK Write succeeds (2 ACKs >= W)
Client ──▶ Node 3 ✗ FAIL
Read Quorum (R=2):
Client ──▶ Node 1 ✓ returns x=5 (version 7)
Client ──▶ Node 2 ✓ returns x=5 (version 7)
Client ──▶ Node 3 (not needed, R=2 satisfied)
At least one of the R nodes MUST have the latest write
because W + R > N (2 + 2 > 3)

Anti-Entropy and Read Repair

Leaderless systems use two mechanisms to keep replicas in sync:

Read Repair: When a client reads from multiple nodes and detects a stale value, it writes the correct value back to the stale node.

Read from 3 nodes:
Node 1: x=5 (version 7) ← up to date
Node 2: x=5 (version 7) ← up to date
Node 3: x=3 (version 5) ← STALE!
Client detects stale value on Node 3.
Client writes x=5 (version 7) back to Node 3. ← Read Repair

Anti-Entropy Process: A background process continuously compares data between replicas and copies missing data. Unlike read repair, this does not depend on clients reading the data.


Part 2: Partitioning (Sharding)

Why Partition Data?

When a dataset is too large for a single machine, or when write throughput exceeds what one machine can handle, the data must be split across multiple nodes.

Before Partitioning:
┌───────────────────────────────┐
│ All Data │
│ (too big for one node) │
└───────────────────────────────┘
After Partitioning:
┌───────────┐ ┌───────────┐ ┌───────────┐
│ Partition │ │ Partition │ │ Partition │
│ 1 │ │ 2 │ │ 3 │
│ (A - H) │ │ (I - P) │ │ (Q - Z) │
└───────────┘ └───────────┘ └───────────┘

Partitioning Strategies

1. Range-Based Partitioning

Data is divided into contiguous ranges based on the key.

Key Range: [A-H] [I-P] [Q-Z]
│ │ │
┌───▼───┐ ┌────▼───┐ ┌───▼───┐
│Node 1 │ │Node 2 │ │Node 3 │
│ │ │ │ │ │
│Adams │ │Jones │ │Smith │
│Baker │ │Kim │ │Taylor │
│Clark │ │Lee │ │Wilson │
└───────┘ └────────┘ └───────┘

Pros: Range queries are efficient (all keys in a range are on the same node) Cons: Hot spots if data is not uniformly distributed (for example, time-series data where recent data gets all the writes)

2. Hash-Based Partitioning

A hash function determines which partition a key belongs to.

hash(key) mod N = partition number
hash("Alice") mod 3 = 0 → Node 0
hash("Bob") mod 3 = 1 → Node 1
hash("Carol") mod 3 = 2 → Node 2
hash("Dave") mod 3 = 0 → Node 0

Pros: Even distribution of keys; no hot spots (with a good hash function) Cons: Range queries are impossible (adjacent keys hash to different partitions)

Problem with simple modular hashing: When you add or remove a node (N changes), almost every key maps to a different partition, requiring massive data migration.

Before (N=3): After adding node (N=4):
hash("Alice") % 3 = 0 hash("Alice") % 4 = 2 ← MOVED!
hash("Bob") % 3 = 1 hash("Bob") % 4 = 1 ← same
hash("Carol") % 3 = 2 hash("Carol") % 4 = 0 ← MOVED!
hash("Dave") % 3 = 0 hash("Dave") % 4 = 3 ← MOVED!
75% of keys must move! This is unacceptable at scale.

Consistent Hashing

Consistent hashing solves the rebalancing problem. Instead of hash mod N, both keys and nodes are mapped onto a circular hash space (a “hash ring”). Each key is assigned to the nearest node going clockwise on the ring.

0
Node C ─┼── Node A
/ │ \
/ │ \
/ │ \
/ │ \
270 ─┤ Hash Ring ├─ 90
\ │ /
\ │ /
\ │ /
\ │ /
Node B ─┼──
180
Keys are placed on the ring using hash(key).
Each key is served by the first node clockwise from its position.

Adding a Node

When a new node is added, only the keys between the new node and its predecessor need to move:

Before: After adding Node D:
A ──── B ──── C A ── D ── B ──── C
│ │ │ │ │ │ │
[keys] [keys] [keys] [keys][moved][keys][keys]
from B
Only keys between A and D (that were on B) need to move.
Everything else stays in place.

Virtual Nodes

In practice, each physical node is represented by multiple virtual nodes (vnodes) on the ring. This ensures more even distribution.

Physical nodes: A, B, C
Virtual nodes (3 per physical):
Ring: A1 ── B1 ── C1 ── A2 ── B2 ── C2 ── A3 ── B3 ── C3
Now each physical node owns roughly 1/3 of the ring,
and the distribution is much more even than with a single
point per node.
import hashlib
from bisect import bisect_right
from collections import defaultdict
class ConsistentHashRing:
def __init__(self, nodes=None, virtual_nodes=150):
self.virtual_nodes = virtual_nodes
self.ring = {} # hash -> node name
self.sorted_keys = [] # sorted hash values
self.nodes = set()
if nodes:
for node in nodes:
self.add_node(node)
def _hash(self, key: str) -> int:
"""Generate a consistent hash for a key."""
return int(
hashlib.md5(key.encode()).hexdigest(), 16
)
def add_node(self, node: str):
"""Add a node with virtual nodes to the ring."""
self.nodes.add(node)
for i in range(self.virtual_nodes):
virtual_key = f"{node}:vnode{i}"
hash_val = self._hash(virtual_key)
self.ring[hash_val] = node
self.sorted_keys.append(hash_val)
self.sorted_keys.sort()
def remove_node(self, node: str):
"""Remove a node and its virtual nodes."""
self.nodes.discard(node)
for i in range(self.virtual_nodes):
virtual_key = f"{node}:vnode{i}"
hash_val = self._hash(virtual_key)
del self.ring[hash_val]
self.sorted_keys.remove(hash_val)
def get_node(self, key: str) -> str:
"""Find which node a key belongs to."""
if not self.ring:
return None
hash_val = self._hash(key)
# Find the first node clockwise from the key
idx = bisect_right(self.sorted_keys, hash_val)
if idx == len(self.sorted_keys):
idx = 0 # Wrap around the ring
return self.ring[self.sorted_keys[idx]]
def get_distribution(self, keys: list[str]) -> dict:
"""Show how keys are distributed across nodes."""
distribution = defaultdict(list)
for key in keys:
node = self.get_node(key)
distribution[node].append(key)
return dict(distribution)
# Usage
ring = ConsistentHashRing(['Node-A', 'Node-B', 'Node-C'])
keys = [f"user:{i}" for i in range(20)]
dist = ring.get_distribution(keys)
for node, assigned_keys in sorted(dist.items()):
print(f"{node}: {len(assigned_keys)} keys")
# Add a new node -- only some keys move
print("\nAdding Node-D...")
ring.add_node('Node-D')
new_dist = ring.get_distribution(keys)
for node, assigned_keys in sorted(new_dist.items()):
print(f"{node}: {len(assigned_keys)} keys")

Rebalancing Strategies

When nodes are added or removed, data must be rebalanced. Several strategies exist:

StrategyDescriptionProsCons
Fixed partitionsCreate many more partitions than nodes; assign partitions to nodesSimple; no data splittingMust choose partition count upfront
Dynamic partitioningSplit partitions that grow too large; merge small onesAdapts to data volumeMore complex; split/merge overhead
Proportional partitioningNumber of partitions proportional to number of nodesEven load per nodeRequires data movement when nodes change

Example: Fixed Partitions (Elasticsearch, Riak, Couchbase)

1000 partitions, 3 nodes initially:
Node A: partitions 0-333 (334 partitions)
Node B: partitions 334-666 (333 partitions)
Node C: partitions 667-999 (333 partitions)
Add Node D:
Node A: partitions 0-249 (250 partitions)
Node B: partitions 250-499 (250 partitions)
Node C: partitions 500-749 (250 partitions)
Node D: partitions 750-999 (250 partitions)
Each existing node gives away ~83 partitions to the new node.
No partition is split -- only reassigned.

Secondary Indexes with Partitioning

Secondary indexes complicate partitioning because the index key is different from the partition key.

Document-partitioned indexes (local indexes): Each partition maintains its own index for the data it contains. Queries that use the secondary index must scatter to all partitions (“scatter-gather”).

Partition 1: Partition 2: Partition 3:
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Data: │ │ Data: │ │ Data: │
│ ID=1, red │ │ ID=3, blue │ │ ID=5, red │
│ ID=2, blue │ │ ID=4, red │ │ ID=6, green │
│ │ │ │ │ │
│ Index: │ │ Index: │ │ Index: │
│ red → [1] │ │ blue → [3] │ │ red → [5] │
│ blue → [2] │ │ red → [4] │ │ green → [6] │
└──────────────┘ └──────────────┘ └──────────────┘
Query "color=red" must check ALL 3 partitions!

Term-partitioned indexes (global indexes): The index itself is partitioned — each partition holds index entries for a range of indexed values.

Index Partition A Index Partition B
(colors a-m) (colors n-z)
┌──────────────┐ ┌──────────────┐
│ blue → [2,3] │ │ red → [1,4,5]│
│ green → [6] │ │ │
└──────────────┘ └──────────────┘
Query "color=red" only needs Index Partition B.
But writes must update multiple index partitions.

Combining Replication and Partitioning

In practice, each partition is replicated across multiple nodes for fault tolerance:

3 partitions, replication factor 3:
Node 1: P1(leader) P2(follower) P3(follower)
Node 2: P1(follower) P2(leader) P3(follower)
Node 3: P1(follower) P2(follower) P3(leader)
Each partition has one leader and two followers.
Each node hosts leaders for some partitions
and followers for others, distributing the load.

Summary

ConceptKey Takeaway
Leader-FollowerSimple, one write path; risk of single leader bottleneck
Multi-LeaderMultiple write paths; must handle conflicts
LeaderlessNo single point of failure; quorum-based consistency
Range PartitioningGood for range queries; risk of hot spots
Hash PartitioningEven distribution; no range queries
Consistent HashingMinimal data movement when adding/removing nodes
Virtual NodesEven out consistent hashing distribution