Buttiamo lì la parola “shuffle” da qualche lezione senza mai aprirla davvero. Oggi la apriamo. Alla fine di questo post dovresti riuscire a immaginarti, quasi fotogramma per fotogramma, cosa succede tra gli executor quando Spark fa un groupBy. Quel quadro mentale è ciò che separa gli ingegneri che sanno fare tuning di un job Spark dagli ingegneri che incollano la stessa .repartition(200) in ogni notebook e sperano.
Prima il concetto alto: uno shuffle è la ridistribuzione dei dati nel cluster perché i record con la stessa chiave finiscano nella stessa partizione. Qualunque cosa dipenda da “tutte le righe per la chiave X nello stesso posto” (group, join, distinct, sort) ne richiede uno. È la cosa più costosa che Spark fa di routine. I moduli 5 e 6 di questo corso sono in sostanza una lunga e paziente analisi di come farne meno e di come rendere quelli che non puoi evitare il più economici possibile.
Cosa innesca uno shuffle
Memorizza questa lista. Ogni volta che scrivi codice che include una di queste, immaginati i dati che volano sulla rete:
groupBy(...).agg(...): raggruppa per chiave; le righe con la stessa chiave devono co-locarsi.df.distinct(): deve deduplicare attraverso tutte le partizioni.df.dropDuplicates([...]): idem.df.join(other, on=...): entrambi i lati devono essere partizionati per la chiave di join (a meno che un lato non sia in broadcast, vedi lezione 28).df.orderBy(...)/df.sort(...): l’ordinamento globale richiede uno shuffle range-partitioned.df.repartition(...): per definizione, è il suo lavoro.df.repartitionByRange(...): idem, con un range partitioner.
Operazioni che non innescano uno shuffle (queste sono narrow):
select,filter,withColumn,drop,cast,na.fill, semplicewithColumnRenamed.union(concatena partizioni; nessuna ridistribuzione).coalesce(n)quando si riduce il numero di partizioni (fonde in locale, niente shuffle, diverso darepartition).
coalesce vs repartition è una lezione a sé (la 35), ma in una riga: coalesce è narrow ed economico; repartition è wide ed è uno shuffle vero.
La sequenza fisica
Vediamo cosa succede davvero per un groupBy("status").sum("total") su un dataset da 100GB su un cluster con 10 executor, ognuno con 4 core. Supponi spark.sql.shuffle.partitions = 200.
Stage N: il map side
Ogni partizione di input ha un task che ci gira sopra. Supponi 200 partizioni di input, quindi 200 map task. Ogni task:
- Legge la propria partizione di input. Da dovunque: S3, HDFS, disco locale, un DataFrame cachato a monte.
- Applica l’aggregazione parziale. L’optimizer di Spark pre-aggrega per partizione, quindi invece di scrivere fuori ogni riga, il map task scrive una somma parziale per ogni valore di
statusche ha visto. (Per ungroupBycon poche chiavi distinte, è una riduzione massiccia. Per undistincto un group ad alta cardinalità, meno.) - Partiziona l’output per destinazione. Per ogni record di output, calcola
hash(status) mod 200per capire a quale partizione downstream appartiene. Bufferizza i record in 200 bucket di destinazione. - Scrive i bucket su disco locale. Ogni map task produce un file di shuffle con 200 sezioni (o 200 file piccoli, a seconda dell’implementazione di shuffle). I dati sono serializzati col serializer configurato (Kryo o Java) e possono essere compressi (
spark.shuffle.compress = truedi default, LZ4).
A questo punto tutti i 200 map task hanno finito. Ci sono 200 x 200 = 40.000 “blocchi” logici sui dischi locali dei 10 executor. Ogni blocco è il dato destinato a una partizione downstream da una partizione upstream.
Il map side ha fatto due cose costose: serializzazione in byte, e una scrittura su disco locale.
Stage N+1: il reduce side
Partono 200 reduce task (uno per partizione di output). Ogni reduce task è responsabile di uno dei 200 bucket di destinazione. Per fare il proprio lavoro, deve fetchare il proprio bucket da ogni map task, il che significa fare una richiesta di rete a ogni executor che tiene output di mappa (potenzialmente tutti e 10).
Ogni reduce task:
- Fetcha i propri blocchi. Richieste di rete a ogni executor che ha dati per lui. I byte scorrono sul filo. Il BlockManager di Spark gestisce questo; lo shuffle service (quando attivo) tiene disponibile l’output di mappa anche se gli executor muoiono.
- Deserializza i blocchi in arrivo. Byte di nuovo in oggetti JVM.
- Li fonde. Tutte le somme parziali per le chiavi assegnate a questa partizione vengono combinate in aggregati finali.
- Passa i dati fusi all’operatore successivo (scrittura su Parquet, nel nostro esempio).
Il reduce side ha fatto: un fetch di rete fan-in da ogni nodo, deserializzazione e una fusione.
Il costo, in numeri
Aggregato sul cluster, quello shuffle di un dataset da 100GB sposta circa 100GB sulla rete. A seconda della cardinalità e dell’aggregazione parziale, a volte molto meno (un groupBy con 5 chiavi distinte collassa drammaticamente); a volte praticamente l’intero dataset (un distinct su una colonna ad alta cardinalità, o un join su righe grezze).
In più: gli stessi circa 100GB colpiscono il disco locale sul map side e vengono riletti dal disco locale sul reduce side. La compressione taglia il volume su filo e disco ma aggiunge CPU.
Il costo wallclock su un cluster tipico: lo shuffle è di solito il 60-90% del tempo wallclock di un job che ne ha uno. Anche leggere la sorgente da S3 è costoso, ma lo shuffle è paragonabile o peggio, e a differenza della lettura della sorgente non lo si può parallelizzare via: è limitato dalla banda di rete e dal fetch più lento.
Per questo la gente teme gli shuffle. Non perché siano misteriosi, ma perché sono genuinamente la cosa più lenta in un job Spark tipico.
Una nota sullo shuffle service
I cluster di produzione di solito fanno girare uno shuffle service esterno: un demone long-lived su ogni nodo worker, separato dalle JVM degli executor. Il suo unico lavoro è servire i blocchi di shuffle a chiunque li chieda. Perché? Perché gli executor vanno e vengono (autoscaling, dynamic allocation, eviction di spot instance), e non vuoi che un executor sparito si porti via il proprio output di mappa. Lo shuffle service disaccoppia “chi ha scritto questo blocco” da “chi può servire questo blocco”. Se l’executor che ha prodotto l’output di shuffle muore, i dati sono ancora sul nodo, e lo shuffle service li passa a chiunque ne abbia bisogno.
Quando lo shuffle service è spento, perdere un executor a metà shuffle significa rieseguire lo stage upstream per rigenerare l’output di mappa perso. Costoso. Accendi il service in produzione. La maggior parte delle piattaforme Spark gestite (Databricks, EMR, Dataproc) lo abilita di default; i deploy bare-metal devono impostare spark.shuffle.service.enabled = true e configurare il demone.
Come vederlo nella Spark UI
Apri la tab Stages (la lezione 22 l’ha coperta). Due colonne sono l’impronta digitale dello shuffle:
- Shuffle Write: byte scritti su disco locale dai task di questo stage (map side).
- Shuffle Read: byte fetchati via rete dai task di questo stage (reduce side).
Uno stage che produce uno shuffle ha uno Shuffle Write non nullo. Uno stage che ne consuma uno ha uno Shuffle Read non nullo. Vengono in coppia attraverso i confini di stage.
Entra in uno stage e guarda il task summary. La colonna Shuffle Read Time ti dice quanto tempo ha speso ogni task ad aspettare l’arrivo dei propri blocchi. Se quello è il pezzo dominante della durata del task, il collo di bottiglia è la rete o la velocità del disco remoto, non il tuo codice. Se le durate dei task sono selvaggiamente skewed (mediana 4s, max 5 minuti), una chiave ha molte più righe delle altre, e un reducer sta facendo quasi tutto il lavoro da solo. Quello è data skew, ed è un argomento intero (lezione 30).
Perché “evita gli shuffle” è una mezza verità
Sentirai “evita gli shuffle” come consiglio in continuazione. È perlopiù giusto, ma può fuorviare. Ecco la versione più onesta:
- Non puoi evitare gli shuffle del tutto. Qualunque aggregazione tra chiavi, qualunque join (tranne broadcast), qualunque sort globale: queste richiedono ridistribuzione. Non c’è una riscrittura furba che ti tiri fuori dalla fisica.
- Quel che puoi fare è restringerli. Ridurre quanti dati lo shuffle deve spostare.
Mosse concrete per restringere, ognuna una futura lezione a sé:
- Filtra presto. Pusha le tue clausole
WHEREa monte così meno dati colpiscono lo shuffle. (L’optimizer Catalyst di Spark ne fa molto per te, ma solo quando può dimostrare che è sicuro.) - Proietta stretto. Non trascinare 50 colonne attraverso uno shuffle se te ne servono 4. Fai
selectpresto. - Broadcasta le tabelle piccole. Una piccola dimension table joinata a una fact table grande non ha bisogno di uno shuffle; può essere replicata interamente su ogni executor. Lezione 28.
- Partiziona in modo intelligente a monte. Se il tuo input è già partizionato per la colonna su cui raggrupperai, Spark può saltare del tutto lo shuffle. Lezione 36.
- Tuna
spark.sql.shuffle.partitions. Il default 200 è sbagliato per quasi ogni workload: troppo alto per dati piccoli, troppo basso per dati grandi. Lezione 32. - Occhio allo skew. Una sola chiave calda può rovinare uno shuffle che altrimenti sarebbe a posto. La lezione 30 copre il salting.
Di cosa parla il resto del corso
Questo è il momento di essere onesti: i moduli 5 e 6 di questo corso esistono per via degli shuffle. Join, partizionamento, AQE, broadcast hint, salting, repartition vs coalesce, bucketing: ognuno di questi argomenti è, alla radice, un modo per modellare o evitare gli shuffle. Se capisci bene la lezione di oggi, ogni ottimizzazione successiva sembrerà un passo logico invece che un sacco di trucchi.
Il modello mentale da portare via:
L’unità di dolore base di Spark è lo shuffle. Ogni wide transformation ne innesca uno. Ogni shuffle è map side serialize + scrittura su disco locale, poi fetch via rete + deserialize + merge sul reduce side. Il costo è all’incirca proporzionale ai byte spostati (dopo aggregazione parziale e compressione). La tab Stages ti dice quanto è grande ognuno.
Provalo sulla tua macchina
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand
spark = (
SparkSession.builder
.appName("shuffle-demo")
.config("spark.sql.shuffle.partitions", "16")
.getOrCreate()
)
# 5M di righe su 32 partizioni, 1000 chiavi distinte
df = (
spark.range(0, 5_000_000, numPartitions=32)
.withColumn("k", (col("id") % 1000).cast("int"))
.withColumn("v", rand() * 1000)
)
# Forza la materializzazione così misuriamo lo shuffle, non la lettura della sorgente
df = df.cache()
df.count()
# 1) Un groupBy: innesca UNO shuffle
df.groupBy("k").sum("v").count()
# 2) Un distinct: anche questo uno shuffle
df.select("k").distinct().count()
# 3) Un self-join su k: shuffle su ENTRAMBI i lati
df.alias("a").join(df.alias("b"), "k").count()
input("Premi Invio per uscire (la UI resta viva)... ")
spark.stop()
Apri la tab Stages della UI. Per ognuna delle tre action, trova gli stage in cui Shuffle Write e Shuffle Read sono non nulli. Annota i byte shufflati. Confronta il self-join (il più pesante, due grossi shuffle in volo) col semplice groupBy (uno shuffle, con l’aggregazione parziale che lo collassa quasi a niente). Quella differenza, tra un’aggregazione che si comprime bene e un join che no, è l’intuizione che guida la maggior parte del lavoro di performance su Spark.
Quello è lo shuffle. Il modulo 4 ora è chiuso; la prossima lezione apre il modulo 5 con narrow vs wide transformation come principio di design: una volta che capisci gli shuffle fisicamente, puoi iniziare a progettare le pipeline intorno a loro invece di limitarti a reagire.