La lezione precedente ti ha lasciato a fissare una Spark UI in cui un task girava per trenta minuti mentre gli altri 199 finivano in dodici secondi. Il collo di bottiglia era una singola hot key: user_id = 1, oppure country = 'US', o qualunque altra cosa domini i tuoi dati. Buttarci sopra altro cluster non aiuta, perché il lavoro è su un task, su un core. Dobbiamo cambiare la forma della chiave.
Questo è il salting.
L’idea in una frase
Prendi la hot key, ci incolli sopra un piccolo numero casuale, e quella che era una partition diventa N partition. La chiave dominante viene spezzata su più task, lo stage finisce in circa 1/N del tempo, e hai chiuso.
L’inghippo, e c’è sempre un inghippo, è che l’altro lato del join ora va replicato, perché ogni variante salata della chiave deve trovare il suo match. Vedremo esattamente come funziona qui sotto, inclusa la versione più furba in cui sali solo le righe che ne hanno bisogno.
Lo schema in quattro passi
Il salting è meccanico una volta che l’hai visto. Quattro passi:
Passo 1: sul lato skewed (fact), aggiungi una colonna salt. Scegli un range di salt N, tipicamente 4, 8 o 16. Per ogni riga sul lato skewed, genera un intero casuale salt = floor(rand() * N). La chiave effettiva diventa (original_key, salt) invece del solo original_key. La hot key, che prima andava in hash su una sola partition, ora va in hash su un massimo di N partition.
Passo 2: sull’altro lato (dimension), replica ogni riga N volte. Per ogni riga della dimension table, emetti N copie, una per ogni possibile valore di salt 0, 1, ..., N-1. La chiave effettiva su questo lato è anch’essa (original_key, salt), ma copre ogni possibile salt per ogni chiave.
Passo 3: fai il join su (original_key, salt). Ogni riga fact trova la sua singola riga dim corrispondente. Il join ora si distribuisce su N partition per la hot key, in modo uniforme.
Passo 4: rimuovi la colonna salt dopo. Ha fatto il suo dovere durante lo shuffle. Le righe in output sono le stesse che avresti ottenuto da un join normale: non ci sono duplicati, perché ogni riga fact ha trovato il match con esattamente una delle N righe dim replicate.
Tutto qui. I passi 2 e 4 sono dove la maggior parte dei principianti inciampa. Replica il lato dim, ma fai il join sul salt, in modo che ogni riga fact corrisponda a esattamente una riga dim replicata. Non aggregare prima del salting a meno che non lo voglia davvero. Non dimenticarti di rimuovere il salt prima di contare le cose.
Esempio pratico
Facciamolo su un dataset di forma realistica. Una fact table di transazioni in cui il 60% delle righe ha country = 'US', più una piccola dimension table di metadata sui paesi.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = (SparkSession.builder
.appName("Salting")
.master("local[*]")
.config("spark.sql.shuffle.partitions", "200")
.config("spark.sql.adaptive.enabled", "false") # disable AQE so we see raw skew
.getOrCreate())
# Fact table: 1M rows, 60% are US, the rest spread across 9 other countries
us_rows = spark.range(0, 600_000).select(
F.lit("US").alias("country"),
F.col("id").alias("txn_id"),
(F.rand() * 1000).alias("amount"),
)
others = ["IT", "FR", "DE", "ES", "UK", "JP", "BR", "IN", "CA"]
other_rows = spark.range(0, 400_000).select(
F.element_at(F.array(*[F.lit(c) for c in others]),
(F.col("id") % 9 + 1).cast("int")).alias("country"),
F.col("id").alias("txn_id"),
(F.rand() * 1000).alias("amount"),
)
facts = us_rows.unionByName(other_rows)
# Dim table: one row per country
dim = spark.createDataFrame(
[(c, c + " full name", c + "-region") for c in ["US"] + others],
"country STRING, country_name STRING, region STRING",
)
Un join normale su country:
joined = facts.join(dim, on="country", how="inner")
joined.write.mode("overwrite").parquet("/tmp/skew-vanilla")
Se guardi la Spark UI, lo stage di shuffle mostra lo schema da manuale della lezione 28: 199 task veloci, un task lento sulla partition che contiene tutte le 600k righe US. Su un cluster vero con numeri più grandi, quel singolo task potrebbe girare per minuti mentre il resto finisce in secondi.
Adesso la versione salata. La variante furba sala solo US, la hot key, non ogni paese, perché salare chiavi non skewed moltiplica solo le righe dim senza alcun beneficio.
N = 8 # salt range
# Step 1: salt the fact side, but ONLY for the hot key
facts_salted = facts.withColumn(
"salt",
F.when(F.col("country") == "US",
(F.rand() * N).cast("int"))
.otherwise(F.lit(0))
)
# Step 2: replicate the dim side, but only for the hot key
us_dim = dim.filter(F.col("country") == "US")
others_dim = dim.filter(F.col("country") != "US")
# Build [0, 1, ..., N-1]
salts = spark.range(0, N).withColumnRenamed("id", "salt").withColumn(
"salt", F.col("salt").cast("int")
)
us_dim_salted = us_dim.crossJoin(salts) # N copies of the US row, one per salt
others_dim_salted = others_dim.withColumn("salt", F.lit(0))
dim_salted = us_dim_salted.unionByName(others_dim_salted)
# Step 3: join on (country, salt)
joined_salted = facts_salted.join(dim_salted, on=["country", "salt"], how="inner")
# Step 4: drop the salt column
joined_salted = joined_salted.drop("salt")
joined_salted.write.mode("overwrite").parquet("/tmp/skew-salted")
Le righe US sul lato fact ora si distribuiscono in modo uniforme su 8 bucket di salt. La riga US sul lato dim compare 8 volte, una per ogni valore di salt, quindi ogni riga fact trova il match con esattamente una riga dim. Gli altri paesi non vengono toccati: il salt è 0 su entrambi i lati, e si uniscono come prima.
Nella Spark UI, la versione salata sembra completamente diversa. Dove la versione normale aveva un task da 30 secondi e 199 task veloci, la versione salata ha 8 task che gestiscono ciascuno circa 75k righe US in circa 4 secondi, in parallelo con il resto. Il wall clock dello stage scende da “limitato dal task lento” a “limitato dal task mediano”: esattamente quello che vuoi.
Il costo
Il salting non è gratis. Tre cose da tenere a mente:
Le righe dim replicate sono righe vere. Se N = 8 e la hot key ha, diciamo, 50 righe sul lato dim (alcune chiavi le hanno, pensa alle hot composite key), hai trasformato 50 righe in 400. Quei dati extra vanno comunque shuffled. Per dim side piccoli come la country table qui sopra, il costo è errore di arrotondamento. Per dim side più grandi, una tabella di attributi per prodotto in cui il “hot SKU” ha 200 varianti di attributo, la moltiplicazione può sommarsi. Verifica.
Scegli N più piccolo possibile. Un errore comune è scegliere N = 100 perché “più diffusione è meglio”. Non lo è. La hot key sul lato fact aveva solo 600k righe; spalmarle su 8 partition dà ~75k per partition, che è parallelismo a sufficienza. Andare a 100 rende ogni partition minuscola, ma hai anche moltiplicato per 100 le righe hot del lato dim. Per la maggior parte dei workload, N tra 4 e 16 è il sweet spot. Parti da 8, misura, aggiusta.
Sala solo le hot key. L’esempio qui sopra è lo schema furbo: un WHEN ... OTHERWISE che sala US e lascia stare tutto il resto. La versione ingenua, salare ogni riga fact e replicare ogni riga dim, funziona, ma paga il costo di replica sull’intera dim table senza alcun beneficio. Se il tuo skew è concentrato su una o due chiavi note, isolale. Se non sai quali chiavi siano hot, lancia prima la diagnostica della lezione 28.
Quando NON salare
Tre casi in cui il salting è lo strumento sbagliato:
Il broadcast join funziona. Se il lato dim ci sta in memoria (lezione 27), broadcastalo e hai chiuso. Niente shuffle, niente skew, niente salt. L’approccio salato conta solo quando il lato dim è troppo grande per essere broadcastato e il lato fact è skewed.
AQE lo gestisce per te. L’Adaptive Query Execution di Spark 3.x (lezione 59) supporta lo skew-join. Con spark.sql.adaptive.enabled = true e spark.sql.adaptive.skewJoin.enabled = true, Spark rileva le partition skewed a runtime e le splitta automaticamente, senza modifiche al codice. AQE gestisce solo i sort-merge join e si attiva oltre una soglia di dimensione configurabile, quindi non sostituisce il salting in tutti i casi, ma su Spark 3.4+ risolve un sacco di skew prima ancora che tu sappia di averne. Verifica sempre se AQE è attivo prima di mettere mano al salt.
Lo “skew” è in realtà lieve. Se la chiave principale è 3x rispetto alla mediana, non hai un problema di skew, hai un workload leggermente sbilanciato. Aumentare spark.sql.shuffle.partitions (lezione 32) è una soluzione più economica. Il salting è per i casi in cui una chiave ha 100x il volume del resto, dove la matematica dice che nient’altro funzionerà.
Salting per il group-by, non solo per il join
Lo stesso trucco funziona per le aggregazioni. Se groupBy("user_id").sum("amount") ha un user hot, puoi:
- Aggiungere una colonna salt sull’input.
- Raggruppare per
(user_id, salt): aggregazione parziale, distribuita su N partition. - Raggruppare di nuovo per
user_idsui risultati parziali: piccolo, economico, niente skew.
salted = events.withColumn("salt", (F.rand() * 8).cast("int"))
partial = (salted.groupBy("user_id", "salt")
.agg(F.sum("amount").alias("partial_sum")))
final = (partial.groupBy("user_id")
.agg(F.sum("partial_sum").alias("total")))
L’aggregazione parziale fa il lavoro pesante in parallelo; l’aggregazione finale vede solo N righe per utente, che ci stanno comodamente in un singolo task. Stesso schema del combiner / reduce in MapReduce: distribuisci il lavoro, poi collassa.
Cosa arriva dopo
La lezione 30 chiude il modulo join-e-shuffle mostrando come leggere il piano fisico di un join e prevederne il runtime prima di premere invio: broadcast, sort-merge, shuffle hash, e come AQE riscrive il piano quando vede un problema. Dopo, il Modulo 6 (lezioni 31 e 32) zoomma fuori da “sistemare lo stage lento” verso “progettare le partition intenzionalmente fin dall’inizio”, perché la maggior parte dei problemi di skew sono problemi di partition travestiti.
Due cose vale la pena fissare in memoria da questa lezione:
- La ricetta del salting: sala il lato fact, replica il lato dim, fai il join, rimuovi il salt. Quattro passi. Non saltarne nessuno.
- Scegli un
Npiccolo, sala solo le hot key, e verifica se AQE lo sta già facendo per te prima di scrivere una riga di codice.
Il salting è il tipo di trucco che la prima volta sembra geniale e la decima sembra routine. Tieni lo snippet qui sopra da qualche parte; la prossima volta che uno stage si pianta su un task, saprai esattamente cosa fare.
Riferimenti: documentazione Apache Spark sulle strategie di join e Adaptive Query Execution; post del blog di engineering di Databricks sui pattern di mitigazione dello skew. Recuperati il 2026-05-01.