Di tutte le configuration con cui viene Spark, esattamente una è responsabile di più job lenti di tutte le altre messe insieme: spark.sql.shuffle.partitions. È settata a 200 di default. Era un default sensato nel 2014 quando il tipico cluster Spark erano quattro portatili in un co-working. Su un cluster moderno che gira volumi di dati moderni, è quasi sempre sbagliato, a volte di un fattore 10, a volte di un fattore 100.
Questa lezione è su cosa fa questa knob, perché il default non si adatta al tuo job, e la matematica per scegliere un valore che invece si adatti.
Cosa controlla
spark.sql.shuffle.partitions controlla il numero di partition in output dopo ogni shuffle. Group-by, join, distinct, window function, repartition, qualunque operazione che inneschi uno shuffle finisce per scrivere esattamente questo numero di partition come input dello stage successivo.
spark.conf.get("spark.sql.shuffle.partitions")
# '200'
df.groupBy("country").count() # output has 200 partitions
df.join(other, "id") # output has 200 partitions
df.dropDuplicates() # output has 200 partitions
Ogni shuffle. Ogni volta. Indipendentemente dalla dimensione dell’input, dalla dimensione dell’output, dalla forma del cluster, o da qualunque altra cosa del job. Il default è solo il letterale intero 200, congelato nel tempo.
Vale la pena fermarsi sulla dimensione di questo martello. Lo shuffle è la cosa più costosa che Spark fa. Il conteggio di partition dopo lo shuffle determina il parallelismo di ogni stage a valle. Settare questo numero sbagliato è la differenza tra un job da 3 minuti e uno da 3 ore, e in produzione, tra un job che ci sta nella sua finestra e uno che fa squillare il telefono di qualcuno alle 4 di notte.
Perché 200 è sbagliato su un cluster moderno
Due scenari. Scegli quello che ti somiglia di più.
Scenario 1: un cluster vero, dati modesti
Immagina 50 executor con 4 core ciascuno, 200 core totali. Dopo uno shuffle, hai 200 partition di output. Spark genera 200 task. Il cluster fa girare tutti i 200 simultaneamente, e… tutto funziona.
Più o meno. Adesso supponiamo che i dati post-shuffle siano 100 GB in totale. Con 200 partition, ogni task processa 500 MB. Un task che processa 500 MB su una JVM con memoria ragionevole: di solito va bene, finisce in un minuto, niente spill.
Fin qui il default funziona. Questo è il caso per cui il default di Spark è stato tunato, ed è il motivo per cui la maggior parte dei workload da tutorial non ne espone mai la debolezza.
Scenario 2: stesso cluster, dati più grandi
Stessi 200 core. Ma ora i dati post-shuffle sono 10 TB invece di 100 GB. Con 200 partition, ogni task processa 50 GB. Un singolo task che cerca di sortare, hash-joinare o aggregare 50 GB di righe su un core:
- Va in spill su disco ripetutamente perché il task non riesce a tenere 50 GB di stato in memoria.
- Le scritture di spill sono I/O su disco lente. Adesso il task è I/O bound.
- La fase di shuffle write produce 50 GB di output per un task, anche questo lento.
- Se ci si aggiunge dello skew, un task ne prende 100 GB e gira per ore.
Il job che “dovrebbe” finire in 20 minuti gira per 4 ore. Le dashboard di CPU mostrano scarsa utilizzazione perché la maggior parte del tempo se ne va in spill e read da disco. Ogni aumento di core non fa differenza perché ci sono solo 200 task.
La cura sono più partition. Non 2x in più, 50x in più. Con spark.sql.shuffle.partitions = 10000 e gli stessi 10 TB, ogni task processa 1 GB. I task finiscono in un paio di minuti ciascuno, tutti i 200 core stanno occupati su molte ondate, il wall clock totale crolla drasticamente.
L’altra faccia della medaglia esiste pure. Se i tuoi dati post-shuffle sono 1 GB in totale e tieni 200 partition, ogni task fa 5 MB di lavoro, finisce in 50 ms, e l’overhead per task (scheduling, classloading, bookkeeping di shuffle) domina. Meglio fare coalesce a 16 partition e lasciare che ogni task si mastichi 64 MB.
In breve: 200 è giusto per una specifica scala di dati e cluster. Tutto al di fuori di quella scala, il numero è sbagliato.
La regola di tuning
Una regola empirica che funziona, usata ampiamente in pratica e raccomandata da Databricks:
Punta a 100-200 MB per shuffle partition.
Questo dimensiona il working set di un task in modo che ci stia comodamente nella memoria dell’executor, finisca in un wall clock ragionevole e non sprechi tempo in overhead per task. La matematica:
shuffle_partitions ~ total_shuffle_data_GB * 1024 / target_MB_per_partition
Se stai shufflando 500 GB di dati e punti a 128 MB per partition:
shuffle_partitions ~ 500 * 1024 / 128 ~ 4000
Arrotonda a un multiplo pulito del tuo conteggio di core se vuoi: 4000 va bene, 4096 è leggermente più carino se hai 256 core perché ogni task atterra in modo uniforme tra le ondate. Non agonizzare sul numero esatto; un valore corretto come ordine di grandezza batte il default di chilometri.
La parte difficile è conoscere la dimensione dei dati post-shuffle. Tre modi per stimare:
Dall’input. Se stai leggendo 500 GB di input e lo shuffle è all’incirca della stessa dimensione (un join che non filtra molto), assumi 500 GB.
Da una piccola run. Lancia il job su un sample, guarda la Spark UI, clicca dentro lo stage di shuffle, leggi il totale “Shuffle Write”. Moltiplica per 1 / sample_fraction.
Dall’esperienza. Dopo qualche job avrai un’idea di quali query esplodono post-shuffle e quali si restringono. Un groupBy("user_id").agg(...) tipicamente restringe i dati 100-1000x. Un self-join tipicamente li espande 10-100x. Una window function su tutta la tabella conserva il conteggio di righe esattamente.
Settarlo
Tre posti, in ordine crescente di scope:
Per job, nel codice:
spark.conf.set("spark.sql.shuffle.partitions", "4000")
Questo è ciò che vuoi di solito. Tunalo per il job specifico. Job diversi hanno dimensioni di shuffle estremamente diverse, e un singolo numero globale non può essere giusto per tutti.
All’avvio della session:
spark = (SparkSession.builder
.appName("BigJoin")
.config("spark.sql.shuffle.partitions", "4000")
.getOrCreate())
Stesso effetto, settato dichiarativamente in cima.
Nel default del cluster (spark-defaults.conf):
spark.sql.shuffle.partitions 4000
Setta il pavimento per tutto ciò che gira sul cluster. Utile quando il cluster ha una dimensione di workload tipica e vuoi sovrascrivere il default 200 una volta per tutte. Non sostituisce comunque il tuning per job per gli outlier.
Puoi anche rileggere il valore corrente per confermare:
print(spark.conf.get("spark.sql.shuffle.partitions"))
Utile nelle prime righe di un job per tracciabilità: se un te del futuro apre i log e si chiede quali fossero le shuffle partition, sarai contento di averlo loggato.
Tre esempi pratici veloci
Workload piccolo, ETL giornaliero su 5 GB di dati.
Post-shuffle: ~5 GB. Target: 100 MB per partition. Matematica: 5 * 1024 / 100 ~ 50.
Setta spark.sql.shuffle.partitions = 64. Il default 200 ti darebbe task da 25 MB ciascuno, va bene, ma più overhead del necessario. 64 è abbastanza parallelismo per qualunque cluster ragionevole e tiene i task a sani 80 MB.
Workload medio, aggregazione oraria su 200 GB.
Post-shuffle (dopo group-by, spesso si restringe): assumi 50 GB. Target: 128 MB. Matematica: 50 * 1024 / 128 ~ 400.
Setta spark.sql.shuffle.partitions = 400. Il default 200 darebbe 250 MB per task, borderline, forse OK, ma vicino al territorio di spill. 400 è comodo.
Workload grande, join tra due tabelle da 5 TB.
Post-shuffle: 5 TB su ciascun lato, full sort-merge join. Target: 200 MB. Matematica: 5000 * 1024 / 200 ~ 25600.
Setta spark.sql.shuffle.partitions = 25000. Il default 200 ti darebbe 25 GB per task, catastrofe garantita. Questa è la classe di workload in cui lasciare il default in posizione trasforma un job trattabile in un’incident.
Altre knob nello stesso vicinato
Un paio di setting correlati che vedrai citati insieme:
spark.default.parallelism: l’equivalente per l’API RDD. Non impatta sui DataFrame. Per lo più storico.spark.sql.files.maxPartitionBytes: controlla la dimensione delle partition in lettura (default 128 MB). Concetto diverso, stage diverso. La lezione 33 lo copre.spark.sql.adaptive.coalescePartitions.enabled: parte di AQE, vedi sotto.
spark.sql.shuffle.partitions è quello che conta per gli stage di shuffle, che è la maggior parte di dove va il tempo.
Il miglioramento di AQE
In Spark 3.x, l’Adaptive Query Execution (AQE) può aggiustare dinamicamente le shuffle partition a runtime. Con:
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
…Spark guarda la dimensione effettiva di ogni shuffle output e fa coalesce delle partition piccole insieme post-shuffle, in modo da non ritrovarti con migliaia di task minuscoli se i dati si rivelano più piccoli del previsto. Puoi settare spark.sql.shuffle.partitions alto (diciamo 5000) e lasciare che AQE lo collassi al numero giusto per i dati effettivi. AQE gestisce anche lo skew (lezione 29) e cambia le strategie di join al volo quando le statistiche indicano che dovrebbe.
Questo riduce significativamente il costo di settare spark.sql.shuffle.partitions sbagliato sul lato alto. Se i tuoi dati si rivelano più piccoli del previsto, AQE lo sistema in silenzio. Ma non sistema il lato basso: se setti 200 e i tuoi dati sono 10 TB, AQE non ha niente su cui fare coalesce; le partition sono già troppo poche. Scegliere un valore di partenza ragionevole conta comunque.
La lezione 59 va a fondo su AQE: le quattro ottimizzazioni che fa, quando ognuna scatta, cosa loggare per confermare che ha funzionato, e dove sono i suoi limiti. Per ora: attivalo, setta un valore di partenza ragionevole per le shuffle partition, e lascia che lui aggiusti.
Cosa fare davvero
Una checklist breve per qualunque nuovo job Spark:
- Stima la dimensione dei dati post-shuffle. Anche un ordine di grandezza grezzo basta.
- Calcola il conteggio di partition.
data_GB * 1024 / 128 MB, arrotonda a qualcosa di pulito. - Setta
spark.sql.shuffle.partitionsin cima al job. Non affidarti al default. Non affidarti a una config a livello di cluster che non vedi. - Attiva AQE. Sia
adaptive.enabledsiaadaptive.coalescePartitions.enabled. Rete di sicurezza gratuita. - Guarda la Spark UI alla prima run. Se lo stage di shuffle ha task che girano 4 minuti ciascuno, dimezza il conteggio di partition la prossima volta. Se girano 4 secondi ciascuno, raddoppialo. Itera finché i task sono nell’ordine delle decine di secondi e il cluster è occupato.
Le prossime due lezioni vanno dentro repartition e coalesce, gli operatori espliciti per cambiare i conteggi di partition a metà job, e la differenza tra i due, che è anch’essa una fonte comune di lentezza silenziosa.
La singola vittoria più grossa che puoi ottenere dal Modulo 6, però, è proprio questa: smettila di lasciare che spark.sql.shuffle.partitions = 200 sia la risposta per ogni job. Settalo deliberatamente. Un te del futuro risparmierà ore, e il tuo conto del cluster scenderà notevolmente.
Riferimenti: documentazione Apache Spark sulla configuration SQL e Adaptive Query Execution; guida al tuning di Databricks sulle shuffle partition e il dimensionamento delle partition. Recuperati il 2026-05-01.