PySpark, de la zero Lecția 54 / 60

Output modes si sink-uri idempotente: foreachBatch si pattern-ul upsert

Append vs update vs complete, sink-urile pe care le aduce Spark si escape hatch-ul foreachBatch pentru tot restul.

Deci query-ul tău de streaming stateful calculează lucruri. Unde merg acele lucruri? Și cum te asiguri că ajung acolo exact o dată, chiar și când Spark face retry pe un batch după un crash?

Lecția asta e despre partea de output: cele trei output modes, sink-urile aduse de Spark și pattern-ul foreachBatch care e răspunsul la „dar eu vreau să scriu în X” pentru orice X care nu e pe lista built-in.

Cele trei output modes, recapitulate

Fiecare query de streaming are un outputMode: ce se scrie la fiecare batch.

append: doar rândurile noi de la ultimul batch. Modul cel mai comun. Funcționează pentru:

  • Query-uri non-agregante (filtre, proiecții, stream-stream joins). Fiecare rând de input se mapează la zero sau mai multe rânduri de output care sunt emise o dată.
  • Agregări mărginite de un watermark. Rândul ferestrei e emis când watermark-ul o închide și niciodată după.

Ce nu poate face: să emită agregări care nu au fost încă finalizate. Dacă business-ul tău vrea să vadă „running total updates pe măsură ce se întâmplă”, append nu îți va da asta; running total-ul apare doar când fereastra se închide, posibil ore după ce evenimentele au sosit.

update: rândurile care s-au schimbat de la ultimul batch. Funcționează pentru agregări și îți dă vizibilitate running; fiecare batch emite ferestrele care au primit date noi, cu totalurile lor curente. Dezavantajul: aceeași cheie poate fi emisă de multe ori cu valori diferite, deci sink-ul trebuie să fie upsert-capable. Scrierea într-un sink append-only (Parquet brut, Kafka fără strategie de cheie) pierde semantica: ai vedea cinci rânduri pentru aceeași fereastră cu cinci count-uri diferite.

complete: tot tabelul de rezultat după fiecare batch. Valid doar pentru agregări. Practic doar pentru rezultate mici. Spark literalmente re-emite setul complet de group keys la fiecare batch. Dacă ai 10 chei distincte și un dashboard care suprascrie un singur fișier, complete mode e fine. Dacă ai 10 milioane de chei, vei avea o zi proastă.

Modul pe care îl alegi e constrâns de ce face query-ul tău și ce poate face sink-ul tău. Nu primești mereu o alegere liberă.

Sink-urile built-in

Out of the box, Spark aduce:

File sink: format("parquet"), format("orc"), format("json"), format("csv"). Scrie fișiere append-only într-un director, partiționate dacă specifici partitionBy. Doar append mode. Bun pentru landing zones, lakes, raw archives.

(query.writeStream
    .format("parquet")
    .option("path", "s3://lake/views/")
    .option("checkpointLocation", "s3://ck/views/")
    .partitionBy("dt")
    .outputMode("append")
    .start())

Kafka sink: format("kafka"). Scrie fiecare rând de output ca un Kafka record. Așteaptă coloanele key, value, topic, opțional headers. Livrare at-least-once; implementezi exactly-once prin idempotent consumers downstream.

(query.selectExpr("CAST(user_id AS STRING) AS key",
                  "to_json(struct(*)) AS value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("topic", "page-aggs")
    .option("checkpointLocation", "...")
    .start())

Console sink: format("console"). Tipărește la driver stdout. Append, update sau complete. Strict pentru dezvoltare; driver-ul va da OOM dacă încerci să tipărești la consolă un workload real.

.format("console").outputMode("update").option("truncate", False)

Memory sink: format("memory"). Scrie într-un tabel in-driver numit pe care îl poți interoga cu Spark SQL obișnuit. Util pentru teste și notebook-uri.

.format("memory").queryName("session_results").outputMode("append")
# in another cell:
spark.sql("SELECT * FROM session_results").show()

foreachBatch: escape hatch-ul. Primește propria secțiune.

Asta e lista oficială. Observă ce nu e pe ea: Postgres, MySQL, Snowflake, BigQuery, Mongo, Elasticsearch, Cassandra, Redis. Aproape orice destinație reală dincolo de un fișier sau Kafka este un job foreachBatch.

Pattern-ul foreachBatch

foreachBatch îți dă micro-batch-ul ca un DataFrame obișnuit și un batch ID, și te lasă să faci ce vrei cu ambele:

def write_batch(batch_df, batch_id):
    # batch_df is a regular DataFrame
    # batch_id is a monotonically increasing long
    batch_df.write.format("jdbc").options(...).mode("append").save()

(query.writeStream
    .foreachBatch(write_batch)
    .option("checkpointLocation", "...")
    .start())

În interiorul funcției, ai API-ul complet de batch. Scrieri multi-target, transformări complexe, apeluri către sisteme externe, logică condițională, totul. Engine-ul de streaming se ocupă de triggering, ordonare și replay-ul batch-urilor.

Două lucruri de interiorizat:

  1. batch_df este un DataFrame obișnuit. Nu e un stream. Poți apela .write pe el, sau .collect() dacă e mic, sau .cache() dacă ai nevoie de el de două ori. Are o mărime definită; acest batch s-a terminat când funcția ta returnează.
  2. Batch-urile pot face replay. Dacă funcția ta eșuează sau driver-ul crashează la mijlocul batch-ului, Spark va apela write_batch din nou cu același batch_id și același conținut. Codul tău trebuie să gestioneze asta.

Acel al doilea punct e unde intră idempotența.

Regula sink-ului idempotent

Contractul este: același batch_id plus același batch_df ar trebui să producă mereu aceeași stare finală în sink, indiferent de câte ori rulează funcția ta.

Dacă sink-ul tău suportă upsert (insert or update by primary key), idempotența e mai mult sau mai puțin gratuită; re-rularea cu aceleași date se suprascrie pe ea însăși. Dacă sink-ul tău suportă doar append, ai nevoie să dedupliciezi după batch_id, de obicei cu o cheie primară pe partea de target ca (batch_id, source_offset).

Urmează două exemple concrete.

Pattern 1: foreachBatch într-un Delta merge (upsert)

Delta Lake suportă MERGE: upsert atomic pe predicat. Acesta este standardul de aur pentru output de agregare exactly-once:

from delta.tables import DeltaTable

def upsert_to_delta(batch_df, batch_id):
    target = DeltaTable.forPath(spark, "s3://lake/page_aggs/")
    (target.alias("t")
        .merge(
            batch_df.alias("s"),
            "t.window_start = s.window_start AND t.page = s.page",
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute())

(windowed.writeStream
    .foreachBatch(upsert_to_delta)
    .outputMode("update")
    .option("checkpointLocation", "s3://ck/page_aggs/")
    .start())

Dacă batch 17 eșuează la jumătate, Spark îl reia. MERGE rulează din nou cu același batch_df. Rândurile care au fost deja merge-uite sunt match-uite și actualizate la aceleași valori; rândurile noi sunt inserate. Starea finală e identică în ambele cazuri.

Pereche asta cu outputMode("update") ca fiecare batch să poarte doar ferestrele care s-au schimbat. Combinate, primești un Delta table care reflectă mereu cea mai recentă agregare per fereastră, actualizat continuu, cu semantică exactly-once dintr-un stream strict at-least-once.

Pattern 2: foreachBatch într-un upsert Postgres

Postgres nu are integrare streaming-native, dar are INSERT ... ON CONFLICT, ceea ce e suficient:

def upsert_to_postgres(batch_df, batch_id):
    rows = batch_df.collect()  # safe only if batch is small
    if not rows:
        return

    sql = """
        INSERT INTO page_aggs (window_start, page, views, batch_id)
        VALUES (%s, %s, %s, %s)
        ON CONFLICT (window_start, page) DO UPDATE
        SET views = EXCLUDED.views,
            batch_id = EXCLUDED.batch_id
        WHERE page_aggs.batch_id < EXCLUDED.batch_id
    """

    import psycopg2
    conn = psycopg2.connect(...)
    try:
        with conn, conn.cursor() as cur:
            cur.executemany(sql, [
                (r.window_start, r.page, r.views, batch_id)
                for r in rows
            ])
    finally:
        conn.close()

Câteva detalii care contează:

  • ON CONFLICT (window_start, page) DO UPDATE este clauza de upsert. Cere un index unic sau PK pe coloanele de conflict.
  • Garda WHERE page_aggs.batch_id < EXCLUDED.batch_id face upsert-ul monoton. Dacă un batch face replay în afara ordinii sau ai late stragglers dintr-un batch mai vechi, nu suprascrii o stare mai nouă cu date mai vechi.
  • .collect() materializează batch-ul pe driver. Sigur doar dacă batch-urile sunt mici (câteva mii de rânduri). Pentru batch-uri mai mari, folosește batch_df.foreachPartition și conectează o dată per partiție, nu o dată per batch.
  • Conexiunea ar trebui să fie deschisă în interiorul funcției, nu să fie închisă peste de pe driver. Conexiunile nu sunt picklable și, chiar dacă ar fi, ai refolosi o singură conexiune între batch-uri în moduri imprevizibile.

Pentru batch-uri foarte mari, un pattern mai robust pune într-un tabel temporar per batch și face merge:

def upsert_to_postgres_staged(batch_df, batch_id):
    staging = f"page_aggs_stage_{batch_id}"
    (batch_df.write
        .format("jdbc")
        .option("dbtable", staging)
        .mode("overwrite")
        .save())
    # Then run a single MERGE/UPSERT statement from staging into target,
    # transactionally, then drop the staging table.

Spark scrie tabelul de staging în paralel; o singură tranzacție face merge-ul atomic; cleanup la final. Idempotent fiindcă scrierea tabelului de staging cu mode("overwrite") este ea însăși idempotentă pe batch_id, iar merge-ul la fel.

Semantică de livrare, recapitulată

Punând modelul de streaming cap-coadă:

  • At-least-once este automat. Checkpointing-ul lui Spark garantează că fiecare eveniment de input este procesat, iar fiecare batch de output este livrat, cel puțin o dată. Crash-uri, restart-uri și replay-uri păstrează asta.
  • Exactly-once cere cooperare din partea sink-ului. Sink-ul trebuie să fie idempotent sub replay-ul aceluiași batch_id. Sink-urile append-only au nevoie de o cheie de dedup; sink-urile upsert-capable au nevoie de semantică PK.
  • Checkpoint-ul este sursa de adevăr. Înregistrează ce offset-uri Kafka au fost consumate, ce state s-a ținut și ce batch-uri au fost commitate. Pierzi-l și îți pierzi garanția.

O greșeală comună e să livrezi un pipeline de streaming care scrie Parquet simplu și să-i spui „exactly-once” fiindcă nimic nu a crashat în timpul testelor. Apoi se întâmplă o pană reală, jobul repornește, ultimul batch face replay și ai rânduri duplicate în lake. Idempotența nu e gratuită; trebuie să construiești pentru ea.

Alegerea modului + sink-ului pentru un caz de utilizare

Câteva combinații comune și ce implică:

  • Streaming filter/transform → file sink. outputMode("append"). Fișierele sunt append-only; nicio problemă de idempotență fiindcă fișierele fiecărui batch sunt commitate atomic cu offset-ul.
  • Agregare windowed → console pentru dev, Delta merge pentru prod. outputMode("update") cu foreachBatch. Running totals vizibile imediat; corecte sub replay.
  • Sessionization (flatMapGroupsWithState) → Kafka. outputMode("append"). Sesiunile emit când se închid; consumatorii downstream gestionează idempotența cu cheia session_id.
  • Dashboard real-time peste Postgres. outputMode("update") + foreachBatch upsert. Dashboard-ul citește snapshot-ul curent; upsert-ul îl ține consistent.

Dacă te trezești vrând complete mode în producție, fă un pas înapoi. Aproape întotdeauna e greșit la scară. Mișcarea corectă e, de obicei, update mode cu un sink upsert, plus un job de snapshot periodic dacă downstream-ul chiar are nevoie de o imagine completă.

Unde ne lasă asta

Acum ai, cap-coadă, modelul de streaming: surse (lecția 50), execuție micro-batch (lecția 51), event time și watermarks (lecția 52), state și operatori stateful (lecția 53) și acum output modes plus sink-uri idempotente. Asta e suficient ca să proiectezi și să trimiți în producție un pipeline de streaming.

Ce încă nu ai e vocabularul de war stories pentru când lucrurile o iau razna: query-uri sub back-pressure, state care explodează, watermark stalls, micro-batch-uri lente, contenție de resurse la nivel de cluster. Acela e Modulul 10: debugging în producție. Începem cu citirea unui tab Streaming UI așa cum un doctor citește un EKG.


Referințe: Apache Spark Structured Streaming Programming Guide, secțiunile despre output modes, output sinks și foreachBatch (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes); documentația Delta Lake MERGE. Consultat 2026-05-01.

Caută