Skip to content

Stream Processing

Stream processing is the practice of processing data continuously as it arrives, rather than waiting to collect it into batches. In a world where users expect real-time dashboards, instant fraud detection, and sub-second recommendations, stream processing has become an essential capability in the modern data stack.


Why Stream Processing

Batch processing introduces inherent latency — data is only as fresh as the last batch run. For many use cases, this delay is unacceptable.

Use CaseRequired LatencyWhy Batch Falls Short
Fraud detectionMillisecondsMust block the transaction before it completes
Real-time dashboardsSecondsExecutives expect live business metrics
IoT monitoringSecondsEquipment failures must be detected immediately
Recommendation enginesMillisecondsRecommendations based on current session behavior
Ride-sharing pricingSecondsSurge pricing must reflect current demand
Log anomaly detectionSecondsSecurity breaches need immediate alerting
Batch Processing Timeline:
Events: ──●──●──●──●──●──●──●──●──●──●──●──●──●──●──●──
Batch window: ├──────── 1 hour ────────┤ │
│ │
Results available: here (1 hour stale)
Stream Processing Timeline:
Events: ──●──●──●──●──●──●──●──●──●──●──●──●──●──●──●──
│ │ │ │ │ │ │ │ │ │ │ │ │ │ │
Results: ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼
(each event processed within milliseconds)

Core Concepts

Events, Streams, and Tables

ConceptDescriptionExample
EventAn immutable fact that something happened at a point in time”User 42 clicked ‘Add to Cart’ at 14:32:05”
StreamAn unbounded, ordered sequence of eventsAll click events from the website
TableA materialized view of a stream at a point in timeCurrent inventory count per product

Event Time vs Processing Time

This distinction is critical for correct stream processing:

Event Time: When the event ACTUALLY occurred (embedded in the data)
Processing Time: When the event ARRIVES at the processing system
Example:
A mobile app records a click at 14:32:05 (event time)
The phone was offline, so the event arrives at 14:45:00 (processing time)
Difference: 13 minutes of delay
──────────────────────────────────────────────▶ time
14:32:05 14:45:00
(event time) (processing time)
│ │
▼ ▼
User clicked System received
the button the event
AspectEvent TimeProcessing Time
DefinitionWhen the event happenedWhen the system processes it
SourceEmbedded in the event payloadSystem clock at processing
DeterministicYes — same input always same resultNo — depends on system load
Late dataMust be handled explicitlyNot an issue (everything is “on time”)
Use forCorrect business logicSystem monitoring and throughput

Windowing

Since streams are infinite, we need a way to bound computations. Windows group events into finite chunks for aggregation.

Tumbling Windows

Fixed-size, non-overlapping windows. Every event belongs to exactly one window.

Events: ──●─●──●───●──●─●──●───●──●─●──●───●──
│ │ │ │ │ │
Window 1: ├────────┤ │ │ │ │
0s 10s │ │ │ │
Window 2: ├────────┤ │ │
10s 20s │ │
Window 3: ├────────┤
20s 30s
Each window: exactly 10 seconds
Overlap: none

Use case: Compute the number of page views per minute.

Sliding Windows

Fixed-size windows that advance by a slide interval, creating overlap. An event may belong to multiple windows.

Events: ──●─●──●───●──●─●──●───●──●─●──●───●──
│ │ │
Window 1: ├──────────────┤ │ (0s - 10s)
│ │ │ │
Window 2: ├──────────────┤ │ (5s - 15s)
│ │ │
Window 3: ├──────────────┤ (10s - 20s)
Window size: 10 seconds
Slide interval: 5 seconds
Overlap: 5 seconds

Use case: Compute a 5-minute moving average of stock prices, updated every 30 seconds.

Session Windows

Variable-size windows defined by a gap duration. A session ends when no events arrive for the specified gap.

Events: ──●─●──●─────────────────●──●─●───────────────●──
│ │ │
Session 1: ├──────────┤ │ │
(gap > threshold) │ │
│ │
Session 2: ├──────────┤ │
(gap > threshold) │
Session 3: ├──┤
Gap timeout: e.g., 30 minutes of inactivity

Use case: Group user interactions into browsing sessions (a session ends after 30 minutes of inactivity).

Windowing Comparison

Window TypeSizeOverlapEvents per WindowUse Case
TumblingFixedNoneExactly one window per eventPeriodic aggregation
SlidingFixedYesMultiple windows per eventMoving averages
SessionVariableNoneActivity-based groupingUser session analysis
GlobalInfiniteN/AAll events, one windowUnbounded aggregation

Apache Spark Structured Streaming

Spark Structured Streaming treats a live data stream as a continuously appending table. It uses the same DataFrame API as batch Spark, making it easy to transition from batch to streaming.

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
window, col, count, sum as spark_sum, avg
)
spark = SparkSession.builder \
.appName("StreamProcessing") \
.getOrCreate()
# Read from Kafka
raw_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "orders") \
.option("startingOffsets", "latest") \
.load()
# Parse the Kafka value (JSON)
from pyspark.sql.types import (
StructType, StructField, StringType,
DoubleType, TimestampType
)
schema = StructType([
StructField("order_id", StringType()),
StructField("customer_id", StringType()),
StructField("amount", DoubleType()),
StructField("event_time", TimestampType()),
])
orders = raw_stream \
.selectExpr("CAST(value AS STRING) as json") \
.select(from_json(col("json"), schema).alias("data")) \
.select("data.*")
# Tumbling window: revenue per 5-minute window
revenue_per_window = orders \
.withWatermark("event_time", "10 minutes") \
.groupBy(
window(col("event_time"), "5 minutes")
) \
.agg(
count("order_id").alias("order_count"),
spark_sum("amount").alias("total_revenue"),
avg("amount").alias("avg_order_value"),
)
# Write results to console (for development)
query = revenue_per_window.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", False) \
.trigger(processingTime="30 seconds") \
.start()
# Write results to Delta Lake (for production)
production_query = revenue_per_window.writeStream \
.outputMode("append") \
.format("delta") \
.option("checkpointLocation", "s3://checkpoints/revenue/") \
.trigger(processingTime="1 minute") \
.toTable("realtime_revenue")
query.awaitTermination()

Apache Flink is a distributed stream processing framework designed for true event-time processing with low latency. Unlike Spark’s micro-batch model, Flink processes events one at a time with native streaming.

FeatureSpark Structured StreamingApache Flink
Processing modelMicro-batch (default) or continuousTrue event-at-a-time
LatencySeconds (micro-batch)Milliseconds
Event time supportYes (with watermarks)First-class, deeply integrated
State managementLimitedRich, queryable state
Exactly-onceYes (with checkpointing)Yes (with checkpointing)
APIDataFrame/SQLDataStream + Table/SQL
EcosystemHuge (Spark ecosystem)Growing
Best forTeams already using SparkUltra-low latency, complex event processing
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.*;
import org.apache.flink.streaming.api.windowing.time.*;
import org.apache.flink.streaming.api.windowing.assigners.*;
import org.apache.flink.streaming.connectors.kafka.*;
import java.time.Duration;
public class OrderStreamProcessor {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing for exactly-once semantics
env.enableCheckpointing(60000); // every 60 seconds
// Read from Kafka
KafkaSource<Order> source = KafkaSource.<Order>builder()
.setBootstrapServers("kafka:9092")
.setTopics("orders")
.setGroupId("order-processor")
.setValueOnlyDeserializer(new OrderDeserializer())
.build();
DataStream<Order> orders = env.fromSource(
source,
WatermarkStrategy
.<Order>forBoundedOutOfOrderness(Duration.ofMinutes(5))
.withTimestampAssigner(
(order, timestamp) -> order.getEventTime()
),
"Kafka Orders"
);
// Tumbling window: order count and revenue per 5 minutes
DataStream<WindowResult> results = orders
.keyBy(Order::getRegion)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.minutes(2))
.aggregate(new OrderAggregator());
// Session window: group orders by customer session
DataStream<SessionResult> sessions = orders
.keyBy(Order::getCustomerId)
.window(EventTimeSessionWindows.withGap(Time.minutes(30)))
.process(new SessionAnalyzer());
results.addSink(new JdbcSink<>(/* warehouse connection */));
sessions.addSink(new KafkaSink<>(/* downstream topic */));
env.execute("Order Stream Processing");
}
}

Watermarks

Watermarks solve the problem of late-arriving data in event-time processing. A watermark is a timestamp that tells the system: “I believe all events with a timestamp earlier than this have arrived.”

Event timeline (actual arrival order):
Time ──▶ T=0 T=5 T=10 T=15 T=20 T=25
│ │ │ │ │ │
Events: e1 e3 e5 e2* e6 e4*
(T=1) (T=6) (T=11) (T=4) (T=21) (T=8)
* e2 and e4 arrived LATE (event time < processing time)
Watermark with 10-minute tolerance:
At processing time T=15, watermark = T=15 - 10 = T=5
Meaning: "All events before T=5 should have arrived"
e2 (event time T=4) arrives at T=15 → still within watermark → ACCEPTED
If e2 arrived at T=25, watermark = T=15 → event time T=4 < T=15 → LATE → dropped or side-output

Watermark Strategies

StrategyDescriptionTrade-off
Bounded out-of-ordernessWatermark = max event time - toleranceHandles known delays; tolerance too small drops events, too large increases latency
Monotonically increasingWatermark = max event time (no tolerance)Lowest latency, but any out-of-order event is late
CustomApplication-specific logicMaximum flexibility, most complex to implement

Exactly-Once Semantics

One of the most critical guarantees in stream processing is delivery semantics — how many times each event is processed.

GuaranteeDescriptionTrade-off
At-most-onceEvents may be lost, never duplicatedFastest, lowest overhead; unacceptable for financial data
At-least-onceEvents are never lost, may be duplicatedRequires downstream deduplication
Exactly-onceEvents are processed precisely onceMost complex, some performance overhead

How Exactly-Once Works

┌──────────────────────────────────────────────────────────┐
│ Exactly-Once via Checkpointing │
│ │
│ 1. Processing engine periodically snapshots its state │
│ (offsets, aggregation state, window contents) │
│ │
│ 2. On failure, the engine restores from the last │
│ checkpoint and replays events from that offset │
│ │
│ 3. Combined with idempotent sinks, this ensures each │
│ event affects the output exactly once │
│ │
│ ┌──────┐ Checkpoint ┌──────┐ Checkpoint │
│ │State │──────(C1)──────▶│State │──────(C2)──────▶ │
│ │ v1 │ │ v2 │ │
│ └──────┘ └──────┘ │
│ │ │
│ Failure here │
│ │ │
│ ▼ │
│ Restore from C2 │
│ Replay from offset at C2 │
│ Results: identical to no-failure │
└──────────────────────────────────────────────────────────┘

End-to-End Exactly-Once

True exactly-once requires coordination across the entire pipeline — source, processing engine, and sink.

ComponentRequirement
SourceMust be replayable (Kafka stores offsets; can replay from any position)
EngineMust checkpoint state consistently (Flink and Spark support this)
SinkMust be idempotent (upserts, deduplication keys) or support transactions
def write_to_sink_idempotent(batch_df, batch_id):
"""
Idempotent sink: uses upsert to prevent duplicates
even if the same batch is written multiple times.
"""
batch_df.createOrReplaceTempView("batch_data")
batch_df._jdf.sparkSession().sql("""
MERGE INTO target_table AS target
USING batch_data AS source
ON target.event_id = source.event_id
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
""")
# Use foreachBatch for exactly-once writes
query = stream_df.writeStream \
.foreachBatch(write_to_sink_idempotent) \
.option("checkpointLocation", "s3://checkpoints/sink/") \
.start()

Stream Processing Architecture Patterns

Pattern 1: Event Sourcing

Store every state change as an immutable event. The current state is derived by replaying all events.

Events (append-only log):
┌──────────────────────────────────────────┐
│ OrderCreated(id=1, amount=50) │
│ OrderItemAdded(id=1, item="Widget") │
│ OrderItemAdded(id=1, item="Gadget") │
│ OrderShipped(id=1, tracking="ABC123") │
│ OrderDelivered(id=1) │
└──────────────────────────────────────────┘
▼ Replay events to derive current state
┌──────────────────────────────────────────┐
│ Order: id=1, amount=50, items=2, │
│ status=delivered, tracking=ABC123 │
└──────────────────────────────────────────┘

Pattern 2: CQRS (Command Query Responsibility Segregation)

Separate the write model (commands) from the read model (queries), connected by a stream of events.

┌──────────┐ Commands ┌──────────┐ Events ┌──────────┐
│ Client │─────────────▶│ Write │───────────▶│ Read │
│ (Write) │ │ Model │ (Stream) │ Model │
└──────────┘ └──────────┘ └──────────┘
┌──────────┐ Queries │
│ Client │──────────────────────────────────────────┘
│ (Read) │
└──────────┘

Quick Reference: Choosing a Stream Processor

FactorSpark StreamingFlinkKafka Streams
LatencySecondsMillisecondsMilliseconds
DeploymentCluster (YARN, K8s)Cluster (YARN, K8s)Library (embedded)
StateLimitedRich, queryableEmbedded, backed by Kafka
SQL supportExcellentGoodKSQL (separate)
Best forSpark shops, batch + streamComplex event processingKafka-native microservices

Next Steps