Stream Processing
Stream processing is the practice of processing data continuously as it arrives, rather than waiting to collect it into batches. In a world where users expect real-time dashboards, instant fraud detection, and sub-second recommendations, stream processing has become an essential capability in the modern data stack.
Why Stream Processing
Batch processing introduces inherent latency — data is only as fresh as the last batch run. For many use cases, this delay is unacceptable.
| Use Case | Required Latency | Why Batch Falls Short |
|---|---|---|
| Fraud detection | Milliseconds | Must block the transaction before it completes |
| Real-time dashboards | Seconds | Executives expect live business metrics |
| IoT monitoring | Seconds | Equipment failures must be detected immediately |
| Recommendation engines | Milliseconds | Recommendations based on current session behavior |
| Ride-sharing pricing | Seconds | Surge pricing must reflect current demand |
| Log anomaly detection | Seconds | Security breaches need immediate alerting |
Batch Processing Timeline:Events: ──●──●──●──●──●──●──●──●──●──●──●──●──●──●──●── │Batch window: ├──────── 1 hour ────────┤ │ │ │Results available: here (1 hour stale)
Stream Processing Timeline:Events: ──●──●──●──●──●──●──●──●──●──●──●──●──●──●──●── │ │ │ │ │ │ │ │ │ │ │ │ │ │ │Results: ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ (each event processed within milliseconds)Core Concepts
Events, Streams, and Tables
| Concept | Description | Example |
|---|---|---|
| Event | An immutable fact that something happened at a point in time | ”User 42 clicked ‘Add to Cart’ at 14:32:05” |
| Stream | An unbounded, ordered sequence of events | All click events from the website |
| Table | A materialized view of a stream at a point in time | Current inventory count per product |
Event Time vs Processing Time
This distinction is critical for correct stream processing:
Event Time: When the event ACTUALLY occurred (embedded in the data)Processing Time: When the event ARRIVES at the processing system
Example: A mobile app records a click at 14:32:05 (event time) The phone was offline, so the event arrives at 14:45:00 (processing time) Difference: 13 minutes of delay
──────────────────────────────────────────────▶ time 14:32:05 14:45:00 (event time) (processing time) │ │ ▼ ▼ User clicked System received the button the event| Aspect | Event Time | Processing Time |
|---|---|---|
| Definition | When the event happened | When the system processes it |
| Source | Embedded in the event payload | System clock at processing |
| Deterministic | Yes — same input always same result | No — depends on system load |
| Late data | Must be handled explicitly | Not an issue (everything is “on time”) |
| Use for | Correct business logic | System monitoring and throughput |
Windowing
Since streams are infinite, we need a way to bound computations. Windows group events into finite chunks for aggregation.
Tumbling Windows
Fixed-size, non-overlapping windows. Every event belongs to exactly one window.
Events: ──●─●──●───●──●─●──●───●──●─●──●───●── │ │ │ │ │ │Window 1: ├────────┤ │ │ │ │ 0s 10s │ │ │ │Window 2: ├────────┤ │ │ 10s 20s │ │Window 3: ├────────┤ 20s 30s
Each window: exactly 10 secondsOverlap: noneUse case: Compute the number of page views per minute.
Sliding Windows
Fixed-size windows that advance by a slide interval, creating overlap. An event may belong to multiple windows.
Events: ──●─●──●───●──●─●──●───●──●─●──●───●── │ │ │Window 1: ├──────────────┤ │ (0s - 10s) │ │ │ │Window 2: ├──────────────┤ │ (5s - 15s) │ │ │Window 3: ├──────────────┤ (10s - 20s)
Window size: 10 secondsSlide interval: 5 secondsOverlap: 5 secondsUse case: Compute a 5-minute moving average of stock prices, updated every 30 seconds.
Session Windows
Variable-size windows defined by a gap duration. A session ends when no events arrive for the specified gap.
Events: ──●─●──●─────────────────●──●─●───────────────●── │ │ │Session 1: ├──────────┤ │ │ (gap > threshold) │ │ │ │Session 2: ├──────────┤ │ (gap > threshold) │ │Session 3: ├──┤
Gap timeout: e.g., 30 minutes of inactivityUse case: Group user interactions into browsing sessions (a session ends after 30 minutes of inactivity).
Windowing Comparison
| Window Type | Size | Overlap | Events per Window | Use Case |
|---|---|---|---|---|
| Tumbling | Fixed | None | Exactly one window per event | Periodic aggregation |
| Sliding | Fixed | Yes | Multiple windows per event | Moving averages |
| Session | Variable | None | Activity-based grouping | User session analysis |
| Global | Infinite | N/A | All events, one window | Unbounded aggregation |
Apache Spark Structured Streaming
Spark Structured Streaming treats a live data stream as a continuously appending table. It uses the same DataFrame API as batch Spark, making it easy to transition from batch to streaming.
from pyspark.sql import SparkSessionfrom pyspark.sql.functions import ( window, col, count, sum as spark_sum, avg)
spark = SparkSession.builder \ .appName("StreamProcessing") \ .getOrCreate()
# Read from Kafkaraw_stream = spark.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "kafka:9092") \ .option("subscribe", "orders") \ .option("startingOffsets", "latest") \ .load()
# Parse the Kafka value (JSON)from pyspark.sql.types import ( StructType, StructField, StringType, DoubleType, TimestampType)
schema = StructType([ StructField("order_id", StringType()), StructField("customer_id", StringType()), StructField("amount", DoubleType()), StructField("event_time", TimestampType()),])
orders = raw_stream \ .selectExpr("CAST(value AS STRING) as json") \ .select(from_json(col("json"), schema).alias("data")) \ .select("data.*")
# Tumbling window: revenue per 5-minute windowrevenue_per_window = orders \ .withWatermark("event_time", "10 minutes") \ .groupBy( window(col("event_time"), "5 minutes") ) \ .agg( count("order_id").alias("order_count"), spark_sum("amount").alias("total_revenue"), avg("amount").alias("avg_order_value"), )
# Write results to console (for development)query = revenue_per_window.writeStream \ .outputMode("update") \ .format("console") \ .option("truncate", False) \ .trigger(processingTime="30 seconds") \ .start()
# Write results to Delta Lake (for production)production_query = revenue_per_window.writeStream \ .outputMode("append") \ .format("delta") \ .option("checkpointLocation", "s3://checkpoints/revenue/") \ .trigger(processingTime="1 minute") \ .toTable("realtime_revenue")
query.awaitTermination()import org.apache.spark.sql.*;import org.apache.spark.sql.streaming.*;import org.apache.spark.sql.types.*;import static org.apache.spark.sql.functions.*;
public class StreamProcessor { public static void main(String[] args) throws Exception { SparkSession spark = SparkSession.builder() .appName("StreamProcessing") .getOrCreate();
// Read from Kafka Dataset<Row> rawStream = spark.readStream() .format("kafka") .option("kafka.bootstrap.servers", "kafka:9092") .option("subscribe", "orders") .option("startingOffsets", "latest") .load();
// Define schema for order events StructType schema = new StructType() .add("order_id", DataTypes.StringType) .add("customer_id", DataTypes.StringType) .add("amount", DataTypes.DoubleType) .add("event_time", DataTypes.TimestampType);
// Parse JSON and apply schema Dataset<Row> orders = rawStream .selectExpr("CAST(value AS STRING) as json") .select(from_json(col("json"), schema).alias("data")) .select("data.*");
// Tumbling window aggregation with watermark Dataset<Row> revenuePerWindow = orders .withWatermark("event_time", "10 minutes") .groupBy(window(col("event_time"), "5 minutes")) .agg( count("order_id").alias("order_count"), sum("amount").alias("total_revenue"), avg("amount").alias("avg_order_value") );
// Write to console StreamingQuery query = revenuePerWindow.writeStream() .outputMode("update") .format("console") .option("truncate", false) .trigger(Trigger.ProcessingTime("30 seconds")) .start();
query.awaitTermination(); }}Apache Flink
Apache Flink is a distributed stream processing framework designed for true event-time processing with low latency. Unlike Spark’s micro-batch model, Flink processes events one at a time with native streaming.
Spark Streaming vs Flink
| Feature | Spark Structured Streaming | Apache Flink |
|---|---|---|
| Processing model | Micro-batch (default) or continuous | True event-at-a-time |
| Latency | Seconds (micro-batch) | Milliseconds |
| Event time support | Yes (with watermarks) | First-class, deeply integrated |
| State management | Limited | Rich, queryable state |
| Exactly-once | Yes (with checkpointing) | Yes (with checkpointing) |
| API | DataFrame/SQL | DataStream + Table/SQL |
| Ecosystem | Huge (Spark ecosystem) | Growing |
| Best for | Teams already using Spark | Ultra-low latency, complex event processing |
Flink Example
import org.apache.flink.api.common.eventtime.*;import org.apache.flink.streaming.api.datastream.*;import org.apache.flink.streaming.api.environment.*;import org.apache.flink.streaming.api.windowing.time.*;import org.apache.flink.streaming.api.windowing.assigners.*;import org.apache.flink.streaming.connectors.kafka.*;import java.time.Duration;
public class OrderStreamProcessor { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing for exactly-once semantics env.enableCheckpointing(60000); // every 60 seconds
// Read from Kafka KafkaSource<Order> source = KafkaSource.<Order>builder() .setBootstrapServers("kafka:9092") .setTopics("orders") .setGroupId("order-processor") .setValueOnlyDeserializer(new OrderDeserializer()) .build();
DataStream<Order> orders = env.fromSource( source, WatermarkStrategy .<Order>forBoundedOutOfOrderness(Duration.ofMinutes(5)) .withTimestampAssigner( (order, timestamp) -> order.getEventTime() ), "Kafka Orders" );
// Tumbling window: order count and revenue per 5 minutes DataStream<WindowResult> results = orders .keyBy(Order::getRegion) .window(TumblingEventTimeWindows.of(Time.minutes(5))) .allowedLateness(Time.minutes(2)) .aggregate(new OrderAggregator());
// Session window: group orders by customer session DataStream<SessionResult> sessions = orders .keyBy(Order::getCustomerId) .window(EventTimeSessionWindows.withGap(Time.minutes(30))) .process(new SessionAnalyzer());
results.addSink(new JdbcSink<>(/* warehouse connection */)); sessions.addSink(new KafkaSink<>(/* downstream topic */));
env.execute("Order Stream Processing"); }}from pyflink.datastream import StreamExecutionEnvironmentfrom pyflink.table import StreamTableEnvironment, EnvironmentSettingsfrom pyflink.table.window import Tumblefrom pyflink.table.expressions import col, lit
# Set up the streaming environmentenv = StreamExecutionEnvironment.get_execution_environment()env.enable_checkpointing(60000) # every 60 seconds
t_env = StreamTableEnvironment.create(env)
# Define Kafka source with DDLt_env.execute_sql(""" CREATE TABLE orders ( order_id STRING, customer_id STRING, amount DOUBLE, region STRING, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' MINUTE ) WITH ( 'connector' = 'kafka', 'topic' = 'orders', 'properties.bootstrap.servers' = 'kafka:9092', 'format' = 'json', 'scan.startup.mode' = 'latest-offset' )""")
# Tumbling window aggregationt_env.execute_sql(""" SELECT window_start, window_end, region, COUNT(order_id) AS order_count, SUM(amount) AS total_revenue, AVG(amount) AS avg_order_value FROM TABLE( TUMBLE(TABLE orders, DESCRIPTOR(event_time), INTERVAL '5' MINUTE) ) GROUP BY window_start, window_end, region""").print()Watermarks
Watermarks solve the problem of late-arriving data in event-time processing. A watermark is a timestamp that tells the system: “I believe all events with a timestamp earlier than this have arrived.”
Event timeline (actual arrival order):
Time ──▶ T=0 T=5 T=10 T=15 T=20 T=25 │ │ │ │ │ │Events: e1 e3 e5 e2* e6 e4* (T=1) (T=6) (T=11) (T=4) (T=21) (T=8)
* e2 and e4 arrived LATE (event time < processing time)
Watermark with 10-minute tolerance: At processing time T=15, watermark = T=15 - 10 = T=5 Meaning: "All events before T=5 should have arrived" e2 (event time T=4) arrives at T=15 → still within watermark → ACCEPTED If e2 arrived at T=25, watermark = T=15 → event time T=4 < T=15 → LATE → dropped or side-outputWatermark Strategies
| Strategy | Description | Trade-off |
|---|---|---|
| Bounded out-of-orderness | Watermark = max event time - tolerance | Handles known delays; tolerance too small drops events, too large increases latency |
| Monotonically increasing | Watermark = max event time (no tolerance) | Lowest latency, but any out-of-order event is late |
| Custom | Application-specific logic | Maximum flexibility, most complex to implement |
Exactly-Once Semantics
One of the most critical guarantees in stream processing is delivery semantics — how many times each event is processed.
| Guarantee | Description | Trade-off |
|---|---|---|
| At-most-once | Events may be lost, never duplicated | Fastest, lowest overhead; unacceptable for financial data |
| At-least-once | Events are never lost, may be duplicated | Requires downstream deduplication |
| Exactly-once | Events are processed precisely once | Most complex, some performance overhead |
How Exactly-Once Works
┌──────────────────────────────────────────────────────────┐│ Exactly-Once via Checkpointing ││ ││ 1. Processing engine periodically snapshots its state ││ (offsets, aggregation state, window contents) ││ ││ 2. On failure, the engine restores from the last ││ checkpoint and replays events from that offset ││ ││ 3. Combined with idempotent sinks, this ensures each ││ event affects the output exactly once ││ ││ ┌──────┐ Checkpoint ┌──────┐ Checkpoint ││ │State │──────(C1)──────▶│State │──────(C2)──────▶ ││ │ v1 │ │ v2 │ ││ └──────┘ └──────┘ ││ │ ││ Failure here ││ │ ││ ▼ ││ Restore from C2 ││ Replay from offset at C2 ││ Results: identical to no-failure │└──────────────────────────────────────────────────────────┘End-to-End Exactly-Once
True exactly-once requires coordination across the entire pipeline — source, processing engine, and sink.
| Component | Requirement |
|---|---|
| Source | Must be replayable (Kafka stores offsets; can replay from any position) |
| Engine | Must checkpoint state consistently (Flink and Spark support this) |
| Sink | Must be idempotent (upserts, deduplication keys) or support transactions |
def write_to_sink_idempotent(batch_df, batch_id): """ Idempotent sink: uses upsert to prevent duplicates even if the same batch is written multiple times. """ batch_df.createOrReplaceTempView("batch_data")
batch_df._jdf.sparkSession().sql(""" MERGE INTO target_table AS target USING batch_data AS source ON target.event_id = source.event_id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * """)
# Use foreachBatch for exactly-once writesquery = stream_df.writeStream \ .foreachBatch(write_to_sink_idempotent) \ .option("checkpointLocation", "s3://checkpoints/sink/") \ .start()async function writeToSinkIdempotent(events, db) { /** * Idempotent sink: uses upsert (ON CONFLICT) to prevent * duplicates even if the same batch is written multiple times. */ const query = ` INSERT INTO target_table (event_id, customer_id, amount, event_time) VALUES ($1, $2, $3, $4) ON CONFLICT (event_id) DO UPDATE SET customer_id = EXCLUDED.customer_id, amount = EXCLUDED.amount, event_time = EXCLUDED.event_time `;
// Process in a transaction for atomicity const client = await db.connect(); try { await client.query("BEGIN"); for (const event of events) { await client.query(query, [ event.event_id, event.customer_id, event.amount, event.event_time, ]); } await client.query("COMMIT"); } catch (err) { await client.query("ROLLBACK"); throw err; } finally { client.release(); }}Stream Processing Architecture Patterns
Pattern 1: Event Sourcing
Store every state change as an immutable event. The current state is derived by replaying all events.
Events (append-only log):┌──────────────────────────────────────────┐│ OrderCreated(id=1, amount=50) ││ OrderItemAdded(id=1, item="Widget") ││ OrderItemAdded(id=1, item="Gadget") ││ OrderShipped(id=1, tracking="ABC123") ││ OrderDelivered(id=1) │└──────────────────────────────────────────┘ │ ▼ Replay events to derive current state┌──────────────────────────────────────────┐│ Order: id=1, amount=50, items=2, ││ status=delivered, tracking=ABC123 │└──────────────────────────────────────────┘Pattern 2: CQRS (Command Query Responsibility Segregation)
Separate the write model (commands) from the read model (queries), connected by a stream of events.
┌──────────┐ Commands ┌──────────┐ Events ┌──────────┐│ Client │─────────────▶│ Write │───────────▶│ Read ││ (Write) │ │ Model │ (Stream) │ Model │└──────────┘ └──────────┘ └──────────┘ ▲┌──────────┐ Queries ││ Client │──────────────────────────────────────────┘│ (Read) │└──────────┘Quick Reference: Choosing a Stream Processor
| Factor | Spark Streaming | Flink | Kafka Streams |
|---|---|---|---|
| Latency | Seconds | Milliseconds | Milliseconds |
| Deployment | Cluster (YARN, K8s) | Cluster (YARN, K8s) | Library (embedded) |
| State | Limited | Rich, queryable | Embedded, backed by Kafka |
| SQL support | Excellent | Good | KSQL (separate) |
| Best for | Spark shops, batch + stream | Complex event processing | Kafka-native microservices |