Spark UI îți spune ce s-a întâmplat după fapt. .explain() îți spune ce urmează să se întâmple înainte să rulezi un singur task. Amândouă contează, UI-ul confirmă realitatea, .explain() îți permite să o prezici. Un inginer Spark senior citește execution plans la fel cum un DBA citește EXPLAIN ANALYZE din Postgres: cu o paranoia ușoară și cu ochiul pe operatorii care nu-și au locul.
Am atins planurile încă din lecția 41 când am acoperit Catalyst. Lecția asta e versiunea de producție: citirea fiecărei linii, știi ce operatori contează și înțelegi diferența dintre ce printează .explain() și ce rulează efectiv sub Adaptive Query Execution.
Ce printează .explain
Implicit, .explain() arată doar execution plan-ul fizic, cel pe care Spark îl va executa efectiv.
df.explain()
== Physical Plan ==
*(2) HashAggregate(keys=[country#42], functions=[sum(amount#15)])
+- Exchange hashpartitioning(country#42, 200), ENSURE_REQUIREMENTS, [plan_id=...]
+- *(1) HashAggregate(keys=[country#42], functions=[partial_sum(amount#15)])
+- *(1) Project [amount#15, country#42]
+- *(1) BroadcastHashJoin [user_id#14], [user_id#41], Inner, BuildRight
:- *(1) Filter (isnotnull(amount#15) AND (amount#15 > 40.0))
: +- *(1) Scan parquet orders[order_id#13,user_id#14,amount#15]
: PushedFilters: [IsNotNull(amount), GreaterThan(amount,40.0)]
+- BroadcastExchange HashedRelationBroadcastMode(...)
+- *(1) Filter isnotnull(user_id#41)
+- *(1) Scan parquet users[user_id#41,name#42,country#43]
.explain(True) (sau .explain(extended=True)) arată toate cele patru faze: parsed, analyzed, optimized și physical. Asta vrei când debughezi un plan neașteptat, fiindcă să vezi călătoria de optimizare îți spune deseori ce rescriere a sărit sau a aplicat Catalyst contrar așteptărilor tale.
df.explain(True)
Mai există și .explain("formatted"), mult mai ușor de citit pe planuri lungi, numerotează operatorii și printează un bloc separat de „details”. Și .explain("cost"), care adaugă numere de rânduri și statistici de mărime din modelul de cost al optimizatorului. Vom reveni la astea.
Direcția de citire
Planurile sunt printate de sus în jos, dar se execută de jos în sus. Frunzele sunt input-urile (file scans, exchanges care primesc date de shuffle), rădăcina e acțiunea care a declanșat planul.
Când citesc un plan, citesc frunzele primele. „Ce scanez, ce coloane, ce filtre s-au push-down-uit?” Apoi urmăresc în sus: „ce face Spark cu datele astea pe drumul spre acțiune?”
Caracterele :- și +- desenează arborele. Copiii unui operator sunt indentați sub el.
Operatori care merită memorați
Nu trebuie să știi fiecare operator. Trebuie să-i știi pe ăștia.
Scan parquet (sau Scan orc, Scan json etc.): citirea fișierului. Linia de sub el listează PushedFilters și PartitionFilters. Coloanele pe care Spark le citește efectiv sunt listate între paranteze, ăla e column pruning în acțiune.
Project: selecția de coloane sau evaluarea de expresii. Ar trebui să fie ieftin.
Filter: un predicat la nivel de rând. Dacă vezi un Filter chiar deasupra unui Scan parquet și același predicat e și în PushedFilters, ambele fac muncă, sursa și-a dat tot interesul, Spark curăță ce nu s-a putut împinge. Dacă predicatul nu e în PushedFilters, nu s-a putut împinge (deseori din cauza unui UDF sau a unei expresii complexe) și Spark filtrează după ce citește fiecare rând.
HashAggregate: o agregare hash-based (group by). Apare deseori de două ori într-un plan în jurul unui Exchange: o agregare parțială înainte de shuffle, o agregare finală după. Acel tipar de agregare în doi pași e una dintre cele mai importante optimizări ale Spark, e motivul pentru care groupBy().count() nu face shuffle la fiecare rând de input.
Sort: un sort. Scump dacă nu e împins în partea cerută a unui SortMergeJoin.
Exchange hashpartitioning(...): shuffle-ul. Fiecare shuffle e un Exchange. E cel mai scump operator din planul tău, punct. Numără-le. Fiecare e un round-trip de rețea + o scriere pe disc + o citire de pe disc. Exchange-urile neașteptate sunt bug-ul de performanță numărul unu.
BroadcastExchange: face broadcast la un dataset mic spre fiecare executor. Ieftin dacă datasetul e cu adevărat mic; driverul îl colectează și îl trimite. De obicei apare sub un BroadcastHashJoin.
BroadcastHashJoin: joinul ieftin. O parte e broadcast-ată, cealaltă e hash-probed local. Spark îl alege când o parte e sub spark.sql.autoBroadcastJoinThreshold (10 MB implicit).
SortMergeJoin: joinul default sigur. Ambele părți shuffled la aceeași partiționare, sortate, apoi merged. Două Exchange-uri sub el.
ShuffleHashJoin: ambele părți shuffled, partea mai mică construită într-o tabelă hash. Mai rar; AQE îl preferă uneori când mărimile sunt la mijloc.
BroadcastNestedLoopJoin: antitiparul. O(n × m). Spark cade înapoi la el când nu poate alege o strategie hash, de obicei fiindcă condiția de join e un predicat non-equi (<, BETWEEN, un apel de funcție) și trebuie să compare fiecare rând stâng cu fiecare rând drept. Dacă vezi asta într-un plan peste date de mărime reală, jobul nu se va termina. Rescrie condiția să fie un equi-join cu un range filter peste el.
WholeStageCodegen: nu chiar un operator, mai degrabă un wrapper. Marcajele *(1), *(2) din fața operatorilor îți spun că acei operatori sunt fuzionați într-o singură funcție generată la runtime. Generarea de bytecode din Tungsten. Dacă vezi *(1) pe Filter, Project, HashAggregate: Spark a scris un singur loop mare pentru toți trei care rulează fără virtual dispatch. Granița dintre code-gen stages e de obicei un shuffle sau un operator care nu suportă codegen (unele UDF-uri, anumite operații Python). Doi operatori cu același *(N) sunt fuzionați; N diferit, nu.
Liniile PushedFilters / PartitionFilters
Sub un Scan parquet vei vedea ceva de genul:
PushedFilters: [IsNotNull(amount), GreaterThan(amount,40.0)]
PartitionFilters: [isnotnull(dt#17), (dt#17 = 2026-05-01)]
PushedFilters sunt împinse la cititorul de format al fișierului. Statisticile din footer-ul Parquet îi permit să sară row groups al căror min/max exclude predicatul. Asta e data skipping la nivel de coloană, filtrul tău pe amount > 40 îi permite Parquet să sară chunks întregi de fișier fără să le decodeze.
PartitionFilters elimină directoare întregi înainte ca vreun fișier să fie deschis. Dacă datele tale sunt partiționate după dt, și filtrezi pe dt = '2026-05-01', acel partition filter înseamnă că Spark listează exact un director și ignoră restul. PartitionFilters sunt cele mai ieftine filtre din Spark, se întâmplă la planning time.
Ce vrei să vezi într-un plan sănătos: fiecare filtru selectiv pe care l-ai scris apare în PushedFilters sau PartitionFilters. Dacă un filtru apare doar ca un operator Filter deasupra scanului, fără nimic în PushedFilters, citești tot fișierul și arunci rândurile. Cauze comune:
- Nepotrivire de tip (
dt = '2026-05-01'peste o coloanăDATE, are nevoie de un cast). - Un UDF în interiorul filtrului, UDF-urile sunt opace pentru optimizator.
- O expresie pe care sursa nu o poate reprezenta (regex peste text, matematică complexă pe date).
Repară filtrul, urmărește-l cum aterizează în PushedFilters, urmărește input bytes cum scad în UI. Cauză și efect.
Un exemplu lucrat
Un join mic pe două tabele. Îl vom explica și vom parcurge fazele.
from pyspark.sql import functions as F
orders = spark.read.parquet("/data/orders/") # ~5 GB, partitioned by dt
users = spark.read.parquet("/data/users/") # ~30 MB
q = (orders
.filter(F.col("dt") == "2026-05-01")
.filter(F.col("amount") > 40)
.join(users, "user_id")
.groupBy("country")
.agg(F.sum("amount").alias("total")))
q.explain(True)
.explain(True) printează patru blocuri. Răsfoiește-le în această ordine:
Parsed Logical Plan: AST-ul literal al codului tău. Referințele de coloană sunt nerezolvate ('amount, cu apostroful însemnând nerezolvat). Util doar când vrei să confirmi că Spark a parsat query-ul așa cum l-ai scris. Sari peste în 99% din cazuri.
Analyzed Logical Plan: referințele de coloană rezolvate la coloane concrete cu ID-uri (amount#15), tipuri atașate. Dacă ai o greșeală de tipar într-un nume de coloană, aici dă eroare. Dacă vezi ID-uri de coloane pe care nu le recunoști, alea sunt aliasuri sau coloane auto-generate de la join.
Optimized Logical Plan: Catalyst a rulat toate rescrierile bazate pe reguli. Aici vezi ce a făcut efectiv optimizatorul cu query-ul tău. Transformări comune de urmărit:
- Cele două filtre fuzionate într-unul:
Filter ((dt = '2026-05-01') AND (amount > 40)). - Coloanele tăiate: rămân doar
user_id,amount,country, joinul foloseșteuser_id, proiecția are nevoie deamountșicountry, restul au plecat. - Predicatele împinse sub joinuri: filtrele pe
ordersse întâmplă înainte de join, nu după.
Dacă ai scris un filtru și nu apare în planul optimizat, optimizatorul l-a aruncat ca redundant sau adevărat. Dacă ai scris o coloană pe care n-ai folosit-o, projection pruning a eliminat-o. Citirea planului optimizat îți spune ce a păstrat Catalyst.
Physical Plan: planul executabil. Parcurge-l de la frunză la rădăcină:
- Scan parquet orders cu
PartitionFilters: [(dt = '2026-05-01')]șiPushedFilters: [IsNotNull(amount), GreaterThan(amount, 40.0)]. Ambele filtre au aterizat la sursă. Bun. - Filter deasupra: Spark verifică din nou predicatul împins. Ieftin.
- Scan parquet users: mic, ar trebui să fie broadcast-at.
- BroadcastExchange: driverul colectează users, face broadcast.
- BroadcastHashJoin: orders sondează tabela hash broadcast. Excelent, fără shuffle pe partea mare.
- Project: păstrează doar
amount,country. - HashAggregate (partial): sumă parțială în interiorul fiecărui task.
- Exchange hashpartitioning(country, 200): shuffle după country.
- HashAggregate (final): sumă finală per country.
Un singur Exchange în total. Tabela mare nu a fost niciodată shuffled, broadcast-ul ne-a salvat. Ăla e planul pe care îl vrei.
Dacă în schimb ai văzut două Exchange-uri și un SortMergeJoin, broadcast-ul nu s-a declanșat, de obicei fiindcă statisticile părții mici au făcut-o să pară mai mare decât spark.sql.autoBroadcastJoinThreshold. Îl poți forța cu F.broadcast(users) sau poți crește pragul.
.explain nu e mereu adevărul: AQE
Iată realitatea de producție: cu Adaptive Query Execution pornit (care e default-ul în Spark 3.2+), execution plan-ul fizic poate fi rescris la runtime pe baza statisticilor reale de shuffle. AQE poate:
- Coalesce partiții post-shuffle mici (mai puține task-uri pentru date triviale).
- Schimbă un SortMergeJoin într-un BroadcastHashJoin dacă mărimea post-shuffle e suficient de mică pentru broadcast.
- Sparge partițiile cu skew în sub-partiții mai mici.
Ce printează .explain() e execution plan-ul fizic pre-AQE, planul așa cum l-a produs optimizatorul static. Planul care rulează efectiv e în tab-ul SQL al UI-ului, unde graful de operatori reflectă planul rescris de AQE cu cutii AQEShuffleRead și orice schimbări de strategie de join.
Implicația: .explain() e pentru înțelegerea intenției și prezicerea cazului cel mai rău. Tab-ul SQL al UI-ului e pentru confirmarea a ceea ce s-a întâmplat. Dacă diferă, AQE a rescris ceva, de obicei spre bine.
Celelalte moduri
.explain("formatted") e ce folosesc când planul are mai mult de ~30 de linii. Output-ul arată așa:
== Physical Plan ==
* HashAggregate (8)
+- Exchange (7)
+- * HashAggregate (6)
+- * Project (5)
+- * BroadcastHashJoin (4)
...
(1) Scan parquet orders
Output: [order_id#13, user_id#14, amount#15]
PushedFilters: [...]
...
(2) Filter
Input: [...]
Condition: (amount#15 > 40.0)
...
Arborele e sus cu noduri numerotate; detaliile pentru fiecare nod numerotat sunt mai jos. Mult mai ușor de citit decât forma indentată pentru planuri mari.
.explain("cost") adaugă numerele de rânduri și estimările de mărime ale optimizatorului lângă fiecare operator. Citește-l cu scepticism, vin din statisticile tabelei, iar statisticile se învechesc; sunt utile ca semnal de prim ordin, nu ca evanghelie.
Ce să faci cu asta
Când un query e lent, înainte să tunezi orice:
- Rulează
.explain(True). Parcurge planul optimizat și planul fizic. - Numără Exchange-urile. Fiecare e un shuffle. Sunt mai multe decât te așteptai?
- Verifică algoritmii de join. Vreun
BroadcastNestedLoopJoin? VreunSortMergeJoinunde voiai un broadcast? - Verifică
PushedFiltersșiPartitionFilterspe fiecare scan. Au făcut filtrele tale push-down? - Rulează query-ul. Deschide tab-ul SQL în UI. Compară planul rescris de AQE cu ce a printat
.explain. Notează ce a schimbat AQE.
Ritualul ăsta de cinci minute prinde mai multe probleme de performanță decât oricâtă tunare a lui spark.sql.shuffle.partitions. Planul e adevărul. Tunează planul, nu config-urile.
Lecția următoare: când nimic din astea nu te salvează și executor-ul moare, memory tuning și OOM postmortem.