PySpark, de la zero Lecția 50 / 60

Bazele Structured Streaming: readStream, writeStream, trigger-e

Punctele de intrare pentru streaming, semantica trigger-elor si checkpoint-ul de care depinde totul.

Lecția trecută a fost conceptuală: stream-urile sunt tabele infinite, batch și streaming sunt un continuum, micro-batch e modelul, DStreams au murit. Lecția asta e mecanica. Până la final vei fi scris un job streaming complet și vei ști ce face fiecare linie.

Sunt exact patru lucruri pe care trebuie să le înțelegi:

  1. readStream: cum intră datele.
  2. writeStream: cum ies datele.
  3. Trigger-e: când se declanșează fiecare micro-batch.
  4. Checkpoint-uri: cum își amintește Spark ce a procesat deja.

Atât. Restul sunt variații.

readStream: partea de input

Punctul de intrare pentru streaming este spark.readStream. Aceeași formă ca spark.read, dar returnează un DataFrame streaming în loc de unul finit:

events = (spark.readStream
            .format("csv")
            .schema("user_id STRING, action STRING, ts TIMESTAMP")
            .option("header", "true")
            .load("/data/incoming/"))

DataFrame-ul events arată normal, același printSchema, aceleași referințe la coloane, dar events.isStreaming returnează True și nu poți face .show() sau .count() pe el. DataFrame-urile streaming nu au un „snapshot curent” pe care să-l inspectezi; au sens doar când sunt conectate la un sink.

Spark vine cu mai multe surse streaming încorporate:

  • file: monitorizează un director pentru fișiere noi. Formatul poate fi csv, json, parquet, orc sau text. Spark scanează directorul la fiecare trigger; orice fișier care nu era acolo data trecută e citit. Asta e cea mai ușoară sursă pentru testare fiindcă o controlezi din shell, cp file.json /data/incoming/ și urmărești jobul cum o ridică.
  • kafka: te abonezi la unul sau mai multe topicuri Kafka. Sursa dominantă în producție. Lecția 51 îi e dedicată.
  • socket: citește text delimitat de newline dintr-un socket TCP. Strict pentru exemple jucărie și demo-uri; fără toleranță la erori, fără offset-uri, niciodată în producție. Dar e la îndemână pentru tutoriale fiindcă poți face nc -lk 9999 într-un terminal și să tastezi evenimente manual.
  • rate și rate-micro-batch: surse sintetice care generează rânduri (timestamp, value) la o rată configurabilă. Utile pentru benchmarking, smoke test-uri și învățare. Sunt documentate, dar rar folosite în job-uri reale.

Terții adaugă mai multe: Delta Lake adaugă delta, Iceberg adaugă iceberg, AWS Kinesis are propriul conector etc. Pattern-ul e mereu același, format("...") plus opțiuni.

Câteva opțiuni de file source care merită știute:

events = (spark.readStream
            .format("json")
            .schema(my_schema)
            .option("maxFilesPerTrigger", 10)
            .option("latestFirst", "false")
            .option("cleanSource", "archive")
            .option("sourceArchiveDir", "/data/archived/")
            .load("/data/incoming/"))

maxFilesPerTrigger limitează câte fișiere noi sunt procesate într-un micro-batch, important dacă recuperezi un backlog și nu vrei ca un singur micro-batch uriaș să facă OOM. latestFirst inversează ordinea fișierelor dacă vrei cele mai noi primele. cleanSource îi spune lui Spark ce să facă cu fișierele pe care le-a procesat (archive, delete sau off), util pentru a păstra directorul sursă ordonat fără să scrii propriul script de cleanup.

Schema contează. Trebuie să furnizezi o schemă pentru sursele de fișiere streaming în mod implicit. Spark nu o va deduce fiindcă asta ar necesita citirea datelor la fiecare trigger, iar Spark vrea ca schema să fie stabilă pe toată durata interogării. Dacă chiar vrei deducție, setează spark.sql.streaming.schemaInference la true, dar e o capcană: odată ce interogarea pornește, schema e fixată.

Transformări: același API pe care îl știi deja

Odată ce ai un DataFrame streaming, îl transformi cu aceiași operatori ca un DataFrame batch:

from pyspark.sql.functions import col, upper

high_value = (events
                .filter(col("action") == "purchase")
                .withColumn("user_id_upper", upper("user_id"))
                .select("user_id_upper", "ts", "amount"))

filter, select, withColumn, groupBy, agg, join, union: toate funcționează, cu o singură regulă consistentă: operațiile care trebuie să se uite la întregul set de date nu funcționează direct pe stream-uri. Nu poți face .sort() global pe un stream fiindcă nu există un „sfârșit” la care să sortezi. Nu poți face .limit(100) fiindcă sensul „primele 100” depinde de ordine. Nu poți calcula o percentilă fiindcă necesită întregul set de date.

Ce poți face:

  • Transformări fără stare: orice per-rând (select, filter, withColumn, cast, explode, UDF-uri care nu referențiază alte rânduri). Astea sunt ușoare.
  • Agregări cu stare: groupBy + count, sum, avg etc. Spark menține starea peste micro-batch-uri. Poți ține contoare curente de stream-uri nemărginite indefinit (cu watermarks ca să mărginească memoria, lecția 52).
  • Agregări pe ferestre: groupBy(window("ts", "5 minutes")) pentru ferestre tumbling, window("ts", "5 minutes", "1 minute") pentru sliding. Fiecare fereastră e propriul ei grup; rezultatele se emit când fereastra se închide.
  • Join-uri stream-static: alăturarea unui stream cu un DataFrame batch regulat (un tabel de dimensiuni, un lookup, o configurare). Mereu suportat.
  • Join-uri stream-stream: alăturarea a două stream-uri cu watermarks și limite de timp. Suportat, dar cu rezerve; lecția 55.

Orice ai calcula peste o listă infinită, în mare, are o versiune în formă streaming. Orice necesită „toate datele chiar acum” nu se traduce.

writeStream: partea de output

Partea de output reflectă input-ul. Configurezi un sink și pornești interogarea:

query = (high_value.writeStream
            .format("parquet")
            .outputMode("append")
            .option("path", "/data/output/")
            .option("checkpointLocation", "/data/checkpoints/high_value/")
            .trigger(processingTime="30 seconds")
            .start())

Trei lucruri se întâmplă aici. Lasă-mă să le despachetez în ordine.

Modurile de output

outputMode îi spune lui Spark ce să scrie la fiecare micro-batch. Trei opțiuni:

  • append: emite doar rândurile care sunt noi de la ultimul micro-batch și care nu se vor mai schimba. Implicit pentru interogări fără stare. Necesar pentru sink-uri care nu suportă actualizări (Parquet, Avro, fișiere simple). Pentru agregări, modul append necesită un watermark astfel încât Spark să știe când o agregare e „gata” și poate fi emisă (lecția 52, din nou, apare peste tot).
  • complete: emite întregul tabel de rezultat curent la fiecare micro-batch. Valid doar pentru agregări (orice altceva are un rezultat nemărginit). Util pentru dashboard-uri sau debugging cu console unde vrei să vezi totalul curent.
  • update: emite doar rândurile care s-au schimbat în acest micro-batch. Util pentru sink-uri care suportă upsert: Delta Lake, Cassandra, JDBC cu logică de merge. Mai eficient decât complete, mai flexibil decât append.

Lecția 53 e analiza profundă a modurilor de output, când fiecare e legal, când fiecare e ce vrei și cum interacționează watermarks. Pentru moment: folosește append pentru sink-uri append-only, complete pentru agregări mici către un sink console sau memory, update pentru sink-uri capabile de upsert.

Trigger-ele

trigger controlează când rulează fiecare micro-batch. Patru variante:

.trigger(processingTime="10 seconds")   # la fiecare 10 secunde
.trigger(availableNow=True)             # proceseaza tot ce e disponibil, apoi opreste
.trigger(continuous="100 ms")           # mod experimental cu latenta mica
# .trigger() omis                       # implicit: cat de repede posibil

Trigger-ul implicit (fără apel .trigger(), sau omis) îi spune lui Spark să pornească următorul micro-batch imediat ce cel anterior se termină. Dacă procesarea ta durează 2 secunde per batch, primești un batch la fiecare 2 secunde. Dacă durează 200 ms, primești cinci pe secundă. Acesta e implicitul corect pentru majoritatea job-urilor de producție: Spark merge cât de repede poate.

processingTime fixează intervalul. Dacă setezi "10 seconds", Spark rulează un micro-batch la fiecare 10 secunde indiferent de timpul de procesare. Dacă un batch durează 4 secunde, Spark așteaptă 6 secunde înainte de următorul. Dacă un batch durează 14 secunde (mai mult decât trigger-ul), următorul pornește imediat și primești un avertisment în log-uri despre rămânerea în urmă. Folosește asta când vrei batching predictibil, cu frecvență mică, să zicem rezumate orare scrise într-un tabel.

availableNow=True este trigger-ul „streaming în stil batch”. Spark citește toate datele disponibile curent în sursă, le procesează (posibil pe mai multe micro-batch-uri interne dacă sunt multe), le scrie și se oprește. Interogarea streaming se termină curat. Asta e enorm de util pentru job-uri pe care vrei să le rulezi pe un program (Airflow, cron) dar scrise cu API-ul streaming, să zicem un job orar care citește din Kafka, procesează ce e acolo și iese. Înlocuiește vechiul mod Trigger.Once (acum depreciat în favoarea availableNow, care gestionează backlog-urile mai bine).

continuous e modul experimental cu latență de milisecundă pe care l-am menționat în lecția 49. Operații restricționate, caz de utilizare de nișă, în mare ignoră-l.

Sink-urile

format și opțiunile asociate aleg destinația:

  • parquet, orc, json, csv: sink-uri de fișiere. Doar mod append. Specifică path și checkpointLocation. Fiecare micro-batch scrie fișiere noi în path.
  • kafka: scrie într-un topic Kafka. Util pentru pipeline-uri stream → stream. Specifică topicul și bootstrap servers.
  • console: afișează la stdout. Doar pentru debugging. Are opțiuni pentru numRows și truncate ca .show().
  • memory: scrie într-un tabel in-memory accesibil prin nume cu spark.sql("SELECT * FROM my_query"). Debugging și explorare în notebook. Nu folosi în producție; tabelul crește nemărginit în memoria driver-ului.
  • foreach și foreachBatch: portițe de scăpare pentru sink-uri custom. foreachBatch îți dă micro-batch-ul ca un DataFrame regulat și te lasă să faci orice (scrie la JDBC, apelează un API HTTP, rulează un merge custom în Delta etc.). Unealta de zi cu zi când niciun sink încorporat nu se potrivește.

Sink-uri terțe: delta, iceberg, JDBC, Kinesis etc. Același pattern.

Checkpoint-ul

Acum partea care e obligatorie și pe care începătorii o uită de fiecare dată.

.option("checkpointLocation", "/data/checkpoints/my_job/")

Locația checkpoint-ului este un director (local, HDFS, S3, oriunde durabil) unde Spark scrie:

  1. Offset-uri consumate: pentru fiecare sursă, care e ultima poziție pe care am procesat-o? Pentru Kafka, offset-ul per partiție. Pentru fișiere, ce fișiere au fost ingerate. Pentru socket-uri, ei bine, nu poți face checkpoint pe socket-uri, ceea ce e un motiv pentru care nu sunt pentru producție.
  2. Stare de agregare: pentru interogări cu stare, snapshot-uri ale contoarelor/sumelor/ferestrelor curente astfel încât să supraviețuiască unui restart.
  3. Commit logs: marcaje care spun „micro-batch N s-a finalizat cu succes și a fost scris în sink.”
  4. Metadate: ID-ul interogării, schema, configurarea sursă/sink. Folosit pentru a detecta nepotriviri la timp de restart.

Fără un checkpoint, fiecare restart al interogării tale streaming pornește de la zero. Pentru o sursă de fișier, asta înseamnă recitirea fiecărui fișier din director. Pentru Kafka, înseamnă recitirea de la startingOffsets configurat. Pentru o agregare, înseamnă recalculul tuturor totalurilor curente de la zero. Niciuna nu e de obicei ce vrei.

Cu un checkpoint, restart-ul continuă de unde s-a terminat ultimul micro-batch reușit. Aceleași offset-uri, aceeași stare de agregare. Interogarea continuă fără probleme.

Câteva reguli:

  • O singură locație de checkpoint per interogare. Două interogări care împart un checkpoint se vor corupe reciproc.
  • Checkpoint-ul e legat de planul de interogare. Dacă schimbi semnificativ operațiile (adaugi o agregare cu stare, schimbi un join), Spark va refuza să reia din vechiul checkpoint și va trebui să pornești de la zero. Schimbările cosmetice (path de output diferit, numRows diferit pentru console) sunt de obicei OK.
  • Nu-l pune pe disc local într-un deployment cluster. Dacă executorul 0 moare și unul nou pornește pe alt nod, fișierele de checkpoint au dispărut. Folosește HDFS, S3, ABFS, GCS, orice durabil.
  • Sink-urile console și memory tehnic funcționează fără un checkpoint pentru debugging, dar Spark va avertiza. În producție, setează-l mereu.

Uitarea checkpointLocation este cea mai comună greșeală Structured Streaming pe care am văzut-o. Interogarea pornește, rulează, arată sănătoasă. Apoi cineva o repornește dintr-un motiv total nelegat și datele apar în output a doua oară. Fiindcă nu a fost niciun checkpoint, sursa a fost recitită.

Un exemplu complet

Punând totul la un loc, citește fișiere JSON dintr-un director, filtrează achizițiile peste 100, scrie Parquet:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("PurchaseFilter").getOrCreate()

schema = "user_id STRING, action STRING, amount DOUBLE, ts TIMESTAMP"

events = (spark.readStream
            .format("json")
            .schema(schema)
            .option("maxFilesPerTrigger", 5)
            .load("/data/incoming/"))

high_value = (events
                .filter(col("action") == "purchase")
                .filter(col("amount") > 100)
                .select("user_id", "amount", "ts"))

query = (high_value.writeStream
            .format("parquet")
            .outputMode("append")
            .option("path", "/data/high_value_purchases/")
            .option("checkpointLocation", "/data/checkpoints/high_value/")
            .trigger(processingTime="30 seconds")
            .start())

query.awaitTermination()

Rulează-l. Aruncă un fișier JSON în /data/incoming/. Așteaptă până la 30 de secunde. Uită-te în /data/high_value_purchases/ după Parquet nou. Uită-te în /data/checkpoints/high_value/ și vei vedea directoare numite offsets, commits, sources, metadata, înregistrarea durabilă a ce a fost procesat.

Omoară procesul cu Ctrl-C. Aruncă alt fișier JSON. Repornește același script. Interogarea se reia, ridică doar fișierul nou (fiindcă offset-ul vechiului e înregistrat) și scrie Parquet-ul nou. Asta e checkpoint-ul care își câștigă pâinea.

Inspectarea unei interogări care rulează

writeStream.start() returnează un handle StreamingQuery. Câteva metode utile:

query.id                # ID stabil al interogarii peste restart-uri (din checkpoint)
query.runId             # ID pentru aceasta rulare specifica; se schimba la restart
query.status            # Stare curenta: "PROCESSING", "WAITING" etc.
query.lastProgress      # Metrici detaliate pentru ultimul micro-batch
query.recentProgress    # Ultimele 100 de micro-batch-uri
query.exception()       # Daca interogarea a esuat, exceptia
query.stop()            # Oprire eleganta
query.awaitTermination(timeout=60)   # Blocheaza, optional cu timeout

lastProgress e cel la care mă uit constant în timpul dezvoltării. E un dict cu rata de input, rata de procesare, durata batch-ului, lag specific sursei (lag de offset Kafka, fișiere în urmă) și dimensiunea stării per-operator. Vom petrece mai mult timp pe monitorizare în lecția 57; pentru moment, doar știe că query.lastProgress e cel mai bun prieten al tău când ceva pare în neregulă.

Ce să iei din asta

Trei lucruri, în ordinea priorității:

  1. API-ul streaming este readStream + transformări + writeStream. Transformările din mijloc sunt același API DataFrame pe care l-ai învățat în patruzeci și opt de lecții. Nu te complica.
  2. Alege un trigger care se potrivește bugetului tău de latență. Implicit (continuu rapid) pentru latență mică, processingTime pentru programare predictibilă, availableNow pentru job-uri batch-pe-streaming.
  3. Setează mereu checkpointLocation. Mereu. Chiar și în dev. Chiar și pentru console. Costul e o linie; beneficiul e că totul restul funcționează corect.

Lecția următoare, sursa pe care o vei folosi efectiv în producție: Kafka. Offset-uri, deserializare, compromisurile dintre at-least-once și exactly-once.


Referințe: Apache Spark Structured Streaming Programming Guide (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html), în special secțiunile despre surse de input, sink-uri de output și trigger-e. Consultat 2026-05-01.

Caută