L’ultima lezione abbiamo svolazzato con la mano e detto “Spark tiene un running count per ogni finestra in qualche stato da qualche parte”. Questa lezione è quel qualche parte: dove vive lo stato tra i micro-batch, come sopravvive ai restart, quali sono i pattern stateful standard, e quando hai bisogno di scappare dall’API di alto livello e scriverti una macchina a stati a mano.
Lo state store
Structured Streaming gira come una sequenza di micro-batch. Ogni batch processa qualche nuova riga di input, aggiorna un’aggregazione o un join, e produce un po’ di output. Tra i batch, i risultati parziali (i running count, le righe di join in attesa, i buffer per chiave) devono vivere da qualche parte.
Quel da qualche parte è lo state store. Concettualmente è un key-value store per partizione e per operatore. Per ogni operatore stateful nella tua query (un’aggregazione windowed, uno stream-stream join, una deduplicazione), Spark partiziona le chiavi tra gli executor e dà a ogni partizione la sua fetta di stato.
Con Spark vengono spedite due implementazioni:
- HDFSBackedStateStoreProvider: il default nelle versioni più vecchie. Lo stato vive in una hash map in memoria per partizione, con il changelog scritto su storage HDFS-compatibile a ogni commit di batch. Veloce per stati piccoli. Crolla pesantemente quando lo stato diventa grande: la mappa in memoria deve stare nell’heap dell’executor, e una partizione con milioni di chiavi causa lunghe pause GC.
- RocksDBStateStoreProvider: aggiunto in Spark 3.2 e la scelta consigliata nella 3.x. Lo stato vive in un’istanza RocksDB embedded sul disco locale, con il changelog caricato su storage remoto al commit. Scambia un po’ di latenza per accesso con la capacità di tenere ordini di grandezza più stato senza far andare in OOM l’executor.
Passa a RocksDB con un solo config:
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider",
)
Per qualunque cosa al di là di un job giocattolo, fai così. Lo stato heap-backed è una pistola puntata al piede in produzione.
Lo state store è anche ciò che rende lo streaming fault-tolerant. A ogni commit di batch, il changelog viene caricato nella tua checkpoint location. Se il driver muore e il job riparte, Spark rigioca il changelog e ricostruisce lo stato, poi riprende dall’ultimo offset committato. Perdi la directory di checkpoint e perdi il tuo stato: trattala come un database, fai backup, non metterla su storage effimero.
Le operazioni stateful standard
Tre pattern coprono la maggior parte dei bisogni.
Aggregazione windowed
Il pane quotidiano, coperto la lezione scorsa. Ogni tupla (window, group-key) ha un buffer di aggregazione corrente nello state store; il watermark dice a Spark quando rilasciarlo.
from pyspark.sql.functions import window, count, sum
agg = (events
.withWatermark("event_time", "5 minutes")
.groupBy(window("event_time", "10 minutes"), "user_id")
.agg(count("*").alias("events"),
sum("amount").alias("total_amount")))
La dimensione dello stato scala come (numero di user_id distinti attivi nella finestra del watermark) × (numero di finestre non chiuse). Il watermark è ciò che la limita.
Puoi anche fare aggregazioni non-windowed:
totals = events.groupBy("user_id").agg(sum("amount").alias("lifetime_total"))
Funziona, ma lo stato cresce per sempre: ogni user_id mai visto resta nello stato. Fallo solo se lo spazio delle chiavi è genuinamente limitato (un piccolo enum, una lista nota di clienti) o se hai una storia di TTL via mapGroupsWithState.
Stream-stream join
Joinare due stream è concettualmente folle: quando una riga dallo stream A arriva, la riga corrispondente dallo stream B potrebbe non essere ancora arrivata. Quindi Spark bufferizza ogni lato, in attesa dei match.
from pyspark.sql.functions import expr
clicks_w = clicks.withWatermark("click_time", "10 minutes")
impressions_w = impressions.withWatermark("imp_time", "30 minutes")
joined = clicks_w.join(
impressions_w,
expr("""
click_id = imp_id
AND click_time >= imp_time
AND click_time <= imp_time + interval 1 hour
"""),
)
Perché gli inner stream-stream join abbiano stato limitato, ti servono due cose:
- Un watermark su ogni lato, così le righe vecchie possono essere fatte scadere.
- Un vincolo temporale nella condizione di join che limiti quanto distanti possono essere le righe matchate sull’event time. Sopra è
click_time <= imp_time + 1 hour.
Senza queste, lo stato su ciascun lato cresce per sempre. Spark ti lascerà scrivere il join, ma non sarà una buona idea.
Anche gli outer join (left_outer, right_outer) funzionano, con l’avvertenza che emettere una riga “no match” deve aspettare che il watermark superi il limite temporale. Quindi aspettati che l’emissione sia in ritardo della dimensione della tua finestra temporale.
Deduplicazione
Gli stream hanno duplicati. I producer fanno retry, Kafka consegna at-least-once, i client mobile sparano due volte. La dropDuplicates streaming tiene un piccolo stato per chiave per poter rilevare le ripetizioni:
deduped = (events
.withWatermark("event_time", "1 hour")
.dropDuplicates(["event_id"]))
Ogni event_id vive nello stato finché il watermark non passa il suo event_time + lateness. Con un watermark di 1 ora, prendi i duplicati dentro una finestra di 1 ora. La dimensione dello stato è proporzionale a (event rate) × (intervallo del watermark) × (dimensione id). A 1.000 eventi/sec con watermark orario sono 3,6 milioni di id nello stato: bene per RocksDB, doloroso per lo store heap-backed.
La deduplicazione senza watermark è anche legale ed è anche un modo per far andare in OOM il tuo job dopo qualche giorno. Accoppiala sempre con un watermark, a meno che tu sappia davvero, davvero che il tuo spazio delle chiavi è limitato.
Quando le op standard non bastano: mapGroupsWithState
L’API di alto livello copre forse l’80% dei bisogni di streaming. L’altro 20% ha bisogno di una vera macchina a stati per chiave: il tuo codice che possiede lo stato, decide quando aggiornare, decide quando emettere, decide quando andare in timeout.
Quello è mapGroupsWithState (e il fratello flatMapGroupsWithState). Scrivi una funzione che gira una volta per coppia (chiave, batch). Riceve i nuovi eventi per quella chiave, lo stato corrente, e un timeout handle. Restituisce il nuovo stato e (opzionalmente) qualche riga di output.
Il caso classico è la sessionizzazione: raggruppare eventi in sessioni dove una sessione finisce dopo N minuti di inattività. Le finestre con watermark non possono fare questo: le finestre sono di dimensione fissa, le sessioni sono di lunghezza variabile e dipendono dal comportamento dell’utente.
Ecco un sessionizer che emette un record di sessione quando un utente è stato inattivo per 30 minuti:
from pyspark.sql.functions import col
from pyspark.sql.streaming import GroupState, GroupStateTimeout
from typing import Iterator
from dataclasses import dataclass
@dataclass
class SessionState:
start: int # epoch seconds
last_seen: int
event_count: int
@dataclass
class SessionRow:
user_id: str
start: int
end: int
event_count: int
def update_session(
user_id: str,
events: Iterator,
state: GroupState,
) -> Iterator[SessionRow]:
if state.hasTimedOut:
# 30 minuti di inattività → emetti e pulisci
s = SessionState(**state.get)
yield SessionRow(user_id, s.start, s.last_seen, s.event_count)
state.remove()
return
if state.exists:
s = SessionState(**state.get)
else:
s = None
for ev in events:
ts = ev.event_time_epoch
if s is None:
s = SessionState(start=ts, last_seen=ts, event_count=1)
else:
s.last_seen = max(s.last_seen, ts)
s.event_count += 1
if s is not None:
state.update(s.__dict__)
# Schedula un timeout 30 minuti dopo last_seen
state.setTimeoutTimestamp(s.last_seen * 1000 + 30 * 60 * 1000)
return iter([]) # niente da emettere fino al timeout
sessions = (events
.groupByKey(lambda r: r.user_id)
.flatMapGroupsWithState(
outputMode="append",
timeoutConf=GroupStateTimeout.EventTimeTimeout,
)(update_session))
Un paio di cose su cui vale la pena fermarsi:
GroupStateè l’handle di stato per chiave.state.get,state.update(...),state.remove(),state.exists,state.hasTimedOut. Quella è l’API.setTimeoutTimestampschedula una sveglia. Quando l’event time supera quel timestamp, Spark chiamaupdate_sessiondi nuovo senza nuovi eventi ma constate.hasTimedOut == True, dandoti la possibilità di emettere e pulire.- Due modalità di timeout:
ProcessingTimeTimeout(orologio da parete) edEventTimeTimeout(guidato dal watermark). Usa i timeout in event time ogni volta che l’event time conta, cioè quasi sempre. - L’output mode influenza cosa puoi restituire.
appendti chiede di non emettere mai due volte la stessa riga;updateti permette di ri-emettere una chiave man mano che cambia.
Il trade-off: mapGroupsWithState ti dà controllo totale. Ti dà anche responsabilità totale per la dimensione dello stato: non c’è eviction automatica oltre a quello che fa la tua logica di timeout. Dimentica di chiamare state.remove() da qualche parte e il tuo job perde stato finché non muore.
Monitoraggio della dimensione dello stato
Ogni batch nell’UI di Streaming riporta metriche di stato:
stateOperators[*].numRowsTotal— righe attualmente nello stato per quell’operatore.stateOperators[*].memoryUsedBytes— memoria heap o RocksDB.stateOperators[*].numRowsDroppedByWatermark— righe in ritardo ignorate.
Plotta le righe di stato totali nel tempo. Un job sano o si stabilizza (stato limitato) o cresce linearmente con il traffico (e va bene se hai dimensionato per quello). Un job che cresce in modo superlineare o che non si stabilizza mai sta perdendo stato: di solito un watermark mancante, una cardinalità di chiavi che non avevi previsto, o un mapGroupsWithState che dimentica di pulire.
Tieni d’occhio anche il tempo di compaction di RocksDB nelle metriche. Se la compaction inizia a dominare la latenza del batch, il tuo stato è più grande di quello che il tuo SSD locale vuole gestire e dovresti o shardare per partizione più aggressivamente, scartare stato meno utile, o spostare un po’ del lavoro su un job batch.
Checklist pratica
Prima di mettere in produzione una query streaming stateful:
- Watermark su ogni aggregazione, lato di join, e dedup. Sì, ognuno. Le eccezioni sono i casi a spazio di chiavi limitato, e sono più rare di quanto pensi.
- Scegli RocksDB a meno che il tuo stato non sia genuinamente piccolissimo. L’heap-backed è per i prototipi.
- Checkpoint su storage durevole e replicato. S3, GCS, Azure Blob, HDFS. Non disco locale, non
/tmp. - Testa il restart, incluso un kill forzato. Tira su il job contro il checkpoint esistente e verifica che lo stato sia recuperato correttamente.
- Plotta dimensione dello stato, watermark e righe scartate fin dal primo giorno. Sono il tuo sistema di allerta precoce.
Prossima lezione: dove va a finire davvero l’output di tutto questo macchinario stateful. Output mode (append, update, complete), i sink integrati, l’escape hatch foreachBatch, e il pattern di upsert che ti dà scritture exactly-once quando il resto della pipeline è at-least-once.
Riferimenti: Apache Spark Structured Streaming Programming Guide, sezioni su operazioni stateful, operazioni stateful arbitrarie, e stream-stream join (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html); documentazione di configurazione del state store RocksDB. Recuperato il 2026-05-01.