Fiecare program PySpark, fiecare celulă de notebook, fiecare batch job, fiecare pipeline de streaming, începe cu aceleași trei linii.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MyJob").getOrCreate()
Acel obiect spark este ușa către tot ce poate face Spark. DataFrame-uri, SQL, streaming, ML, catalogul, configurația, toate atârnă de SparkSession. Dacă înțelegi ce configurează de fapt cele trei linii, ai înțeles fundamentul fiecărui job Spark pe care-l vei scrie vreodată.
Lecția asta desface incantația aceea, bucată cu bucată.
Ce este de fapt un SparkSession
Înainte de Spark 2.0, API-ul era format din trei obiecte separate: SparkContext (conexiunea la cluster, RDD-uri), SQLContext (DataFrame-uri și SQL) și HiveContext (SQL cu aromă Hive). Ca să faci ceva non-trivial aveai nevoie de toate trei, iar pattern-ul de a le construi era dureros.
Spark 2.0 le-a unificat în SparkSession. Intern, SparkSession deține în continuare un SparkContext (poți ajunge la el prin spark.sparkContext), dar aproape niciodată nu trebuie să-l atingi direct.
SparkSession este un singleton per JVM. getOrCreate() returnează cel existent dacă există sau construiește unul nou dacă nu. Într-un notebook asta contează: dacă rulezi celula de două ori, primești aceeași sesiune de ambele dăți, cu configurația din primul apel. Configurația setată la apelurile ulterioare către builder este ignorată în tăcere. Vom vedea cum să gestionăm asta.
Incantația standard, extinsă
Iată pattern-ul builder complet cu fiecare parametru care ne pasă:
from pyspark.sql import SparkSession
spark = (SparkSession.builder
.appName("RuneholdSalesETL")
.master("local[*]")
.config("spark.sql.shuffle.partitions", "8")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.driver.memory", "4g")
.getOrCreate())
Trei lucruri de știut despre builder:
- Este un API fluent: fiecare
.appName(),.master(),.config()returnează builder-ul din nou, deci le înlănțui. Parantezele de încadrare îți permit să spargi pe mai multe linii fără backslash-uri urâte. - Ordinea nu contează.
appNameînainte demastereste identic cumasterînainte deappName. getOrCreate()construiește efectiv sesiunea și o returnează. Până când nu-l apelezi, ai un builder, nu o sesiune.
Acum hai să trecem prin fiecare bucată.
appName: numirea job-ului
.appName("RuneholdSalesETL")
Numele ăsta apare:
- În bara de titlu Spark UI (
localhost:4040). - În cluster manager (YARN ResourceManager, numele pod-ului Kubernetes, Spark master web UI) când trimiți la un cluster real.
- În liniile de log și event log-uri.
- În listele de job-uri Databricks/EMR.
Alege un nume specific. MyApp, Test, Spark sunt inutile când te uiți la cinci job-uri concurente în producție. Folosește ceva precum runehold-orders-daily-etl sau customer-churn-feature-build. Nu costă nimic și te scapă data viitoare când ceva merge prost la 3 dimineața.
master: unde trăiesc executorii
.master("local[*]")
URL-ul master îi spune Spark unde să ruleze procesele executor. Asta e cea mai importantă alegere de configurație.
Valorile pe care le vei folosi efectiv:
| URL | Înțeles |
|---|---|
local | Un fir de execuție, în driver JVM. Util pentru smoke test-uri minuscule; fără paralelism deloc. |
local[N] | N fire, unde N este un număr întreg. local[4] = patru fire executor. |
local[*] | Un fir per nucleu CPU pe mașină. Cel mai bun pentru dezvoltare. |
local[*, 3] | La fel ca mai sus, dar reîncearcă task-urile eșuate de până la 3 ori. |
spark://host:7077 | Master de cluster Spark standalone. |
yarn | Cluster Hadoop YARN; configurația se ia din HADOOP_CONF_DIR. |
k8s://https://api.cluster.local:6443 | Cluster Kubernetes. |
mesos://host:5050 | Mesos. (Depreciat; nu vei vedea desfășurări noi.) |
În toate modurile local, executorii rulează în interiorul aceluiași JVM ca driver-ul. Nu există rețea. Fără serializare prin cablu. Este un singur proces care se preface a fi un cluster. Asta e excelent pentru dezvoltare: pornire rapidă, depanare ușoară, fără cluster necesar, dar înseamnă că rezultatele tale „merge pe laptopul meu” nu se transferă întotdeauna la un cluster real, unde shuffle-urile traversează rețeaua și serializarea contează.
Pentru tot ce ține de cursul ăsta, local[*] este ce vrei.
O notă despre nu seta master în cod dacă vei desfășura într-un cluster. Când faci spark-submit la YARN sau Kubernetes, cluster manager-ul transmite --master yarn (sau orice altceva) pe linia de comandă. Dacă codul tău apelează și .master("local[*]"), valoarea hardcodată câștigă, iar job-ul tău „de producție” rulează local în mașina gateway, ceea ce e rău. Pattern-ul pe care-l folosesc majoritatea echipelor:
# Nu hardcoda master in codul de productie
spark = (SparkSession.builder
.appName("RuneholdSalesETL")
# Fara .master() - se preia din spark-submit / SPARK_MASTER env
.getOrCreate())
Pentru dezvoltare locală, suprascrie pe linia de comandă: spark-submit --master local[*] my_job.py. Sau setează variabila de mediu MASTER. Vom reveni la asta în Modulul 7.
Configurațiile care contează în prima zi
.config(key, value) setează oricare dintre sutele de proprietăți spark.*. Există o listă completă la spark.apache.org/docs/latest/configuration.html. Pe majoritatea nu le vei atinge niciodată. O mână le vei seta în fiecare job.
spark.sql.shuffle.partitions
Implicit: 200. Numărul de partiții pe care Spark le folosește pentru operațiuni shuffle: join-uri, agregări, distinct-uri, orice trebuie să redistribuie date între executori.
200 este valoarea pe care inginerii Databricks au ales-o acum câțiva ani pentru că funcționează „ok” pe un cluster tipic de producție cu 50 de noduri. Pe laptopul tău cu 8 nuclee, 200 de partiții pentru un DataFrame de 10MB înseamnă că fiecare partiție are 50KB, iar Spark petrece mai mult timp programând task-uri decât făcând muncă. df.groupBy(...).count() durează 30 de secunde fără un motiv bun.
Regulă empirică pentru dev local: setează-l la numărul tău de nuclee sau la 2x numărul de nuclee.
.config("spark.sql.shuffle.partitions", "8")
În producție, valoarea corectă este aproximativ total_data_in_GB * 2 sau cores * 3, în funcție de forma job-ului. Vom acoperi tuning-ul în Modulul 9.
spark.sql.adaptive.enabled
Implicit: true începând cu Spark 3.2. Activează Adaptive Query Execution (AQE), care îi permite lui Spark să replanifice o interogare la runtime pe baza dimensiunilor reale de date din etapa anterioară. AQE va coalesce partiții shuffle minuscule, va comuta de la sort-merge la broadcast join-uri când o parte se dovedește a fi mică și va gestiona automat skew-ul.
Este, în 99% din cazuri, performanță gratuită. Las-o pornită. Setează-o explicit pentru versiuni Spark mai vechi și pentru claritate:
.config("spark.sql.adaptive.enabled", "true")
spark.driver.memory
Implicit: 1g. Dimensiunea heap-ului pentru driver JVM. Driver-ul ține planul de interogare, catalogul de scheme, variabilele broadcast și rezultatele lui .collect()/.toPandas().
Dacă vezi vreodată java.lang.OutOfMemoryError: Java heap space și nu făceai un collect() gigantic, driver-ul este probabil prea mic. Pe un laptop de dev cu 16GB RAM, dă-i 4g:
.config("spark.driver.memory", "4g")
Configurarea asta trebuie setată înainte de pornirea JVM, ceea ce înseamnă înainte de construirea SparkSession-ului. Dacă încerci spark.conf.set("spark.driver.memory", "4g") după getOrCreate(), este ignorată în tăcere: JVM-ul rulează deja cu 1GB.
spark.executor.memory
Implicit: 1g. Dimensiunea heap-ului pentru fiecare executor JVM. În modul local[*] asta este în mare parte cosmetic pentru că driver-ul și executorii împart un JVM. În modul cluster real este cel mai important buton de tuning pe care-l ai.
Pentru dev local poți să-l lași în pace sau să-l setezi explicit:
.config("spark.executor.memory", "2g")
Alte configurații pe care le vei vedea în sălbăticie
spark.serializer: de obicei setat laorg.apache.spark.serializer.KryoSerializerpentru performanță. Serializatorul Java este implicit și este lent.spark.sql.session.timeZone: implicit este timezone-ul JVM. Setează explicit laUTCca să eviți bug-uri silențioase de conversie de timezone.spark.sql.legacy.timeParserPolicy:CORRECTEDeste parserul modern,LEGACYeste comportamentul Spark 2.x. Alege unul și documentează alegerea.
Citirea și schimbarea configurațiilor la runtime
Odată ce sesiunea e pornită, poți citi orice configurație:
print(spark.conf.get("spark.sql.shuffle.partitions"))
# 8
print(spark.conf.get("spark.app.name"))
# RuneholdSalesETL
Poți schimba configurațiile de runtime (orice nu cere o repornire JVM):
spark.conf.set("spark.sql.shuffle.partitions", 16)
Dar nu și configurațiile de nivel JVM precum spark.driver.memory. Acelea trebuie setate la momentul builder-ului.
Ca să le listezi pe toate:
for k, v in spark.sparkContext.getConf().getAll():
print(f"{k} = {v}")
Vei vedea cincizeci și ceva de setări, majoritatea pe care Spark le-a completat cu valori implicite.
Spark UI
Cel mai util instrument de depanare din Spark.
print(spark.sparkContext.uiWebUrl)
# http://192.168.1.10:4040
Cât timp SparkSession este viu, îndreaptă browser-ul către acel URL. Primești:
- Jobs: fiecare acțiune pe care ai rulat-o, cu stages și task-uri, runtime și un DAG.
- Stages: defalcare la nivel de task, unde să cauți skew.
- Storage: ce ai pus în cache și câtă memorie folosește.
- SQL / DataFrame: planul de interogare, atât logic cât și fizic, cu timing-uri pe stage.
- Environment: fiecare configurație care este efectiv în vigoare.
- Executors: memorie, GC, numărul de task-uri per executor.
Dacă portul 4040 este ocupat (de exemplu un alt SparkSession rulează deja), Spark se leagă la 4041. Apoi 4042. Portul real este în spark.sparkContext.uiWebUrl.
UI-ul moare în momentul în care apelezi spark.stop(). Pentru a păstra istoricul după ce un job se termină, configurează Spark History Server, dar acela este un subiect din Modulul 8.
Un șablon complet adnotat
Iată ce lipesc efectiv în partea de sus a fiecărui script și notebook:
from pyspark.sql import SparkSession
# Construieste sesiunea. Ordinea apelurilor .config() nu conteaza; getOrCreate() declanseaza pornirea JVM.
spark = (SparkSession.builder
# Identificarea job-ului - vizibila in Spark UI si cluster manager
.appName("runehold-orders-daily-etl")
# Unde ruleaza executorii. local[*] = unul per nucleu. Nu seta in prod.
.master("local[*]")
# Paralelism la shuffle. Implicit 200 e gresit pentru laptop-uri.
.config("spark.sql.shuffle.partitions", "8")
# Heap-ul driver-ului. Seteaza INAINTE de pornirea JVM. 4g e suficient pentru dev.
.config("spark.driver.memory", "4g")
# AQE: performanta gratuita pe Spark 3.2+
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
# Evita bug-urile silentioase de timezone
.config("spark.sql.session.timeZone", "UTC")
# Serializare mai rapida
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate())
# Linistim log-urile zgomotoase de pornire
spark.sparkContext.setLogLevel("WARN")
# Verificare de sanatate
print(f"Spark version: {spark.version}")
print(f"Spark UI: {spark.sparkContext.uiWebUrl}")
print(f"App ID: {spark.sparkContext.applicationId}")
# DataFrame rapid de smoke test
df = spark.range(0, 1_000_000).selectExpr("id", "id % 7 AS modulo")
df.groupBy("modulo").count().orderBy("modulo").show()
spark.stop()
Rulează-l. Ar trebui să vezi versiunea Spark, un URL de UI, un application ID și un tabel de 7 rânduri cu numărători. Întregul script durează aproximativ 5 secunde pe un laptop modern.
Capcana „sesiunea există deja”
Într-un notebook, dacă rulezi o celulă care construiește un SparkSession cu local[4], apoi îl schimbi la local[*] și rulezi din nou, al doilea apel lovește getOrCreate(), găsește sesiunea existentă și o returnează nemodificată. Noua ta setare master este ignorată. La fel pentru orice apel .config().
Două căi de ieșire:
# Optiunea 1: opreste sesiunea existenta mai intai
spark.stop()
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Optiunea 2: curata sesiunea activa si reconstruieste
SparkSession.builder.appName("X").getOrCreate().stop()
SparkSession._instantiatedSession = None
spark = SparkSession.builder.master("local[*]").getOrCreate()
De cele mai multe ori, repornirea kernel-ului Python este mai simplă și mai puțin predispusă la erori.
Ce avem acum
Poți construi un SparkSession de la zero, seta configurațiile care contează cu adevărat, să le citești înapoi, să găsești UI-ul și să eviți capcana singleton-ului. Fiecare lecție de aici încolo începe cu presupunerea că ai o variabilă spark cu care să lucrezi.
Lecția următoare: punerea efectivă a datelor în Spark. CSV, JSON, Parquet, trei formate cu trei comportamente implicite diferite și compromisul schema-on-read care decide dacă job-ul tău durează 10 secunde sau 10 minute.