Deci query-ul tău de streaming stateful calculează lucruri. Unde merg acele lucruri? Și cum te asiguri că ajung acolo exact o dată, chiar și când Spark face retry pe un batch după un crash?
Lecția asta e despre partea de output: cele trei output modes, sink-urile aduse de Spark și pattern-ul foreachBatch care e răspunsul la „dar eu vreau să scriu în X” pentru orice X care nu e pe lista built-in.
Cele trei output modes, recapitulate
Fiecare query de streaming are un outputMode: ce se scrie la fiecare batch.
append: doar rândurile noi de la ultimul batch. Modul cel mai comun. Funcționează pentru:
- Query-uri non-agregante (filtre, proiecții, stream-stream joins). Fiecare rând de input se mapează la zero sau mai multe rânduri de output care sunt emise o dată.
- Agregări mărginite de un watermark. Rândul ferestrei e emis când watermark-ul o închide și niciodată după.
Ce nu poate face: să emită agregări care nu au fost încă finalizate. Dacă business-ul tău vrea să vadă „running total updates pe măsură ce se întâmplă”, append nu îți va da asta; running total-ul apare doar când fereastra se închide, posibil ore după ce evenimentele au sosit.
update: rândurile care s-au schimbat de la ultimul batch. Funcționează pentru agregări și îți dă vizibilitate running; fiecare batch emite ferestrele care au primit date noi, cu totalurile lor curente. Dezavantajul: aceeași cheie poate fi emisă de multe ori cu valori diferite, deci sink-ul trebuie să fie upsert-capable. Scrierea într-un sink append-only (Parquet brut, Kafka fără strategie de cheie) pierde semantica: ai vedea cinci rânduri pentru aceeași fereastră cu cinci count-uri diferite.
complete: tot tabelul de rezultat după fiecare batch. Valid doar pentru agregări. Practic doar pentru rezultate mici. Spark literalmente re-emite setul complet de group keys la fiecare batch. Dacă ai 10 chei distincte și un dashboard care suprascrie un singur fișier, complete mode e fine. Dacă ai 10 milioane de chei, vei avea o zi proastă.
Modul pe care îl alegi e constrâns de ce face query-ul tău și ce poate face sink-ul tău. Nu primești mereu o alegere liberă.
Sink-urile built-in
Out of the box, Spark aduce:
File sink: format("parquet"), format("orc"), format("json"), format("csv"). Scrie fișiere append-only într-un director, partiționate dacă specifici partitionBy. Doar append mode. Bun pentru landing zones, lakes, raw archives.
(query.writeStream
.format("parquet")
.option("path", "s3://lake/views/")
.option("checkpointLocation", "s3://ck/views/")
.partitionBy("dt")
.outputMode("append")
.start())
Kafka sink: format("kafka"). Scrie fiecare rând de output ca un Kafka record. Așteaptă coloanele key, value, topic, opțional headers. Livrare at-least-once; implementezi exactly-once prin idempotent consumers downstream.
(query.selectExpr("CAST(user_id AS STRING) AS key",
"to_json(struct(*)) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("topic", "page-aggs")
.option("checkpointLocation", "...")
.start())
Console sink: format("console"). Tipărește la driver stdout. Append, update sau complete. Strict pentru dezvoltare; driver-ul va da OOM dacă încerci să tipărești la consolă un workload real.
.format("console").outputMode("update").option("truncate", False)
Memory sink: format("memory"). Scrie într-un tabel in-driver numit pe care îl poți interoga cu Spark SQL obișnuit. Util pentru teste și notebook-uri.
.format("memory").queryName("session_results").outputMode("append")
# in another cell:
spark.sql("SELECT * FROM session_results").show()
foreachBatch: escape hatch-ul. Primește propria secțiune.
Asta e lista oficială. Observă ce nu e pe ea: Postgres, MySQL, Snowflake, BigQuery, Mongo, Elasticsearch, Cassandra, Redis. Aproape orice destinație reală dincolo de un fișier sau Kafka este un job foreachBatch.
Pattern-ul foreachBatch
foreachBatch îți dă micro-batch-ul ca un DataFrame obișnuit și un batch ID, și te lasă să faci ce vrei cu ambele:
def write_batch(batch_df, batch_id):
# batch_df is a regular DataFrame
# batch_id is a monotonically increasing long
batch_df.write.format("jdbc").options(...).mode("append").save()
(query.writeStream
.foreachBatch(write_batch)
.option("checkpointLocation", "...")
.start())
În interiorul funcției, ai API-ul complet de batch. Scrieri multi-target, transformări complexe, apeluri către sisteme externe, logică condițională, totul. Engine-ul de streaming se ocupă de triggering, ordonare și replay-ul batch-urilor.
Două lucruri de interiorizat:
batch_dfeste un DataFrame obișnuit. Nu e un stream. Poți apela.writepe el, sau.collect()dacă e mic, sau.cache()dacă ai nevoie de el de două ori. Are o mărime definită; acest batch s-a terminat când funcția ta returnează.- Batch-urile pot face replay. Dacă funcția ta eșuează sau driver-ul crashează la mijlocul batch-ului, Spark va apela
write_batchdin nou cu acelașibatch_idși același conținut. Codul tău trebuie să gestioneze asta.
Acel al doilea punct e unde intră idempotența.
Regula sink-ului idempotent
Contractul este: același batch_id plus același batch_df ar trebui să producă mereu aceeași stare finală în sink, indiferent de câte ori rulează funcția ta.
Dacă sink-ul tău suportă upsert (insert or update by primary key), idempotența e mai mult sau mai puțin gratuită; re-rularea cu aceleași date se suprascrie pe ea însăși. Dacă sink-ul tău suportă doar append, ai nevoie să dedupliciezi după batch_id, de obicei cu o cheie primară pe partea de target ca (batch_id, source_offset).
Urmează două exemple concrete.
Pattern 1: foreachBatch într-un Delta merge (upsert)
Delta Lake suportă MERGE: upsert atomic pe predicat. Acesta este standardul de aur pentru output de agregare exactly-once:
from delta.tables import DeltaTable
def upsert_to_delta(batch_df, batch_id):
target = DeltaTable.forPath(spark, "s3://lake/page_aggs/")
(target.alias("t")
.merge(
batch_df.alias("s"),
"t.window_start = s.window_start AND t.page = s.page",
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute())
(windowed.writeStream
.foreachBatch(upsert_to_delta)
.outputMode("update")
.option("checkpointLocation", "s3://ck/page_aggs/")
.start())
Dacă batch 17 eșuează la jumătate, Spark îl reia. MERGE rulează din nou cu același batch_df. Rândurile care au fost deja merge-uite sunt match-uite și actualizate la aceleași valori; rândurile noi sunt inserate. Starea finală e identică în ambele cazuri.
Pereche asta cu outputMode("update") ca fiecare batch să poarte doar ferestrele care s-au schimbat. Combinate, primești un Delta table care reflectă mereu cea mai recentă agregare per fereastră, actualizat continuu, cu semantică exactly-once dintr-un stream strict at-least-once.
Pattern 2: foreachBatch într-un upsert Postgres
Postgres nu are integrare streaming-native, dar are INSERT ... ON CONFLICT, ceea ce e suficient:
def upsert_to_postgres(batch_df, batch_id):
rows = batch_df.collect() # safe only if batch is small
if not rows:
return
sql = """
INSERT INTO page_aggs (window_start, page, views, batch_id)
VALUES (%s, %s, %s, %s)
ON CONFLICT (window_start, page) DO UPDATE
SET views = EXCLUDED.views,
batch_id = EXCLUDED.batch_id
WHERE page_aggs.batch_id < EXCLUDED.batch_id
"""
import psycopg2
conn = psycopg2.connect(...)
try:
with conn, conn.cursor() as cur:
cur.executemany(sql, [
(r.window_start, r.page, r.views, batch_id)
for r in rows
])
finally:
conn.close()
Câteva detalii care contează:
ON CONFLICT (window_start, page) DO UPDATEeste clauza de upsert. Cere un index unic sau PK pe coloanele de conflict.- Garda
WHERE page_aggs.batch_id < EXCLUDED.batch_idface upsert-ul monoton. Dacă un batch face replay în afara ordinii sau ai late stragglers dintr-un batch mai vechi, nu suprascrii o stare mai nouă cu date mai vechi. .collect()materializează batch-ul pe driver. Sigur doar dacă batch-urile sunt mici (câteva mii de rânduri). Pentru batch-uri mai mari, foloseștebatch_df.foreachPartitionși conectează o dată per partiție, nu o dată per batch.- Conexiunea ar trebui să fie deschisă în interiorul funcției, nu să fie închisă peste de pe driver. Conexiunile nu sunt picklable și, chiar dacă ar fi, ai refolosi o singură conexiune între batch-uri în moduri imprevizibile.
Pentru batch-uri foarte mari, un pattern mai robust pune într-un tabel temporar per batch și face merge:
def upsert_to_postgres_staged(batch_df, batch_id):
staging = f"page_aggs_stage_{batch_id}"
(batch_df.write
.format("jdbc")
.option("dbtable", staging)
.mode("overwrite")
.save())
# Then run a single MERGE/UPSERT statement from staging into target,
# transactionally, then drop the staging table.
Spark scrie tabelul de staging în paralel; o singură tranzacție face merge-ul atomic; cleanup la final. Idempotent fiindcă scrierea tabelului de staging cu mode("overwrite") este ea însăși idempotentă pe batch_id, iar merge-ul la fel.
Semantică de livrare, recapitulată
Punând modelul de streaming cap-coadă:
- At-least-once este automat. Checkpointing-ul lui Spark garantează că fiecare eveniment de input este procesat, iar fiecare batch de output este livrat, cel puțin o dată. Crash-uri, restart-uri și replay-uri păstrează asta.
- Exactly-once cere cooperare din partea sink-ului. Sink-ul trebuie să fie idempotent sub replay-ul aceluiași
batch_id. Sink-urile append-only au nevoie de o cheie de dedup; sink-urile upsert-capable au nevoie de semantică PK. - Checkpoint-ul este sursa de adevăr. Înregistrează ce offset-uri Kafka au fost consumate, ce state s-a ținut și ce batch-uri au fost commitate. Pierzi-l și îți pierzi garanția.
O greșeală comună e să livrezi un pipeline de streaming care scrie Parquet simplu și să-i spui „exactly-once” fiindcă nimic nu a crashat în timpul testelor. Apoi se întâmplă o pană reală, jobul repornește, ultimul batch face replay și ai rânduri duplicate în lake. Idempotența nu e gratuită; trebuie să construiești pentru ea.
Alegerea modului + sink-ului pentru un caz de utilizare
Câteva combinații comune și ce implică:
- Streaming filter/transform → file sink.
outputMode("append"). Fișierele sunt append-only; nicio problemă de idempotență fiindcă fișierele fiecărui batch sunt commitate atomic cu offset-ul. - Agregare windowed → console pentru dev, Delta merge pentru prod.
outputMode("update")cuforeachBatch. Running totals vizibile imediat; corecte sub replay. - Sessionization (
flatMapGroupsWithState) → Kafka.outputMode("append"). Sesiunile emit când se închid; consumatorii downstream gestionează idempotența cu cheia session_id. - Dashboard real-time peste Postgres.
outputMode("update")+foreachBatchupsert. Dashboard-ul citește snapshot-ul curent; upsert-ul îl ține consistent.
Dacă te trezești vrând complete mode în producție, fă un pas înapoi. Aproape întotdeauna e greșit la scară. Mișcarea corectă e, de obicei, update mode cu un sink upsert, plus un job de snapshot periodic dacă downstream-ul chiar are nevoie de o imagine completă.
Unde ne lasă asta
Acum ai, cap-coadă, modelul de streaming: surse (lecția 50), execuție micro-batch (lecția 51), event time și watermarks (lecția 52), state și operatori stateful (lecția 53) și acum output modes plus sink-uri idempotente. Asta e suficient ca să proiectezi și să trimiți în producție un pipeline de streaming.
Ce încă nu ai e vocabularul de war stories pentru când lucrurile o iau razna: query-uri sub back-pressure, state care explodează, watermark stalls, micro-batch-uri lente, contenție de resurse la nivel de cluster. Acela e Modulul 10: debugging în producție. Începem cu citirea unui tab Streaming UI așa cum un doctor citește un EKG.
Referințe: Apache Spark Structured Streaming Programming Guide, secțiunile despre output modes, output sinks și foreachBatch (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes); documentația Delta Lake MERGE. Consultat 2026-05-01.