Ți se dau cheile unui cluster Spark pe care nu l-ai mai văzut. „Spune-ne ce e stricat până la ora 17.”
Lecția asta e checklist-ul de 30 de minute pe care îl rulez. Fiecare pas, în ordine, cu ce să cauți și ce să faci dacă găsești. E lecția finală a cursului fiindcă leagă tot ce am acoperit, partiționare, shuffle-uri, join-uri, memorie, AQE, streaming, tab-ul SQL, tab-ul executors, și împachetează totul într-o singură tură ordonată. O poți printa. O poți lipi în trusa ta. Ziua în care un client nou îți dă un cluster Spark, scoți asta și ești productiv în jumătate de oră.
Mulțumesc că ai rămas prin 60 de lecții. Hai să terminăm cu o tură scriptată prin tot toolkit-ul.
Pasul 1, prezentarea generală a clusterului (2 minute)
Deschide Spark UI (sau UI-ul workspace-ului dacă ești pe Databricks/EMR/Dataproc). Găsește aplicația care rulează. Sus-dreapta îți dă versiunea. Tab-ul Environment îți dă restul.
spark.version # 3.5.1, 3.4.2, etc.
spark.sparkContext.master # yarn, k8s://..., local[*], spark://...
spark.sparkContext.defaultParallelism # total cores
sc = spark.sparkContext
print(sc.statusTracker().getExecutorInfos())
Ce cauți:
- Versiunea Spark, ideal 3.2+, pentru AQE pornit by default. 3.0 / 3.1 înseamnă audit manual de config.
- Cluster manager, YARN, Kubernetes, Standalone sau vendor-managed. Tooling operațional diferit.
- Numărul și dimensiunea executorilor, câți, câtă memorie fiecare, câte core-uri fiecare. Compară cu capacitatea anunțată a clusterului.
- Dimensiunea driverului, driverele subdimensionate crapă la rezultate
collect()mari.
Pasul 2, tab-ul Executors (3 minute)
Ăsta e sp_Blitz-ul tău pentru Spark. Apasă pe Executors.
Executor ID | Address | Status | Cores | Memory Used | Task Time | GC Time | Failed | Active Tasks
driver | 10.0.1.4 | Active | 0 | 0 B / 4 GB | 0 ms | 0 ms | 0 | 0
0 | 10.0.2.10 | Active | 4 | 8.2 GB / 12 GB | 2.1 h | 18 min | 0 | 4
1 | 10.0.2.11 | DEAD | 4 | - | 1.4 h | 22 min | 17 | 0
2 | 10.0.2.12 | Active | 4 | 11.8 GB / 12 GB | 2.4 h | 1.1 h | 12 | 4
Ce cauți:
- Executori morți. Dacă vezi unii, uită-te la log-uri (link-ul
stderr), OOM, nod pierdut, sau driverul i-a omorât pe idle timeout. Morți recurenți = job-ul tău e instabil. - GC time ca fracțiune din task time. Executor 2 de mai sus are GC la ~45% din task time. E catastrofal; lecția 57 (tunarea memoriei) e fix-ul.
- Numărul de task-uri eșuate. Diferit de zero peste mai mulți executori = problemă reală (probabil skew sau OOM); concentrat pe un singur executor = un nod bolnav.
- Utilizarea memoriei aproape de limită. Executorii la 11.8 / 12 GB sunt o alocare distanță de spill sau OOM.
Notează cine arată rău și mergi mai departe.
Pasul 3, job-uri care rulează și query-uri blocate (2 minute)
Apasă pe Jobs. Sortează după Duration. Orice rulează > 1 oră merită o întrebare.
# Intr-un notebook:
[(j.jobId, j.name, j.status, j.numTasks, j.numActiveTasks)
for j in spark.sparkContext.statusTracker().getActiveJobIds()]
Ce cauți:
- Job-uri long-running care ar trebui să fie rapide (un agregat zilnic care a rulat 6 ore în loc de 30 de minute, candidat pentru kill).
- Job-uri blocate cu un singur task activ în timp ce restul s-au terminat, coada clasică de skew (lecția 28).
- Stări blocate pe partea de driver, multe stage-uri terminate dar job-ul nu se finalizează; de obicei un
collect()sautoPandas()care trage prea mult.
Ca să omori un job: Spark UI are un link (kill) lângă job-urile care rulează dacă ești admin. Sau spark.sparkContext.cancelJobGroup(...) dacă codul tău a setat un job group.
Pasul 4, statistici cumulative ale job-urilor și cele mai lente 10 (3 minute)
Apasă pe Jobs din nou, derulează la lista Completed Jobs. Sortează după Duration descrescător. Uită-te la primele 10.
Ce au în comun?
- Toate scriu în același tabel? → layout-ul de storage al acelui tabel ar putea fi gâtul de sticlă (fișiere mici, compactare lipsă, partiționare proastă).
- Toate fac join cu aceeași dimensiune? → dim-ul ăla ar putea avea nevoie de broadcast (lecția 27) sau o cheie mai bună.
- Toate citesc din aceeași sursă? → verifică statisticile și partiționarea sursei ăleia.
- Toate rulează la aceeași oră din zi? → contention pe resurse cu alt job.
Recunoașterea pattern-urilor e mai rapidă decât tunarea fiecăruia.
Pasul 5, utilizarea disk-ului pe workeri (3 minute)
Director-ele locale ale Spark (spark.local.dir, default /tmp) țin fișiere de shuffle, fișiere de spill, fișiere de broadcast și block-uri cache-uite. Se umplu.
# Pe fiecare worker (via SSH sau kubectl exec):
df -h /tmp
du -sh /tmp/spark-* 2>/dev/null | sort -h | tail
Pe platforme manageriate, verifică metrica de disk al workerului în UI-ul platformei. Pe Databricks, ăsta e panoul de storage al clusterului. Pe EMR, metricile de disk Ganglia sau CloudWatch.
Ce cauți:
/tmpmai mult de 80% plin → shuffle-ul și spill-ul vor eșua. Adaugă disk sau restartează clusterul.- Director-e vechi
spark-*-shuffledin job-uri eșuate trecute care nu au fost curățate. Se curăță la shutdown grațios dar persistă după crash-uri. Sigur de șters după ce confirmi că nu rulează nicio aplicație. - Fișiere cache care nu au fost curățate (director-e
blockmgr-*), aceeași poveste.
Pasul 6, top query-uri costisitoare în tab-ul SQL (3 minute)
Apasă pe SQL / DataFrame. Sortează după Duration. Top 10 query-uri sunt unde se duce timpul clusterului tău.
Pentru fiecare dintre cele mai rele:
- Apasă în el. Uită-te la plan.
- Găsește operatorul cel mai rău (timpul raportat cel mai mare).
- Aplică checklist-ul lecției 58: skew? Strategie de join proastă? Shuffle masiv? Filtru care nu se împinge?
Vinovați frecvenți pe un cluster proaspăt:
- Un
BroadcastNestedLoopJoinundeva, condiție de join lipsă. - Un
SortMergeJoinîntre un tabel de 50 GB și un lookup table de 5 MB fiindcă cineva a dezactivatautoBroadcastJoinThresholdcu ani în urmă și a uitat. - Un
groupBycu 50 de partiții de 8 GB fiecare fiindcă AQE e oprit. - UDF-uri Python (
BatchEvalPython) făcând muncă ce ar putea fi o funcție SQL (lecția 40).
Astea sunt query-urile de înregistrat ca follow-up, nu neapărat de reparat astăzi.
Pasul 7, eșecuri recente și clasa de eroare (2 minute)
Pentru clusterele Databricks: tab-ul Event Log și Failed Jobs. Pentru YARN: yarn application -list -appStates FAILED și log-urile aplicației. Pentru Kubernetes: kubectl get pods și pod-urile de executor care au ieșit non-zero.
Grupează eșecurile recente după clasa de eroare:
- Driver OOM,
java.lang.OutOfMemoryError: Java heap spacepe driver. De obicei uncollect(),toPandas(), sau broadcast uriaș. Lecția 57. - Executor OOM, aceeași excepție pe un executor. Skew (lecția 28), cache prost (lecția 23), sau memorie subdimensionată.
- Lost executor,
ExecutorLostFailureurmat de retry de stage. De obicei OOM-killed de OS / container manager. MăreștememoryOverhead. - Shuffle fetch failed,
FetchFailedException. Un executor a murit în timp ce altul citea de la el. Simptom al oricăruia dintre cele de mai sus. - Task failed N times,
spark.task.maxFailures(default 4) a fost atins. Bug real sau skew persistent.
Defalcarea îți spune clasa dominantă, care îți spune ce fix să prioritizezi.
Pasul 8, query-uri de streaming și checkpoint-uri (3 minute)
Dacă cineva rulează Structured Streaming pe acest cluster:
for q in spark.streams.active:
last = q.lastProgress
print(q.name, q.status, last.get("inputRowsPerSecond"), last.get("processedRowsPerSecond"),
last.get("batchDuration"), last.get("triggerExecution"))
Ce cauți:
- Query-uri unde input rate > processed rate susținut → rămâne în urmă, backlog nelimitat.
- Query-uri cu
batchDuration> intervalul de trigger → nu poate ține pasul; cluster mai mare sau workload mai mic. - Query-uri unde
numInputRowse zero ore întregi → upstream-ul e mort, query-ul e idle.
Apoi verifică storage-ul de checkpoint:
# S3 sau DBFS sau orice e checkpoint root
aws s3 ls s3://my-checkpoints/ --recursive --summarize | tail -3
Ce cauți:
- Director-e de checkpoint ale query-urilor moarte pe care nu le-a curățat nimeni, irosesc storage.
- Arbori de checkpoint singulari în GB fiindcă politica de retenție nu prune-uiește state-ul. Query-urile stateful (lecția 53) fără watermark-uri acumulează la nesfârșit.
Pasul 9, job-uri programate și eșecuri recente (2 minute)
Toți cluster manager-ii au view-uri pentru job-urile programate.
- Databricks: tab-ul Workflows → Jobs → sortează după statusul ultimei rulări.
- EMR: status Step; rulări Airflow DAG dacă orchestrezi astfel.
- Dataproc: template-uri Workflow și rulări recente de job-uri.
- Plain Airflow / Dagster / etc.: instanțe de task-uri eșuate în orchestrator.
Ce cauți:
- Rulări eșuate în ultimele 7 zile. Pattern-uri: același job eșuând zilnic, aceeași oră a zilei, aceeași eroare?
- Job-uri care nu au mai rulat de luni. Programare dezactivată? Ar trebui șterse?
- Rulări care au reușit dar au durat de 4 ori mai mult decât de obicei → regresii silențioase, înregistrează ca follow-up.
Pasul 10, audit de cache (2 minute)
Apasă tab-ul Storage.
RDD Name | Storage Level | Cached Partitions | Memory | Disk
df_users (id 12) | MEMORY_AND_DISK | 200 / 200 | 18 GB | 0
df_huge_facts (id 14) | MEMORY_ONLY | 80 / 1200 | 12 GB | 0
df_unused_2024 (id 5) | MEMORY_AND_DISK | 200 / 200 | 6 GB | 0
Ce cauți:
- Dataset-uri cache-uite mai mari decât încape în memorie (al doilea rând de mai sus, doar 80 din 1200 de partiții cache-uite). Cache-ul nu face nimic; fie mărește memoria, fie nu mai cache-ui.
- Dataset-uri cache-uite care n-au mai fost atinse de ore întregi, leak-uite dintr-un notebook care e încă atașat. Unpersist-uiește-le.
- DataFrame-uri cache-uite multiple foarte similare, cineva a cache-uit aceleași date de trei ori sub nume diferite de variabile.
spark.sparkContext._jsc.getPersistentRDDs() listează tot ce e cache-uit în acest moment, dacă vrei să scriptezi auditul.
Pasul 11, mentenanță Delta / Iceberg pe tabele (2 minute)
Dacă clusterul citește/scrie tabele Delta sau Iceberg, verifică dacă rulează mentenanță:
DESCRIBE HISTORY my.table LIMIT 10; -- ultimele operatii
DESCRIBE DETAIL my.table; -- numFiles, sizeInBytes
Ce cauți:
numFiles> 10.000 pe tabele sub câțiva TB → problemă de small files, are nevoie deOPTIMIZE.- Niciun
OPTIMIZEîn istoric de luni de zile → programează compactare săptămânală. - Niciun
VACUUMnici el → fișiere versionate vechi acumulându-se în object storage; costă bani. - Echivalente Iceberg:
expire_snapshotsșirewrite_data_files, tot pe schedule.
Fără astea, tabelele tale se degradează în liniște.
Pasul 12, sanitate la configurarea memoriei (2 minute)
sc.getConf().getAll()
Sau pur și simplu tab-ul Environment în UI, derulează la Spark Properties.
Ce cauți:
spark.executor.memoryșispark.executor.memoryOverhead, overhead-ul ar trebui să fie aproximativ 10% din memoria executorului sau 1 GB, oricare e mai mare. Mai strâns de atât și YARN/K8s va omorî container-ele (lecția 57).spark.driver.memory, dacă cineva apeleazăcollect()pe acest cluster, asta contează. Default-ul de 1 GB e prea mic pentru orice serios.spark.executor.cores, de obicei 4-5. Mai mult înseamnă mai multă contention pentru JVM, mai puțin înseamnă mai multe JVM-uri (overhead).spark.sql.shuffle.partitions, dacă e 200 (default-ul) și procesezi terabyți, ai o problemă. AQE ajută dar nu repară totul.
Dacă container-ele sunt omorâte (verifică lista de executori la pasul 2 plus evenimente YARN/K8s), memoryOverhead e primul lucru de mărit.
Pasul 13, drift de configurare (2 minute)
Același tab Environment. Filtrează după spark.sql.* și spark.shuffle.*. Scanează după non-default-uri.
Ce cauți:
spark.sql.adaptive.enabled = false→ pornește-l înapoi (lecția 59) decât dacă cineva are un motiv documentat.spark.sql.autoBroadcastJoinThreshold = -1→ broadcast dezactivat. Aproape niciodată o idee bună pe un workload real.spark.sql.shuffle.partitions = 2000→ setat sus o dată pentru un job ad-hoc, niciodată resetat. Acum afectează fiecare job pe cluster.- Serializeri custom, codec-uri, evict policies, fiecare non-default are nevoie de un comentariu în docs-ul echipei tale care să explice de ce. Dacă nu știe nimeni de ce, suspect.
Documentează fiecare non-default cu justificarea sa. Orice nu poți justifica, revert.
Pasul 14, scrie raportul (3 minute)
Ia notițele și transformă-le într-un raport scurt. Oglindește template-ul SQL Server, ține-l scurt, prioritizat, acționabil.
Customer X Spark Cluster Health Check, 2026-06-18
- Overall: Galben. Două findings P1.
- Critical:
- Query-ul de streaming
clickstream-aggsrămâne în urmă cu 12 ore; input rate de 4 ori mai mare decât processed rate. Fie cluster mai mare, fie scade workload-ul. Decide astăzi. - Cluster-ul rulează Spark 3.0 → AQE oprit by default, mai multe job-uri ar beneficia imediat. Planifică un upgrade sau backport-ează configurile AQE.
- Query-ul de streaming
- High:
- OOM-uri de executor zilnic pe job-ul
daily-rollup. Semnal: skew pecountry_code. Salt sau AQE. - 4 din 12 executori arată GC time > 30% din task time. Mărește memoria executorului sau micșorează dataset-urile cache-uite.
- Niciodată rulat
OPTIMIZEpeevents_delta; 47.000 de fișiere mici pe un tabel de 800 GB. Programează OPTIMIZE săptămânal.
- OOM-uri de executor zilnic pe job-ul
- Medium:
BroadcastNestedLoopJoinîn top-ul query-urilor costisitoare (weekly_finance). Condiție de join lipsă; query-ul durează 40 de minute per rulare.spark.sql.shuffle.partitions = 4000setat pentru un job ad-hoc în martie, niciodată reverted; afectează negativ job-urile mici.df_unused_2024cache-uit (6 GB, MEMORY_AND_DISK) atașat la un notebook idle. Unpersist.
- Low:
- 12 log-uri de executori morți acumulate în
/tmppe workeri. Adaugă un cron de cleanup. - Job-ul
etl-archiven-a mai rulat cu succes din ianuarie. Dezactivează sau repară.
- 12 log-uri de executori morți acumulate în
- Follow-up:
- Audit la toate configurile Spark față de default-uri; documentează cele păstrate.
- Migrează clusterul la Spark 3.5 LTS.
- Setează schedule pentru Delta
OPTIMIZE+VACUUMpeste toate tabelele de producție. - Adaugă dashboard-uri Prometheus / DataDog la nivel de cluster pentru GC executor, volum shuffle, rata de eșec a task-urilor.
Asta e livrabilul tău. Scurt, acționabil, prioritizat. Trimite-l prin email cui ți-a dat cheile clusterului.
Pasul 15, ce să faci după acest curs
Acum cunoști echivalentul a câțiva ani de experiență Spark on-the-job, comprimată în 60 de lecții. Ce să citești și să urmărești în continuare:
- Documentația Spark,
Performance Tuningși secțiunea AQE. Sursa de adevăr pentru fiecare config pe care o vei atinge. - Gitbook-ul lui Jacek Laskowski, The Internals of Apache Spark și The Internals of Spark SQL. Gratuit, profund tehnic, locul unde mergi când trebuie să știi ce face efectiv un operator.
- Cărțile și prezentările lui Holden Karau, High Performance Spark și Learning Spark. Orientate spre producție, scrise de cineva care face debugging la clustere Spark reale de ani de zile.
- Blogul Databricks, părtinitor spre platforma lor dar plin de deep dives despre Catalyst, AQE, Delta, Photon. Citește cu filtrul de bias pornit.
- Apache Spark JIRA, când lovești un bug ciudat, caută-l. E o șansă de 50/50 ca fix-ul să fie în următoarea versiune minor.
- Practică pe un workload real, un sandbox cu TPC-DS sau datele tale. Rulează health check-ul, repară findings, repetă lunar. Așa se formează memoria musculară.
Felicitări
Ai trecut prin 60 de lecții de PySpark. Acum cunoști:
- Fundamentele, ce e Spark, arhitectura, ierarhia RDD/DataFrame/Dataset (Modulele 1-2).
- API-ul DataFrame, citire, scriere, transformare, agregare, window, pivot, UDF (Modulele 3-4).
- Mecanica execuției, lazy evaluation, narrow vs wide, DAG-ul, caching, shuffles, joins, broadcast, skew, salting (Modulele 5-6).
- Partitioning, bucketing, file layout, on-disk vs in-memory (Modulul 7).
- Optimizatorul, Catalyst, Tungsten, tab-ul SQL, formate de fișiere și JDBC și cloud storage (Modulul 8).
- Streaming, surse, watermark-uri, operații stateful, output modes, sinks (Modulul 9).
- Producție, debugging la job-uri lente, AQE, health check-ul de cluster (Modulul 10).
Dacă acest curs și-a făcut treaba, ești gata să deschizi un cluster Spark pe care nu l-ai mai văzut, să-i diagnostichezi problemele în 30 de minute, să repari problemele de top și să nu intri în panică când pager-ul de on-call sună la 3 dimineața. Ăsta e standardul. Asta înseamnă „să cunoști PySpark” la acest nivel.
Vei tot învăța lucruri noi în fiecare săptămână, asta e meseria. Dar acum ai cadrul. Următorul blog post sau release note se potrivește într-o structură care are sens, în loc să fie un zid de termeni nefamiliari.
Du-te și ai grijă de pipeline-urile tale. Ele depind de tine.
Mulțumesc pentru lectură. Dacă ai observat erori, ai sugestii sau vrei să te cerți despre dacă repartition() sau coalesce() e alegerea corectă, salută.