Am petrecut ultimele patruzeci și opt de lecții pe ceea ce Spark numește procesare batch: ai niște date, stau într-un fișier sau într-un tabel de bază de date, rulezi un job, jobul se termină, primești un output. Datele erau acolo înainte să începi; datele sunt acolo după ce termini. Gata.
Modulul 9 e despre cealaltă jumătate a poveștii. Datele n-au sosit încă toate. Încă sosesc. Vor continua să sosească mâine, săptămâna viitoare, anul viitor, până când cineva oprește sistemul. Treaba ta e să calculezi peste ele pe măsură ce apar, nu într-o trecere de o singură dată care se termină, ci într-un proces care rulează continuu și emite rezultate pe măsură ce devin disponibile.
Acesta este streaming. Și înainte să atingem orice cod, vreau să încetinesc și să fixez ce înseamnă efectiv cuvântul ăsta în Spark, fiindcă e unul dintre acei termeni unde marketingul a luat-o înaintea inginerie și ingineria s-a schimbat de două ori în ultimul deceniu.
Date mărginite vs nemărginite
Cel mai curat mod de a gândi nu e „batch” vs „streaming”, ci forma setului de date.
Datele mărginite sunt un set fix de date cu o dimensiune cunoscută. Comenzile de ieri. Tranzacțiile din ultimul trimestru. Conținutul unui fișier Parquet pe S3. Starea curentă a unui tabel Postgres în momentul în care îl citești. Există un MIN, un MAX, un număr de rânduri. Poți calcula peste tot setul. Poți să termini.
Datele nemărginite sunt un flux care nu se termină. Evenimente Kafka care vin de la un server web. Linii de log de aplicație scrise în fiecare secundă. Citiri de senzori IoT, click stream-uri, evenimente CDC dintr-o bază de date OLTP, ping-uri GPS de la o flotă. Nu există MAX fiindcă datele sunt încă în producție. Rândurile pe care le vezi azi sunt o mică felie din ceea ce va exista în cele din urmă. Poți calcula peste ce a sosit până acum, dar nu poți termina niciodată.
Ambele tipuri de date sunt reale, ambele sunt comune și Spark poate procesa ambele. Distincția contează fiindcă presupunerile pe care le poți face sunt foarte diferite. Cu date mărginite, poți sorta tot. Poți calcula agregări exacte. Poți itera de două ori. Cu date nemărginite, niciuna dintre astea nu funcționează, cel puțin nu în același fel. Nu poți sorta o listă infinită. Nu poți calcula „media tuturor rândurilor” fiindcă vin mai multe rânduri. Trebuie să redefinești ce înseamnă „rezultat”.
Streaming, în Spark, este API-ul pentru calcul peste date nemărginite.
Batch și streaming sunt un continuum
Iată chestia pe care documentația n-o face mereu evidentă: nu există o linie curată între batch și streaming. Sunt două capete ale unui spectru.
Un ETL nocturn care procesează ultimele 24 de ore de date e „batch”, dar doar fiindcă cineva a decis că o dată pe zi e suficient de des. Dacă ai rula același job în fiecare oră, apoi în fiecare minut, apoi la fiecare 10 secunde, ai face streaming micro-batch fără să schimbi logica. La limită, fiecare-rând-pe-măsură-ce-sosește, ai face streaming record-cu-record. Diferența este latența, nu operația.
Aceasta este intuiția centrală din spatele API-ului modern de streaming din Spark. În loc să aibă un set de operatori pentru batch și un set total diferit pentru streaming, Spark le-a unificat: aceleași select, filter, groupBy, join pe care le-ai scris timp de patruzeci și cinci de lecții funcționează și pe DataFrame-uri streaming. Înveți API-ul o dată, îl reutilizezi.
Modelul mental care face să se prindă piesa este tabela infinită.
Tabela infinită
Imaginează-ți un tabel care nu e niciodată terminat de scris. De fiecare dată când un eveniment nou ajunge la sursă, un rând nou e adăugat la sfârșit. Tabelul crește monoton. Nu vezi niciodată un rând să dispară; vezi doar unele noi adăugate.
Așa modelează Spark Structured Streaming un stream: ca un tabel logic care crește în timp. Topicul Kafka cu trei mesaje noi în secunda asta? Acelea sunt trei rânduri noi în tabel. Directorul de fișiere de log unde au apărut două fișiere noi? Acele fișiere noi contribuie cu rânduri noi. Tabelul este conceptual infinit, dar la orice moment dat are un număr specific de rânduri, cele care au sosit până atunci.
Când scrii o interogare streaming, descrii o transformare pe acest tabel infinit. Treaba lui Spark e să țină acea transformare la zi pe măsură ce sosesc rânduri noi. Dacă interogarea ta e SELECT count(*) FROM events WHERE country = 'IT', Spark menține un contor curent și emite o valoare actualizată de fiecare dată când tabelul de input crește. Dacă interogarea ta e SELECT user_id, COUNT(*) FROM clicks GROUP BY user_id, Spark menține un contor curent per utilizator și emite delta-urile.
Interogarea pe care o scrii arată ca SQL batch. Execuția pe dedesubt este incrementală: în loc să recalculeze de la zero la fiecare trigger, Spark află ce e nou și actualizează rezultatul.
Modelul micro-batch
Cum rulează Spark efectiv asta? În mod implicit, trișează un pic. Nu procesează evenimentele unul câte unul, ci le procesează în micro-batch-uri.
Un micro-batch e exact ce sună: un batch mic. La fiecare interval de trigger (implicit: cât de repede posibil, de obicei la fiecare câteva sute de milisecunde), Spark face următoarele:
- Întreabă fiecare sursă ce date noi au apărut de la ultimul micro-batch (fișiere noi în director, offset-uri noi în Kafka, rânduri noi în sursa rate).
- Citește acele date noi într-un DataFrame regulat.
- Rulează interogarea ta împotriva lor, posibil combinate cu starea din micro-batch-uri anterioare (pentru lucruri precum agregări curente).
- Scrie output-ul în sink.
- Înregistrează offset-urile consumate în checkpoint astfel încât următorul micro-batch să știe de unde să continue.
Rezultatul: latențe end-to-end de aproximativ 100 ms până la câteva secunde, în funcție de cât de grea e munca și cât de mari sunt micro-batch-urile. Nu e streaming „real” sub-milisecundă, dar e suficient de rapid pentru aproape orice caz de utilizare analitic (monitorizare de fraudă, actualizări de dashboard, ETL într-un warehouse, detecție de anomalii la scări de timp umane).
Procesarea micro-batch moștenește toate punctele forte ale Spark batch: optimizatorul Catalyst planifică fiecare micro-batch, Tungsten generează cod pentru el, toleranța la erori vine din mecanismul existent de retry batch și poți folosi fiecare operator pe care îl știi deja. Prețul este latența: fiecare micro-batch are overhead, deci nu poți, realist, să împingi intervalele sub ~100 ms.
Modul continuous: cealaltă cale experimentală
Pentru workload-uri care au nevoie reală de latență single-row, sub-milisecundă, adiacent high-frequency trading, alerte real-time pe evenimente individuale, servire de feature-uri cu latență mică, Spark oferă și modul de procesare continuous, declanșat cu .trigger(continuous="100 ms").
În modul continuous, Spark nu rulează micro-batch-uri. Task-uri de lungă durată stau pe executoare și procesează înregistrări pe măsură ce sosesc, una câte una. Latența end-to-end scade în intervalul milisecundelor.
Capcana: modul continuous suportă un set mult mai mic de operații. La momentul Spark 4.x încă suportă doar operații map-like (select, filter, cast, withColumn) și o mână de surse/sink-uri. Fără agregări, fără join-uri, fără windowing. A fost marcat experimental ani la rând și suprafața suportată abia a crescut. Pentru 99% din job-uri, micro-batch e ce vrei; modul continuous e o unealtă pentru o nișă specifică unde latența micro-batch chiar nu e suficientă.
O să petrec restul acestui modul pe calea micro-batch, fiindcă asta e ce e Spark streaming în producție.
O scurtă istorie: DStreams și de ce au dispărut
Dacă citești cărți, postări de blog sau răspunsuri Stack Overflow mai vechi, vei vedea referințe la DStreams, prescurtare pentru „discretized streams”. DStreams a fost primul API de streaming din Spark, introdus în Spark 1.x.
DStreams erau bazate pe RDD. Un DStream era conceptual o secvență de RDD-uri, una per interval de micro-batch, și o transformai cu operații în stil RDD (map, flatMap, reduceByKey etc.). A funcționat, a rulat în producție în multe companii și a învățat echipa multe, dar avea probleme:
- Era separat de DataFrame-uri și optimizatorul Catalyst. Nu puteai reutiliza logica ta DataFrame batch; trebuia să o rescrii în operații RDD.
- Era bazat pe timp doar pe processing time, nu pe event time. Calculul de ferestre peste când evenimentele s-au întâmplat efectiv (mai degrabă decât când Spark le-a procesat) era dureros.
- Povestea toleranței la erori era per-RDD. Evenimentele întârziate sau în afara ordinii erau dificile.
- API-ul era de nivel mai jos decât API-ul DataFrame, deci două baze de cod paralele au crescut: batch în DataFrame-uri, streaming în DStreams. Dureros pentru o echipă de întreținut.
Structured Streaming, introdus în Spark 2.x, înlocuiește DStreams cu API-ul în formă de tabel pe care l-am descris mai sus. Folosește DataFrame-uri, rulează prin Catalyst și Tungsten și suportă procesarea event-time corect (lecția 52, watermarks).
DStreams a fost depreciat din Spark 3.4 și modulul pyspark.streaming, inclusiv vechile clase StreamingContext și DStream, a fost eliminat în Spark 4. Dacă găsești un tutorial care importă from pyspark.streaming import StreamingContext, închide tab-ul. Codul ăla nu va rula pe un Spark curent și nu există motiv să scrii cod nou în acel stil.
Pentru restul acestui modul, „streaming” înseamnă Structured Streaming. Capitolul DStreams e închis.
O degustare: un job de streaming în cinci linii
Ca să ancorăm abstracția, iată un job de streaming complet. Urmărește un director, citește orice fișier CSV nou care apare și afișează un contor de rânduri pe secundă în consolă:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, window
spark = SparkSession.builder.appName("HelloStream").getOrCreate()
events = (spark.readStream
.schema("user_id STRING, action STRING")
.csv("/tmp/incoming/"))
counts = (events
.withColumn("ts", current_timestamp())
.groupBy(window("ts", "1 second"))
.count())
query = (counts.writeStream
.outputMode("complete")
.format("console")
.start())
query.awaitTermination()
Cinci lucruri de observat:
spark.readStreamîn loc despark.read. Asta e toată diferența de API pe partea de input. Restul, schemă, format, opțiuni, arată identic cu o citire batch.- Rezultatul
readStreameste un DataFrame. Mai exact,events.isStreamingreturneazăTrue, dar e încă un DataFrame regulat în ce privește transformările tale. ApelurilewithColumn,groupBy,windowsunt exact API-ul batch. writeStreamîn loc dewrite. Aceeași diferență pe partea de output.outputMode("complete")spune „emit întregul tabel de rezultat curent la fiecare micro-batch”, potrivit pentru o agregare. Vom parcurge cele trei moduri de output (append, complete, update) în lecția 50.awaitTermination()ține procesul în viață în timp ce interogarea streaming rulează. Fără el, scriptul tău se închide și Spark oprește interogarea.
Aruncă un CSV în /tmp/incoming/, urmărește consola care afișează contoare, aruncă altul, urmărește contoarele care se actualizează. Faci streaming.
Încotro merge acest modul
Următoarele trei lecții acoperă fundațiile:
- Lecția 50:
readStream,writeStream, tipurile de trigger și checkpoint-ul atât de important. Mecanica efectivă a rulării unui job streaming. - Lecția 51: sursa Kafka, care e ce citesc 90% din job-urile streaming de producție. Offset-uri, deserializare, semantica exactly-once, capcanele.
- Lecția 52: event time și watermarks. Piesa care face ca „la ce oră s-a întâmplat efectiv evenimentul ăsta?” să funcționeze corect chiar și când evenimentele sosesc în afara ordinii.
După aceea vom acoperi modurile de output (53), operațiile cu stare (54) și join-urile stream-stream (55) înainte de a încheia modulul cu partea de operațiuni: deployment de producție, monitorizare, modurile de eșec (56-58).
Pentru moment, concluzia: streaming nu e o lume magică separată. E același API DataFrame pe care îl știi deja, aplicat datelor care n-au terminat de sosit, rulate incremental pe micro-batch-uri, cu un checkpoint care urmărește progresul. Odată ce imaginea aia e în capul tău, restul e mecanică.
Referințe: Apache Spark Structured Streaming Programming Guide (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) și notele de migrare Spark 4.0 despre eliminarea pyspark.streaming (https://spark.apache.org/docs/latest/streaming/). Consultat 2026-05-01.