Lecția trecută am dat din mână și am spus „Spark ține un running count pentru fiecare fereastră într-un state undeva.” Lecția asta e despre acel undeva: unde trăiește state-ul între micro-batch-uri, cum supraviețuiește restart-urilor, care sunt pattern-urile stateful standard și când trebuie să scapi de API-ul de nivel înalt și să scrii singur o state machine.
State store-ul
Structured Streaming rulează ca o secvență de micro-batch-uri. Fiecare batch procesează niște rânduri noi de input, actualizează o agregare sau un join și produce niște output. Între batch-uri, rezultatele parțiale (running counts, rândurile pending de join, buffer-urile per-key) trebuie să trăiască undeva.
Acel undeva este state store-ul. Conceptual, este un key-value store per-partiție, per-operator. Pentru fiecare operator stateful din query-ul tău (o agregare windowed, un stream-stream join, un dedup), Spark partiționează cheile pe executori și dă fiecărei partiții propria felie de state.
Două implementări vin la pachet cu Spark:
- HDFSBackedStateStoreProvider: default-ul în versiunile mai vechi. State-ul trăiește într-un hash map in-memory per partiție, cu changelog-ul scris către storage compatibil HDFS la fiecare batch commit. Rapid pentru state mic. Cade urât când state-ul devine mare; map-ul in-memory trebuie să încapă în heap-ul executorului, iar o partiție cu milioane de chei cauzează pauze GC lungi.
- RocksDBStateStoreProvider: adăugat în Spark 3.2 și alegerea recomandată în 3.x. State-ul trăiește într-o instanță RocksDB embedded pe disk-ul local, cu changelog-ul încărcat către storage remote la commit. Schimbă puțină latență per-acces pe abilitatea de a ține ordine de mărime mai mult state fără să dea OOM la executor.
Schimbi la RocksDB cu un singur config:
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider",
)
Pentru orice peste un toy job, fă asta. Heap-backed state este un footgun în producție.
State store-ul este și ce face streaming-ul fault-tolerant. La fiecare batch commit, changelog-ul e încărcat la checkpoint location-ul tău. Dacă driver-ul moare și jobul repornește, Spark redă changelog-ul și reconstruiește state-ul, apoi reia de la ultimul offset commitat. Pierzi directorul de checkpoint și îți pierzi state-ul; tratează-l ca pe o bază de date, fă-i backup, nu-l pune pe storage efemer.
Operațiile stateful standard
Trei pattern-uri acoperă majoritatea nevoilor.
Agregare windowed
Pâinea cu unt, acoperită lecția trecută. Fiecare tuplă (window, group-key) are un buffer de agregare running în state store; watermark-ul îi spune lui Spark când să-l elibereze.
from pyspark.sql.functions import window, count, sum
agg = (events
.withWatermark("event_time", "5 minutes")
.groupBy(window("event_time", "10 minutes"), "user_id")
.agg(count("*").alias("events"),
sum("amount").alias("total_amount")))
Mărimea state-ului scalează cu (numărul de user_ids distincte active în fereastra watermark-ului) × (numărul de ferestre nu încă închise). Watermark-ul îl mărginește.
Poți face și agregări non-windowed:
totals = events.groupBy("user_id").agg(sum("amount").alias("lifetime_total"))
Asta funcționează, dar state-ul crește la nesfârșit: fiecare user_id văzut vreodată rămâne în state. Fă asta doar dacă spațiul de chei este genuin mărginit (un enum mic, o listă cunoscută de clienți) sau dacă ai un TTL story prin mapGroupsWithState.
Stream-stream joins
Join-uirea a două stream-uri este conceptual sălbatică: când un rând de la stream A sosește, rândul corespunzător de la stream B s-ar putea să nu fi ajuns încă. Așa că Spark buffer-uiește fiecare parte, așteptând match-uri.
from pyspark.sql.functions import expr
clicks_w = clicks.withWatermark("click_time", "10 minutes")
impressions_w = impressions.withWatermark("imp_time", "30 minutes")
joined = clicks_w.join(
impressions_w,
expr("""
click_id = imp_id
AND click_time >= imp_time
AND click_time <= imp_time + interval 1 hour
"""),
)
Pentru ca inner stream-stream joins să aibă state mărginit, ai nevoie de două lucruri:
- Un watermark pe fiecare parte, ca rândurile vechi să poată fi expirate.
- O constrângere de timp în condiția de join care mărginește cât de departe pot fi rândurile match-uite în event time. Mai sus, este
click_time <= imp_time + 1 hour.
Fără acelea, state-ul pe oricare parte crește la nesfârșit. Spark te va lăsa să scrii join-ul; doar nu va fi o idee bună.
Outer joins (left_outer, right_outer) funcționează și ele, cu reținerea că emiterea unui rând „no match” trebuie să aștepte ca watermark-ul să avanseze peste limita de timp. Așa că așteaptă-te ca emiterea să întârzie cu mărimea ferestrei tale de timp.
Deduplicare
Stream-urile au duplicate. Producătorii fac retry, Kafka livrează at-least-once, clienții mobili trag de două ori. dropDuplicates în streaming ține un state mic per cheie ca să poată detecta repetările:
deduped = (events
.withWatermark("event_time", "1 hour")
.dropDuplicates(["event_id"]))
Fiecare event_id trăiește în state până când watermark-ul îi depășește event_time + lateness. Cu un watermark de 1 oră, prinzi duplicatele într-o fereastră de 1 oră. Mărimea state-ului e proporțională cu (rata evenimentelor) × (intervalul watermark-ului) × (mărimea id-ului). La 1.000 evenimente/sec cu watermark orar, asta înseamnă 3,6 milioane de id-uri în state; bine pentru RocksDB, dureros pentru store-ul heap-backed.
Deduplicarea fără watermark este și ea legală și e tot un mod de a-ți da OOM la job după câteva zile. Mereu pereche-o cu un watermark, decât dacă chiar, chiar știi că spațiul tău de chei e mărginit.
Când operațiile standard nu sunt suficiente: mapGroupsWithState
API-ul de nivel înalt acoperă poate 80% din nevoile de streaming. Restul de 20% au nevoie de o state machine reală per cheie: codul tău care deține state-ul, decide când să-l actualizeze, decide când să emită, decide când să facă timeout.
Asta e mapGroupsWithState (și sora ei flatMapGroupsWithState). Scrii o funcție care rulează o dată per pereche (key, batch). Primește evenimentele noi pentru acea cheie, state-ul curent și un timeout handle. Returnează state-ul nou și (opțional) niște rânduri de output.
Cazul de utilizare clasic e sessionization: gruparea evenimentelor în sesiuni unde o sesiune se termină după N minute de inactivitate. Ferestrele cu watermark nu pot face asta; ferestrele sunt de mărime fixă, sesiunile sunt de lungime variabilă și depind de comportamentul utilizatorului.
Iată un sessionizer care emite un session record odată ce un utilizator a fost inactiv 30 de minute:
from pyspark.sql.functions import col
from pyspark.sql.streaming import GroupState, GroupStateTimeout
from typing import Iterator
from dataclasses import dataclass
@dataclass
class SessionState:
start: int # epoch seconds
last_seen: int
event_count: int
@dataclass
class SessionRow:
user_id: str
start: int
end: int
event_count: int
def update_session(
user_id: str,
events: Iterator,
state: GroupState,
) -> Iterator[SessionRow]:
if state.hasTimedOut:
# 30 minutes of inactivity → emit and clear
s = SessionState(**state.get)
yield SessionRow(user_id, s.start, s.last_seen, s.event_count)
state.remove()
return
if state.exists:
s = SessionState(**state.get)
else:
s = None
for ev in events:
ts = ev.event_time_epoch
if s is None:
s = SessionState(start=ts, last_seen=ts, event_count=1)
else:
s.last_seen = max(s.last_seen, ts)
s.event_count += 1
if s is not None:
state.update(s.__dict__)
# Schedule a timeout 30 minutes after last_seen
state.setTimeoutTimestamp(s.last_seen * 1000 + 30 * 60 * 1000)
return iter([]) # nothing to emit until timeout
sessions = (events
.groupByKey(lambda r: r.user_id)
.flatMapGroupsWithState(
outputMode="append",
timeoutConf=GroupStateTimeout.EventTimeTimeout,
)(update_session))
Câteva lucruri pentru care merită să te oprești:
GroupStateeste handle-ul de state per cheie.state.get,state.update(...),state.remove(),state.exists,state.hasTimedOut. Acela e API-ul.setTimeoutTimestampprogramează un wakeup. Când event time depășește acel timestamp, Spark apeleazăupdate_sessiondin nou fără evenimente noi, dar custate.hasTimedOut == True, dându-ți o șansă să emiți și să curăți.- Două moduri de timeout:
ProcessingTimeTimeout(wall clock) șiEventTimeTimeout(driven de watermark). Folosește event-time timeouts oricând event time contează, ceea ce e aproape întotdeauna. - Output mode-ul afectează ce poți returna.
appendcere să nu emiți niciodată același rând de două ori;updateîți permite să re-emiți o cheie pe măsură ce se schimbă.
Compromisul: mapGroupsWithState îți dă control total. Îți dă și responsabilitate totală pentru mărimea state-ului; nu există evicție automată dincolo de ce face logica ta de timeout. Uiți să apelezi state.remove() undeva și jobul tău scurge state până moare.
Monitorizarea mărimii state-ului
Fiecare batch din UI-ul de Streaming raportează metrici de state:
stateOperators[*].numRowsTotal: rânduri curente în state pentru acel operator.stateOperators[*].memoryUsedBytes: memoria heap sau RocksDB.stateOperators[*].numRowsDroppedByWatermark: late rows ignorate.
Plotează rândurile totale de state în timp. Un job sănătos fie se stabilizează (state mărginit), fie crește liniar cu traficul (și asta e bine dacă l-ai dimensionat pentru asta). Un job care crește superliniar sau care nu plafonează niciodată scurge; de obicei un watermark lipsă, o cardinalitate de cheie pe care n-ai anticipat-o sau un mapGroupsWithState care uită să curețe.
Urmărește și timpul de RocksDB compaction în metrici. Dacă compaction-ul începe să domine latența batch-ului, state-ul tău e mai mare decât vrea SSD-ul tău local să gestioneze și ar trebui fie să faci shard pe partiție mai agresiv, să arunci state mai puțin util, fie să muți o parte din muncă într-un job batch.
Checklist practic
Înainte să trimiți în producție un query de streaming stateful:
- Watermark pe fiecare agregare, parte de join și dedup. Da, fiecare. Excepțiile sunt cazurile cu spațiu de chei mărginit și sunt mai rare decât crezi.
- Alege RocksDB decât dacă state-ul tău e genuin minuscul. Heap-backed e pentru prototipuri.
- Checkpoint către storage durabil, replicat. S3, GCS, Azure Blob, HDFS. Nu disk local, nu
/tmp. - Testează restart, inclusiv un kill forțat. Ridică jobul împotriva checkpoint-ului existent și verifică dacă state-ul e recuperat corect.
- Plotează mărimea state-ului, watermark-ul și rândurile aruncate din ziua unu. Acestea sunt sistemul tău de avertizare timpurie.
Lecția următoare: unde merge efectiv output-ul tuturor acestor mașinării stateful. Output modes (append, update, complete), sink-urile built-in, escape hatch-ul foreachBatch și pattern-ul de upsert care îți dă scrieri exactly-once când restul pipeline-ului e at-least-once.
Referințe: Apache Spark Structured Streaming Programming Guide, secțiunile despre stateful operations, arbitrary stateful operations și stream-stream joins (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html); documentația de configurare RocksDB state store. Consultat 2026-05-01.