PySpark, from the ground up Lesson 53 / 60

Stateful operations: aggregations, sessions, and the state store

Where Spark Streaming keeps the state between micro-batches, the standard stateful patterns, and when to drop down to mapGroupsWithState.

Last lesson we waved a hand and said “Spark keeps a running count for each window in some state somewhere.” This lesson is that somewhere: where state lives between micro-batches, how it survives restarts, what the standard stateful patterns are, and when you need to escape the high-level API and write a state machine yourself.

The state store

Structured Streaming runs as a sequence of micro-batches. Each batch processes some new input rows, updates an aggregation or a join, and produces some output. Between batches, the partial results — the running counts, the pending join rows, the per-key buffers — have to live somewhere.

That somewhere is the state store. Conceptually it’s a per-partition, per-operator key-value store. For each stateful operator in your query (a windowed aggregation, a stream-stream join, a deduplication), Spark partitions the keys across executors and gives each partition its own slice of state.

Two implementations ship with Spark:

  • HDFSBackedStateStoreProvider — the default in older versions. State lives in an in-memory hash map per partition, with the changelog written to HDFS-compatible storage at each batch commit. Fast for small state. Falls over hard when state gets big — the in-memory map has to fit in executor heap, and a partition with millions of keys causes long GC pauses.
  • RocksDBStateStoreProvider — added in Spark 3.2 and the recommended choice in 3.x. State lives in an embedded RocksDB instance on local disk, with changelog uploaded to remote storage on commit. Trades a little per-access latency for the ability to hold orders of magnitude more state without OOMing the executor.

Switch to RocksDB with one config:

spark.conf.set(
    "spark.sql.streaming.stateStore.providerClass",
    "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider",
)

For anything beyond a toy job, do this. Heap-backed state is a footgun in production.

The state store is also what makes streaming fault-tolerant. On every batch commit, the changelog is uploaded to your checkpoint location. If the driver dies and the job restarts, Spark replays the changelog and reconstructs the state, then resumes from the last committed offset. Lose the checkpoint directory and you lose your state — treat it like a database, back it up, don’t put it on ephemeral storage.

The standard stateful operations

Three patterns cover most needs.

Windowed aggregation

The bread and butter, covered last lesson. Each (window, group-key) tuple has a running aggregation buffer in the state store; the watermark tells Spark when to release it.

from pyspark.sql.functions import window, count, sum

agg = (events
    .withWatermark("event_time", "5 minutes")
    .groupBy(window("event_time", "10 minutes"), "user_id")
    .agg(count("*").alias("events"),
         sum("amount").alias("total_amount")))

State size scales as (number of distinct user_ids active in the watermark window) × (number of unclosed windows). The watermark is what bounds it.

You can also do non-windowed aggregations:

totals = events.groupBy("user_id").agg(sum("amount").alias("lifetime_total"))

This works, but state grows forever — every user_id ever seen stays in state. Only do this if the key space is genuinely bounded (a small enum, a known list of customers) or if you have a TTL story via mapGroupsWithState.

Stream-stream joins

Joining two streams is conceptually wild: when a row from stream A arrives, the matching row from stream B may not have arrived yet. So Spark buffers each side, waiting for matches.

from pyspark.sql.functions import expr

clicks_w = clicks.withWatermark("click_time", "10 minutes")
impressions_w = impressions.withWatermark("imp_time", "30 minutes")

joined = clicks_w.join(
    impressions_w,
    expr("""
        click_id = imp_id
        AND click_time >= imp_time
        AND click_time <= imp_time + interval 1 hour
    """),
)

For inner stream-stream joins to have bounded state, you need two things:

  1. A watermark on each side, so old rows can be expired.
  2. A time constraint in the join condition that bounds how far apart the matched rows can be on event time. Above, it’s click_time <= imp_time + 1 hour.

Without those, state on either side grows forever. Spark will let you write the join — it just won’t be a good idea.

Outer joins (left_outer, right_outer) work too, with the caveat that emitting a “no match” row has to wait for the watermark to advance past the time bound. So expect emission to lag by the size of your time window.

Deduplication

Streams have duplicates. Producers retry, Kafka delivers at-least-once, mobile clients double-fire. The streaming dropDuplicates keeps a small state per key so it can detect repeats:

deduped = (events
    .withWatermark("event_time", "1 hour")
    .dropDuplicates(["event_id"]))

Each event_id lives in state until the watermark passes its event_time + lateness. With a 1-hour watermark, you catch duplicates within a 1-hour window. State size is proportional to (event rate) × (watermark interval) × (id size). At 1,000 events/sec with hourly watermark, that’s 3.6 million ids in state — fine for RocksDB, painful for the heap-backed store.

Deduplication without a watermark is also legal and also a way to OOM your job after a few days. Always pair with a watermark unless you really, truly know your key space is bounded.

When the standard ops aren’t enough: mapGroupsWithState

The high-level API covers maybe 80% of streaming needs. The other 20% need an actual per-key state machine — your own code that owns the state, decides when to update, decides when to emit, decides when to time out.

That’s mapGroupsWithState (and its sibling flatMapGroupsWithState). You write a function that runs once per (key, batch) pair. It receives the new events for that key, the current state, and a timeout handle. It returns the new state and (optionally) some output rows.

The classic use case is sessionization: group events into sessions where a session ends after N minutes of inactivity. Watermarked windows can’t do this — windows are fixed-size, sessions are variable-length and depend on user behavior.

Here’s a sessionizer that emits a session record once a user has been idle for 30 minutes:

from pyspark.sql.functions import col
from pyspark.sql.streaming import GroupState, GroupStateTimeout
from typing import Iterator
from dataclasses import dataclass

@dataclass
class SessionState:
    start: int       # epoch seconds
    last_seen: int
    event_count: int

@dataclass
class SessionRow:
    user_id: str
    start: int
    end: int
    event_count: int

def update_session(
    user_id: str,
    events: Iterator,
    state: GroupState,
) -> Iterator[SessionRow]:
    if state.hasTimedOut:
        # 30 minutes of inactivity → emit and clear
        s = SessionState(**state.get)
        yield SessionRow(user_id, s.start, s.last_seen, s.event_count)
        state.remove()
        return

    if state.exists:
        s = SessionState(**state.get)
    else:
        s = None

    for ev in events:
        ts = ev.event_time_epoch
        if s is None:
            s = SessionState(start=ts, last_seen=ts, event_count=1)
        else:
            s.last_seen = max(s.last_seen, ts)
            s.event_count += 1

    if s is not None:
        state.update(s.__dict__)
        # Schedule a timeout 30 minutes after last_seen
        state.setTimeoutTimestamp(s.last_seen * 1000 + 30 * 60 * 1000)

    return iter([])  # nothing to emit until timeout

sessions = (events
    .groupByKey(lambda r: r.user_id)
    .flatMapGroupsWithState(
        outputMode="append",
        timeoutConf=GroupStateTimeout.EventTimeTimeout,
    )(update_session))

A few things worth pausing on:

  • GroupState is the per-key state handle. state.get, state.update(...), state.remove(), state.exists, state.hasTimedOut. That’s the API.
  • setTimeoutTimestamp schedules a wakeup. When event time crosses that timestamp, Spark calls update_session again with no new events but state.hasTimedOut == True, giving you a chance to emit and clean up.
  • Two timeout modes: ProcessingTimeTimeout (wall clock) and EventTimeTimeout (driven by the watermark). Use event-time timeouts whenever event time matters — which is almost always.
  • The output mode affects what you can return. append requires you to never emit the same row twice; update lets you re-emit a key as it changes.

The trade-off: mapGroupsWithState gives you total control. It also gives you total responsibility for state size — there’s no automatic eviction beyond what your timeout logic does. Forget to call state.remove() somewhere and your job leaks state until it dies.

State size monitoring

Every batch in the Streaming UI reports state metrics:

  • stateOperators[*].numRowsTotal — rows currently in state for that operator.
  • stateOperators[*].memoryUsedBytes — heap or RocksDB memory.
  • stateOperators[*].numRowsDroppedByWatermark — late rows ignored.

Plot total state rows over time. A healthy job either stabilizes (bounded state) or grows linearly with traffic (and that’s fine if you’ve sized for it). A job that grows superlinearly or never plateaus is leaking — usually a missing watermark, a key cardinality you didn’t anticipate, or a mapGroupsWithState that forgets to clean up.

Also watch RocksDB compaction time in the metrics. If compaction starts dominating batch latency, your state is bigger than your local SSD wants to handle and you should either shard by partition more aggressively, drop less-useful state, or move some of the work to a batch job.

Practical checklist

Before shipping a stateful streaming query:

  • Watermark on every aggregation, join side, and dedup. Yes, every one. The exceptions are bounded-key-space cases, and they’re rarer than you think.
  • Pick RocksDB unless your state is genuinely tiny. Heap-backed is for prototypes.
  • Checkpoint to durable, replicated storage. S3, GCS, Azure Blob, HDFS. Not local disk, not /tmp.
  • Test restart, including a forced kill. Bring the job up against the existing checkpoint and verify state is recovered correctly.
  • Plot state size, watermark, and dropped rows from day one. These are your early-warning system.

Next lesson: where the output of all this stateful machinery actually goes. Output modes (append, update, complete), the built-in sinks, the foreachBatch escape hatch, and the upsert pattern that gives you exactly-once writes when the rest of the pipeline is at-least-once.


References: Apache Spark Structured Streaming Programming Guide, sections on stateful operations, arbitrary stateful operations, and stream-stream joins (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html); RocksDB state store configuration documentation. Retrieved 2026-05-01.

Search