PySpark, de la zero Lecția 58 / 60

Debugging la job-uri Spark lente: checklist-ul de 30 de minute

Bucla sistematica de a-ti da seama ce nu e in regula cu un job lent: citesti UI-ul, gasesti stage-ul lent, te uiti la skew, GC, volum de shuffle, in ordinea asta.

Un job care se termina înainte în 12 minute acum durează 90. Managerul tău vrea să știe de ce înainte de standup. Ai un singur tab cu Spark UI deschis și o senzație vagă.

Ăsta e momentul în care toată lumea vrea să înceapă să tuneze. Sar direct la spark.sql.shuffle.partitions, măresc memoria executorilor, presară apeluri repartition(), fac cache la întâmplare. Nimic nu funcționează fiindcă nimic nu a fost diagnosticat. Ard o oră și job-ul tot lent rămâne.

Disciplina este: niciodată nu tunezi orbește, întotdeauna citești UI-ul mai întâi. Mai jos e bucla de 30 de minute pe care o rulez pe orice job lent. Șase pași, în ordine, de fiecare dată. La majoritatea job-urilor vei găsi răspunsul la pasul 2 sau 3. Restul pașilor sunt acolo pentru cazurile mai grele.

Pasul 1, deschide UI-ul și găsește stage-ul lent (3 minute)

Job-urile lente sunt de obicei lente din cauza unui singur stage, nu a tuturor.

Deschide Spark UI (history server dacă job-ul s-a terminat deja, live UI dacă încă rulează). Apasă pe Jobs. Rularea lentă e cea cu bara orizontală lungă.

Apasă în ea. Acum ești pe tab-ul Stages pentru job-ul respectiv. Sortează după Duration descrescător. Rândul de sus e vinovatul.

Stage Id | Description                | Duration | Tasks  | Input    | Shuffle Read
   18    | aggregate at App.scala:142 | 47 min   | 200    | 1.2 GB   | 89 GB
   12    | scan parquet ...           | 4.1 min  | 1200   | 380 GB   | 0
   ...

Stage 18 e locul în care s-au dus cele 47 de minute. Restul e zgomot. De acum încolo, fiecare pas e despre stage 18.

O notă despre citirea duratelor: bara stage-ului e wall-clock de la început până la sfârșit. În interior ai multe task-uri. Timpul wall-clock poate fi lung fiindcă task-urile sunt lente, sau fiindcă stage-ul are mai multe valuri de task-uri, sau fiindcă un singur task e mult mai lent decât celelalte. Încă nu poți spune. Mergi mai departe.

Pasul 2, distribuția duratei task-urilor (5 minute)

Apasă în stage-ul lent. Derulează la Summary Metrics for Completed Tasks. Tabelul ăsta e aur. Îți dă min / 25 / median / 75 / max pentru fiecare metrică pe task: durată, GC time, input size, shuffle read, shuffle write, peak memory.

Coloana la care te uiți prima e Duration.

Distribuție sănătoasă:

Min: 4s  | 25th: 6s | Median: 7s | 75th: 8s | Max: 12s

Task-urile se termină într-o bandă strânsă. Stage-ul doar își face treaba. Treci la pasul 3.

Distribuție skewed:

Min: 0.4s | 25th: 1.1s | Median: 1.4s | 75th: 2.3s | Max: 41 min

Maximul ăla e tot stage-ul. Un singur task face 99% din muncă. Ai skew. Asta e cea mai frecventă cauză a „job-ul s-a făcut lent” pe un workload real.

Calea de fix sunt lecțiile 28 și 29: identifică ce cheie e fierbinte (groupBy și count, caută outlier-ii uriași), apoi fie o filtrezi, fie faci broadcast la partea mică, fie sărezi cheia de join. AQE rezolvă multe cazuri automat (lecția 59), așa că verifică întâi dacă AQE e activat. Dacă nu este, pornirea lui e jumătate de schimbare de config.

Pasul 3, fracțiunea timpului de GC (3 minute)

În același tabel Summary Metrics, uită-te la GC Time. Sănătos înseamnă timp de GC sub 10% din durata task-ului. Dacă vezi GC time la 30% din durată pe task-ul median, executorii tăi se chinuie cu garbage collector-ul și de aceea munca se târăște.

Poți vedea asta și în tab-ul Executors: o coloană Task Time (GC Time) arată secundele totale petrecute și secundele totale în GC, per executor. Dacă GC e mai mult de ~20% din task time, JVM-ul flămânzește.

Cauze posibile:

  • Un DataFrame cache-uit prea mare pentru memoria executorului. Verifică tab-ul Storage. Dacă ai cache-uit 80 GB pe executori cu 12 GB utilizabili fiecare, eviction-urile sunt constante. Treci la MEMORY_AND_DISK_SER, sau nu cache-ui, sau cache-uiește o proiecție mai mică.
  • Un shuffle care trage prea multe rânduri într-un singur task (revizitează pasul 2, ar putea fi skew care cauzează presiune pe GC).
  • Memorie configurată prea strâns. Lecția 57 acoperă spark.executor.memory, memoryOverhead și împărțirea on-heap vs off-heap.

Tunarea memoriei orbește, fără să verifici GC time, e cea mai frecventă risipă a unei după-amieze de vineri. Verifică numărul mai întâi.

Pasul 4, shuffle read și shuffle write (5 minute)

Uită-te la coloanele Shuffle Read și Shuffle Write la nivel de stage. Compară-le cu output-ul tău.

Dacă stage-ul scoate 200 MB și face shuffle la 89 GB, muți de 450 de ori mai multe date decât ai nevoie. Fix-ul e aproape întotdeauna să filtrezi și să proiectezi mai devreme, înainte de shuffle, nu după. Optimizatorul Spark face o parte din asta pentru tine, dar nu poate împinge filtrele prin UDF-uri, prin string-uri selectExpr pe care nu le înțelege, sau prin cod utilizator care se ramifică imperativ pe date.

Câștiguri rapide:

  • Mută apelurile .filter() înainte de join-uri.
  • Înlocuiește select("*") cu coloanele de care ai nevoie efectiv.
  • Aruncă field-urile struct nested pe care nu le folosești (select("id", "amount", "ts") în loc să cari un record cu 200 de field-uri prin cinci stage-uri).
  • Dacă două stage-uri fac shuffle pe aceeași cheie, vezi dacă poți face .repartition(key) o dată și să refolosești prin cache, sau folosește bucketed tables (lecția 36).

Verifică și cum scalează volumul de shuffle cu input-ul. Un input de 4 GB care produce 100 GB de shuffle implică explozie de rânduri, de obicei un join many-to-many. Lecțiile 25-30 acoperă mecanica join-urilor în detaliu.

Pasul 5, citește planul SQL (5 minute)

Apasă tab-ul SQL / DataFrame în UI. Găsește query-ul pentru job-ul tău lent (timestamp-urile și duratele ajută la potrivire). Apasă în el. Ai planul fizic, cu metrici per operator.

La ce să te uiți, în ordinea „asta e prost”:

  • BroadcastNestedLoopJoin. Aproape întotdeauna greșit. Înseamnă că Spark nu a găsit o condiție de join și face produs cartezian. Verifică join-ul tău. Ai uitat clauza on=? Faci join pe o funcție precum df1.id == upper(df2.id) care nu s-a tradus? Operatorul ăsta e o alarmă cu cinci sirene pe orice input non-minuscul.
  • SortMergeJoin unde te așteptai la broadcast. Partea mică nu a fost suficient de mică (pragul implicit de broadcast e 10 MB, ridică-l cu spark.sql.autoBroadcastJoinThreshold). Sau estimarea dimensiunii e greșită fiindcă a venit dintr-un DataFrame pentru care Spark nu avea statistici. Forțează cu broadcast(df) (lecția 27).
  • Operatori Filter care stau deasupra unui scan în loc să fie împinși în el. Pentru surse Parquet/ORC, predicate pushdown ar trebui să le împăturească înăuntru. Dacă nu sunt împinse, filtrul tău e pe o coloană pe care sursa nu o poate filtra (rezultatul unui UDF, un path de struct field, o expresie).
  • Subexpresii repetate, același filtru sau aceeași proiecție apărând în mai multe ramuri ale planului. Un cache() al DataFrame-ului partajat se poate plăti rapid singur.

Planul îți spune ce face Spark de fapt, nu ce sugerează codul tău Python. Diverg.

Pasul 6, dimensiunea input-ului și spill (4 minute)

Încă două numere, ambele pe tab-ul Stages.

Input size, ar trebui să se potrivească cu ce te aștepți. Dacă crezi că scanezi un tabel partiționat de 4 GB iar stage-ul arată Input: 380 GB, partition pruning a eșuat. Cauză frecventă: un filtru pe o coloană care nu e coloana de partiționare, sau o expresie de filtru care învinge pruning-ul (partition_dt = string_col unde string_col e un varchar nepartiționat pe care Spark nu poate dovedi că e constant).

Spill (Memory) și Spill (Disk), coloanele astea apar când task-urile au rămas fără spațiu in-memory și au trebuit să verse buffer-ele de sort/agregare. Spill diferit de zero înseamnă că un task a încercat să țină mai mult decât încăpea. Două căi: mărește memoria executorului (lecția 57) sau partiționează mai mic (mai multe partiții = mai puțin per task = mai puțin spill).

Dacă vezi spill mare pe disk, job-ul tău e gâtuit pe throughput-ul SSD-ului local. În cloud, ăla e disk-ul efemer al executorului. Anumite forme (sort masiv + memorie mică) vor face spill întotdeauna, job-ul tău e ce este și întrebarea e doar dacă spill-ul e acceptabil.

Punând totul cap la cap: checklist-ul printabil

Salvează asta undeva. Lipește-l pe monitor. Următoarea dată când cineva spune „job-ul e lent”:

  1. Deschide Spark UI. Găsește stage-ul cel mai lent după durată.
  2. Uită-te la distribuția duratei task-urilor. Max vs median. Diferență mare = skew → lecțiile 28-29.
  3. Uită-te la GC time. >20% din task time = presiune pe memorie → lecția 57 sau repară cache-ul.
  4. Uită-te la shuffle read/write. Sute de GB shuffle-uite pentru un output mic = filtrează mai devreme → lecțiile 25-30.
  5. Citește planul SQL. BroadcastNestedLoopJoin, broadcast lipsă, filter peste scan → repară query-ul.
  6. Uită-te la dimensiunea input-ului și la spill. Dimensiune scan nepotrivită = pruning a eșuat; spill diferit de zero = memorie sau partiții greșite.

Disciplina e ordinea. Skew e mai frecvent decât probleme de memorie. Probleme de memorie sunt mai frecvente decât probleme de plan. Probleme de plan sunt mai frecvente decât probleme de scan. Vei rezolva 80% din job-urile lente la pasul 2.

Ce nu va prinde checklist-ul ăsta

  • Probleme de cluster din afara job-ului. Un vecin gălăgios pe infra partajată, un nod degradat, o partiție de rețea. UI-ul îți arată o vedere Spark-like asupra lumii; dacă VM-ul de bază e bolnav, executorii vor părea lenți fără o cauză evidentă. Verifică log-urile executorilor și metricile platformei.
  • Cod care e pur și simplu lent. Un UDF Python care face un regex peste o coloană de 200 GB va fi lent indiferent cum tunezi Spark. UI-ul îți va spune că petrece tot timpul în MapPartitions lângă un Python row, dar fix-ul e „nu mai folosi un UDF Python” (lecția 40), nu „tunează memoria executorului”.
  • Regresii de calitate a datelor. Ieri job-ul a rulat pe 4 GB. Astăzi rulează pe 40 GB fiindcă upstream a început să scrie de două ori. Job-ul nu e lent, input-ul e mai mare. Întotdeauna verifică dimensiunea input-ului mai întâi când „job-ul s-a făcut lent peste noapte”.

Promisiunea celor 30 de minute

Toată bucla de mai sus durează 30 de minute primele dăți când o faci. Apoi 15. Apoi 5, fiindcă vei învăța să arunci o privire la tab-urile potrivite și să sari peste părțile sănătoase. Până faci debugging la douăzeci de job-uri lente, vei avea o intuiție pentru unde să te uiți întâi pe baza simptomelor.

Dar întoarce-te mereu la disciplină. Ziua în care începi să tunezi înainte să citești UI-ul e ziua în care vei petrece trei ore reparând lucrul greșit.

Lecția următoare: Adaptive Query Execution. Funcționalitatea Spark care rezolvă multe dintre problemele de mai sus automat, când poate. Vom vedea când poate, când nu poate, și cum să citești planurile pe care le rescrie la runtime.

Caută