Pentru cea mai mare parte a istoriei Spark, optimizatorul a funcționat așa cum funcționează orice optimizator relațional: citește query-ul, se uită la statisticile tabelului, planifică totul în avans, rulează. Dacă statisticile erau greșite, să zicem că faci join cu un tabel ieșit dintr-un stage Spark anterior fără statistici reale, planul era greșit și plăteai costul.
Spark 3.0 a introdus Adaptive Query Execution (AQE) și a schimbat asta. AQE rescrie planul fizic la runtime, după ce stage-urile timpurii se termină și Spark are dimensiuni reale în loc de presupuneri. La Spark 3.2 era pornit by default. La 3.5 era load-bearing pentru jumătate din optimizările pe care le recomandă docs. Dacă ești pe un Spark recent și nu folosești AQE, lași 30-50% din optimizator pe masă.
Lecția asta e despre ce face AQE de fapt, configurile care îl controlează și unde tot nu te poate ajuta.
Cele trei lucruri pe care le face AQE
AQE e o colecție de optimizări la runtime, dar practic îți pasă de trei.
1. Dynamic partition coalescing
Asta e cea de care beneficiază imediat majoritatea job-urilor.
Fără AQE: setezi spark.sql.shuffle.partitions = 200. Spark face mereu shuffle în 200 de partiții. Dacă datele tale post-shuffle sunt 4 GB total, asta e 20 MB per partiție, ok. Dacă sunt 40 MB total, asta e 200 KB per partiție și ai 200 de task-uri de overhead procesând partiții aproape goale. Dacă sunt 400 GB, ai 200 de partiții de 2 GB fiecare, greu pentru un singur task.
Numărul 200 e o presupunere făcută înainte ca Spark să știe ceva despre date.
Cu AQE: după shuffle, Spark se uită la cât de mare e fiecare partiție de output efectiv și face coalescing la cele mici adiacente în unele mai mari. Presupunerea de 200 de partiții devine 8 partiții de 50 MB dacă asta cer datele. Mai puțin overhead, mai puține task-uri minuscule, job mai rapid.
Aproape niciodată nu mai trebuie să apelezi coalesce() manual după un groupBy. AQE se ocupă.
# Configurile care conduc coalescing-ul
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.sql.adaptive.coalescePartitions.minPartitionNum": "1",
"spark.sql.adaptive.advisoryPartitionSizeInBytes": "64m",
advisoryPartitionSizeInBytes e ce vizează AQE după coalescing. 64 MB e default; multe echipe îl ridică la 128 MB sau 256 MB pentru job-uri batch ca să reducă numărul de task-uri și mai mult. Nu trece peste ce încape confortabil în working set-ul unui singur task.
2. Gestionarea join-urilor skewed
Al doilea lucru pe care îl face AQE e să detecteze partițiile skewed în join-uri la runtime și să le împartă.
Fără AQE: un SortMergeJoin pe o cheie unde 80% din rânduri au country = 'US' produce o partiție uriașă procesată de un singur task, coada clasică de skew. Stai și te uiți cum un task rulează 40 de minute în timp ce celelalte 199 s-au terminat în 10 secunde. Salting-ul (lecția 29) era fix-ul manual.
Cu AQE: Spark observă că o partiție shuffle-uită e mai mare decât skewedPartitionFactor × medianul (default 5x) și mai mare decât skewedPartitionThresholdInBytes (default 256 MB). Când ambele condiții sunt îndeplinite, Spark împarte partiția grasă în bucăți mai mici, replică rândurile potrivite din cealaltă parte și le rulează în paralel. Salting automat pentru cazurile ușoare, fără modificare de cod.
"spark.sql.adaptive.skewJoin.enabled": "true",
"spark.sql.adaptive.skewJoin.skewedPartitionFactor": "5",
"spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes": "256MB",
Dacă tot lovești skew cu astea pornite, scade factorul (încearcă 3) sau pragul (încearcă 64 MB), AQE va împărți mai agresiv. Compromisul e că împărțirea agresivă pe un dataset non-skewed adaugă overhead, deci nu o forța fără un câștig măsurat.
3. Comutarea strategiei de join
Al treilea e mai subtil dar plătește pe job-uri mari. Spark a planificat un SortMergeJoin fiindcă la momentul planificării ambele părți păreau prea mari pentru broadcast. Dar poate partea de build vine dintr-un filtru anterior care aruncă 99% din rânduri. La momentul planificării Spark a estimat 5 GB; la runtime e de fapt 30 MB. Cu AQE, Spark observă și comută join-ul la un BroadcastHashJoin, mult mai ieftin.
Asta e activat by default cu AQE. Configul e spark.sql.adaptive.localShuffleReader.enabled (true), care permite părții de broadcast să citească fișierele de shuffle local fără să re-shuffle-uiască cealaltă parte. spark.sql.autoBroadcastJoinThreshold se aplică în continuare, dimensiunile la runtime sub prag flip-ează join-ul.
Cum funcționează AQE de fapt sub capotă (pe scurt)
Merită înțeles o dată, fiindcă explică limitele.
Un plan Spark e un arbore de stage-uri separate de shuffle-uri. Fără AQE, Spark planifică tot arborele în avans și îl rulează. Cu AQE, Spark planifică până la prima graniță de shuffle, rulează aia, măsoară dimensiunile efective ale output-ului, apoi re-planifică restul arborelui folosind acele numere reale în loc de estimările optimizatorului. Face asta recursiv la fiecare graniță de shuffle.
Acea re-planificare e locul în care se întâmplă magia. Cu dimensiuni reale în mână, optimizatorul poate:
- Vedea că o parte „mare” e de fapt mică și să comute la broadcast.
- Vedea că o partiție e skewed și să o împartă.
- Vedea că datele post-shuffle sunt mici și să facă coalescing partițiilor.
Cele două consecințe ale acestui design:
- AQE ajută doar la granițele de shuffle. Fără shuffle, fără măsurătoare, fără re-plan. De aceea skew-ul pe partea sursă fără shuffle nu poate beneficia (mai multe mai jos).
- AQE adaugă un mic overhead de planificare per stage. Pentru query-uri foarte rapide pe input-uri minuscule, overhead-ul poate depăși beneficiul. Pentru orice altceva, e neglijabil față de economiile la runtime.
Poți vedea granița în planul SQL: fiecare shuffle e împachetat într-un ShuffleQueryStage, iar AQE se inserează între stage-uri.
Un before/after real
Un exemplu mic dar realist. Citește un fact table, filtrează la un singur client, fă join cu un dim table. Fără AQE filtrul nu schimbă strategia de join și ești tot pe teritoriul SortMergeJoin:
spark.conf.set("spark.sql.adaptive.enabled", "false")
spark.conf.set("spark.sql.shuffle.partitions", "200")
orders = spark.read.parquet("s3://lake/orders/") # 80 GB
customers = spark.read.parquet("s3://lake/customers/") # 12 GB
orders.filter("customer_id = 'cust_42'") \
.join(customers, "customer_id") \
.groupBy("region") \
.sum("amount") \
.write.parquet("s3://out/cust42-by-region/")
Filtrul ăla coboară partea de orders la ~80 MB. Dar Spark a planificat join-ul când ambele părți erau mari, deci primești 200 de task-uri reducer pentru ~80 MB de date join-uite cu 12 GB. Wall clock: 14 minute.
Pornește AQE, același job:
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
Ce se schimbă:
- După shuffle-ul filtrului, AQE vede că partea de orders e 80 MB și comută planul ca să facă broadcast-hash-join orders-ul în customers (partea mică s-a schimbat, acum e orders-ul filtrat).
- Shuffle-ul groupBy de după join are 200 de partiții dar ~150 KB fiecare. AQE face coalescing la 4-8 partiții.
Wall clock: 90 de secunde. Același cod, aceleași date, o singură schimbare de config.
Nu vei vedea mereu 10x. Dar vei vedea adesea 2-3x pe workload-uri reale, iar AQE aproape niciodată nu face un job mai lent, în cel mai rău caz e neutru pe query-uri deja optime.
Citirea planurilor rescrise de AQE
Tab-ul SQL din UI arată planul de la runtime. Vei vedea operatori precum AdaptiveSparkPlan, CustomShuffleReader (coalesced sau local) și ShuffleQueryStage. Există un view „Initial Plan” și un view „Final Plan”; diff-ul dintre ele e exact ce a rescris AQE.
Câteva pattern-uri de recunoscut:
CustomShuffleReader coalesced→ s-a întâmplat coalescing, mai puține partiții decât planificat inițial.BroadcastHashJoinlângă unShuffleQueryStage→ AQE a comutat un SortMergeJoin la broadcast la runtime.- Citiri multiple
CustomShuffleReaderale aceluiași shuffle, cu adnotăriSkewed=true→ AQE a gestionat skew-ul.
Dacă nu vezi adnotările astea pe un query la care te așteptai să-l optimizeze AQE, verifică dacă AQE e cu adevărat pornit. spark.conf.get("spark.sql.adaptive.enabled") ar trebui să returneze "true". Unele platforme vin cu el dezactivat în default-ul de cluster, Databricks îl are pornit, vanilla open-source Spark < 3.2 îl are oprit.
Unde tot nu poate AQE să ajute
AQE e grozav. Nu e magie. Nu te poate ajuta cu:
Skew pe partea sursă, fără shuffle. Dacă cheia ta skewed e în fișierul de input (un fișier Parquet are 90% din rânduri pentru country='US' din cauza modului în care a fost scris) și query-ul tău nu face shuffle, AQE n-are la ce să reacționeze. Fix-ul e să faci repartition la sursă la citire, sau să repari scrierea upstream, sau să filtrezi cheia fierbinte și să o procesezi separat. Lecția 28 încă se aplică.
Broadcast joins. Odată ce un join e broadcast, nu e niciun shuffle de inspectat. Dacă partea de broadcast în sine are skew care explodează la join (un one-to-many unde o cheie matchează 80% din partea de build), AQE nu poate interveni. Ai nevoie de o strategie de join diferită, sau de salting-ul din lecția 29.
Skew pe coloane derivate. Dacă cheia ta de join e concat(country, city) iar skew-ul e în country, AQE se uită la dimensiunile partițiilor cheii concatenate. Tot funcționează în majoritatea cazurilor, dar dacă concat-ul distribuie skew-ul peste mai multe partiții uniform suficient cât să scape de skewedPartitionFactor, AQE nu va împărți. Va trebui să inspectezi manual.
Coalescing de partiții minuscule într-un stage memory-bound. AQE face coalescing partițiilor mici ca să reducă overhead-ul. Dacă nevoile de memorie per task ale stage-ului următor scalează cu dimensiunea partiției, o funcție window cu un partition spec uriaș, o agregare cu cardinalitate mare, coalescing-ul la partiții mai mari te poate împinge în teritoriul spill-ului. Urmărește spill-ul în stage-ul următor; dacă îl vezi după activarea AQE, scade înapoi advisoryPartitionSizeInBytes.
Streaming. AQE nu se aplică la query-urile Structured Streaming. Engine-ul de streaming planifică micro-batch-urile diferit și multe rescrieri AQE ar strica semantica incrementală. Pentru streaming, tunezi partițiile explicit.
Ce ar trebui să pornești, astăzi
Pentru job-uri batch pe Spark 3.2+, configul default sigur este:
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m")
Majoritatea platformelor au asta pornit by default deja. Verifică cu spark.conf.get. Dacă ești pe un cluster self-managed care rulează Spark 3.0 sau 3.1, AQE era oprit by default, pornește-l, rulează-ți job-urile, urmărește-le cum se accelerează.
Cazurile rare în care ai dezactiva AQE: o pipeline foarte stabilă, hand-tuned, în care ai măsurat o regresie, sau un query în care overhead-ul de runtime al planner-ului AQE depășește beneficiul (query-uri foarte scurte pe input-uri mici). Astea sunt cazuri marginale. Răspunsul implicit e „lasă-l pornit”.
Legând înapoi la checklist-ul de debugging
Lecția 58 a fost bucla de diagnoză: găsește stage-ul lent, verifică skew, GC, volum de shuffle, plan. AQE e răspunsul la „am skew” și „partițiile mele de shuffle sunt greșite” pentru multe job-uri. Dacă urmezi checklist-ul și tot găsești skew, prima întrebare e dacă AQE e pornit. A doua e dacă prinde skew-ul (uită-te în planul SQL după adnotările skewedPartition). A treia, doar dacă ambele sunt adevărate, e dacă ai nevoie de salting manual peste.
În cele mai multe zile, AQE e suficient. În cele mai multe zile nu trebuie să te gândești la el. Asta e ideea.
Lecția următoare e capstone-ul cursului: un health check de 30 de minute pe un cluster Spark pe care nu l-ai mai văzut. Tura finală care leagă tot ce am acoperit. Adu cafea.