If there’s one part of Structured Streaming that trips up almost everyone the first time, it’s this: there are two clocks, and they disagree. Until you internalize that, your aggregations will produce numbers that look right at first glance and are quietly, subtly wrong.
This lesson is about those two clocks, and the mechanism Spark gives you to reconcile them: the watermark.
The two times
Every event flowing through your pipeline has at least two timestamps associated with it.
Event time is when the thing actually happened. The user clicked “buy” at 10:03:47. The IoT sensor recorded a temperature reading at 10:03:47. That moment is an immutable fact about the world.
Processing time is when Spark sees the row. The click traveled through a CDN, sat in a Kafka partition for a few seconds, got picked up by a micro-batch trigger — Spark might process it at 10:03:51, or 10:04:30, or, if Kafka was lagging, 10:18:00.
In a perfect synchronous world, these two would be equal. In real systems, they’re never equal. Mobile clients buffer events when offline and flush them an hour later. Network blips cause retries. A producer process restarts and replays its commit log. A batch job ingests yesterday’s CSV file today. All of these produce events with old event times arriving at fresh processing times.
Now imagine you want to compute “page views per 10-minute window.” The naive thing is:
from pyspark.sql.functions import window, count
views_by_processing_time = (events
.groupBy(window("processing_ts", "10 minutes"), "page")
.agg(count("*").alias("views")))
This is easy and feels right and is wrong. You’re not measuring “views in the 10:00-10:10 window of the world.” You’re measuring “rows Spark happened to ingest between 10:00 and 10:10.” Those are different sets — and the gap between them is wherever your reporting bugs come from.
What you almost always want is event-time windowing:
views_by_event_time = (events
.groupBy(window("event_time", "10 minutes"), "page")
.agg(count("*").alias("views")))
window("event_time", "10 minutes") tells Spark: bucket each row into a 10-minute window based on its event_time column. A row with event_time = 10:03:47 goes into the [10:00, 10:10) window regardless of when Spark processes it. A row with event_time = 09:58:12 that happens to arrive at processing time 10:05 goes into the [09:50, 10:00) window — not the current one.
That’s the model. Now the problem.
The unbounded-state problem
If Spark accepts late events forever, the state for past windows can never be released. Every 10-minute window has its running count sitting in memory (or RocksDB, or wherever your state store lives), waiting for the possibility that one more straggler from 09:50 might still show up. After a week of running, you have 1,008 ten-minute windows, each holding state for every page key that ever appeared. After a month, you have 4,320. Memory grows without bound, the state store gets slower, and eventually the streaming job dies.
You need a way to tell Spark: “after this point, stop waiting. Close the window, emit the final result, throw away the state.”
That’s what a watermark is.
What a watermark actually is
A watermark is a guarantee from you to Spark that no more events with event_time earlier than some threshold X will arrive. Once Spark observes that guarantee for a particular window — i.e. once the watermark has moved past the window’s end — it considers the window closed, finalizes the aggregation, and discards its state.
Spark computes the watermark continuously. The basic policy is:
watermark = max(event_time observed so far) − lateness_budget
You configure the lateness budget. That’s the knob. A 5-minute budget says: “Spark, if you’ve seen an event at 10:14, assume nothing earlier than 10:09 will still show up.”
The API:
from pyspark.sql.functions import window, count
aggregated = (events
.withWatermark("event_time", "5 minutes")
.groupBy(window("event_time", "10 minutes"), "page")
.agg(count("*").alias("views")))
withWatermark("event_time", "5 minutes") does two things at once:
- Tells Spark which column carries event time (so it knows what to track).
- Sets the lateness budget — events arriving more than 5 minutes after the current watermark are considered too late and dropped silently.
That second part is critical and easy to miss. A watermark trades correctness for boundedness. You’re explicitly saying “I’d rather drop 0.1% of late stragglers than let my state grow forever.” The size of that 0.1% is your choice; tune the lateness budget based on how late your real events arrive.
Worked example: tracing through concrete events
Let’s run actual events through this and watch what happens. Setup:
- Window: 10 minutes, tumbling. So windows are
[10:00, 10:10),[10:10, 10:20), etc. - Watermark lateness: 5 minutes.
- Events arrive in this processing-time order (their
event_timeshown in brackets):
| # | processing_ts | event_time | window |
|---|---|---|---|
| 1 | 10:00:30 | 10:00 | [10:00,10:10) |
| 2 | 10:09:10 | 10:09 | [10:00,10:10) |
| 3 | 10:11:05 | 10:11 | [10:10,10:20) |
| 4 | 10:11:40 | 10:08 LATE | [10:00,10:10) |
| 5 | 10:14:20 | 10:14 | [10:10,10:20) |
| 6 | 10:18:05 | 10:06 TOO LATE | dropped |
Walk through it:
Event 1 (event_time 10:00). First event seen. Max event time so far = 10:00. Watermark = 10:00 − 5min = 09:55. The [10:00,10:10) window has running count = 1.
Event 2 (event_time 10:09). Same window. Max event time = 10:09. Watermark = 10:09 − 5min = 10:04. Window count is now 2. Watermark is at 10:04, still inside the [10:00,10:10) window, so the window stays open.
Event 3 (event_time 10:11). Lands in the next window. Max event time = 10:11, watermark = 10:06. Still inside [10:00,10:10), so that window remains open. New window [10:10,10:20) count = 1.
Event 4 (event_time 10:08). Late! Its event_time is 10:08, but processing time is 10:11:40, well after the window’s nominal end. Is it within the watermark budget? Current watermark is 10:06, and 10:08 > 10:06, so yes, it’s accepted. Window [10:00,10:10) count goes to 3. This is the watermark earning its keep — late events get included as long as they’re not too late.
Event 5 (event_time 10:14). Max event time = 10:14, watermark = 10:09. Now, 10:09 has reached the end of [10:00,10:10). Spark closes that window, emits the final count of 3, and drops the state for it. New watermark also means future events from before 10:09 will be rejected.
Event 6 (event_time 10:06). Watermark is 10:09. 10:06 < 10:09, so this event is dropped silently. It’s too late. The window is gone. Spark won’t reopen it.
A few things to take away from that trace:
- The watermark moves only forward. Even if event 6 had been event time 10:00:00, it wouldn’t pull the watermark back.
- Late events within the budget update the still-open window — you don’t get a separate “late update” event; the running aggregation just absorbs them.
- The window closes when the watermark passes its end. The output (in append mode) for
[10:00,10:10)only emits after event 5 is processed, even though the wall clock is well past 10:10. - Events outside the budget are dropped. There’s no error, no warning in your data — just a counter in the metrics.
That last point is why monitoring matters.
Watermark monitoring
In the Spark UI’s Streaming tab, every micro-batch shows two values you should learn to read:
- Event time max — the largest event time observed in this batch.
- Watermark — the current watermark value.
Plot them over time. They should march upward together, with a roughly constant gap (your lateness budget). If the watermark stops moving, your stream is stuck (no new max event times — usually a stalled source). If the gap blows out, your input is suddenly very late.
Also watch the numRowsDroppedByWatermark metric. A nonzero value means you’re dropping late events; a steadily growing value means your lateness budget is too small for your real arrival pattern.
Choosing the lateness budget
There’s no formula. You measure your data. Compute, on a representative sample:
from pyspark.sql.functions import col, unix_timestamp
lateness = (events.select(
(unix_timestamp("processing_ts") - unix_timestamp("event_time")).alias("lag_seconds")
))
lateness.summary("min", "50%", "95%", "99%", "max").show()
The 99th percentile is usually a sensible budget — accept 1% loss in exchange for bounded state. If you literally cannot tolerate dropping events, you don’t want streaming aggregation at all; you want an append-only fact table written to a lake, with a separate batch job that recomputes recent days each night.
A working query end-to-end
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, count, col, from_json
from pyspark.sql.types import StructType, StringType, TimestampType
spark = SparkSession.builder.appName("EventTimeDemo").getOrCreate()
schema = StructType().add("page", StringType()).add("event_time", TimestampType())
raw = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "pageviews")
.load())
events = (raw
.select(from_json(col("value").cast("string"), schema).alias("e"))
.select("e.*"))
windowed = (events
.withWatermark("event_time", "5 minutes")
.groupBy(window("event_time", "10 minutes"), "page")
.agg(count("*").alias("views")))
query = (windowed.writeStream
.outputMode("append")
.format("console")
.option("checkpointLocation", "/tmp/ck/views")
.start())
outputMode("append") here is the natural pair with watermarks: each window is emitted exactly once, after the watermark passes its end. We’ll spend the next two lessons unpacking output modes more carefully.
What we haven’t talked about yet is what’s happening behind the watermark — where Spark stores the running counts, how it survives a job restart, and what other stateful patterns exist beyond simple aggregation. That’s lesson 53: the state store, sessionization, and mapGroupsWithState.
References: Apache Spark Structured Streaming Programming Guide, sections on event time and watermarking (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking). Retrieved 2026-05-01.