PySpark, de la zero Lecția 32 / 60

spark.sql.shuffle.partitions = 200 si de ce e aproape mereu gresit

Cel mai consecvent default din Spark, de ce nu se potriveste cu cluster-ul tau si cum sa-l tunezi pentru job-ul curent.

Dintre toate setările de configurare cu care vine Spark, exact una e responsabilă pentru mai multe job-uri lente decât toate celelalte la un loc: spark.sql.shuffle.partitions. E setat la 200 ca default. Era un default sensibil în 2014, când cluster-ul Spark tipic era patru laptop-uri într-un co-working space. Pe un cluster modern care rulează volume moderne de date, e aproape mereu greșit, uneori cu un factor de 10, alteori cu un factor de 100.

Lecția asta e despre ce face acest buton, de ce default-ul nu se potrivește cu job-ul tău și matematica pentru a alege o valoare care se potrivește.

Ce controlează

spark.sql.shuffle.partitions controlează numărul de partiții de output după fiecare shuffle. Group-by, join, distinct, window function, repartition, orice operație care declanșează un shuffle ajunge să scrie exact atâtea partiții în input-ul stage-ului următor.

spark.conf.get("spark.sql.shuffle.partitions")
# '200'

df.groupBy("country").count()  # output has 200 partitions
df.join(other, "id")           # output has 200 partitions
df.dropDuplicates()            # output has 200 partitions

Fiecare shuffle. De fiecare dată. Indiferent de mărimea input-ului, mărimea output-ului, forma cluster-ului sau orice altceva legat de job. Default-ul e doar literalul întreg 200, înghețat în timp.

Merită să te oprești puțin asupra mărimii acestui ciocan. Shuffle-ul e cel mai costisitor lucru pe care îl face Spark. Numărul de partiții după shuffle determină paralelismul fiecărui stage downstream. Setarea greșită a acestui număr e diferența dintre un job de 3 minute și un job de 3 ore, iar în producție, dintre un job care încape în fereastra lui și unul care îți sună pe cineva la 4 dimineața.

De ce 200 e greșit pe un cluster modern

Două scenarii. Alege-l pe cel care seamănă mai mult cu al tău.

Scenariul 1: un cluster real, date modeste

Imaginează-ți 50 de executor-i cu câte 4 core-uri fiecare, 200 de core-uri în total. După un shuffle, ai 200 de partiții de output. Spark generează 200 de task-uri. Cluster-ul le rulează pe toate 200 simultan și… totul funcționează.

Cam așa. Acum să presupunem că datele post-shuffle sunt 100 GB în total. Cu 200 de partiții, fiecare task procesează 500 MB. Un task care procesează 500 MB pe un JVM cu memorie rezonabilă: de obicei bine, termină într-un minut, fără spill.

Până aici default-ul funcționează. Ăsta e cazul pentru care a fost tunat default-ul Spark și e motivul pentru care majoritatea workload-urilor de tutorial nu-i expun niciodată slăbiciunea.

Scenariul 2: același cluster, date mai mari

Aceleași 200 de core-uri. Dar acum datele post-shuffle sunt 10 TB în loc de 100 GB. Cu 200 de partiții, fiecare task procesează 50 GB. Un singur task care încearcă să sorteze, să facă hash-join sau să agrege 50 GB de rânduri pe un core:

  • Spilluiește la disc repetat fiindcă task-ul nu poate ține 50 GB de stare în memorie.
  • Scrierile de spill sunt I/O lent pe disc. Acum task-ul e I/O bound.
  • Faza de scriere a shuffle-ului produce 50 GB de output pentru un singur task, ceea ce e și el lent.
  • Dacă apare și skew peste asta, un task primește 100 GB și rulează ore.

Job-ul care „ar trebui” să termine în 20 de minute rulează 4 ore. Dashboard-urile de CPU arată utilizare scăzută fiindcă majoritatea timpului e petrecut spilluind și citind de pe disc. Orice creștere de core-uri nu face nicio diferență fiindcă sunt doar 200 de task-uri.

Leacul e mai multe partiții. Nu de 2x mai multe, de 50x mai multe. Cu spark.sql.shuffle.partitions = 10000 și aceleași 10 TB, fiecare task procesează 1 GB. Task-urile termină în câteva minute fiecare, toate 200 de core-uri rămân ocupate prin multe valuri, wall clock-ul total scade dramatic.

Există și reversul medaliei. Dacă datele tale post-shuffle sunt 1 GB în total și păstrezi 200 de partiții, fiecare task face 5 MB de muncă, termină în 50 ms și overhead-ul per task (programare, classloading, bookkeeping de shuffle) domină. Mai bine să faci coalesce la 16 partiții și să lași fiecare task să mestece prin 64 MB.

Pe scurt: 200 e potrivit pentru o anumită scară specifică de date și cluster. Orice dincolo de scara aia, numărul e greșit.

Regula de tuning

O regulă de degetar care merge, folosită larg în practică și recomandată de Databricks:

Țintește 100-200 MB per partiție de shuffle.

Asta dimensionează working set-ul unui task ca să încapă confortabil în memoria executor-ului, să termine într-un wall clock rezonabil și să nu piardă timp pe overhead per task. Matematica:

shuffle_partitions ≈ total_shuffle_data_GB × 1024 / target_MB_per_partition

Dacă shuffluiești 500 GB de date și țintești 128 MB per partiție:

shuffle_partitions ≈ 500 × 1024 / 128 ≈ 4000

Rotunjește la un multiplu curat al numărului de core-uri dacă vrei: 4000 merge bine, 4096 e puțin mai drăguț dacă ai 256 de core-uri fiindcă fiecare task aterizează uniform peste valuri. Nu te chinui cu numărul exact; o valoare corectă ca ordin de mărime bate default-ul cu mile.

Partea grea e să cunoști mărimea datelor post-shuffle. Trei moduri de a estima:

Din input. Dacă citești 500 GB de input și shuffle-ul e cam de aceeași mărime (un join care nu filtrează mult), presupune 500 GB.

Dintr-o rulare mică. Rulează job-ul pe un sample, uită-te la Spark UI, intră în stage-ul de shuffle, citește totalul „Shuffle Write”. Înmulțește cu 1 / sample_fraction.

Din experiență. După câteva job-uri vei avea un simț pentru care interogări explodează post-shuffle și care se micșorează. Un groupBy("user_id").agg(...) de obicei micșorează datele de 100-1000x. Un self-join de obicei le crește de 10-100x. O window function peste tot tabelul păstrează numărul de rânduri exact.

Setarea lui

Trei locuri, în ordine crescătoare a domeniului:

Per job, în cod:

spark.conf.set("spark.sql.shuffle.partitions", "4000")

Asta e ce vrei de obicei. Tunezi pentru job-ul specific. Job-uri diferite au mărimi de shuffle radical diferite și un singur număr global nu poate fi corect pentru toate.

La startul sesiunii:

spark = (SparkSession.builder
         .appName("BigJoin")
         .config("spark.sql.shuffle.partitions", "4000")
         .getOrCreate())

Același efect, setat declarativ în vârf.

În default-ul cluster-ului (spark-defaults.conf):

spark.sql.shuffle.partitions  4000

Setează podeaua pentru tot ce rulează pe cluster. Util când cluster-ul are o mărime tipică de workload și vrei să suprascrii default-ul de 200 o dată. Tot nu înlocuiește tuning-ul per job pentru excepții.

Poți și să citești valoarea curentă înapoi ca să confirmi:

print(spark.conf.get("spark.sql.shuffle.partitions"))

Util în primele linii ale unui job pentru trasabilitate, dacă un viitor tu deschide log-urile și se întreabă care erau partițiile de shuffle, vei fi mulțumit că ai logat asta.

Trei exemple lucrate rapid

Workload mic, ETL zilnic pe 5 GB de date.

Post-shuffle: ~5 GB. Țintă: 100 MB per partiție. Matematică: 5 × 1024 / 100 ≈ 50.

Setează spark.sql.shuffle.partitions = 64. Default-ul de 200 ți-ar da task-uri de 25 MB fiecare, bine, dar mai mult overhead decât e nevoie. 64 e suficient paralelism pentru orice cluster rezonabil și ține task-urile la 80 MB sănătoși.

Workload mediu, agregare orară pe 200 GB.

Post-shuffle (după group-by, deseori se micșorează): presupune 50 GB. Țintă: 128 MB. Matematică: 50 × 1024 / 128 ≈ 400.

Setează spark.sql.shuffle.partitions = 400. Default-ul de 200 ți-ar da 250 MB per task, la limită, posibil OK, dar tăind aproape de teritoriul de spill. 400 e confortabil.

Workload mare, join între două tabele de 5 TB.

Post-shuffle: 5 TB pe fiecare parte, sort-merge join complet. Țintă: 200 MB. Matematică: 5000 × 1024 / 200 ≈ 25600.

Setează spark.sql.shuffle.partitions = 25000. Default-ul de 200 ți-ar da 25 GB per task, catastrofă garantată. Asta e clasa de workload în care lăsarea default-ului în loc transformă un job tractabil într-un outage.

Alte butoane din vecinătate

Câteva setări înrudite pe care le vei vedea referite împreună:

  • spark.default.parallelism, echivalentul din RDD-API. Nu afectează DataFrame-urile. Mai mult istoric.
  • spark.sql.files.maxPartitionBytes, controlează mărimea partițiilor la citire (default 128 MB). Concept diferit, stage diferit. Lecția 33 acoperă asta.
  • spark.sql.adaptive.coalescePartitions.enabled, parte din AQE, vezi mai jos.

spark.sql.shuffle.partitions e cel care contează pentru stage-urile de shuffle, care sunt majoritatea locurilor unde se petrece timpul.

Upgrade-ul AQE

În Spark 3.x, Adaptive Query Execution (AQE) poate ajusta dinamic partițiile de shuffle în runtime. Cu:

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

…Spark se uită la mărimea efectivă a fiecărui output de shuffle și coalesce-ză partițiile mici împreună post-shuffle, ca să nu ajungi cu mii de task-uri minuscule dacă datele tale se dovedesc mai mici decât te așteptai. Poți seta spark.sql.shuffle.partitions mare (să zicem 5000) și să lași AQE să-l prăbușească la numărul potrivit pentru datele efective. AQE se ocupă și de skew (lecția 29) și schimbă strategiile de join din mers când statisticile indică că ar trebui.

Asta reduce semnificativ costul de a greși spark.sql.shuffle.partitions în partea înaltă. Dacă datele tale se dovedesc mai mici decât te așteptai, AQE le repară pe tăcute. Dar nu repară partea joasă, dacă setezi 200 și datele tale sunt 10 TB, AQE n-are ce să coalesce-ze; partițiile sunt deja prea puține. Alegerea unei valori de start rezonabile încă contează.

Lecția 59 intră în adânc pe AQE: cele patru optimizări pe care le face, când se activează fiecare, ce de logat ca să confirmi că a funcționat și unde sunt limitele lui. Deocamdată: pornește-l, setează o valoare de start rezonabilă pentru partițiile de shuffle și lasă-l să ajusteze.

Ce să faci de fapt

O listă scurtă pentru orice job nou de Spark:

  1. Estimează mărimea datelor post-shuffle. Chiar și un ordin de mărime aproximativ e suficient.
  2. Calculează numărul de partiții. data_GB × 1024 / 128 MB, rotunjește la ceva curat.
  3. Setează spark.sql.shuffle.partitions în vârful job-ului. Nu te baza pe default. Nu te baza pe un config la nivel de cluster pe care nu-l vezi.
  4. Activează AQE. Atât adaptive.enabled cât și adaptive.coalescePartitions.enabled. Plasă de siguranță gratuită.
  5. Urmărește Spark UI la prima rulare. Dacă stage-ul de shuffle are task-uri care rulează 4 minute fiecare, înjumătățește numărul de partiții data viitoare. Dacă rulează 4 secunde fiecare, dublează-l. Iterează până când task-urile sunt în zeci de secunde și cluster-ul e ocupat.

Următoarele două lecții intră în repartition și coalesce, operatorii expliciți pentru schimbarea numărului de partiții la mijlocul job-ului, și diferența dintre cei doi, care e și ea o sursă comună de încetinire silențioasă.

Cea mai mare victorie pe care o poți obține din modulul 6, totuși, e doar asta: nu mai lăsa spark.sql.shuffle.partitions = 200 să fie răspunsul pentru fiecare job. Setează-l deliberat. Viitorul tu va salva ore, iar factura cluster-ului va scădea vizibil.


Referințe: documentația Apache Spark despre configurarea SQL și Adaptive Query Execution; ghidul de tuning Databricks despre partițiile de shuffle și dimensionarea partițiilor. Consultat 2026-05-01.

Caută