Le ultime lezioni le abbiamo passate a costruire DataFrame, trasformarli e scriverli. Ogni volta che concatenavi una .select() o una .filter(), Spark in realtà non faceva nulla: si limitava a ricordare. E ogni volta che alla fine chiamavi .count() o .write.parquet(), qualcosa si metteva in moto. Quel qualcosa è il DAG: un grafo aciclico diretto di operatori che Spark costruisce in memoria e poi esegue a blocchi.
La lezione di oggi è su come leggere quel grafo. Una volta che riesci a guardare la tab Stages della Spark UI e individuare al volo quale operazione è quella lenta, fare debug di Spark smette di essere mistico. Diventa la stessa cosa che leggere un piano d’esecuzione di SQL Server: trovi la freccia grossa, sistemi la freccia grossa.
Cosa significa davvero “DAG”
DAG = directed acyclic graph, grafo aciclico diretto. Tre parole, tutte sostanza:
- Graph (grafo): nodi (operatori) collegati da archi (flusso dei dati).
- Directed (diretto): gli archi hanno una direzione; i dati scorrono in un senso solo.
- Acyclic (aciclico): nessun ciclo; un nodo non si rialimenta mai da solo.
Ogni job PySpark è un DAG. Quando scrivi:
df = spark.read.csv("orders.csv", header=True)
filtered = df.filter(col("country") == "IT")
agg = filtered.groupBy("status").sum("total")
agg.write.parquet("out/")
Spark non costruisce quattro cose separate. Ne costruisce una sola: Read CSV -> Filter -> GroupBy -> Sum -> Write Parquet. Pigramente. Niente è ancora stato eseguito. Il driver conosce soltanto la forma del lavoro.
L’action, .write.parquet(), è ciò che fa partire l’esecuzione. In quel momento Spark fa passare il grafo attraverso il Catalyst optimizer, applica regole (predicate pushdown, column pruning, constant folding), produce un physical plan, e poi lo divide in stage.
Cos’è uno stage
Uno stage è un pezzo contiguo del DAG che Spark riesce a eseguire da capo a fondo senza spostare dati tra executor. Finché ogni operazione è narrow, cioè ogni partizione di output dipende da una sola partizione di input, il lavoro resta locale. Filter, select, withColumn, cast, mappature semplici: tutte narrow. Spark le esegue in un’unica passata per partizione, niente rete, niente shuffle su disco tra operatori.
Una wide transformation spezza la catena. Wide significa che una partizione di output dipende da molte partizioni di input: groupBy, join, distinct, orderBy, repartition. Per risolvere “tutte le righe con la stessa chiave finiscono nello stesso posto”, Spark deve ridistribuire i dati tra gli executor. Quella ridistribuzione è il confine tra due stage.
La regola è semplice e vale la pena memorizzarla:
Uno stage = un blocco di lavoro narrow. I confini tra stage = shuffle.
Se il tuo DAG ha tre shuffle, hai quattro stage. Se ha zero shuffle (una pipeline ETL pulita che filtra e scrive), hai uno stage. Questo è il modello mentale che ti farà risparmiare ore.
Task: l’unità di esecuzione
Ogni stage viene suddiviso in task, uno per partizione. Uno stage che opera su 200 partizioni diventa 200 task. Lo scheduler di Spark distribuisce i task ai core degli executor; ogni core esegue un task alla volta. Se hai 10 executor con 4 core ciascuno = 40 core totali, i tuoi 200 task girano in circa 5 ondate da 40.
Questo è tutto il modello d’esecuzione. Gli stage girano in sequenza (uno stage non può partire finché i suoi parent non hanno finito, perché gli serve l’output shufflato). I task all’interno di uno stage girano in parallelo fino al limite di core del cluster.
Un esempio concreto
Considera questo job, che è più o meno il canonico “primo workload Spark serio” che scrivono tutti:
from pyspark.sql.functions import col, sum as _sum
orders = (
spark.read
.option("header", True)
.option("inferSchema", True)
.csv("s3://runehold/orders/2026-04/")
)
result = (
orders
.filter(col("country") == "IT")
.groupBy("status")
.agg(_sum("total").alias("total_revenue"))
)
result.write.mode("overwrite").parquet("s3://runehold/reports/it-revenue/")
Quanti stage costruisce Spark? Vediamoli.
- Read CSV + filter: entrambi narrow. Stesso stage. Chiamiamolo Stage 0.
- groupBy(“status”).agg(…): wide. Innesca uno shuffle. Confine di stage.
- Dopo lo shuffle, Spark fa l’aggregazione finale per gruppo, poi scrive Parquet. Entrambi narrow. Stage 1.
Quindi questo job ha due stage. C’è una sottigliezza: Spark fa un’aggregazione parziale prima dello shuffle (un HashAggregate in modalità partial) in modo che sulla rete viaggino solo le somme parziali per partizione, non ogni riga. Questa è una delle ottimizzazioni che rende groupBy meno costoso di quanto la gente teme. Torneremo sul tema nella lezione 27 quando parleremo di combiner.
Nella Spark UI, questo si traduce in:
- Stage 0: task che leggono i file di input, filtrano, aggregano parzialmente. Output: somme parziali scritte su disco locale, partizionate per hash di
status. - Stage 1: task che recuperano le proprie shuffle partition assegnate, completano l’aggregazione, scrivono Parquet.
Se il tuo input erano 200 file CSV, lo Stage 0 ha 200 task. Il numero di task dello Stage 1 dipende da spark.sql.shuffle.partitions (default 200, sì, lo stesso default che tutti si dimenticano di tunare nei job piccoli).
Leggere la tab Stages della Spark UI
Apri http://localhost:4040 (o l’URL della UI del driver, qualunque sia) e clicca su Stages. Vedi una tabella per job. Le colonne che contano:
- Stage Id: identificatore, ordinato per esecuzione.
- Description: la migliore ipotesi di Spark su cosa fa lo stage (spesso mostra la riga del tuo codice che l’ha innescato).
- Submitted / Duration: quando è partito lo stage, quanto è durato.
- Tasks (Succeeded/Total): quanti task sono girati. Numero grande = parallelismo alto (di solito un bene). Numero pari a
1= collo di bottiglia seriale (male). - Input: byte letti da fonte esterna (S3, HDFS, FS locale).
- Output: byte scritti.
- Shuffle Read: byte che questo stage ha tirato dagli stage precedenti via rete.
- Shuffle Write: byte che questo stage ha scritto su disco locale perché lo stage successivo li consumi.
- GC Time: tempo speso nel garbage collection della JVM. Se è oltre il 10% circa della durata, hai un problema di memoria.
Clicca su uno stage per scendere nel dettaglio. La pagina di dettaglio ha il task summary: min, 25esimo percentile, mediana, 75esimo percentile, max per durata, GC time, dimensione input, shuffle read/write e così via.
Qui trovi lo skew. Confronta il task time mediano col task time massimo:
- Mediana: 2 secondi. Max: 3 secondi. Sano.
- Mediana: 2 secondi. Max: 90 secondi. Un task ha 45 volte il lavoro del task tipico. Quello è skew. Una chiave ha molte più righe delle altre, e un task sta trascinando l’intero stage. La lezione 30 copre cosa farci (salting, broadcast join, AQE).
Il link DAG Visualization in alto nella pagina di dettaglio dello stage disegna il grafo degli operatori solo per quello stage. Utile per verificare “sì, il filtro è stato pushato giù, la projection è narrow, l’aggregazione parziale è avvenuta dove me l’aspettavo”.
Pipelining all’interno di uno stage
All’interno di un singolo stage, Spark non materializza i risultati intermedi tra operatori. Se il tuo stage è Read CSV -> Filter -> Project -> Write, ogni riga (o batch vettorizzato) attraversa tutti gli operatori in sequenza prima che inizi la successiva. Gli operatori vengono fusi insieme, spesso in bytecode generato: la whole-stage code generation di Tungsten.
Per questo le narrow transformation sono così economiche. Il costo di uno stage è dominato dalla lettura dell’input e dalla scrittura dell’output. Aggiungere un withColumn in mezzo è, all’incirca, gratis. L’eccezione è qualunque cosa rompa la pipeline (per esempio una Python UDF che Spark non riesce a inlinare). La lezione 41 copre le performance delle UDF.
Quando uno stage spilla
Se il working set di uno stage non sta nella memoria dell’executor, Spark spilla i dati intermedi su disco locale. La Spark UI lo mostra nel dettaglio per task sotto Shuffle Spill (Memory) e Shuffle Spill (Disk).
Uno spill piccolo va bene: Spark è pensato per gestirlo con grazia. Uno spill grosso è una bandiera rossa enorme: lo stage sta girando 5-20 volte più lento del dovuto perché invece di lavorare in RAM sta pompando dati attraverso il disco.
Cause comuni:
- Chiave skewed in un groupBy o join (un task riceve troppo).
spark.sql.shuffle.partitionsimpostato troppo basso (ogni partizione è enorme).- Memoria executor troppo piccola per il workload.
Andremo a fondo sul tuning della memoria nella lezione 57. Per ora, quando vedi “Shuffle Spill (Disk): 12.3 GB” nella UI, la mossa è: aumentare le shuffle partition, aumentare la memoria executor, o cercare lo skew.
La relazione tra job, stage e task
Un veloce ripasso di gerarchia, perché la Spark UI usa tutti e tre i termini:
- Job: cosa innesca un’action. Ogni
.count(),.write(),.collect()produce un job. (A volte di più: ogni tanto l’optimizer fa partire un piccolo job per calcolare prima delle statistiche.) - Stage: un blocco di lavoro narrow all’interno di un job. Un job ha 1 o più stage, separati da shuffle.
- Task: uno stage che gira su una partizione. Uno stage ha N task dove N = numero di partizioni.
Quindi: job -> stage -> task. La Spark UI ha tab per tutti e tre. La maggior parte della tua vita di debug la passerai nella tab Stages, scendendo ogni tanto in un task specifico.
Come AQE riscrive il DAG a runtime
Un’ultima sfumatura che vale la pena menzionare presto. Spark moderno (3.0+) ha l’Adaptive Query Execution (spark.sql.adaptive.enabled = true, default dalla 3.2). Quando AQE è attivo, il DAG non è del tutto fissato al momento della submission. Dopo ogni shuffle, Spark guarda le dimensioni reali dei dati shufflati e può:
- Coalescere molte shuffle partition piccole in poche più grandi (evita 200 nano-task su un risultato piccolo).
- Trasformare un sort-merge join in un broadcast join se la build side si è rivelata piccola.
- Spezzare partizioni skewed in sotto-partizioni che più task possono masticare in parallelo.
Quindi lo stage che vedi nella UI può avere una forma leggermente diversa da quella che predirresti dal solo codice. La Spark UI mostra utilmente i piani riscritti da AQE con un marker. Torneremo su AQE in dettaglio nella lezione 33; lo segnalo adesso così non ti confondi quando la UI mostra meno task di quanti spark.sql.shuffle.partitions farebbe pensare.
Provalo sulla tua macchina
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, rand
spark = (
SparkSession.builder
.appName("dag-demo")
.config("spark.sql.shuffle.partitions", "16")
.getOrCreate()
)
# Sintetizza 1M di righe su 200 partizioni
df = (
spark.range(0, 1_000_000, numPartitions=200)
.withColumn("country", (col("id") % 5).cast("string"))
.withColumn("status", (col("id") % 3).cast("string"))
.withColumn("total", rand() * 1000)
)
# Un job a due stage: filter (narrow) + groupBy (wide)
result = (
df.filter(col("country") == "1")
.groupBy("status")
.agg(_sum("total").alias("revenue"))
)
result.show()
# Ora vai su http://localhost:4040, clicca Jobs, clicca il job appena girato,
# entra in ogni stage, guarda il task summary. Trova lo shuffle write
# nello stage 0 e il corrispondente shuffle read nello stage 1.
input("Premi Invio per uscire (mantiene la UI viva)... ")
spark.stop()
L’input() tiene viva la sessione Spark in modo che tu possa girare per la UI prima che il driver si spenga. Una volta spento, la UI sparisce (a meno che tu non abbia configurato l’History Server, che è un argomento per la lezione 60).
Due stage, uno shuffle, un’aggregazione ovvia. Il DAG non banale più semplice, e quello che dovresti riuscire a leggere nel sonno prima di andare oltre.
La prossima lezione parla di caching e persistence: quando dici a Spark “ricordati di questo DataFrame, lo userò di nuovo”. Copriremo i sette storage level e i pattern in cui il caching paga davvero, prima che la lezione 24 si giri e spieghi quando il caching è invece una cattiva idea.