Ci sono grosso modo cinquanta proprietà di configurazione di Spark legate alla memoria, e le vedrai tutte da qualche parte in qualche risposta su Stack Overflow. Te ne servono quattro. Le altre quarantasei sono o auto-derivate dalle quattro, o rilevanti una volta l’anno per uno specifico failure mode molto particolare, o semplicemente legacy.
Questa è la lezione in cui tagliamo via il rumore. Quali region di memoria ha davvero Spark, cosa controllano i quattro config, che aspetto hanno i quattro tipi di OOM, cosa significa spill e quando va bene, e la regola di sizing che trasforma “non ho idea di quanto grandi fare gli executor” in un punto di partenza da cui iterare.
Cosa vive dentro un executor
Un executor è un processo JVM. Ognuno fa girare spark.executor.cores task in parallelo come thread dentro quella JVM. La memoria dentro la JVM è divisa in region.
Reserved memory: 300 MB fissi, per gli oggetti interni di Spark. Non si tuna.
User memory: il 25% di (heap dell’executor meno reserved) di default. È per gli oggetti che il tuo codice crea fuori dalle strutture gestite di Spark: closure di UDF, broadcast var che accumuli a mano, qualsiasi cosa metti in una normale collection Python o Scala. Se una UDF Python costruisce un dict locale gigante, quel dict vive qui.
Spark unified memory pool: il restante 75%. Dentro questo pool competono due cose:
- Execution memory: sort buffer, hash table per aggregation, hash table per join, buffer di shuffle. Tutto ciò che serve a Spark mentre fa girare gli operatori.
- Storage memory: DataFrame cachati (il
.cache()che hai messo su quel DataFrame).
La parte “unified” di “unified memory” è che execution e storage condividono il pool dinamicamente. Lo storage può prendere a prestito dall’execution e viceversa. C’è un soft floor per ognuno: di default lo storage ha garantito il 50% del pool, e l’execution può sfrattare lo storage sopra quel floor quando le serve spazio. Eviction significa che le partition cachate vengono droppate. La execution memory non è evictable: se all’execution serve più spazio e lo storage non ha nulla da cedere, Spark spilla su disco.
Con questo abbiamo coperto l’heap della JVM. Fuori dall’heap c’è un’altra region.
Off-heap memory (“overhead”): stack della JVM, librerie native (pensa a Pandas, Arrow, buffer off-heap di Tungsten), buffer di rete dello shuffle, processi Python worker, overhead del container. Non fa parte di spark.executor.memory; è un budget separato controllato da spark.executor.memoryOverhead.
Memoria totale del container = spark.executor.memory piu’ spark.executor.memoryOverhead. Quello è il numero che Kubernetes o YARN riserva davvero per il tuo executor.
I quattro config
Questi sono quelli che toccherai. Tutto il resto, ignoralo finché non hai una ragione specifica.
spark.executor.memory è l’allocazione on-heap per executor. Il numero grosso. La JVM riceve -Xmx settato a questo valore.
spark.executor.memoryOverhead è l’allocazione off-heap per executor. Il default è max(384 MB, 0.10 per spark.executor.memory). Il default funziona per la maggior parte dei workload solo-JVM. Alzalo quando usi PySpark in modo intenso (i Python worker vivono qui), Pandas UDF pesanti (i buffer Arrow vivono qui), o quando vedi errori “container killed by YARN/K8s”: quelli sono quasi sempre esaurimento di overhead, non di heap.
spark.driver.memory è l’heap per il processo driver. Il driver coordina il cluster e tiene i metadata dei tuoi job, ma tiene anche tutto ciò che fai .collect(), le tabelle broadcastate, e la cache per qualsiasi computazione cachata lato driver. Default 1 GB, va bene per la maggior parte dei job. Alzalo se broadcasti tabelle grandi o collecti risultati significativi.
spark.driver.maxResultSize è il cap sui byte che il driver accetterà da .collect() e azioni simili. Default 1 GB. Se il tuo job muore con Total size of serialized results of N tasks (X GB) is bigger than spark.driver.maxResultSize, hai provato a collectare più del cap. O alzi il cap o, più spesso, non collecti quei dati: scrivili invece.
Tutto qui. Ci sono anche spark.memory.fraction (la frazione del pool unified) e spark.memory.storageFraction (la frazione del soft-floor dello storage), ma i default 0,6 e 0,5 sono ben tarati e dovresti lasciarli stare a meno di non avere un problema specifico diagnosticato.
Un tipico punto di partenza in produzione:
spark.executor.memory=12g
spark.executor.memoryOverhead=2g
spark.executor.cores=4
spark.driver.memory=4g
spark.driver.maxResultSize=2g
È un container da 14 GB per executor con 4 core. Da li’ itera in base a cosa ti dice la UI.
Spill: il rallentamento silenzioso
Quando l’execution memory finisce, ad esempio un sort o un’aggregation produce più dati intermedi di quanti ne stiano, Spark non fa OOM. Spilla. Lo stato in memoria corrente viene serializzato e scritto su disco locale, il buffer in memoria viene liberato, e l’operatore continua. Quando l’operatore finisce, i pezzi spillati vengono mergiati di nuovo dal disco.
Nel tab Stages, nella tabella dei task, vedi due colonne:
- Spill (Memory): dimensione non compressa di ciò che è stato spinto fuori.
- Spill (Disk): byte compressi effettivamente scritti su disco locale.
Lo spill non è un errore. È Spark che degrada con grazia. Ma è lento: l’I/O su disco è ordini di grandezza più lento della RAM, e un job che spilla pesantemente gira a una frazione della sua velocità in memoria. Un po’ di spill sotto pressione va bene. Spill su ogni task, gigabyte per task, ogni stage, quello è un job che chiede pietà per più memoria.
Cause comuni di spill:
- Partition troppo grande. Una partition da 10 GB spillerà su qualsiasi executor ragionevole. Ripartiziona per fare partition più piccole (target: circa 128 MB - 1 GB per partition per la maggior parte dei workload).
- Sort o aggregation su troppe chiavi. Un
groupBysu una colonna ad alta cardinalità costruisce una hash table enorme. - Troppi core per executor rispetto all’heap. Ogni task ha bisogno di working memory; con N core e X heap, ogni task riceve grosso modo X/N. Troppi core = ogni task affamato.
Ordine di fix: prima sistema la dimensione delle partition (economico, solo codice), poi alza la executor memory, poi cala i core per executor.
I quattro tipi di OOM
Quando la memoria finisce e Spark non riesce a uscirne spillando, hai un OOM. Ci sono quattro failure mode distinti; il fix dipende da quale hai.
1. OOM JVM dell’executor durante l’esecuzione
L’heap dell’executor si riempie oltre quanto lo spill possa recuperare. La JVM muore con:
java.lang.OutOfMemoryError: Java heap space
oppure
java.lang.OutOfMemoryError: GC overhead limit exceeded
Il processo executor muore, il cluster manager lo riavvia, i task falliti riprovano su un altro executor. Se i dati sono davvero troppo grandi, anche il retry fallisce, e lo stage fallisce dopo spark.task.maxFailures (default 4) tentativi.
Fix, in ordine:
- Partition più piccole (
.repartition(N)con N più grande). - Heap dell’executor più grande (
spark.executor.memory). - Meno core per executor (meno competizione per l’heap).
- Se è un join, controlla se dovresti broadcastare il lato piccolo (le hash table enormi mangiano heap in fretta).
- Se è un
collect()o untoPandas()che lo causa sul driver, smettila.
2. Container killed da YARN/K8s
L’executor ha superato il suo limite di memoria totale (heap piu’ overhead). Il cluster manager ammazza il container. L’errore nel log del driver:
Container killed by YARN for exceeding memory limits.
14.5 GB of 14 GB physical memory used.
Consider boosting spark.executor.memoryOverhead.
Oppure su Kubernetes:
Container exited with exit code 137 (OOMKilled)
Questo non è un OOM di heap. L’heap probabilmente stava bene. L’overhead off-heap (Python worker, buffer Arrow, librerie native, network buffer) è straripato. Il fix è alzare spark.executor.memoryOverhead, non spark.executor.memory. La gente sbaglia di continuo qui e alza l’heap per ore chiedendosi perché non cambia niente.
Se stai facendo girare PySpark con UDF pesanti o Pandas UDF, aspettati di aver bisogno di 3-4 GB di overhead, non i 384 MB di default.
3. OOM del driver
Il driver finisce l’heap. Lo stack trace punta a codice driver-side, spesso:
java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.sql.Dataset.collect(...)
Cause:
.collect()su un DataFrame troppo grande. Anche un.toPandas()su una tabella da 1 miliardo di righe ammazzerà il driver.- Broadcastare una tabella più grande di quanto pensavi. Il driver la collecta prima di broadcastare.
- Un query plan enorme (migliaia di operatori, ad esempio SQL generato dinamicamente): Catalyst stesso usa memoria.
- Caching con dati
LocalRelation(DataFrame piccoli costruiti concreateDataFrameda una grossa lista Python).
Fix: non collectare. Scrivi su un sink invece. Se davvero ti serve un piccolo risultato in locale, prima .limit(N). Se devi broadcastare una tabella grande, probabilmente non dovresti: non è più una tabella piccola.
4. Spill eccessivo (tecnicamente non un OOM)
Il job non fallisce. Ci mette solo 3 ore invece di 20 minuti. Il tab Stages mostra gigabyte di Spill (Disk) per task su più stage. Il GC time è il 30%+ del task time.
Questo non è un OOM ma è il failure mode adiacente all’OOM che frega la gente: la pipeline “funziona” quindi nessuno se ne accorge, finché la review dei costi non mostra che il cluster costa il doppio di quanto dovrebbe. Trattalo come un OOM. Stessi fix: partition più piccole, più memoria, meno core per executor.
Come si fa davvero la diagnosi di un OOM
Quando un executor muore, il log dell’executor sul cluster ha le ultime parole della JVM. Nella Spark UI, clicca l’executor morto nel tab Executors, poi “stderr”, e cerca OutOfMemoryError o Container killed. La prima riga dello stack trace ti dice se è stato un OOM di heap (nomi di operatori visibili: Sort, HashAggregate, ShuffleExternalSorter) o un kill da overhead (messaggio kernel di OOM, niente stack).
Conta anche il pattern dei fallimenti.
- Un task per stage fallisce ripetutamente: skew. Una chiave ha troppi dati. Salting, gestione skew di AQE, o una strategia di partitioning diversa.
- Tutti i task in uno stage falliscono: il workload è troppo grande per gli executor attuali. Resize.
- Task casuali falliscono attraverso vari stage: di solito pressione di overhead. Alza l’overhead.
La regola di sizing
Ecco un punto di partenza che funziona per la maggior parte dei workload batch. Non è ottimale per nessun job specifico: è il punto da cui partire prima di tunare.
executor heap = (working memory per task) per (cores per executor) per 1.3 (headroom)
overhead = max(384 MB, 0.10 per heap, 2 GB se PySpark con UDF)
container = heap + overhead
Per un job con circa 1,5 GB di working memory per task (un join + aggregation moderato) e 4 core per executor:
heap = 1.5 per 4 per 1.3 ~= 8 GB
overhead = max(800 MB, 2 GB se PySpark) = 2 GB
container = 10 GB
Poi dimensiona il numero di executor in base al totale dei core di cluster che vuoi e ai core per executor. Se vuoi 64 core di parallelismo con executor da 4 core, fanno 16 executor.
Pochi executor grossi vs molti piccoli
Questa è l’eterna domanda di sizing in Spark. Due estremi.
Executor grossi (16 core, 64 GB). Pro: i dati cachati sono locali a molti task (niente re-fetch), lo shuffle ha meno connessioni (meno per meno = molto meno), il broadcast costa meno per task. Contro: un singolo OOM ammazza il lavoro di 16 task, le pause GC sono più lunghe, la gestione dell’heap della JVM diventa più difficile sopra i 32 GB circa (limite dei compressed oops).
Executor piccoli (2 core, 8 GB). Pro: blast radius più piccolo, GC più facile, più parallelismo per nodo (più container). Contro: più connessioni di shuffle (ogni executor si connette a ogni altro per lo shuffle, quadratico nel numero di executor), più replicazione dei dati broadcastati, peggiore cache locality.
La via di mezzo che funziona per la maggior parte dei job batch in produzione: 4-8 core per executor, 8-16 GB di heap. Sopra gli 8 core inizi a perdere efficienza di parallelismo per contesa nella JVM; sopra i 32 GB di heap circa sei in territorio in cui il GC tuning inizia a contare e dovresti considerare G1 GC (-XX:+UseG1GC).
Per PySpark in particolare, sbilanciati verso il lato piccolo: i Python worker aggiungono pressione di memoria e gli executor molto larghi significano molti processi Python per JVM, cosa che frammenta la overhead memory.
Una checklist di debug
Quando un OOM atterra nella tua inbox:
- Quale OOM era? Leggi il log dell’executor (o del driver se era un OOM di driver). Heap, container kill, o driver?
- Quale stage? UI, Stages, cerca i task falliti. Lo stage ti dice quale operazione stava girando.
- C’è un task molto più grande degli altri? Skew. Sistema al livello dei dati.
- Quanto sono grandi le partition? UI, Stages, input size diviso task count. Se le partition sono oltre 2 GB, ripartiziona.
- Il GC time è oltre il 10% del task time? Pressione di memoria. Alza l’heap o cala i core.
- L’overhead è alto? PySpark o Pandas UDF? Alza
memoryOverhead. - Qualcuno ha aggiunto un
.collect()? Controlla i recent code change.
Quella checklist risolve la grande maggioranza degli incidenti di memoria. Il resto sono casi strani (leak di librerie native, bug dell’allocator off-heap, roba del genere) e a quel punto sei in una conversazione di tipo diverso, di solito col team di piattaforma.
Abbiamo fatto la UI, i plan, e la memoria. La prossima lezione apre la seconda metà del modulo 10 con skew, broadcast tuning, e i config di partitioning che trasformano un job lento in uno veloce.