PySpark, dalle fondamenta Lezione 8 / 60

La tua prima SparkSession

Il punto di ingresso di ogni job PySpark. Cos'è una SparkSession, le configurazioni che contano, e cosa significa davvero `local[*]`.

Ogni programma PySpark, ogni cella di notebook, ogni job batch, ogni pipeline di streaming, inizia con le stesse tre righe.

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MyJob").getOrCreate()

Quell’oggetto spark è la porta verso tutto ciò che Spark sa fare. DataFrame, SQL, streaming, ML, il catalogo, la configurazione: tutto pende dalla SparkSession. Se capisci cosa quelle tre righe configurano davvero, hai capito le fondamenta di ogni job Spark che scriverai mai.

Questa lezione smonta quell’incantesimo, pezzo per pezzo.

Cos’è davvero una SparkSession

Prima di Spark 2.0, l’API era fatta di tre oggetti separati: SparkContext (connessione al cluster, RDD), SQLContext (DataFrame e SQL) e HiveContext (SQL in stile Hive). Per fare qualunque cosa di non banale ti servivano tutti e tre, e il pattern per costruirli era doloroso.

Spark 2.0 li ha unificati in SparkSession. Internamente, la SparkSession possiede ancora un SparkContext (puoi raggiungerlo tramite spark.sparkContext), ma quasi mai hai bisogno di toccarlo direttamente.

La SparkSession è un singleton per JVM. getOrCreate() restituisce quella esistente se esiste, oppure ne costruisce una nuova se non c’è. In un notebook questo è importante: se esegui la cella due volte, ottieni la stessa sessione entrambe le volte, con la configurazione della prima chiamata. Le configurazioni impostate in chiamate successive a builder vengono silenziosamente ignorate. Vedremo come affrontare la cosa.

L’incantesimo standard, espanso

Ecco il pattern del builder completo con ogni parametro che ci interessa:

from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .appName("RuneholdSalesETL")
         .master("local[*]")
         .config("spark.sql.shuffle.partitions", "8")
         .config("spark.sql.adaptive.enabled", "true")
         .config("spark.driver.memory", "4g")
         .getOrCreate())

Tre cose da sapere sul builder:

  1. È una fluent API: ogni .appName(), .master(), .config() restituisce il builder stesso, quindi li concateni. Le parentesi che avvolgono ti permettono di andare a capo senza brutti backslash.
  2. L’ordine non conta. appName prima di master è identico a master prima di appName.
  3. getOrCreate() costruisce davvero la sessione e la restituisce. Finché non lo chiami, hai un builder, non una sessione.

Adesso vediamo ogni pezzo.

appName, dare un nome al job

.appName("RuneholdSalesETL")

Questo nome compare:

  • Nella barra del titolo della Spark UI (localhost:4040).
  • Nel cluster manager (YARN ResourceManager, nome del pod Kubernetes, web UI del master Spark) quando fai submit a un cluster vero.
  • Nelle righe di log e nei log degli eventi.
  • Nelle liste di job di Databricks/EMR.

Scegli un nome che sia specifico. MyApp, Test, Spark sono inutili quando stai fissando cinque job concorrenti in produzione. Usa qualcosa tipo runehold-orders-daily-etl o customer-churn-feature-build. Non costa niente e ti salva la prossima volta che qualcosa va storto alle 3 di notte.

master, dove vivono gli executor

.master("local[*]")

L’URL del master dice a Spark dove eseguire i processi executor. Questa è la singola scelta di configurazione più importante.

I valori che userai davvero:

URLSignificato
localUn thread, nella JVM del driver. Utile per smoke test minuscoli; nessun parallelismo.
local[N]N thread, dove N è un intero. local[4] = quattro thread executor.
local[*]Un thread per core CPU della macchina. Il migliore per lo sviluppo.
local[*, 3]Come sopra, ma riprova i task falliti fino a 3 volte.
spark://host:7077Master di un cluster Spark standalone.
yarnCluster Hadoop YARN: la config viene presa da HADOOP_CONF_DIR.
k8s://https://api.cluster.local:6443Cluster Kubernetes.
mesos://host:5050Mesos. (Deprecato; non vedrai nuovi deployment.)

In tutte le modalità local, gli executor girano dentro la stessa JVM del driver. Non c’è rete. Niente serializzazione attraverso il cavo. È un solo processo che finge di essere un cluster. Ottimo per lo sviluppo: avvio rapido, debug facile, niente cluster necessario, ma significa che i tuoi risultati “funziona sul mio portatile” non sempre si traducono su un cluster vero, dove gli shuffle attraversano la rete e la serializzazione conta.

Per tutto in questo corso, local[*] è quello che vuoi.

Una nota sul non impostare master nel codice se farai deploy su un cluster. Quando fai spark-submit a YARN o Kubernetes, il cluster manager passa --master yarn (o quello che è) sulla riga di comando. Se anche il tuo codice chiama .master("local[*]"), vince il valore hardcoded e il tuo job “in produzione” gira localmente dentro la macchina gateway, che è male. Il pattern che la maggior parte dei team usa:

# Non fissare master nel codice di produzione
spark = (SparkSession.builder
         .appName("RuneholdSalesETL")
         # Niente .master(), prendilo da spark-submit / SPARK_MASTER env
         .getOrCreate())

Per lo sviluppo locale, fai override sulla riga di comando: spark-submit --master local[*] my_job.py. O imposta la variabile d’ambiente MASTER. Torneremo su questo nel Modulo 7.

Le config che contano nel primo giorno

.config(key, value) imposta una qualunque delle centinaia di proprietà spark.*. C’è una lista completa su spark.apache.org/docs/latest/configuration.html. La maggior parte non le toccherai mai. Una manciata le imposterai in ogni job.

spark.sql.shuffle.partitions

Default: 200. Il numero di partizioni che Spark usa per le operazioni di shuffle: join, aggregazioni, distinct, qualunque cosa debba ridistribuire i dati tra gli executor.

200 è il valore che gli ingegneri Databricks hanno scelto anni fa perché funziona “abbastanza bene” su un tipico cluster di produzione da 50 nodi. Sul tuo portatile con 8 core, 200 partizioni di un DataFrame da 10MB significano che ogni partizione ha 50KB e Spark passa più tempo a schedulare task che a fare lavoro. Il tuo df.groupBy(...).count() ci mette 30 secondi per nessun buon motivo.

Regola del pollice per dev locale: imposta al numero di core, o 2x il numero di core.

.config("spark.sql.shuffle.partitions", "8")

In produzione, il valore giusto è grossomodo dati_totali_in_GB * 2 o core * 3, a seconda della forma del job. Vedremo il tuning nel Modulo 9.

spark.sql.adaptive.enabled

Default: true in Spark 3.2+. Abilita l’Adaptive Query Execution (AQE), che permette a Spark di ripianificare una query a runtime basandosi sulle dimensioni reali dei dati dallo stage precedente. AQE coalescerà le partizioni di shuffle minuscole, passerà da sort-merge a broadcast join quando un lato si rivela piccolo, e gestirà lo skew automaticamente.

È nel 99% dei casi performance gratis. Lascialo acceso. Impostalo esplicitamente per le versioni più vecchie di Spark e per chiarezza:

.config("spark.sql.adaptive.enabled", "true")

spark.driver.memory

Default: 1g. La dimensione dell’heap della JVM del driver. Il driver tiene il piano della query, il catalogo dello schema, le broadcast variable, e i risultati di .collect()/.toPandas().

Se mai vedi java.lang.OutOfMemoryError: Java heap space e non stavi facendo un collect() enorme, probabilmente il driver è troppo piccolo. Su un portatile dev con 16GB di RAM, dagli 4g:

.config("spark.driver.memory", "4g")

Questa config va impostata prima che la JVM si avvii, il che significa prima che la SparkSession sia costruita. Se provi spark.conf.set("spark.driver.memory", "4g") dopo getOrCreate(), viene silenziosamente ignorato: la JVM gira già con 1GB.

spark.executor.memory

Default: 1g. La dimensione dell’heap di ogni JVM executor. In modalità local[*] è per lo più cosmetico perché driver ed executor condividono una JVM. In modalità cluster vera è la manopola di tuning più importante che hai.

Per dev locale puoi lasciarlo perdere o impostarlo esplicitamente:

.config("spark.executor.memory", "2g")

Altre config che vedrai in giro

  • spark.serializer: di solito impostato su org.apache.spark.serializer.KryoSerializer per le performance. Il serializer Java è il default ed è lento.
  • spark.sql.session.timeZone: di default è il timezone della JVM. Imposta esplicitamente a UTC per evitare bug silenziosi di conversione di timezone.
  • spark.sql.legacy.timeParserPolicy: CORRECTED è il parser moderno, LEGACY è il comportamento di Spark 2.x. Scegli uno e documenta la scelta.

Leggere e cambiare le config a runtime

Una volta che la sessione è su, puoi leggere qualunque config:

print(spark.conf.get("spark.sql.shuffle.partitions"))
# 8

print(spark.conf.get("spark.app.name"))
# RuneholdSalesETL

Puoi cambiare le config a runtime (qualunque cosa non richieda un riavvio della JVM):

spark.conf.set("spark.sql.shuffle.partitions", 16)

Ma non le config a livello JVM come spark.driver.memory. Quelle vanno impostate al momento del builder.

Per elencare tutto:

for k, v in spark.sparkContext.getConf().getAll():
    print(f"{k} = {v}")

Vedrai una cinquantina di impostazioni, la maggior parte delle quali Spark ha riempito con i default.

La Spark UI

Il singolo strumento di debug più utile in Spark.

print(spark.sparkContext.uiWebUrl)
# http://192.168.1.10:4040

Mentre la SparkSession è viva, punta il tuo browser a quell’URL. Ottieni:

  • Jobs: ogni action che hai eseguito, con stage e task, runtime, e un DAG.
  • Stages: scomposizione a livello di task, dove cercare lo skew.
  • Storage: cosa hai messo in cache e quanta memoria sta usando.
  • SQL / DataFrame: il piano della query, sia logico che fisico, con i tempi degli stage.
  • Environment: ogni config effettivamente in vigore.
  • Executors: memoria, GC, conteggio task per executor.

Se la porta 4040 è occupata (ad esempio un’altra SparkSession è già in esecuzione), Spark si lega a 4041. Poi 4042. La porta effettiva è in spark.sparkContext.uiWebUrl.

La UI muore nel momento in cui chiami spark.stop(). Per tenere la storia in giro dopo che un job finisce, configura lo Spark History Server, ma quello è un argomento del Modulo 8.

Un template completo annotato

Ecco quello che incollo davvero in cima a ogni script e notebook:

from pyspark.sql import SparkSession

# Costruisce la sessione. L'ordine delle chiamate .config() non conta; è getOrCreate() che fa partire la JVM.
spark = (SparkSession.builder
         # Identificazione del job, visibile in Spark UI e cluster manager
         .appName("runehold-orders-daily-etl")

         # Dove girano gli executor. local[*] = uno per core. Non impostare in prod.
         .master("local[*]")

         # Parallelismo dello shuffle. Il default 200 è sbagliato per i portatili.
         .config("spark.sql.shuffle.partitions", "8")

         # Heap del driver. Imposta PRIMA dell'avvio della JVM. 4g sono abbondanti per dev.
         .config("spark.driver.memory", "4g")

         # AQE: performance gratis su Spark 3.2+
         .config("spark.sql.adaptive.enabled", "true")
         .config("spark.sql.adaptive.coalescePartitions.enabled", "true")

         # Evita bug silenziosi di timezone
         .config("spark.sql.session.timeZone", "UTC")

         # Serializzazione più veloce
         .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

         .getOrCreate())

# Silenzia i log rumorosi di startup
spark.sparkContext.setLogLevel("WARN")

# Sanity check
print(f"Spark version: {spark.version}")
print(f"Spark UI:      {spark.sparkContext.uiWebUrl}")
print(f"App ID:        {spark.sparkContext.applicationId}")

# DataFrame smoke veloce
df = spark.range(0, 1_000_000).selectExpr("id", "id % 7 AS modulo")
df.groupBy("modulo").count().orderBy("modulo").show()

spark.stop()

Eseguilo. Dovresti vedere la versione di Spark, un URL della UI, un application ID, e una tabella di 7 righe con i conteggi. L’intero script ci mette circa 5 secondi su un portatile moderno.

La trappola della “sessione già esistente”

In un notebook, se esegui una cella che costruisce una SparkSession con local[4], poi la cambi in local[*] e riesegui, la seconda chiamata arriva a getOrCreate(), trova la sessione esistente, e la restituisce immutata. La tua nuova impostazione di master viene ignorata. Lo stesso per ogni chiamata .config().

Due vie d’uscita:

# Opzione 1: ferma prima la sessione esistente
spark.stop()
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Opzione 2: pulisci la sessione attiva e ricostruisci
SparkSession.builder.appName("X").getOrCreate().stop()
SparkSession._instantiatedSession = None
spark = SparkSession.builder.master("local[*]").getOrCreate()

La maggior parte delle volte, riavviare il kernel Python è più semplice e meno soggetto a errori.

Cosa abbiamo adesso

Sai costruire una SparkSession da zero, impostare le configurazioni che contano davvero, rileggerle, trovare la UI, ed evitare la trappola del singleton. Ogni lezione da qui in avanti parte dall’assunzione che tu abbia una variabile spark con cui lavorare.

Prossima lezione: mettere davvero dati dentro Spark. CSV, JSON, Parquet: tre formati con tre comportamenti di default diversi, e il tradeoff dello schema-on-read che decide se il tuo job ci mette 10 secondi o 10 minuti.

Cerca