Am petrecut ultimele câteva lecții construind DataFrame-uri, transformându-le și scriindu-le. De fiecare dată când ai înlănțuit un .select() sau un .filter(), Spark nu a făcut nimic concret, doar a ținut minte. Și de fiecare dată când ai apelat în final .count() sau .write.parquet(), ceva s-a pus în mișcare. Acel ceva este DAG-ul: un graf orientat aciclic de operatori pe care Spark îl construiește în memorie și apoi îl execută în bucăți.
Lecția de azi e despre cititul acelui graf. Odată ce te poți uita la tab-ul Stages din Spark UI și poți identifica imediat care operație e cea lentă, debugging-ul în Spark încetează să mai fie mistic. Devine același lucru ca cititul unui plan de execuție SQL Server: găsești săgeata groasă, repari săgeata groasă.
Ce înseamnă de fapt „DAG”
DAG = directed acyclic graph (graf orientat aciclic). Trei cuvinte care fac muncă reală:
- Graph (graf), noduri (operatori) conectate prin muchii (flux de date).
- Directed (orientat), muchiile au o direcție; datele curg într-un singur sens.
- Acyclic (aciclic), fără bucle; un nod nu se reîntoarce niciodată în sine însuși.
Fiecare job PySpark e un DAG. Când scrii:
df = spark.read.csv("orders.csv", header=True)
filtered = df.filter(col("country") == "IT")
agg = filtered.groupBy("status").sum("total")
agg.write.parquet("out/")
Spark nu construiește patru lucruri separate. Construiește un singur graf: Read CSV → Filter → GroupBy → Sum → Write Parquet. Lazy. Nimic nu s-a executat încă. Driver-ul cunoaște doar forma muncii.
Acțiunea, .write.parquet(), este cea care declanșează execuția. În acel moment Spark trece graful prin Catalyst optimizer, aplică reguli (predicate pushdown, column pruning, constant folding), produce un plan fizic și apoi îl taie în stages.
Ce este un stage
Un stage este o porțiune contiguă din DAG pe care Spark o poate rula end-to-end fără să mute date între executors. Atâta timp cât fiecare operație este narrow, adică fiecare partiție de output depinde de exact o partiție de input, munca rămâne locală. Filter, select, withColumn, cast, map simplu: toate sunt narrow. Spark le poate rula pe toate într-o singură trecere per partiție, fără rețea, fără shuffle pe disc între operatori.
O transformare wide rupe lanțul. Wide înseamnă că o partiție de output depinde de mai multe partiții de input: groupBy, join, distinct, orderBy, repartition. Pentru a rezolva „toate rândurile cu aceeași cheie ajung în același loc”, Spark trebuie să redistribuie datele între executors. Acea redistribuire este granița dintre două stages.
Așa că regula e simplă și merită memorată:
Un stage = o porțiune de muncă narrow. Granițele dintre stages = shuffles.
Dacă DAG-ul tău are trei shuffles, ai patru stages. Dacă are zero shuffles (un pipeline ETL pur care doar filtrează și scrie), ai un singur stage. Acesta este modelul mental care îți va salva ore.
Tasks: unitatea de execuție
Fiecare stage e împărțit în tasks, una per partiție. Un stage care operează pe 200 de partiții devine 200 de tasks. Scheduler-ul Spark distribuie acele tasks către core-urile executor-ilor; fiecare core rulează un task la un moment dat. Dacă ai 10 executors cu 4 core-uri fiecare = 40 de core-uri în total, cele 200 de tasks rulează în aproximativ 5 valuri de câte 40.
Acesta e tot modelul de execuție. Stages rulează secvențial (un stage nu poate începe până când părinții lui nu termină, fiindcă are nevoie de output-ul lor după shuffle). Tasks în interiorul unui stage rulează în paralel până la capacitatea de core-uri a clusterului.
Un exemplu lucrat
Ia în considerare acest job, care e mai mult sau mai puțin „prima sarcină reală Spark” pe care o scrie toată lumea:
from pyspark.sql.functions import col, sum as _sum
orders = (
spark.read
.option("header", True)
.option("inferSchema", True)
.csv("s3://runehold/orders/2026-04/")
)
result = (
orders
.filter(col("country") == "IT")
.groupBy("status")
.agg(_sum("total").alias("total_revenue"))
)
result.write.mode("overwrite").parquet("s3://runehold/reports/it-revenue/")
Câte stages construiește Spark? Hai să le parcurgem.
- Read CSV + filter, ambele narrow. Același stage. Numește-l Stage 0.
- groupBy(“status”).agg(…), wide. Declanșează un shuffle. Granița dintre stages.
- După shuffle, Spark face agregarea finală per grup, apoi scrie Parquet. Ambele narrow. Stage 1.
Deci acest job are două stages. Există o subtilitate: Spark face o agregare parțială înainte de shuffle (numită HashAggregate cu mod partial), astfel încât doar sumele parțiale per partiție să fie trimise prin rețea, nu fiecare rând. Asta e una dintre optimizările care fac groupBy mai ieftin decât se tem oamenii. Vom reveni la asta în lecția 27 când vom discuta despre combiners.
În Spark UI, asta se traduce în:
- Stage 0: tasks care citesc fișierele de input, filtrează, fac agregarea parțială. Output: sume parțiale scrise pe discul local, partiționate după hash-ul lui
status. - Stage 1: tasks care își aduc partițiile de shuffle alocate, termină agregarea, scriu Parquet.
Dacă input-ul tău a fost 200 de fișiere CSV, Stage 0 are 200 de tasks. Numărul de tasks din Stage 1 depinde de spark.sql.shuffle.partitions (implicit 200, da, același default pe care toată lumea uită să-l ajusteze în job-uri mici).
Citirea tab-ului Stages din Spark UI
Deschide http://localhost:4040 (sau orice URL are UI-ul tău de driver) și apasă Stages. Vei vedea un tabel per job. Coloanele care contează:
- Stage Id, identificator, ordonat după execuție.
- Description, cea mai bună aproximare a lui Spark despre ce face stage-ul (adesea afișează linia din codul tău care l-a declanșat).
- Submitted / Duration, când a început stage-ul, cât a durat.
- Tasks (Succeeded/Total), câte tasks au rulat. Număr mare = paralelism ridicat (de obicei bun). Număr de
1= bottleneck serial (rău). - Input, bytes citiți dintr-o sursă externă (S3, HDFS, FS local).
- Output, bytes scriși.
- Shuffle Read, bytes pe care acest stage i-a tras din stages anterioare prin rețea.
- Shuffle Write, bytes pe care acest stage i-a scris pe discul local pentru ca următorul stage să-i consume.
- GC Time, timpul petrecut în garbage collection JVM. Dacă e mai mult de ~10% din durată, ai o problemă de memorie.
Apasă pe orice stage ca să intri în detalii. Pagina de detaliu are task summary: min, percentila 25, mediană, percentila 75, max pentru durată, GC time, input size, shuffle read/write etc.
Aici găsești skew. Compară timpul median al unui task cu timpul max:
- Mediană: 2 secunde. Max: 3 secunde. Sănătos.
- Mediană: 2 secunde. Max: 90 de secunde. Un task are de 45× munca task-ului tipic. Acela e skew. O cheie are mult mai multe rânduri decât celelalte și un task târăște tot stage-ul. Lecția 30 acoperă ce să faci cu asta (salting, broadcast joins, AQE).
Link-ul DAG Visualization din partea de sus a paginii de detaliu a stage-ului desenează graful operatorilor doar pentru acel stage. Util pentru a verifica „da, filtrul a fost împins în jos, proiecția e narrow, agregarea parțială s-a întâmplat unde mă așteptam”.
Pipelining în interiorul unui stage
În interiorul unui singur stage, Spark nu materializează rezultate intermediare între operatori. Dacă stage-ul tău e Read CSV → Filter → Project → Write, fiecare rând (sau batch vectorizat) curge prin toți operatorii în secvență înainte ca următorul să înceapă. Operatorii sunt fuzionați împreună, adesea în bytecode generat, whole-stage code generation al lui Tungsten.
De aceea transformările narrow sunt atât de ieftine. Costul unui stage e dominat de citirea input-ului și scrierea output-ului. Adăugarea unui withColumn la mijloc e, mai mult sau mai puțin, gratis. Excepția e orice rupe pipeline-ul (un UDF Python pe care Spark nu-l poate inline-ui, de exemplu). Lecția 41 acoperă performanța UDF-urilor.
Când un stage face spill
Dacă working set-ul unui stage nu încape în memoria executor-ului, Spark face spill datelor intermediare pe discul local. Spark UI arată asta în detaliul per task la Shuffle Spill (Memory) și Shuffle Spill (Disk).
Un spill mic e ok, Spark e proiectat să se descurce cu el grațios. Un spill mare e un steag roșu uriaș: stage-ul rulează de 5-20× mai lent decât ar trebui pentru că, în loc să lucreze în RAM, pompează date prin disc.
Cauze comune:
- Cheie cu skew într-un groupBy sau join (un task primește prea mult).
spark.sql.shuffle.partitionssetat prea jos (fiecare partiție e uriașă).- Memoria executor-ului prea mică pentru workload.
Vom intra în detaliu pe tuning-ul memoriei în lecția 57. Pentru moment, când vezi „Shuffle Spill (Disk): 12.3 GB” în UI, mișcarea ta este: crește numărul de shuffle partitions, crește memoria executor-ului sau caută skew.
Relația dintre jobs, stages și tasks
O scurtă reîmprospătare a ierarhiei pentru că Spark UI folosește toți cei trei termeni:
- Job, ce declanșează o acțiune. Fiecare
.count(),.write(),.collect()produce un job. (Uneori mai multe, optimizatorul declanșează ocazional un job mic ca să calculeze întâi statistici.) - Stage, o porțiune de muncă narrow în interiorul unui job. Un job are 1+ stages, separate prin shuffles.
- Task, un stage care rulează pe o partiție. Un stage are N tasks, unde N = numărul de partiții.
Deci: job → stages → tasks. Spark UI are tab-uri pentru toate trei. Cea mai mare parte a vieții tale de debugging va fi în tab-ul Stages, intrând ocazional într-un task specific.
Cum rescrie AQE DAG-ul la runtime
O ultimă răsucire care merită menționată devreme. Spark modern (3.0+) are Adaptive Query Execution (spark.sql.adaptive.enabled = true, default din 3.2). Când AQE e activat, DAG-ul nu e complet fix la momentul submit-ului. După fiecare shuffle, Spark se uită la dimensiunile reale ale datelor după shuffle și poate:
- Coalescea multe partiții mici de shuffle într-un număr mai mic de partiții mai mari (evită 200 de nano-task-uri pe un rezultat mic).
- Schimba un sort-merge join într-un broadcast join dacă build-side-ul s-a dovedit mic.
- Diviza partițiile cu skew în sub-partiții pe care mai multe tasks le pot mesteca în paralel.
Așa că stage-ul pe care îl vezi în UI poate avea o formă ușor diferită de cea pe care ai prezice-o doar din cod. Spark UI arată util planurile rescrise de AQE cu un marker. Vom reveni la AQE în detaliu în lecția 33; semnalez aici ca să nu te încurci când UI-ul arată mai puține tasks decât ar implica spark.sql.shuffle.partitions.
Rulează asta pe propria mașină
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, rand
spark = (
SparkSession.builder
.appName("dag-demo")
.config("spark.sql.shuffle.partitions", "16")
.getOrCreate()
)
# Synthesize 1M rows across 200 partitions
df = (
spark.range(0, 1_000_000, numPartitions=200)
.withColumn("country", (col("id") % 5).cast("string"))
.withColumn("status", (col("id") % 3).cast("string"))
.withColumn("total", rand() * 1000)
)
# A two-stage job: filter (narrow) + groupBy (wide)
result = (
df.filter(col("country") == "1")
.groupBy("status")
.agg(_sum("total").alias("revenue"))
)
result.show()
# Now go open http://localhost:4040, click Jobs, click the job that just ran,
# click into each stage, look at the task summary. Find the shuffle write
# in stage 0 and the matching shuffle read in stage 1.
input("Press Enter to exit (keeps the UI alive)... ")
spark.stop()
Apelul input() ține sesiunea Spark vie ca să poți cotrobăi prin UI înainte ca driver-ul să se închidă. Odată ce se închide, UI-ul dispare (cu excepția cazului în care ai configurat History Server-ul, care e un subiect pentru lecția 60).
Două stages, un shuffle, o agregare evidentă. Cel mai simplu DAG non-trivial și cel pe care ar trebui să fii capabil să-l citești cu ochii închiși înainte să mergi mai departe.
Lecția următoare e despre caching și persistence, atunci când îi spui lui Spark „ține minte acest DataFrame, îl voi folosi din nou”. Vom acoperi cele șapte storage levels și pattern-urile în care caching-ul chiar dă roade, înainte ca lecția 24 să se întoarcă și să explice când caching-ul e de fapt o idee proastă.