Lo shuffle è l’operazione più costosa di Spark. Quando fai il join di due DataFrame grossi, ogni riga da ciascun lato deve essere passata per hash sulla chiave di join e mandata sulla rete all’executor responsabile di quel bucket di hash. Decine di gigabyte che volano tra macchine, scritte e lette dal disco locale nel mezzo. Un join che ci mette dieci minuti potrebbe spenderne otto in shuffle.
Un broadcast join salta tutto questo circo. Se un lato del join è abbastanza piccolo, davvero piccolo, “ci sta in memoria” piccolo, Spark manda l’intero DataFrame piccolo a ogni executor del cluster. Ogni executor ha già la sua fetta del DataFrame grosso. Adesso ogni executor può fare il join della sua fetta contro il lato piccolo completamente in locale. Niente shuffle. Niente rete per il lato grosso. Il lato grosso non si muove mai.
Questa è la singola ottimizzazione più importante per le performance dei join in Spark, e succede automaticamente, la maggior parte del tempo. Questa lezione parla di quando succede automaticamente, come forzarlo quando Spark se lo perde, e quando non forzarlo perché fai OOM al driver.
Il quadro
Il join standard (sort-merge o shuffle hash) fa così:
big_df (300 GB) small_df (50 MB)
| |
[shuffle by key] [shuffle by key]
| |
\________ ___________/
\/
[executors do local joins]
Due shuffle. 300 GB di traffico di rete più 50 MB di traffico di rete, in massima parte il primo.
Un broadcast join fa così:
small_df (50 MB)
|
[collect to driver]
|
[send to every executor]
|
big_df (300 GB) ---------------+
| |
+---- local join on each executor's slice
Il lato grosso non fa mai shuffle. Il lato piccolo viaggia esattamente una volta per executor. Se hai una tabella di lookup da 50 MB e una tabella di fatti da 300 GB, il broadcast è tra le 10 e le 100 volte più veloce.
Quando Spark sceglie il broadcast automaticamente
Spark ha una config che si chiama spark.sql.autoBroadcastJoinThreshold. Default: 10485760 (10 MB). Qualunque DataFrame la cui dimensione stimata è sotto questa soglia è candidabile per il broadcast.
spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
# '10485760'
10 MB è conservativo. La maggior parte dei cluster di produzione lo alza a 100 MB, 200 MB, a volte 1 GB, a seconda della memoria di driver e executor:
# 200 MB threshold — typical for a cluster with 16 GB executors
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 200 * 1024 * 1024)
Il trucco è la parola stimata. Spark stima la dimensione di un DataFrame dalle sue statistiche se esistono (devi aver fatto ANALYZE TABLE per quelle), o dalla dimensione del file se sta leggendo direttamente un file, o dalla dimensione dell’input se è il risultato di un’operazione a monte. Non esegue la query per scoprirlo. Questa è la fonte della maggior parte delle sorprese del tipo “ma perché Spark non ha fatto broadcast di questo?”.
Un file Parquet da 5 GB che filtri fino a 50 MB è, agli occhi di Spark, ancora un DataFrame da 5 GB ai fini della pianificazione del join. Il filtro avviene a runtime; il planner fa la sua scelta in anticipo. Quindi il filtro pesante non scatena il broadcast automatico anche se i dati effettivamente joinati starebbero dentro tranquillamente.
Il rimedio è l’hint di broadcast.
L’hint di broadcast
from pyspark.sql.functions import broadcast
big_facts.join(broadcast(small_dim), on="dim_id", how="inner")
broadcast(small_dim) dice a Spark: “fidati, questo lato è piccolo, fanne broadcast”. Spark obbedisce, senza fare domande, anche se la sua stima di dimensione dice il contrario.
Questa è la singola funzione più utile di PySpark per le performance dei join. Se hai identificato un join caldo e un lato è genuinamente piccolo a runtime, anche se non sembra piccolo staticamente, butta un broadcast() attorno e guarda il tempo del job calare.
Un piccolo esempio concreto. Imposto una tabella di fatti di ordini e una tabella dimensione di paesi, eseguo il join in due modi e guardo il piano.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import broadcast
spark = (SparkSession.builder
.appName("BroadcastJoins")
.master("local[*]")
.config("spark.sql.shuffle.partitions", "8")
.getOrCreate())
# Pretend this is huge — millions of rows
orders = spark.createDataFrame(
[(i, f"IT" if i % 3 == 0 else "NL" if i % 3 == 1 else "DE", i * 1.5)
for i in range(10_000)],
"order_id INT, country STRING, total DOUBLE",
)
# Tiny lookup table — three rows
countries = spark.createDataFrame(
[("IT", "Italy", 22.0),
("NL", "Netherlands", 21.0),
("DE", "Germany", 19.0)],
"country STRING, country_name STRING, vat_rate DOUBLE",
)
# Without explicit hint — at this size Spark picks broadcast on its own
orders.join(countries, on="country").explain()
L’output del piano (potato alle parti rilevanti) suonerà più o meno così:
== Physical Plan ==
*(2) BroadcastHashJoin [country#5], [country#15], Inner, BuildRight
:- *(2) Filter isnotnull(country#5)
: +- *(2) Scan ExistingRDD[order_id#4, country#5, total#6]
+- BroadcastExchange HashedRelationBroadcastMode([country#15])
+- *(1) Filter isnotnull(country#15)
+- *(1) Scan ExistingRDD[country#15, country_name#16, vat_rate#17]
Le due parole da cercare: BroadcastHashJoin e BroadcastExchange. Se le vedi, del lato piccolo si sta facendo broadcast. Se vedi SortMergeJoin, del lato piccolo si sta facendo shuffle.
Adesso disabilita la soglia automatica e rilancia:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
orders.join(countries, on="country").explain()
# == Physical Plan ==
# *(5) SortMergeJoin [country#5], [country#15], Inner
# :- *(2) Sort [country#5 ASC NULLS FIRST], false, 0
# : +- Exchange hashpartitioning(country#5, 8) ...
# ...
Stesso join, piano diverso. Entrambi fanno shuffle perché il broadcast è spento. Decine di secondi contro tempi sotto il secondo su dati veri.
Riforziamo il broadcast anche con la soglia spenta:
orders.join(broadcast(countries), on="country").explain()
# Back to BroadcastHashJoin.
L’hint vince sulla soglia. L’hint è l’override.
L’hint opposto: forzare un sort-merge
Ci sono casi in cui Spark vuole fare broadcast e tu preferiresti che non lo facesse: di solito perché la stima di dimensione di Spark è troppo bassa e il broadcast farà OOM. L’hint è merge:
df1.join(df2.hint("merge"), on="key").explain()
# Forces SortMergeJoin
Oppure shuffle_hash se vuoi specificamente la strategia shuffle hash join:
df1.join(df2.hint("shuffle_hash"), on="key")
Li userai meno spesso di broadcast, ma quando Spark sbaglia strategia, sapere che esistono tutti e tre gli hint conta.
Come disabilitare il broadcast a livello globale
A volte stai facendo debug di un job ballerino e vuoi togliere il broadcast dal tavolo:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
-1 vuol dire “non fare mai broadcast automatico”. Gli hint broadcast() espliciti continuano a funzionare. Questo è utile per testare se il broadcast automatico sta causando problemi, ma non dovresti lasciarlo a -1 in produzione: rinunceresti alla singola migliore ottimizzazione di join che Spark abbia.
Il modo di fallire: OOM al driver
La cosa che rende veloci i broadcast join, mandare l’intero lato piccolo a ogni executor, è anche la cosa che li rende pericolosi. Il meccanismo è:
- Spark chiama
.collect()sul DataFrame piccolo, tirandosi indietro al driver tutte le sue righe. - Il driver materializza tutta la roba in memoria.
- Il driver la spedisce a ogni executor sulla rete.
Se il lato “piccolo” è in realtà 4 GB e il tuo driver ha 2 GB di memoria, lo step 2 fa OOM. Il driver muore. Il job muore. Vedi uno stack trace che finisce con qualcosa tipo OutOfMemoryError: Java heap space o Total size of serialized results of N tasks (X GB) is bigger than spark.driver.maxResultSize.
Il rimedio è uno tra:
- Non fare broadcast di quel DataFrame. Togli l’hint, lascia che Spark faccia sort-merge.
- Aumenta la memoria del driver (
spark.driver.memory) espark.driver.maxResultSize. - Filtra o aggrega il lato piccolo prima di farne broadcast.
Regola empirica: non fare mai broadcast di qualcosa più grande di ~1 GB anche su un driver bello grosso. La fascia 100-200 MB è la zona comoda. Qualunque cosa più grande e stai flirtando con l’OOM e perdi anche il vantaggio del broadcast: a quella dimensione il costo di rete della replica su N executor inizia a competere con il costo di fare semplicemente shuffle.
Due pattern pratici
Pattern 1: filtra, poi fai broadcast. Se sai che il lato piccolo sarà minuscolo dopo un filtro, materializzalo e metti l’hint:
active_users = (users
.where(F.col("status") == "active")
.select("user_id", "name", "tier"))
events.join(broadcast(active_users), on="user_id", how="left")
Se active_users è il 5% di users e l’originale era 200 MB, stai facendo broadcast di 10 MB. Vittoria facile.
Pattern 2: broadcast di una dim, sort-merge di un fact. Il pattern “star schema”. Tabella di fatti grossa, tabelle dimensione piccole. Fai broadcast di ogni dim:
(facts
.join(broadcast(dim_country), on="country_id")
.join(broadcast(dim_product), on="product_id")
.join(broadcast(dim_date), on="date_id"))
Ogni dim evita uno shuffle. Tre join, zero shuffle sul lato fact. Questa è l’ottimizzazione pane e burro in qualunque carico di lavoro analitico.
Cosa il broadcast non può aggiustare
I broadcast join risolvono il problema “un lato è piccolo”. Non risolvono il problema “una chiave è enorme”. Se il tuo DataFrame grosso ha 100 milioni di righe per country = 'US' e 1 milione di righe per tutto il resto messo insieme, nessuna quantità di broadcast aiuta: il lato fact viene comunque processato in locale su ogni executor, e un executor finisce comunque con tutte le righe US. Quello è lo skew dei dati. Quella è la lezione 28.
Leggere .explain() come un ispettore
Una volta che inizi a cacciare problemi di performance, .explain() diventa uno strumento di tutti i giorni. L’output di default è solo testo; .explain(True) ti dà i piani parsed, analyzed, optimized e fisico tutti insieme. Per le domande sul broadcast, conta il piano fisico. Le parole chiave da cercare:
BroadcastHashJoin: via veloce, broadcast riuscito.BroadcastNestedLoopJoin: il broadcast è riuscito ma la condizione di join non è un’uguaglianza, quindi Spark sta facendo un check riga per riga. Lento se il lato di broadcast non è banale.SortMergeJoin: entrambi i lati hanno fatto shuffle e sort. Il default per i join grosso-contro-grosso.ShuffleHashJoin: entrambi i lati hanno fatto shuffle, il lato più piccolo è messo in hash in memoria. Spark lo sceglie meno spesso di default; puoi forzarlo condf.hint("shuffle_hash").BroadcastExchange: il lato piccolo che viene raccolto e fatto broadcast. Sempre accoppiato aBroadcastHashJoin(oBroadcastNestedLoopJoin).
Se hai eseguito il tuo job, ti aspettavi un broadcast, e .explain() mostra SortMergeJoin, l’indiziato più probabile è la stima di dimensione: il planner pensa che il lato piccolo sia più grande della soglia. O alzi la soglia, o sistemi le statistiche, o aggiungi l’hint broadcast() esplicito.
Una piccola nota su AQE
L’Adaptive Query Execution di Spark 3.x può convertire un SortMergeJoin in un BroadcastHashJoin a runtime se la dimensione effettiva di un lato risulta abbastanza piccola dopo l’esecuzione degli stage iniziali. Questa è una risposta parziale al problema “la stima di dimensione era sbagliata”: Spark ha una seconda occhiata una volta che i dati si sono materializzati. AQE è attivo di default in Spark 3.2+, ma la soglia di broadcast per la conversione AQE è un’impostazione separata (spark.sql.adaptive.autoBroadcastJoinThreshold). La lezione 59 copre AQE per intero; per ora, sappi soltanto che “Spark non ha fatto broadcast in fase di piano” non vuol dire necessariamente “Spark non farà broadcast affatto”.
Catalyst, l’optimizer di query di Spark, prende la maggior parte di queste decisioni per te quando ha buone statistiche. La lezione 41 copre Catalyst da capo a fondo e spiega come leggere i suoi piani. Per ora, hint di broadcast quando sai che il lato piccolo è piccolo, occhio alla soglia, e non fare broadcast di un DataFrame da 4 GB su un driver da 2 GB.
Riferimenti: documentazione Apache Spark SQL sugli hint di join e su autoBroadcastJoinThreshold; post sul blog di engineering di Databricks sui broadcast join e sulla scelta della strategia di join. Recuperato il 2026-05-01.