Se ti ricordi una sola cosa di tutto questo corso, falla diventare il contenuto di questa lezione.
Ogni transformation in Spark cade in una di due categorie. Le transformation narrow girano alla velocità dell’hardware attraverso il cluster senza coordinazione. Le transformation wide richiedono che tutti gli executor si fermino, scambino dati sulla rete e riprendano: un’operazione chiamata shuffle che costa ordini di grandezza in più di tutto ciò che le sta intorno.
La linea tra narrow e wide è il singolo confine più importante in Spark. Ogni domanda di performance su Spark (perché è lento, come lo rendo più veloce, perché il mio cluster è caduto, perché un executor sta facendo tutto il lavoro) torna a questa distinzione. Il resto del corso è in gran parte sull’evitare, minimizzare o ottimizzare le transformation wide.
Un rapido recap delle partition
Prima che la definizione abbia senso, il modello delle partition. Un DataFrame Spark è diviso in molte partition, tipicamente da 100 a qualche migliaio, distribuite sugli executor del cluster. Ogni partition vive su una macchina ed è processata da un task. Finché il lavoro di ogni task è indipendente dagli altri, il cluster è contento e il tuo job gira alla velocità del task più lento. Le cose smettono di essere contente quando un task sulla macchina A ha bisogno di sapere qualcosa che si trova sulla macchina B. È allora che i dati devono spostarsi.
La definizione
Una transformation narrow è quella in cui ogni partition di output dipende da esattamente una partition di input. Ogni task può girare indipendentemente sulla sua macchina, guardando solo la sua fetta di dati, e produrre la sua fetta di output. Nessuna coordinazione. Nessuna rete.
Una transformation wide è quella in cui ogni partition di output dipende da dati di più partition di input. Il cluster deve ridistribuire le righe in modo che le righe che devono finire nella stessa partition di output finiscano sulla stessa macchina. Questa ridistribuzione è lo shuffle: ogni executor scrive le sue righe in un file di disco locale partizionato per destinazione, poi ogni altro executor rilegge le righe destinate a lui. Movimento di dati all-to-all.
Concreto:
- Un
filter(col("Country") == "IT")guarda una riga alla volta. Ogni task legge la sua partition, tiene le righe IT, scarta il resto, scrive la sua fetta di output. Le altre partition non contano. Narrow. - Un
groupBy("Country").agg(F.sum("Total"))ha bisogno che ogni riga IT finisca nello stesso posto per essere sommata. Alcune righe IT sono sulla macchina A, altre sulla B, altre sulla C. Devono incontrarsi. Shuffle. Wide.
Il catalogo
Transformation narrow (no shuffle, economiche):
select,selectExpr(quando non c’è DISTINCT o aggregazione nell’espressione)where/filterwithColumn,withColumnRenamed,dropunion,unionByName- aggiunge righe; nessuna riga deve spostarsina.fill,na.drop,na.replacesample(senza replacement, nella maggior parte delle modalità)map,mapPartitions- le operazioni stile RDD riga per riga, incluse per completezza
Transformation wide (shuffle richiesto, costose):
groupBy(*cols).agg(...)- raggruppare per chiave richiede che le righe con la stessa chiave si trovino insiemedistinct(),dropDuplicates(*cols)- lo stesso: la deduplica ha bisogno delle righe con lo stesso valore insiemejoin(other, on, how)- entrambi i lati devono avere le chiavi corrispondenti collocate insieme, eccetto per i broadcast join (lezione 25)orderBy/sort- il sort globale richiede range-partitioning attraverso il clusterrepartition(n),repartition(n, col),repartitionByRange(...)- queste esistono letteralmente per fare shuffleintersect,exceptAll,subtract- operazioni insiemistiche sull’intero DataFrame- Window function con una clausola
partitionBy(...)- hanno bisogno delle righe nella stessa finestra insieme
L’eccezione onorevole:
coalesce(n)riduce il numero di partition combinando quelle adiacenti. Non fa shuffle; è narrow. Il trade-off è che puoi solo scendere (non puoi passare da 4 partition a 200 concoalesce), e le partition di output possono diventare sbilanciate. Usacoalesceper scendere da 200 a 10 prima di scrivere un piccolo file. Usarepartitionquando hai bisogno di bilanciamento o di salire.
L’immagine
Diagramma ASCII. Quattro partition di input, in cima, processate da quattro task. Due flussi: narrow (filter), e wide (groupBy).
Narrow transformation: filter(col("Country") == "IT")
Input: [P0] [P1] [P2] [P3]
| | | |
v v v v
task0 task1 task2 task3 (parallel, independent)
| | | |
v v v v
Output: [P0'] [P1'] [P2'] [P3']
Each task reads exactly one partition, writes one partition. No data
crosses machines. Wall-clock time = time of slowest task.
Wide transformation: groupBy("Country").agg(...)
Input: [P0] [P1] [P2] [P3]
| | | |
v v v v
task0 task1 task2 task3 (partial aggregate)
| | | |
+----+----+----+----+----+----+
| | |
v v v === SHUFFLE ===
IT NL DE (network + disk)
| | |
v v v
task_IT task_NL task_DE (final aggregate)
| | |
v v v
Output: [Pa] [Pb] [Pc]
Guarda quella sezione centrale. Nel caso narrow, quattro task fanno ognuno la propria cosa in parallelo e abbiamo finito. Nel caso wide, quattro task fanno lavoro parziale, poi ogni task deve mandare le sue righe alla destinazione giusta (righe IT in un posto, righe NL in un altro, righe DE in un terzo) sulla rete. Poi un secondo round di task fa l’aggregazione finale.
Questo è lo shuffle. È la cosa lenta e costosa nel calcolo distribuito. Ed è la cosa a cui ogni discussione di performance in Spark torna alla fine.
Perché gli shuffle sono costosi
Tre costi, tutti reali. Disk write: ogni executor, prima di mandare fuori le sue righe, le scrive su disco locale partizionate per destinazione, in modo che il lato ricevente possa tirarle on demand e i task falliti possano essere ritentati senza rieseguire il lavoro a monte. Network transfer: le righe si muovono attraverso la rete del cluster; in uno shuffle a 200 partition su un cluster a 10 nodi, ogni nodo manda a ogni altro nodo simultaneamente, e la banda diventa il collo di bottiglia. Disk read: l’executor ricevente legge i file di shuffle in memoria per fare il suo lavoro.
Una pipeline di sole transformation narrow gira alla velocità di CPU più storage. Uno shuffle aggiunge un round-trip di disco-rete-disco per l’intero dataset. Cinque shuffle lo fanno cinque volte sequenzialmente, perché ogni shuffle è una barriera: il prossimo stage non può iniziare finché lo shuffle del precedente non è finito. In numeri pratici: uno stage solo narrow su un DataFrame da 10 GB potrebbe girare in 30 secondi; aggiungi un groupBy e la stessa pipeline spesso impiega 2-3 minuti.
Come individuare uno shuffle in .explain()
Spark espone lo shuffle direttamente nel physical plan. Cerca l’operatore Exchange:
df = spark.createDataFrame(
[(i, "IT" if i % 3 == 0 else "NL", float(i)) for i in range(1000)],
"OrderId INT, Country STRING, Total DOUBLE",
)
# Narrow only — no Exchange
(df.where(col("Country") == "IT")
.withColumn("WithVat", col("Total") * 1.22)).explain()
# *(1) Project [..., (Total * 1.22) AS WithVat]
# +- *(1) Filter (Country = IT)
# +- *(1) Scan ExistingRDD[...]
# Add a groupBy — Exchange appears
(df.where(col("Country") == "IT")
.groupBy("Country")
.agg(F.sum("Total").alias("TotalIT"))).explain()
# *(2) HashAggregate(keys=[Country], functions=[sum(Total)])
# +- Exchange hashpartitioning(Country, 200), ENSURE_REQUIREMENTS, ...
# +- *(1) HashAggregate(keys=[Country], functions=[partial_sum(Total)])
# +- *(1) Filter (Country = IT)
# +- *(1) Scan ExistingRDD[...]
Eccolo lì l’Exchange hashpartitioning(Country, 200). I *(1) e *(2) sono ID di stage: Spark divide il lavoro al confine dello shuffle. Lo stage 1 legge, filtra e fa l’aggregazione parziale per partition. Lo stage 2 prende l’output shuffolato, finalizza l’aggregazione e restituisce il risultato.
Lo schema da interiorizzare: ogni Exchange è uno shuffle, un confine di stage, un pezzo di costo di rete/disco.
Stage e DAG
Definizione veloce: uno stage è un pezzo di lavoro che può girare come una pipeline di transformation narrow senza shuffle nel mezzo. Ogni Exchange è il confine tra due stage. Una pipeline con due shuffle ha tre stage; una pipeline senza shuffle è un solo stage. Ogni stage deve finire prima che inizi il prossimo, perché il suo input è l’output dello shuffle del precedente. Regola empirica: conta le transformation wide e hai grosso modo contato gli stage. La storia completa del DAG arriva nella prossima lezione.
Le regole di performance che ne escono
Una volta interiorizzato narrow contro wide, ogni raccomandazione di performance Spark ha senso:
Filtra e proietta prima dei passi wide. Un where e un select sono narrow e gratis; farli presto rimpicciolisce i dati che entrano nello shuffle successivo. L’optimizer spesso te li spinge giù, ma scriverli nel posto giusto rende l’intento ovvio.
Evita di ordinare se non hai davvero bisogno di output ordinato. orderBy è un sort globale attraverso il cluster, una delle cose più costose che puoi fare. Se hai bisogno solo dei top 10, df.orderBy("col").limit(10) permette a Spark di fare un’ottimizzazione TopK, ma è comunque più economico evitare il sort del tutto quando puoi.
Usa i broadcast join per piccolo-su-grande. Se un lato di un join ci sta nella memoria del driver (sotto qualche centinaio di MB), Spark può fare il broadcast a ogni executor ed evitare lo shuffle sul lato grande del tutto. La lezione 25 copre questo; può trasformare un join da 10 minuti in uno da 30 secondi.
Riusa i risultati intermedi. Se hai già pagato il costo di uno shuffle una volta, cacha il risultato prima di fare diverse aggregazioni sopra. La lezione 23 copre la strategia di caching.
Non fare repartition se non hai un motivo. repartition() è essa stessa una transformation wide. Chiamarla per ragioni cosmetiche significa pagare uno shuffle per nessun beneficio. Buoni motivi: skew severa, pre-partizionamento per molte aggregazioni su una chiave, o controllo del numero di file di output prima di una scrittura.
Usa coalesce per ridurre le partition prima della scrittura. Se il tuo DataFrame ha 200 partition e stai scrivendo un piccolo risultato, coalesce(8) dà 8 file di output senza shuffle.
Un avvertimento sullo skew: il modello narrow contro wide assume che le partition di output dello shuffle siano grosso modo bilanciate. Nella vita reale spesso non lo sono. Se l’80% dei tuoi ordini viene dall’Italia e fai groupBy("Country"), il task di reduce per IT prende l’80% dei dati, gira 50 volte più lentamente del resto e domina il wall-clock time. Le colonne categoriali con un valore dominante sono i soliti sospetti. La diagnosi e le soluzioni vengono più avanti nel corso.
Mettiamo insieme
Una pipeline realistica. Marca ogni riga come narrow (N), wide (W) o action (A):
df = (spark.read.parquet("./orders") # ___
.where(col("OrderDate") >= "2026-01-01") # ___
.withColumn("Total", # ___
col("Total").cast("double"))
.join(customers, "CustomerId", "left") # ___
.groupBy("Country", "CustomerId") # ___
.agg(F.sum("Total").alias("LTV"))
.orderBy(F.desc("LTV")) # ___
.limit(100) # ___
.write.parquet("./out/top100")) # ___
Read, filter (N), withColumn (N), join (W), groupBy.agg (W), orderBy (W), limit (N), write (A). Tre transformation wide, quattro stage, una action. Il costo è dominato dai tre shuffle (il join, il group, il sort), non dai passi narrow.
Se questa pipeline fosse lenta, la soluzione non è nelle righe narrow. È: si potrebbe fare il broadcast di customers? Si potrebbe raggruppare prima su una cardinalità più piccola? Le leve sono sempre le stesse: rendere gli shuffle più piccoli, meno o evitabili.
Esegui questo sulla tua macchina
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col
spark = (SparkSession.builder.appName("NarrowWide").master("local[*]").getOrCreate())
df = spark.createDataFrame(
[(i, "IT" if i % 3 == 0 else "NL", float(i)) for i in range(1000)],
"OrderId INT, Country STRING, Total DOUBLE",
)
# 1. Pure narrow chain — no Exchange in the plan
narrow = (df
.where(col("Country") == "IT")
.withColumn("WithVat", col("Total") * 1.22)
.withColumn("Region", F.lit("EU")))
narrow.explain()
# 2. Single wide step — one Exchange, two stages
single_wide = narrow.groupBy("Country").agg(F.sum("WithVat").alias("Total"))
single_wide.explain()
# 3. Multiple wide steps — multiple Exchanges, more stages
multi_wide = (narrow
.join(df.select("OrderId", "Total").alias("d2"), "OrderId")
.groupBy("Country").agg(F.sum("WithVat").alias("Total"))
.orderBy(F.desc("Total")))
multi_wide.explain()
# 4. coalesce vs repartition: count Exchange operators
coalesced = narrow.coalesce(2)
coalesced.explain() # no Exchange — just partition combining
repartitioned = narrow.repartition(8)
repartitioned.explain() # Exchange present — full shuffle
# 5. Run the wide pipeline and check the Spark UI for stage count
multi_wide.write.mode("overwrite").parquet("./tmp/wide_demo")
# Open http://localhost:4040 — you should see one job with three stages
# (one per Exchange plus the final write).
Fissa quei piani finché l’operatore Exchange non diventi la cosa che i tuoi occhi cercano per primi. È il muscolo da costruire. Da qui in avanti, “questa pipeline sarà veloce?” è per lo più la stessa domanda di “quanti Exchange ha il suo piano, e quanto sono grandi i dati che passano per ognuno?”.
Prossima lezione: il DAG. Come Spark trasforma un piano logico in stage, cosa ti compra il lineage graph (fault tolerance gratis), e come leggere la visualizzazione del DAG nella Spark UI per qualunque job. La lezione 23 è il caching: il playbook su quando, cosa e come cachare in modo che il tuo lavoro iterativo non paghi la tassa dello shuffle due volte.