PySpark, dalle fondamenta Lezione 55 / 60

La Spark UI: lo strumento più importante che imparerai

Un giro guidato di ogni tab (Jobs, Stages, Tasks, SQL, Storage, Executors) e cosa ti dice ognuno quando qualcosa va storto.

Se di tutto questo corso impari un solo strumento operativo, fai che sia la Spark UI. Ogni domanda di performance (“perché questo job è lento”, “perché è andato in OOM”, “la mia cache funziona”, “il filtro va in pushdown”, “un task sta facendo tutto il lavoro”) ha la sua risposta nella UI. I log ti dicono cosa è successo. La UI ti dice perché è stato lento, e questa è la domanda che paga le bollette.

Il Modulo 10 è la metà produzione del corso. Lo passeremo su debugging, tuning e la cassetta degli attrezzi a cui un Spark engineer va a prendere alle 2 di notte. Questa lezione è il pezzo centrale: la UI, ogni tab, quali colonne leggere per prime, che aspetto hanno i pattern.

Dove trovarla

Per una SparkSession locale, la UI si lega a localhost:4040 per default. Se la porta 4040 è occupata (perché hai un altro driver in esecuzione), Spark incrementa: 4041, 4042, e così via. Il driver logga l’URL effettivo allo startup. Puoi anche chiederlo direttamente alla session:

spark = SparkSession.builder.master("local[*]").getOrCreate()
print(spark.sparkContext.uiWebUrl)
# http://192.168.1.10:4040

Sui cluster veri, il cluster manager ospita (o linka) la UI:

  • Databricks: ogni pagina cluster ha un link “Spark UI”. Mentre il cluster gira, la UI è viva; per i cluster terminati Databricks tiene un rendering statico dell’event-log.
  • EMR: la UI YARN ResourceManager sul nodo master elenca ogni applicazione; clicca l’applicazione, clicca “ApplicationMaster” per la UI live, o “History” una volta finita.
  • Kubernetes / Spark Operator: il pod del driver espone la 4040 internamente; fai kubectl port-forward verso il tuo laptop.
  • YARN puro: come EMR, il ResourceManager linka.
  • Spark History Server: per le applicazioni terminate, un server separato legge gli event log (la spark.eventLog.dir su cui hai scritto) e ri-renderizza la UI on demand.

La UI è una vista read-only dello stato interno del driver Spark. Non puoi rompere nulla cliccandoci attorno. Clicca tutto.

I tab in ordine di utilità

Spark espone un set fisso di tab. Li attraverso nell’ordine in cui li uso davvero in produzione, non da sinistra a destra.

1. Jobs: la vista d’insieme

La pagina di atterraggio. Una riga per job, dove un job è tutto quello che viene innescato da una singola action (.show(), .count(), .write(), .collect()). Colonne:

  • Job ID: sequenziale.
  • Description: il nome dell’action, o quello che imposti con spark.sparkContext.setJobDescription("...") (fallo; è gratis e rende la UI leggibile).
  • Submitted: timestamp wall-clock.
  • Duration: durata wall-clock.
  • Stages: Succeeded/Total: quanti stage ha avuto questo job.
  • Tasks (for all stages): Succeeded/Total: numero totale di task e quanti sono finiti.

Usa il tab Jobs per rispondere a “quale action è lenta”. Ordina per Duration. Il job lento è quello su cui scendere.

Se vedi una description tipo “count at NativeMethodAccessorImpl” senza altro contesto, hai un .count() da qualche parte di cui non ti eri accorto stesse innescando, spesso una print di debug lasciata in produzione.

2. Stages: la sostanza

Clicca un job e ottieni i suoi stage. È qui che passerai la maggior parte del tempo.

Uno stage è una sequenza di operazioni che Spark può eseguire senza uno shuffle. Ogni shuffle è un confine di stage. Un tipico job di join ha tre stage: scansiona e pre-shuffla il lato sinistro, scansiona e pre-shuffla il lato destro, fai il join.

Colonne per-stage da leggere per prime:

  • Duration: quanto è durato lo stage.
  • Tasks: Succeeded/Total: il numero di task ti dice il numero di partition.
  • Input: byte letti dalla sorgente.
  • Output: byte scritti.
  • Shuffle Read / Shuffle Write: byte shufflati. Shuffle grossi sono shuffle costosi.

Clicca uno stage e ottieni la Task table. Questa è la singola schermata più utile di tutta la UI.

Per ogni task vedi Status, Duration, GC Time, Shuffle Read Size, Shuffle Read Records, Spill (Memory), Spill (Disk) ed Errors. Ordina per Duration discendente. Adesso guarda:

Max contro median. Tocca “Summary Metrics” in alto: Spark ti dà min / 25% / median / 75% / max per ogni colonna numerica. Se la durata massima del task è 10 volte la mediana, hai uno skew. Se l’input massimo è 10 volte la mediana, hai partition di input skewed. Se lo shuffle read massimo è 10 volte la mediana, hai una hot key in un join o groupBy. Lo skew è il singolo problema di produzione più comune; la task table è dove lo individui.

Le colonne di spill. Spill (Memory) è quanti dati non compressi Spark ha dovuto spingere fuori dalla memoria di esecuzione. Spill (Disk) è quanti dati compressi sono finiti su disco locale a causa di quella spinta. Qualsiasi spill significa che hai esaurito la memoria di esecuzione e hai dovuto ripiegare su disco. Un poco è normale sotto pressione; molto è il motivo per cui il tuo job è lento. Parleremo di come sistemarlo nella lezione 57.

GC time. Tempo speso nella garbage collection JVM per task. Se il GC è più del ~10% della durata del task, sei sotto pressione di memoria anche se non c’è spill. Aumenta la memoria dell’executor o riduci i dati per task.

La visualizzazione DAG in cima alla pagina dello stage è anche utile: disegna gli operatori dentro lo stage e le frecce tra loro. Utile per capire cosa sta facendo lo stage, meno utile per il debugging di performance rispetto alla task table.

3. SQL / DataFrame: il tab del piano di query

Questo è il tab più cliccato in produzione per i workload DataFrame e SQL. Ogni query che lanci compare come una riga col testo SQL originale o una description generata, più un link “Execution ID”.

Clicca l’Execution ID e ottieni il grafo degli operatori: ogni nodo del physical plan come un riquadro, ogni riquadro annotato con conteggi di righe, tempo, righe in output e byte. Le annotazioni vengono dalla run effettiva, non da stime: questa è la vista post-mortem di cosa Spark ha fatto.

Cosa cercare:

  • Operatori Exchange inattesi. Ogni Exchange è uno shuffle. Due Exchange dove te ne aspettavi uno significa che hai accidentalmente causato un re-shuffle (spesso ripartizionando due volte o rompendo un co-partitioning).
  • Il riquadro dell’algoritmo di join. BroadcastHashJoin è veloce. SortMergeJoin è il default sicuro. BroadcastNestedLoopJoin è un code smell; di solito significa che l’optimizer non è riuscito a scegliere una strategia per via di una condizione di join non-equi.
  • Conteggi di righe sui filtri. Se il tuo filtro dice “1B righe in ingresso, 1B righe in uscita”, il filtro non sta filtrando, di solito per via di un type mismatch.
  • Annotazioni dell’Adaptive Query Execution. Con AQE attivo, il tab SQL è l’unico posto in cui vedere cosa è davvero girato, perché il piano è cambiato a runtime. Cerca i riquadri AQEShuffleRead: è AQE che coalesce o splitta partition in modo adattivo.

Il tab SQL è anche dove scoprirai che quello che df.explain() ha stampato prima della run non è quello che è stato eseguito davvero. Ci scaveremo nella lezione 56.

4. Storage: la cache che dice la verità

Il tab Storage elenca ogni DataFrame in cache o persistito, con la sua dimensione effettiva sul cluster, lo storage level (memory only, memory and disk, ecc.), la fraction cached e la distribuzione per executor.

Cosa leggere per primo:

  • Fraction cached. Se non è 100%, non hai abbastanza memoria e Spark ha evitto delle partition. La cache è parzialmente fredda e la prossima lettura sarà un ricompute parziale.
  • Size in Memory. Questo è ciò che è effettivamente residente, che è spesso molto più grande della dimensione Parquet su disco per via del formato in memoria.
  • RDD names. Se il tuo codice chiama .cache() su più DataFrame, dagli nomi umani con df.createOrReplaceTempView("name") prima oppure setta df.rdd.name = "..." così li distingui nella UI.

Lo Storage è come confermi che .cache() ha effettivamente fatto quello che ti aspettavi. Se il tab è vuoto dopo la tua action, la tua chiamata di cache non si è materializzata: il caching è lazy, devi innescare un’action che tocchi il DataFrame in cache perché si popoli.

5. Executors: salute per executor

Una riga per executor (più una riga per il driver). Colonne:

  • Address: host:port.
  • Status: Active o Dead.
  • RDD Blocks / Storage Memory: quanti dati in cache tiene questo executor.
  • Disk Used: dati di shuffle e spill su disco locale.
  • Cores: core assegnati a questo executor.
  • Active / Failed / Complete Tasks: throughput dei task.
  • Task Time (GC Time): tempo totale dei task e la frazione di GC.
  • Input / Shuffle Read / Shuffle Write: byte processati.

La grossa: GC Time come frazione del Task Time. Se un executor sta spendendo il 30% del tempo in GC, sta affogando di oggetti e la JVM è il collo di bottiglia. O aumenti la memoria, o riduci i core per executor, o sistemi il workload. La pressione di GC è un killer silenzioso: il tuo job gira, ma a metà velocità, e niente logga un errore.

L’altra cosa da controllare è Dead executors. Se gli executor continuano a morire e a essere rimpiazzati, qualcosa non va, di solito kill OOM dal cluster manager. Clicca l’executor morto, guarda il log dell’executor, trova il motivo del kill. La lezione 57 è la lezione post-mortem sull’OOM.

6. Streaming / Structured Streaming

Presente solo se hai query di streaming. Statistiche per query: input rate, processing rate, batch duration, dimensione dello state operator. I pattern da osservare sono il processing rate che resta indietro rispetto all’input rate (non riesci a stare al passo), la batch duration che cresce nel tempo (lo stato sta crescendo senza limite, problema di watermark), e la dimensione dello state operator che tocca decine di GB (stai tenendo troppo stato, è ora di aggiungere un watermark o ripensare le chiavi).

Lo streaming l’abbiamo coperto nelle lezioni 49-54; questo tab è dove diagnostichi la versione di produzione.

7. Environment

Ogni config Spark, proprietà JVM, voce di classpath e config Hadoop in vigore per questo driver. Utile quando stai debuggando “perché in produzione si comporta diverso”: metà delle volte la risposta è una config che il platform team ha messo e che non sapevi. Cerca spark.sql.adaptive, spark.serializer, spark.executor.memory per iniziare.

L’albero di decisione “dove guardo per primo”

Sei stato chiamato di urgenza. La pipeline è lenta. Attraversa la UI così:

  1. Tab Jobs. Ordina per durata. Il job lento è quello su cui cliccare.
  2. Clicca il job lento, vai su Stages. Ordina gli stage per durata. Lo stage lento è quello su cui cliccare.
  3. Clicca lo stage lento, vai su Task table. Clicca “Summary Metrics”.
    • Durata massima molto maggiore della mediana? Skew. Sistema con salting o gestione skew di AQE.
    • Numeri di spill grandi? Pressione di memoria. Aumenta la memoria dell’executor o riduci la dimensione delle partition.
    • GC time alto? Anche pressione di memoria, spesso più core per executor di quelli che l’heap regge.
    • Tutto bilanciato e semplicemente lento? Sei CPU-bound, servono più core o una query più intelligente.
  4. Verifica nel tab SQL. Trova la query, guarda il grafo degli operatori, controlla la strategia di join e i conteggi di righe a ogni filtro. Spesso individui “il filtro che non ha fatto niente” o “il broadcast che non è avvenuto”.
  5. Tab Executors. Ce n’è qualcuno morto? Un singolo executor sta facendo tutto il lavoro? Il GC time sembra in salute?

Questo è il 90% del debugging Spark. La UI ti dice quale di quelle quattro cose è sbagliata. Leggere la UI con scioltezza è la differenza tra un Spark engineer e qualcuno che fa girare Spark.

Prossima lezione: .explain() e come leggere il piano prima di lanciarlo, così la UI conferma quello che ti aspettavi invece di essere una sorpresa.

Cerca