Apache Kafka
Apache Kafka is a distributed event streaming platform used by thousands of companies for high-throughput, fault-tolerant, real-time data pipelines and event-driven architectures. Originally built at LinkedIn, Kafka handles trillions of events per day at companies like Netflix, Uber, and Airbnb.
Unlike traditional message queues that delete messages after consumption, Kafka retains messages in a durable, ordered log. This enables multiple consumers to read the same data, replay events, and build real-time and batch processing pipelines.
Architecture Overview
┌──────────────────────────────────────────────────────────────────┐│ Kafka Cluster ││ ││ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ││ │ Broker 0 │ │ Broker 1 │ │ Broker 2 │ ││ │ │ │ │ │ │ ││ │ Topic A │ │ Topic A │ │ Topic A │ ││ │ Partition 0│ │ Partition 1│ │ Partition 2│ ││ │ (Leader) │ │ (Leader) │ │ (Leader) │ ││ │ │ │ │ │ │ ││ │ Topic A │ │ Topic A │ │ Topic A │ ││ │ Partition 1│ │ Partition 0│ │ Partition 0│ ││ │ (Replica) │ │ (Replica) │ │ (Replica) │ ││ └─────────────┘ └─────────────┘ └─────────────┘ ││ ││ ┌────────────────────────────────────────────────────────────┐ ││ │ ZooKeeper / KRaft │ ││ │ (cluster metadata and coordination) │ ││ └────────────────────────────────────────────────────────────┘ │└──────────────────────────────────────────────────────────────────┘ ▲ │ │ ▼ ┌─────────┐ ┌──────────────┐ │Producers│ │ Consumers │ │ │ │ (Consumer │ │ App A │ │ Groups) │ │ App B │ │ App X │ └─────────┘ │ App Y │ └──────────────┘Core Components
| Component | Description |
|---|---|
| Broker | A single Kafka server that stores data and serves clients |
| Cluster | A group of brokers working together |
| Topic | A named feed of messages (like a database table) |
| Partition | An ordered, immutable sequence of messages within a topic |
| Producer | An application that publishes messages to topics |
| Consumer | An application that reads messages from topics |
| Consumer Group | A set of consumers that cooperatively consume from a topic |
| ZooKeeper/KRaft | Manages cluster metadata, broker coordination, leader election |
Topics and Partitions
Topics
A topic is a logical channel for messages. Topics are multi-subscriber: multiple consumer groups can read from the same topic independently.
Partitions
Each topic is divided into partitions. Partitions are the unit of parallelism and are distributed across brokers.
Topic "orders" with 3 partitions:
Partition 0: [msg0] [msg3] [msg6] [msg9] ──▶ offsetPartition 1: [msg1] [msg4] [msg7] ──▶ offsetPartition 2: [msg2] [msg5] [msg8] [msg10] ──▶ offset
Messages are appended to partitions in order.Each message has a unique offset within its partition.Messages are NOT ordered across partitions.
Offset: A sequential ID for each message within a partition. Partition 0: offset 0, 1, 2, 3, ... Partition 1: offset 0, 1, 2, ...Partition Assignment
How does Kafka decide which partition a message goes to?
1. Explicit partition: Producer specifies partition number2. Key-based: partition = hash(key) % num_partitions Same key → same partition → guaranteed ordering per key3. Round-robin: No key → distribute evenly across partitionsReplication
Each partition is replicated across multiple brokers for fault tolerance:
Topic "orders", Partition 0, Replication Factor = 3:
Broker 0: Partition 0 (LEADER) ◀── All reads/writesBroker 1: Partition 0 (FOLLOWER) ◀── Replicates from leaderBroker 2: Partition 0 (FOLLOWER) ◀── Replicates from leader
If Broker 0 fails: Broker 1 or Broker 2 is elected new leader. No data loss (if in-sync replicas are up to date).
ISR (In-Sync Replicas): Followers that are caught upwith the leader. Only ISR members can be elected leader.Producers
Producers publish messages to Kafka topics. Key configuration options affect durability, throughput, and latency.
Producer Acknowledgments (acks)
acks=0: "Fire and forget" Producer ──▶ Broker (No acknowledgment. Fastest. Data loss possible.)
acks=1: "Leader acknowledged" Producer ──▶ Broker (Leader) Producer ◀── ACK (after leader writes to disk) (If leader crashes before replication, data loss.)
acks=all (-1): "All in-sync replicas acknowledged" Producer ──▶ Broker (Leader) ──▶ Follower 1 ──▶ Follower 2 Producer ◀── ACK (after all ISR replicas write) (Strongest durability. Highest latency.)from kafka import KafkaProducerimport json
producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode(), key_serializer=lambda k: k.encode() if k else None, acks='all', # Wait for all replicas retries=3, # Retry on failure batch_size=16384, # Batch messages (16KB) linger_ms=10, # Wait up to 10ms to batch compression_type='gzip', enable_idempotence=True # Exactly-once producer)
# Send a message with a key# Key determines partition: same key → same partitiondef publish_order(order): future = producer.send( 'orders', key=order['customer_id'], value=order, headers=[ ('source', b'order-service'), ('version', b'1.0') ] )
# Block until message is sent (or use callback) metadata = future.get(timeout=10) print( f"Sent to partition {metadata.partition} " f"at offset {metadata.offset}" )
# Publish orderspublish_order({ 'order_id': 'ORD-001', 'customer_id': 'CUST-42', 'items': ['widget', 'gadget'], 'total': 149.99})
# Flush any buffered messagesproducer.flush()producer.close()const { Kafka, Partitioners } = require('kafkajs');
const kafka = new Kafka({ clientId: 'order-service', brokers: ['localhost:9092'],});
const producer = kafka.producer({ allowAutoTopicCreation: false, idempotent: true, // Exactly-once producer transactionalId: 'order-producer-1', // For transactions});
async function publishOrders() { await producer.connect();
// Send a single message await producer.send({ topic: 'orders', messages: [ { key: 'CUST-42', value: JSON.stringify({ orderId: 'ORD-001', customerId: 'CUST-42', items: ['widget', 'gadget'], total: 149.99 }), headers: { source: 'order-service', version: '1.0' } } ] });
// Send a batch of messages await producer.sendBatch({ topicMessages: [ { topic: 'orders', messages: [ { key: 'CUST-1', value: JSON.stringify({ orderId: 'ORD-002', total: 29.99 })}, { key: 'CUST-2', value: JSON.stringify({ orderId: 'ORD-003', total: 59.99 })} ] }, { topic: 'analytics', messages: [ { value: JSON.stringify({ event: 'order_created', count: 2 })} ] } ] });
await producer.disconnect();}
publishOrders().catch(console.error);import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.serialization.StringSerializer;import com.google.gson.Gson;
import java.util.Properties;
public class OrderProducer {
public static void main(String[] args) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); props.put(ProducerConfig.RETRIES_CONFIG, 3); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 10); props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
Gson gson = new Gson();
try (var producer = new KafkaProducer<String, String>(props)) {
var order = new Order( "ORD-001", "CUST-42", 149.99 );
ProducerRecord<String, String> record = new ProducerRecord<>( "orders", order.customerId(), gson.toJson(order) );
// Asynchronous send with callback producer.send(record, (metadata, exception) -> { if (exception != null) { System.err.println( "Failed: " + exception.getMessage() ); } else { System.out.printf( "Sent to partition %d offset %d%n", metadata.partition(), metadata.offset() ); } });
producer.flush(); } }
record Order( String orderId, String customerId, double total ) {}}Consumers and Consumer Groups
Consumer Groups
A consumer group is a set of consumers that cooperatively read from a topic. Each partition is assigned to exactly one consumer within the group.
Topic "orders" with 4 partitions:Consumer Group "order-processing":
Partition 0 ──▶ Consumer A Partition 1 ──▶ Consumer A Partition 2 ──▶ Consumer B Partition 3 ──▶ Consumer C
3 consumers share 4 partitions. Consumer A handles 2 partitions.
If Consumer B crashes: Partition 2 is reassigned (rebalanced) to A or C.
Adding Consumer D: Partitions are rebalanced across A, B, C, D (one partition per consumer).
Adding Consumer E (5 consumers > 4 partitions): Consumer E sits idle (no partition to consume from).Offset Management
Each consumer tracks its position in each partition using an offset:
Partition 0: [0] [1] [2] [3] [4] [5] [6] [7] [8] ▲ ▲ │ │ committed offset current position (last processed) (reading here)
If consumer crashes and restarts: Resumes from committed offset (3). Messages 4-8 are reprocessed.
If using auto-commit: Offsets committed periodically (default: 5s). Messages between last commit and crash are reprocessed.
If using manual commit: You control exactly when offsets are committed. More control but more responsibility.from kafka import KafkaConsumerimport json
consumer = KafkaConsumer( 'orders', bootstrap_servers=['localhost:9092'], group_id='order-processing', auto_offset_reset='earliest', enable_auto_commit=False, value_deserializer=lambda m: json.loads( m.decode('utf-8') ))
try: for message in consumer: print( f"Partition: {message.partition}, " f"Offset: {message.offset}, " f"Key: {message.key}" )
order = message.value print(f"Processing order: {order['order_id']}")
# Process the message process_order(order)
# Manual commit after successful processing consumer.commit()
except KeyboardInterrupt: passfinally: consumer.close()
def process_order(order): print(f"Order {order['order_id']} processed")const { Kafka } = require('kafkajs');
const kafka = new Kafka({ clientId: 'order-processor', brokers: ['localhost:9092']});
const consumer = kafka.consumer({ groupId: 'order-processing'});
async function consume() { await consumer.connect(); await consumer.subscribe({ topic: 'orders', fromBeginning: true });
await consumer.run({ eachMessage: async ({ topic, partition, message, heartbeat }) => { const order = JSON.parse(message.value.toString()); const key = message.key?.toString();
console.log( `Partition: ${partition}, ` + `Offset: ${message.offset}, ` + `Key: ${key}` ); console.log(`Processing: ${order.orderId}`);
// Process the order await processOrder(order);
// Heartbeat to prevent rebalance during // long processing await heartbeat(); }, // Manual offset management autoCommit: false, eachBatch: async ({ batch, resolveOffset, commitOffsetsIfNecessary }) => { for (const message of batch.messages) { const order = JSON.parse( message.value.toString() ); await processOrder(order); resolveOffset(message.offset); await commitOffsetsIfNecessary(); } } });}
async function processOrder(order) { console.log(`Order ${order.orderId} processed`);}
consume().catch(console.error);Kafka Streams
Kafka Streams is a client library for building stream processing applications that transform, aggregate, and join data from Kafka topics.
Input Topic ──▶ Kafka Streams Application ──▶ Output Topic
Stream Processing Operations: - filter: Remove unwanted messages - map: Transform messages - flatMap: One-to-many transformation - groupBy: Group messages by key - aggregate/count/reduce: Stateful operations - join: Combine streams by key within a time window - windowed operations: Process data in time windowsStream Processing Example
Use Case: Real-time order analytics
Input: "orders" topic { orderId: "ORD-1", category: "electronics", amount: 299 } { orderId: "ORD-2", category: "books", amount: 29 } { orderId: "ORD-3", category: "electronics", amount: 599 }
Processing: 1. Filter: Only orders > $50 2. Group by: category 3. Aggregate: Sum amounts per category in 5-min windows
Output: "order-analytics" topic { category: "electronics", window: "14:00-14:05", total: 898 }import org.apache.kafka.streams.*;import org.apache.kafka.streams.kstream.*;import org.apache.kafka.common.serialization.*;import java.time.Duration;import java.util.Properties;
public class OrderAnalytics {
public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-analytics"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
// Read from "orders" topic KStream<String, String> orders = builder.stream("orders");
// Process: filter, group, aggregate KTable<Windowed<String>, Double> analytics = orders // Deserialize JSON .mapValues(OrderAnalytics::parseOrder) // Filter orders > $50 .filter((key, order) -> order.amount() > 50) // Group by category .groupBy((key, order) -> order.category()) // 5-minute tumbling window .windowedBy( TimeWindows.ofSizeWithNoGrace( Duration.ofMinutes(5) ) ) // Sum amounts .aggregate( () -> 0.0, (key, order, total) -> total + order.amount(), Materialized.with( Serdes.String(), Serdes.Double() ) );
// Write results to output topic analytics.toStream() .map((windowedKey, total) -> KeyValue.pair( windowedKey.key(), String.format( "{\"category\":\"%s\"," + "\"total\":%.2f}", windowedKey.key(), total ) )) .to("order-analytics");
// Start the stream processing KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start();
Runtime.getRuntime().addShutdownHook( new Thread(streams::close) ); }
record Order( String orderId, String category, double amount ) {}
static Order parseOrder(String json) { // Parse JSON to Order (simplified) return new Order("ORD-1", "electronics", 299.0); }}# Faust: Python Stream Processing (Kafka Streams equivalent)import faust
app = faust.App( 'order-analytics', broker='kafka://localhost:9092', value_serializer='json')
class Order(faust.Record): order_id: str category: str amount: float
# Input topicorders_topic = app.topic('orders', value_type=Order)
# Output topicanalytics_topic = app.topic('order-analytics')
# State: running totals per categorycategory_totals = app.Table( 'category-totals', default=float)
@app.agent(orders_topic)async def process_orders(orders): """Stream processor: aggregate orders by category.""" async for order in orders: if order.amount > 50: category_totals[order.category] += order.amount await analytics_topic.send( key=order.category, value={ 'category': order.category, 'running_total': category_totals[order.category] } )
if __name__ == '__main__': app.main()Exactly-Once Semantics
Kafka provides exactly-once semantics (EOS) through three mechanisms:
1. Idempotent Producer: Producer assigns sequence numbers to messages. Broker deduplicates by (producerId, sequence). Duplicate sends (from retries) are silently dropped.
2. Transactional Producer: Atomic writes across multiple partitions. Either ALL messages in a transaction are committed, or NONE are.
3. Consumer read_committed: Consumer only reads messages from committed transactions. Uncommitted/aborted messages are invisible.End-to-End Exactly-Once
Exactly-once from consume to produce:
Consumer reads from input topic ├── Process message ├── Produce to output topic ┐ └── Commit consumer offset ┘ ATOMIC (transactional)
If ANY step fails, the entire transaction is aborted.On retry, the consumer rereads and reprocesses.But the output is not duplicated (transaction was aborted).Kafka Use Cases
| Use Case | Description | Example Companies |
|---|---|---|
| Event streaming | Real-time event pipelines between systems | Netflix, LinkedIn |
| Log aggregation | Collect logs from all services into a central platform | All large tech companies |
| Metrics collection | Real-time operational metrics | Twitter, Datadog |
| Stream processing | Real-time data transformation and analytics | Uber (surge pricing) |
| Event sourcing | Store all state changes as events | Financial systems |
| CDC (Change Data Capture) | Replicate database changes to other systems | Debezium + Kafka |
| Microservice communication | Async communication between services | Most microservice architectures |
Kafka Operations Essentials
# Create a topickafka-topics.sh --create \ --bootstrap-server localhost:9092 \ --topic orders \ --partitions 6 \ --replication-factor 3
# List topicskafka-topics.sh --list \ --bootstrap-server localhost:9092
# Describe a topickafka-topics.sh --describe \ --bootstrap-server localhost:9092 \ --topic orders
# Produce messages from CLIkafka-console-producer.sh \ --bootstrap-server localhost:9092 \ --topic orders \ --property key.separator=: \ --property parse.key=true
# Consume messages from CLIkafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic orders \ --from-beginning \ --group test-consumer
# Check consumer group lagkafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --describe \ --group order-processingSummary
| Concept | Key Takeaway |
|---|---|
| Topics & Partitions | Topics are split into ordered partitions for parallelism |
| Replication | Each partition is replicated across brokers for fault tolerance |
| Consumer Groups | Consumers in a group share partitions for parallel processing |
| Offsets | Track consumer position; enable replay and exactly-once |
| acks=all | Strongest durability guarantee for producers |
| Kafka Streams | Client library for stateful stream processing |
| Exactly-Once | Achieved through idempotent producers and transactions |
| Retention | Messages persist for a configurable period (not deleted on consumption) |