Un stage Spark e atât de rapid cât cel mai lent task al lui. Propoziția aia e toată lecția. Citește-o din nou, internalizeaz-o, iar restul sunt în mare parte consecințe.
Într-un stage Spark sănătos, fiecare task procesează aproximativ aceeași cantitate de date. 200 de partiții, fiecare cu vreo 50 MB de rânduri, fiecare terminând în 12 secunde. Întregul stage se termină în vreo 14 secunde, pentru că task-urile rulează în paralel iar cel mai lent abia e mai lent decât mediana.
Într-un stage cu skew, 199 de task-uri se termină în 12 secunde și un task, cel ghinionist, rulează 30 de minute. Întregul stage durează 30 de minute. Adăugarea de mai multe core-uri nu ajută. Adăugarea de mai multă memorie nu ajută. Bottleneck-ul este un singur task, pe un singur executor, care procesează o partiție obscen de mare. Asta e data skew.
Lecția asta este despre recunoașterea skew-ului, găsirea lui în Spark UI și înțelegerea de ce e o problemă în primul rând. Soluția, salting, primește lecția proprie data viitoare.
Cum apare skew-ul
Spark partiționează datele după un shuffle prin hash-uirea cheii de join sau de group-by:
partition_for(row) = hash(row.key) % num_partitions
Dacă cheile tale sunt distribuite uniform, rândurile se împrăștie aproximativ uniform în partiții. Dacă nu sunt, iar în datele reale aproape niciodată nu sunt, unele partiții primesc mult mai multe rânduri decât altele.
Trei scenarii frecvente din cod de producție:
Power users. Un tabel de evenimente cu cheia user_id. Utilizatorul de top are 100 de milioane de evenimente. Utilizatorul median are 12. După un shuffle de groupBy("user_id"), fiecare eveniment de la utilizatorul de top aterizează în aceeași partiție. Partiția aia are 100 MB, restul au 1 KB.
Concentrare geografică. Tranzacții cu cheia country. 60% din trafic e din SUA, 30% dintr-o coadă lungă, 10% din „toate celelalte”. După gruparea pe country, partiția SUA e de 6 ori mai mare decât toate celelalte la un loc.
Chei null sau goale. O coloană în care majoritatea rândurilor au o valoare reală dar 30% au null sau "". Toate null-urile fac hash în același bucket și se adună. Asta e cea mai frecventă și cea mai insidioasă pentru că nimeni nu o planifică: doar ridică din umeri și merge mai departe, iar job-ul e misterios de lent.
Pattern-ul e identic în toate trei: cheia de join sau de group-by este dezechilibrată, shuffle-ul păstrează acel dezechilibru, iar un task ajunge să facă mult mai multă muncă decât restul.
De ce „tot stage-ul așteaptă”
Stage-urile Spark au o barieră. Stage-ul următor nu poate porni până când fiecare task din stage-ul curent se termină și își scrie output-ul de shuffle pe disc. Dacă 199 din cele 200 de task-uri ale tale se termină în 12 secunde și unul rulează 30 de minute, stage-ul următor stă degeaba 29 de minute și 48 de secunde, așteptând acel un singur task.
Poți vedea asta în vizualizarea timeline a stage-ului din Spark UI ca o bară lungă și subțire: aproape totul se termină devreme și mai există un singur task întins în prelungire la final. Vom vedea cum arată mai jos.
Mai înseamnă și că: munca totală în cluster nu este modul corect de a gândi un job cu skew. Cluster-ul ca întreg ar putea fi utilizat doar 5% în 29 din acele 30 de minute. Un core e blocat, restul se plictisesc. Dashboard-urile de CPU mint despre asta.
Un set de date clar cu skew
Hai să fabricăm puțin skew ca să-l putem privi. Voi construi un DataFrame de 1,1 milioane de rânduri de evenimente în care un utilizator (user_id = 1) cumulează 1 milion de rânduri iar celelalte 100.000 de rânduri sunt răspândite pe 999 de utilizatori.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = (SparkSession.builder
.appName("TheSkewProblem")
.master("local[*]")
.config("spark.sql.shuffle.partitions", "200")
.getOrCreate())
# 1 million rows for user 1
big_user = spark.range(0, 1_000_000).select(
F.lit(1).alias("user_id"),
F.col("id").alias("event_id"),
)
# 100k rows distributed across user_ids 2..1000
other_users = spark.range(0, 100_000).select(
((F.col("id") % 999) + 2).alias("user_id"),
F.col("id").alias("event_id"),
)
events = big_user.unionByName(other_users)
events.count() # 1,100,000
Acum rulează diagnosticul: group by user_id și count, sortat descrescător după count:
(events
.groupBy("user_id")
.count()
.orderBy(F.desc("count"))
.show(10))
# +-------+-------+
# |user_id| count|
# +-------+-------+
# | 1|1000000|
# | 2| 101|
# | 3| 101|
# | 4| 101|
# | 5| 101|
# | 6| 101|
# | 7| 101|
# | 8| 101|
# | 9| 101|
# | 10| 101|
# +-------+-------+
Utilizatorul 1 are 1.000.000 de rânduri. Fiecare alt utilizator are aproximativ 100. Asta e un raport de 10.000x. Orice are cheia user_id va trimite toate rândurile utilizatorului 1 într-o singură partiție.
Cum arată în Spark UI
Rulează orice operație care face shuffle pe user_id:
result = events.groupBy("user_id").agg(F.count("*").alias("n"))
result.write.mode("overwrite").parquet("/tmp/skew-demo")
Deschide Spark UI (http://localhost:4040 când rulezi local), intră în stage-ul care face shuffle-ul și uită-te la tabelul Tasks de jos. Coloanele de urmărit:
- Duration: timp de ceas per task
- Shuffle Read Size / Records: cât date a tras fiecare task din stage-ul anterior
Într-un stage sănătos, aceste coloane vor fi grupate strâns în jurul unei mediane. Într-un stage cu skew vei vedea ceva de genul:
Min: Median: Max:
50 ms 80 ms 45 s <- duration
2 KB 4 KB 40 MB <- shuffle read size
Când durata maximă e de peste 5x mediana, ai skew. Când e de 100x mediana, ai skew sever. Spark UI evidențiază asta cu un rând „summary” care arată min/percentila 25/median/percentila 75/max: aruncă o privire și răspunsul e chiar acolo.
Celălalt indiciu este vizualizarea stage timeline. Stage-urile sănătoase arată ca o cărămidă bine strânsă de bare orizontale care se termină toate cam în același timp. Stage-urile cu skew au o bară lungă care iese în afară, uneori întinsă de 10x dincolo de restul. Odată ce ai văzut un timeline cu skew, le recunoști dintr-o privire.
În producție: simptomul de coadă lungă
Nu vei avea mereu Spark UI la îndemână: uneori depanezi doar din log-uri. Semnătura skew-ului acolo este simptomul de coadă lungă:
[INFO] Stage 14: 195/200 tasks finished in 28s
[INFO] Stage 14: 198/200 tasks finished in 32s
[INFO] Stage 14: 199/200 tasks finished in 35s
[INFO] Stage 14: 199/200 tasks finished in 2m 14s
[INFO] Stage 14: 199/200 tasks finished in 5m 31s
[INFO] Stage 14: 199/200 tasks finished in 12m 09s
[INFO] Stage 14: 200/200 tasks finished in 24m 58s
195 de task-uri se termină în 28 de secunde. Al 200-lea se termină 25 de minute mai târziu. Acel decalaj este un task care procesează o partiție prea grasă. Timpul total de ceas al job-ului e dominat de acel un singur task, chiar dacă majoritatea muncii s-a făcut în prima jumătate de minut.
De ce nimic altceva nu ajută
Când dezvoltatorii văd asta prima dată, instinctul e să arunce resurse în problemă. Niciuna dintre mișcările evidente nu funcționează:
- Mai mulți executori? Nu. Bottleneck-ul e un singur task care rulează pe un singur executor. Executorii în plus stau degeaba.
- Mai multe core-uri per executor? Nu. Un singur task folosește un singur core. Multi-core ajută doar dacă există mai multe task-uri de rulat.
- Mai multă memorie? Uneori, dacă task-ul cu skew se prelingea pe disc, mai multă memorie îl accelerează. Dar task-ul tot rulează singur și tot ești blocat de el.
- Mai multe partiții de shuffle? Doar dacă skew-ul e ușor. Creșterea lui
spark.sql.shuffle.partitionsde la 200 la 2000 răspândește cheile mai ușoare pe mai multe partiții, dar fiecare rând cu cheia grea tot face hash în aceeași partiție unică. - Repartition? Un simplu
repartition(2000)e uniform și nu va ajuta: doar va shuffle-ui același rezultat dezechilibrat într-un alt număr uniform de partiții, iar cheia grea tot va ateriza într-un singur loc.
Lucrul care funcționează e să schimbi forma cheii înseși, astfel încât cheia grea să fie împărțită pe mai multe partiții. Asta e salting. Asta e lecția 29.
Soluții, pe scurt
O hartă completă a remediilor pentru skew, ordonate aproximativ după ușurință:
-
Broadcast join, când o parte e mică. Lecția 27 a acoperit asta. Dacă tabelul cu cheia grea e alăturat unui lookup mic, broadcast lookup-ul iar join-ul devine local: fără shuffle, fără skew.
-
Filtrează vinovatul evident. Dacă 30% din rândurile tale au
user_id = nullși nu îți pasă de null-uri în join, filtrează-le înainte de join. Câștig gratis. -
Salting. Adaugă un sufix aleator la cheile grele ca să facă hash în mai multe partiții, fă join-ul, apoi colapsează înapoi. Funcționează pentru cazul ambele-părți-sunt-mari. Acoperire completă în lecția 29.
-
Tratarea skew-ului prin AQE. Spark 3.x vine cu Adaptive Query Execution, care poate detecta skew la runtime și împărți partițiile grele automat. Activat cu
spark.sql.adaptive.enabled = trueșispark.sql.adaptive.skewJoin.enabled = true. Nu e magic: ajută doar pentru sort-merge joins, intră în acțiune doar peste un threshold configurabil și funcționează doar pe join-ul în sine (nu pe group-by-uri arbitrare). Dar pe Spark 3.4+ rezolvă mult skew fără modificări de cod. Lecția 59 intră adânc în AQE. -
Pre-agregă înainte de join. Dacă cheia grea e grea pentru că are multe duplicate pe care oricum o să le agregi, fă agregarea mai întâi. Un
.groupBy("user_id").agg(...)înaintea join-ului micșorează numărul de rânduri ale utilizatorului 1 de la un milion la unu.
Ordinea operațiilor într-o sesiune reală de depanare e de obicei: (1) confirmă că e skew cu interogarea de diagnostic, (2) verifică dacă AQE e pornit și are un threshold potrivit pentru datele tale, (3) dacă o parte e un lookup mic, fă-i broadcast, (4) dacă nu, filtrează cheile null sau gunoi, (5) dacă nu, salting.
Ce urmează
Acum știi ce e skew-ul, de ce e o problemă la nivel de stage și nu la nivel de cluster și cum să-l identifici din UI sau din log-uri. Lecția 29 parcurge salting de la cap la coadă cu cod, inclusiv capcana în care salting prost implementat înrăutățește lucrurile. După aceea, lecția 30 încheie modulul de joins-and-shuffles legând totul împreună: cum să citești planul fizic al unui join și să prezici timpul de execuție înainte să apeși go.
Interogarea de diagnostic pentru skew, df.groupBy(key).count().orderBy(F.desc("count")).show(20), merită memorată. Rulează-o pe orice DataFrame pe care urmează să faci ceva cu cheie. Dacă cheia de top e de peste ~10x mediana, planifică pentru skew înainte să depanezi job-ul lent.
Un diagnostic mai strâns pentru producție
Simplu groupBy().count() funcționează pe seturi mici de date. Pe date reale de producție, un group-by complet doar ca să verifici skew-ul e el însuși un shuffle scump. O abordare mai rapidă e să faci sample mai întâi:
sample = events.sample(fraction=0.01, seed=42)
(sample
.groupBy("user_id")
.count()
.orderBy(F.desc("count"))
.show(20))
Un sample de 1% e de obicei suficient pentru a identifica cheile grele și sare peste shuffle-ul complet. Dacă utilizatorul 1 domină în sample, domină și în setul complet.
Un alt one-liner util e raportul de skew, cheia de top versus mediana:
counts = events.groupBy("user_id").count()
top = counts.agg(F.max("count")).first()[0]
median = counts.approxQuantile("count", [0.5], 0.01)[0]
print(f"top={top:,} median={median:,} ratio={top/median:.1f}x")
Un raport peste 100 e skew sever. Peste 10 merită gândit. Sub 5 e în regulă.
Ambele diagnostice sunt suficient de ieftine ca să fie aruncate într-un pipeline ca verificare de bun-simț înainte de orice shuffle scump. Tu-din-viitor îl vei mulțumi pe tu-din-prezent prima dată când alternativa e să depanezi un stage de 4 ore din log-uri.
Referințe: documentația Apache Spark despre comportamentul de shuffle și Adaptive Query Execution; articole de pe blogul de inginerie Databricks despre identificarea și remedierea data skew-ului. Consultat 2026-05-01.