PySpark, de la zero Lecția 53 / 60

Operatii stateful: agregari, sesiuni si state store

Unde tine Spark Streaming state-ul intre micro-batch-uri, pattern-urile stateful standard si cand sa cobori la mapGroupsWithState.

Lecția trecută am dat din mână și am spus „Spark ține un running count pentru fiecare fereastră într-un state undeva.” Lecția asta e despre acel undeva: unde trăiește state-ul între micro-batch-uri, cum supraviețuiește restart-urilor, care sunt pattern-urile stateful standard și când trebuie să scapi de API-ul de nivel înalt și să scrii singur o state machine.

State store-ul

Structured Streaming rulează ca o secvență de micro-batch-uri. Fiecare batch procesează niște rânduri noi de input, actualizează o agregare sau un join și produce niște output. Între batch-uri, rezultatele parțiale (running counts, rândurile pending de join, buffer-urile per-key) trebuie să trăiască undeva.

Acel undeva este state store-ul. Conceptual, este un key-value store per-partiție, per-operator. Pentru fiecare operator stateful din query-ul tău (o agregare windowed, un stream-stream join, un dedup), Spark partiționează cheile pe executori și dă fiecărei partiții propria felie de state.

Două implementări vin la pachet cu Spark:

  • HDFSBackedStateStoreProvider: default-ul în versiunile mai vechi. State-ul trăiește într-un hash map in-memory per partiție, cu changelog-ul scris către storage compatibil HDFS la fiecare batch commit. Rapid pentru state mic. Cade urât când state-ul devine mare; map-ul in-memory trebuie să încapă în heap-ul executorului, iar o partiție cu milioane de chei cauzează pauze GC lungi.
  • RocksDBStateStoreProvider: adăugat în Spark 3.2 și alegerea recomandată în 3.x. State-ul trăiește într-o instanță RocksDB embedded pe disk-ul local, cu changelog-ul încărcat către storage remote la commit. Schimbă puțină latență per-acces pe abilitatea de a ține ordine de mărime mai mult state fără să dea OOM la executor.

Schimbi la RocksDB cu un singur config:

spark.conf.set(
    "spark.sql.streaming.stateStore.providerClass",
    "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider",
)

Pentru orice peste un toy job, fă asta. Heap-backed state este un footgun în producție.

State store-ul este și ce face streaming-ul fault-tolerant. La fiecare batch commit, changelog-ul e încărcat la checkpoint location-ul tău. Dacă driver-ul moare și jobul repornește, Spark redă changelog-ul și reconstruiește state-ul, apoi reia de la ultimul offset commitat. Pierzi directorul de checkpoint și îți pierzi state-ul; tratează-l ca pe o bază de date, fă-i backup, nu-l pune pe storage efemer.

Operațiile stateful standard

Trei pattern-uri acoperă majoritatea nevoilor.

Agregare windowed

Pâinea cu unt, acoperită lecția trecută. Fiecare tuplă (window, group-key) are un buffer de agregare running în state store; watermark-ul îi spune lui Spark când să-l elibereze.

from pyspark.sql.functions import window, count, sum

agg = (events
    .withWatermark("event_time", "5 minutes")
    .groupBy(window("event_time", "10 minutes"), "user_id")
    .agg(count("*").alias("events"),
         sum("amount").alias("total_amount")))

Mărimea state-ului scalează cu (numărul de user_ids distincte active în fereastra watermark-ului) × (numărul de ferestre nu încă închise). Watermark-ul îl mărginește.

Poți face și agregări non-windowed:

totals = events.groupBy("user_id").agg(sum("amount").alias("lifetime_total"))

Asta funcționează, dar state-ul crește la nesfârșit: fiecare user_id văzut vreodată rămâne în state. Fă asta doar dacă spațiul de chei este genuin mărginit (un enum mic, o listă cunoscută de clienți) sau dacă ai un TTL story prin mapGroupsWithState.

Stream-stream joins

Join-uirea a două stream-uri este conceptual sălbatică: când un rând de la stream A sosește, rândul corespunzător de la stream B s-ar putea să nu fi ajuns încă. Așa că Spark buffer-uiește fiecare parte, așteptând match-uri.

from pyspark.sql.functions import expr

clicks_w = clicks.withWatermark("click_time", "10 minutes")
impressions_w = impressions.withWatermark("imp_time", "30 minutes")

joined = clicks_w.join(
    impressions_w,
    expr("""
        click_id = imp_id
        AND click_time >= imp_time
        AND click_time <= imp_time + interval 1 hour
    """),
)

Pentru ca inner stream-stream joins să aibă state mărginit, ai nevoie de două lucruri:

  1. Un watermark pe fiecare parte, ca rândurile vechi să poată fi expirate.
  2. O constrângere de timp în condiția de join care mărginește cât de departe pot fi rândurile match-uite în event time. Mai sus, este click_time <= imp_time + 1 hour.

Fără acelea, state-ul pe oricare parte crește la nesfârșit. Spark te va lăsa să scrii join-ul; doar nu va fi o idee bună.

Outer joins (left_outer, right_outer) funcționează și ele, cu reținerea că emiterea unui rând „no match” trebuie să aștepte ca watermark-ul să avanseze peste limita de timp. Așa că așteaptă-te ca emiterea să întârzie cu mărimea ferestrei tale de timp.

Deduplicare

Stream-urile au duplicate. Producătorii fac retry, Kafka livrează at-least-once, clienții mobili trag de două ori. dropDuplicates în streaming ține un state mic per cheie ca să poată detecta repetările:

deduped = (events
    .withWatermark("event_time", "1 hour")
    .dropDuplicates(["event_id"]))

Fiecare event_id trăiește în state până când watermark-ul îi depășește event_time + lateness. Cu un watermark de 1 oră, prinzi duplicatele într-o fereastră de 1 oră. Mărimea state-ului e proporțională cu (rata evenimentelor) × (intervalul watermark-ului) × (mărimea id-ului). La 1.000 evenimente/sec cu watermark orar, asta înseamnă 3,6 milioane de id-uri în state; bine pentru RocksDB, dureros pentru store-ul heap-backed.

Deduplicarea fără watermark este și ea legală și e tot un mod de a-ți da OOM la job după câteva zile. Mereu pereche-o cu un watermark, decât dacă chiar, chiar știi că spațiul tău de chei e mărginit.

Când operațiile standard nu sunt suficiente: mapGroupsWithState

API-ul de nivel înalt acoperă poate 80% din nevoile de streaming. Restul de 20% au nevoie de o state machine reală per cheie: codul tău care deține state-ul, decide când să-l actualizeze, decide când să emită, decide când să facă timeout.

Asta e mapGroupsWithState (și sora ei flatMapGroupsWithState). Scrii o funcție care rulează o dată per pereche (key, batch). Primește evenimentele noi pentru acea cheie, state-ul curent și un timeout handle. Returnează state-ul nou și (opțional) niște rânduri de output.

Cazul de utilizare clasic e sessionization: gruparea evenimentelor în sesiuni unde o sesiune se termină după N minute de inactivitate. Ferestrele cu watermark nu pot face asta; ferestrele sunt de mărime fixă, sesiunile sunt de lungime variabilă și depind de comportamentul utilizatorului.

Iată un sessionizer care emite un session record odată ce un utilizator a fost inactiv 30 de minute:

from pyspark.sql.functions import col
from pyspark.sql.streaming import GroupState, GroupStateTimeout
from typing import Iterator
from dataclasses import dataclass

@dataclass
class SessionState:
    start: int       # epoch seconds
    last_seen: int
    event_count: int

@dataclass
class SessionRow:
    user_id: str
    start: int
    end: int
    event_count: int

def update_session(
    user_id: str,
    events: Iterator,
    state: GroupState,
) -> Iterator[SessionRow]:
    if state.hasTimedOut:
        # 30 minutes of inactivity → emit and clear
        s = SessionState(**state.get)
        yield SessionRow(user_id, s.start, s.last_seen, s.event_count)
        state.remove()
        return

    if state.exists:
        s = SessionState(**state.get)
    else:
        s = None

    for ev in events:
        ts = ev.event_time_epoch
        if s is None:
            s = SessionState(start=ts, last_seen=ts, event_count=1)
        else:
            s.last_seen = max(s.last_seen, ts)
            s.event_count += 1

    if s is not None:
        state.update(s.__dict__)
        # Schedule a timeout 30 minutes after last_seen
        state.setTimeoutTimestamp(s.last_seen * 1000 + 30 * 60 * 1000)

    return iter([])  # nothing to emit until timeout

sessions = (events
    .groupByKey(lambda r: r.user_id)
    .flatMapGroupsWithState(
        outputMode="append",
        timeoutConf=GroupStateTimeout.EventTimeTimeout,
    )(update_session))

Câteva lucruri pentru care merită să te oprești:

  • GroupState este handle-ul de state per cheie. state.get, state.update(...), state.remove(), state.exists, state.hasTimedOut. Acela e API-ul.
  • setTimeoutTimestamp programează un wakeup. Când event time depășește acel timestamp, Spark apelează update_session din nou fără evenimente noi, dar cu state.hasTimedOut == True, dându-ți o șansă să emiți și să curăți.
  • Două moduri de timeout: ProcessingTimeTimeout (wall clock) și EventTimeTimeout (driven de watermark). Folosește event-time timeouts oricând event time contează, ceea ce e aproape întotdeauna.
  • Output mode-ul afectează ce poți returna. append cere să nu emiți niciodată același rând de două ori; update îți permite să re-emiți o cheie pe măsură ce se schimbă.

Compromisul: mapGroupsWithState îți dă control total. Îți dă și responsabilitate totală pentru mărimea state-ului; nu există evicție automată dincolo de ce face logica ta de timeout. Uiți să apelezi state.remove() undeva și jobul tău scurge state până moare.

Monitorizarea mărimii state-ului

Fiecare batch din UI-ul de Streaming raportează metrici de state:

  • stateOperators[*].numRowsTotal: rânduri curente în state pentru acel operator.
  • stateOperators[*].memoryUsedBytes: memoria heap sau RocksDB.
  • stateOperators[*].numRowsDroppedByWatermark: late rows ignorate.

Plotează rândurile totale de state în timp. Un job sănătos fie se stabilizează (state mărginit), fie crește liniar cu traficul (și asta e bine dacă l-ai dimensionat pentru asta). Un job care crește superliniar sau care nu plafonează niciodată scurge; de obicei un watermark lipsă, o cardinalitate de cheie pe care n-ai anticipat-o sau un mapGroupsWithState care uită să curețe.

Urmărește și timpul de RocksDB compaction în metrici. Dacă compaction-ul începe să domine latența batch-ului, state-ul tău e mai mare decât vrea SSD-ul tău local să gestioneze și ar trebui fie să faci shard pe partiție mai agresiv, să arunci state mai puțin util, fie să muți o parte din muncă într-un job batch.

Checklist practic

Înainte să trimiți în producție un query de streaming stateful:

  • Watermark pe fiecare agregare, parte de join și dedup. Da, fiecare. Excepțiile sunt cazurile cu spațiu de chei mărginit și sunt mai rare decât crezi.
  • Alege RocksDB decât dacă state-ul tău e genuin minuscul. Heap-backed e pentru prototipuri.
  • Checkpoint către storage durabil, replicat. S3, GCS, Azure Blob, HDFS. Nu disk local, nu /tmp.
  • Testează restart, inclusiv un kill forțat. Ridică jobul împotriva checkpoint-ului existent și verifică dacă state-ul e recuperat corect.
  • Plotează mărimea state-ului, watermark-ul și rândurile aruncate din ziua unu. Acestea sunt sistemul tău de avertizare timpurie.

Lecția următoare: unde merge efectiv output-ul tuturor acestor mașinării stateful. Output modes (append, update, complete), sink-urile built-in, escape hatch-ul foreachBatch și pattern-ul de upsert care îți dă scrieri exactly-once când restul pipeline-ului e at-least-once.


Referințe: Apache Spark Structured Streaming Programming Guide, secțiunile despre stateful operations, arbitrary stateful operations și stream-stream joins (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html); documentația de configurare RocksDB state store. Consultat 2026-05-01.

Caută