PySpark, dalle fondamenta Lezione 42 / 60

Tungsten: code generation e layout di memoria colonnare

Come Spark fonde le operazioni in codice compilato, il formato colonnare off-heap, e perché DataFrame Spark è veloce.

Catalyst riscrive la tua query nel miglior piano che riesce a tirare fuori. Quella è metà della storia. L’altra metà è come Spark esegue quel piano, e la risposta è cambiata drasticamente nel 2015 quando è atterrato Project Tungsten. RDD Spark e DataFrame Spark sono tecnicamente lo stesso motore, ma in pratica si comportano come prodotti diversi, e Tungsten è la maggior parte del motivo. Questa lezione parla di cosa Tungsten fa davvero, perché il codice DataFrame batte spesso il codice RDD di 5-10 volte, e del piccolo insieme di marker di piano che dovresti imparare a riconoscere.

Cosa Tungsten si proponeva di sistemare

Entro il 2015, Spark aveva un problema che molti motori in-memory avevano: era bottlenecked sulla JVM. Le CPU erano diventate drasticamente più veloci, la banda della RAM era migliorata, ma il codice basato su JVM non ne traeva vantaggio. I colli di bottiglia erano specifici:

Overhead di memoria. Un valore Long in una riga Object[] JVM occupa 16 byte per il Long boxato più 8 byte per il puntatore, più padding (chiamiamoli 32 byte) per memorizzare 8 byte di dati reali. Una riga di dieci long occupava 320 byte quando i dati erano 80. Per un sistema che dovrebbe essere in-memory, questo era catastrofico.

Garbage collection. L’esecuzione RDD di Spark allocava milioni di piccoli oggetti Java per task. Le pause GC che ne risultavano dominavano il tempo dell’executor su workload grandi. Fare il tuning della GC della JVM era una parte permanente del far girare Spark in produzione, e non ha mai davvero funzionato bene.

Pattern di accesso non cache-friendly. Gli oggetti Java vivono dovunque l’allocator dell’heap li abbia messi, il che significa che iterare su una “riga” è una sequenza di pointer chase, ognuno dei quali è probabilmente un cache miss L2 o L3. Le CPU passavano la maggior parte del tempo ad aspettare la memoria.

Overhead di chiamate a funzione virtuale tra operatori. Ogni operatore nel physical plan era un oggetto separato con un metodo iterator.next(). Per produrre una riga del risultato finale, il motore chiamava next() sull’operatore in cima, che chiamava next() sul figlio, e così via giù per l’albero. Ogni chiamata era una virtual dispatch che il JIT non poteva sempre inline. Per un milione di righe attraverso uno stage di cinque operatori, sono cinque milioni di chiamate virtuali.

Tungsten ha attaccato tutti e quattro. Il risultato è il motore che hai usato.

Pezzo 1: memoria off-heap gestita e UnsafeRow

Tungsten ha introdotto un formato di riga binario chiamato UnsafeRow. Invece di Object[], una riga è disposta come un header a dimensione fissa (null bitmap, offset dei campi) seguito da valori binari packed strettamente. Campi a larghezza fissa come int e double vanno inline; campi a lunghezza variabile come le stringhe memorizzano i loro dati in una coda contigua. Una riga di dieci long occupa 88 byte, gli 80 byte di dati più un piccolo header, invece di 320.

Questo formato vive in memoria che Spark gestisce da sé, spesso off-heap (configurata via spark.memory.offHeap.enabled e spark.memory.offHeap.size), allocata in grosse lastre che Spark suddivide. La garbage collection non la tocca. Niente boxing: una colonna di int è genuinamente quattro byte per valore.

Non interagisci con UnsafeRow direttamente in Python. Ne vedi gli effetti: pressione di memoria drasticamente più bassa per executor, niente mal di testa di tuning GC, e la capacità di tenere centinaia di milioni di righe in memoria su una singola macchina senza sgretolarsi.

Pezzo 2: computazione cache-conscious

Una volta che i dati sono in questa forma binaria densa, puoi disporli per la cache della CPU. Tungsten piazza le righe di una partition in regioni di memoria contigue. Iterare una partition diventa una camminata sequenziale su un buffer, cosa che il prefetcher della CPU adora. Confrontalo con RDD Spark, dove iterare una partition voleva dire camminare una lista di oggetti Java sparsi per l’heap.

Per le letture colonnari (Parquet, ORC) la vittoria è anche più grande. Invece di materializzare le righe immediatamente, Spark legge chunk di colonna in batch colonnari gestiti da Tungsten: tipicamente 4.096 valori da una singola colonna, packed insieme. Operazioni che toccano una colonna alla volta (filtri, proiezioni, aggregazioni) leggono in un loop stretto su un solo buffer, restano in cache L2, e lasciano che le unità SIMD della CPU facciano davvero lavoro. Un vectorized Parquet reader può essere 5-10 volte più veloce di un reader riga-per-riga sullo stesso file.

Vedi questo nei piani come nodi ColumnarToRow: marcano il confine dove Spark passa dal processing colonnare a batch (usato negli scan e in alcuni operatori) al processing una-riga-alla-volta (usato in altri). Lo Spark moderno fa una quantità sempre maggiore di lavoro in pure columnar mode e ColumnarToRow compare più tardi nel piano di quanto facesse prima.

Pezzo 3: whole-stage code generation

Questa è la headliner. Invece di eseguire il physical plan come un albero di oggetti operator, Tungsten genera bytecode Java a runtime che fonde gli operatori adiacenti in un singolo loop stretto, e poi lo JIT-compila.

Uno stage che fa scan, filter, project e aggregate nel modello vecchio è quattro operatori con quattro iterator e quattro chiamate next() virtuali per riga. Whole-stage codegen lo riscrive in qualcosa tipo:

while (input.hasNext()) {
    UnsafeRow row = input.next();
    // Filtro inlined
    if (row.getDouble(2) <= 40.0) continue;
    // Proiezione inlined
    long userId = row.getLong(1);
    double amount = row.getDouble(2);
    // Aggregazione parziale inlined
    aggBuffer.update(...);
}

Un metodo, un loop, niente virtual dispatch, JIT-friendly. Il JIT può estrarre i null check, vettorizzare delle parti, e fare inline aggressivamente. Il risultato è spesso entro 2-3 volte da C stretto scritto a mano, su Java.

La firma visibile nel physical plan è l’asterisco:

*(2) HashAggregate(keys=[country#X], functions=[partial_sum(amount#Y)])
+- *(2) Project [country#X, amount#Y]
   +- *(2) Filter (amount#Y > 40)
      +- *(2) Scan ExistingRDD[user_id#A, amount#Y, country#X]

Ogni operatore con *(2) fa parte del codice generato dello stage 2. Il numero è l’id dello stage di codegen. Quando vedi un sotto-albero contiguo di operatori *(N) con lo stesso N, quello è un loop fuso.

Quando l’asterisco manca, il codegen non è entrato in gioco. Le ragioni più comuni:

  • Una UDF Python normale nel piano. Il nodo BatchEvalPython e tutto quello che lo circonda non possono essere fusi col codegen: Spark deve spezzare lo stage per spedire le righe al Python worker. La lezione 40 diceva che le UDF erano costose; questa è la ragione più profonda del perché.
  • Operatori che non supportano il codegen. Una manciata di operatori fisici (alcune varianti di join, alcuni cammini di aggregazione, alcune funzioni window) non hanno implementazioni di codegen. Spark torna alla vecchia iterazione stile volcano.
  • Il codice generato ha superato il limite di dimensione del bytecode JVM. La JVM impone un cap di 64KB di bytecode su un singolo metodo. Per schemi molto larghi (centinaia di colonne) o espressioni generate molto grandi, il codegen di Spark può sforare. Quando succede, Spark cattura l’errore e ricade sul cammino interpretato. Lo vedrai nei log dell’executor come un warning su JaninoRuntimeException o “method too large”. Se lo vedi ripetutamente, splittare la tua query in stage più piccoli o ridurre il numero di colonne è il fix.

C’è una manopola di debug:

spark.conf.set("spark.sql.codegen.wholeStage", "false")

Questo disabilita whole-stage codegen per la session. Quasi nessuno dovrebbe mai impostarlo. L’unico caso in cui è utile è il debug di un sospetto bug del codegen: spegnilo, vedi se il tuo comportamento strano sparisce, apri una bug report a Spark. Nell’operazione normale dovrebbe essere acceso, e nelle versioni recenti di Spark è acceso di default.

Vectorized reader

Strettamente legato a Tungsten c’è il vectorized columnar reader per Parquet e ORC. Invece di decodificare una riga alla volta, il reader tira su un batch di valori da una colonna, applica la decodifica (RLE, dictionary) sull’intero batch, e passa un buffer colonnare al prossimo operatore. I filtri che combaciano con la forma colonnare girano anche loro in modo vettoriale.

Le config rilevanti:

spark.conf.set("spark.sql.parquet.enableVectorizedReader", "true")  # default true
spark.conf.set("spark.sql.orc.enableVectorizedReader", "true")      # default true

Entrambe sono di default true. La ragione di sapere che esistono è il debug: se vedi un nodo Scan parquet che alimenta un ColumnarToRow molto presto nel piano, la lettura vettoriale è accesa. Se vedi normali letture riga-per-riga, qualcosa è disabilitato, di solito a causa di un tipo non supportato (alcuni complex nested type) o un override di config.

Un benchmark concreto

Prendi una semplice aggregazione e confronta RDD vs DataFrame sugli stessi dati. I numeri sono illustrativi, non benchmark che dovresti citare, ma la forma è quello che ogni team ha visto la prima volta che ci ha provato:

import time
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("TungstenDemo").master("local[*]").getOrCreate()

# 50M righe, due colonne
df = spark.range(0, 50_000_000).select(
    (F.col("id") % 1000).alias("group"),
    (F.rand() * 100).alias("value"),
)

# Materialize once so the cost is in the aggregation, not the generation
df.write.mode("overwrite").parquet("/tmp/tungsten_demo")
df = spark.read.parquet("/tmp/tungsten_demo")

# Cammino DataFrame: codegen, lettura vettoriale, batch colonnari
t0 = time.time()
df.groupBy("group").agg(F.sum("value")).collect()
print("DataFrame:", time.time() - t0, "seconds")

# Cammino RDD: niente codegen, niente benefici di UnsafeRow, oggetti Python
t0 = time.time()
(df.rdd
   .map(lambda r: (r["group"], r["value"]))
   .reduceByKey(lambda a, b: a + b)
   .collect())
print("RDD:", time.time() - t0, "seconds")

Sul mio portatile la versione DataFrame gira in circa 3 secondi e quella RDD in circa 30. Quasi tutta la differenza è Tungsten: codegen che fonde filter/project/partial-agg, il vectorized Parquet reader, i batch colonnari, l’assenza di overhead da oggetti Python nella JVM dell’executor. Il query plan racconta la stessa storia:

df.groupBy("group").agg(F.sum("value")).explain()
# == Physical Plan ==
# AdaptiveSparkPlan isFinalPlan=false
# +- HashAggregate(keys=[group#X], functions=[sum(value#Y)])
#    +- Exchange hashpartitioning(group#X, 200), ENSURE_REQUIREMENTS
#       +- *(1) HashAggregate(keys=[group#X], functions=[partial_sum(value#Y)])
#          +- *(1) ColumnarToRow
#             +- FileScan parquet [group#X, value#Y]
#                Batched: true,
#                PushedFilters: [],
#                ReadSchema: struct<group:bigint,value:double>

Il *(1) copre tutto il lavoro pre-shuffle: scan, columnar-to-row, aggregazione parziale, tutto fuso. Batched: true conferma la lettura vettoriale. Lo shuffle è l’unica cosa che non è in codegen, e questo perché attraversare gli executor non è fondamentalmente un problema di code-fusion per riga.

La versione RDD non ha niente di tutto questo. Ogni riga attraversa in Python (perché la lambda è Python) e torna indietro, la riduzione parziale gira in Python, e l’optimizer non vede niente che possa riscrivere. È una pura pipe di operazioni, niente fusione, niente batching, niente codegen.

Ecco perché quasi ogni domanda “dovrei usare gli RDD?” ha la stessa risposta: no. La performance del motore DataFrame viene da Tungsten e Catalyst che lavorano insieme, e perdi entrambi nel momento in cui scendi all’RDD.

Cosa ricordare

Tungsten è tre cose (memoria off-heap gestita con il formato binario UnsafeRow, layout colonnari cache-conscious, e whole-stage codegen) e spiegano la maggior parte del perché DataFrame Spark è veloce. Leggi i query plan per i prefissi *(N) per confermare che il codegen sta entrando in gioco; gli asterischi mancanti sono il segnale di allarme rumoroso che qualcosa (di solito una UDF) sta rompendo la fusione. Le letture vettoriali Parquet/ORC sono la droga di passaggio: il column pruning e il predicate pushdown che ottieni da Catalyst alimentano direttamente le letture colonnari batched, che alimentano direttamente l’esecuzione fusa dal codegen. L’intero stack è progettato per tenere dati densi che fluiscono attraverso loop stretti su CPU moderne.

Questo chiude il modulo 7. Ora sai leggere un Spark plan, prevedere cosa farà l’optimizer col tuo codice, e riconoscere quando i superpoteri del motore stanno o non stanno entrando in gioco. Il modulo 8 inizia la prossima lezione e gira lo sguardo verso l’esterno: le sorgenti dati. Parquet, ORC, Avro, JDBC, l’Hive metastore, Delta Lake, Iceberg: cosa sono, come differiscono, e quale si adatta al tuo workload.


Riferimenti: “Project Tungsten: Bringing Spark Closer to Bare Metal” (https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html) e la guida Apache Spark SQL al performance tuning (https://spark.apache.org/docs/latest/sql-performance-tuning.html). Consultati il 2026-05-01.

Cerca