PySpark, dalle fondamenta Lezione 41 / 60

Catalyst: il cervello dietro ogni DataFrame

Come Spark trasforma il tuo codice in un query plan, le quattro fasi di ottimizzazione, e come leggere .explain(True).

Ogni operazione su DataFrame che hai scritto in questo corso è passata attraverso Catalyst. Ogni .filter, .join, .groupBy, ogni espressione su colonna, ogni stringa Spark SQL: Catalyst le ha viste tutte, ne ha riscritte la maggior parte, e ha deciso come davvero eseguirle. Il motivo per cui DataFrame Spark è così tanto più veloce di RDD Spark è perlopiù Catalyst (con Tungsten che fa il resto, e quella è la prossima lezione).

Fino a ora ho fatto la mano vagante con “Spark capisce il piano giusto”. È ora di guardare come. Una volta che sai leggere un query plan, puoi prevedere le performance dal codice, debuggare le sorprese, e scrivere Spark con intenzione invece che con la speranza.

Cos’è Catalyst

Catalyst è il query optimizer di Spark, scritto in Scala e distribuito come parte di spark-catalyst. È il livello tra “il tuo codice DataFrame” e “i task che girano sugli executor”. Ogni volta che chiami una action (.show(), .count(), .write()) Catalyst prende le operazioni che hai descritto, le trasforma attraverso quattro fasi, e passa il risultato al motore di esecuzione di Spark.

Le fasi:

  1. Parsed logical plan: il tuo codice, sintatticamente valido, ancora senza controlli semantici.
  2. Analyzed logical plan: i riferimenti alle colonne risolti contro lo schema; tipi controllati; tutto è concreto.
  3. Optimized logical plan: rewriting basato su regole: predicate pushdown, column pruning, constant folding, projection collapse, join reordering, decine di altre.
  4. Physical plan: scelte sulla strategia di esecuzione vera: quale algoritmo di join, quale implementazione di scan, quale strategia di exchange.

Nello Spark moderno c’è anche un livello a runtime sopra la fase 4 (Adaptive Query Execution) che aggiusta il physical plan a metà query in base alle statistiche di shuffle osservate. Ci arriviamo alla fine.

L’intera pipeline gira ogni singola volta che triggeri una action, e il costo è trascurabile rispetto all’esecuzione vera della query.

Una piccola query e i suoi piani

Prendiamo qualcosa di concreto. Due tabelle, un filtro, un’aggregazione, un join.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("CatalystDemo").master("local[*]").getOrCreate()

orders = spark.createDataFrame(
    [(1, 100, 50.0, "2024-01-01"),
     (2, 100, 75.0, "2024-01-02"),
     (3, 200, 30.0, "2024-01-03")],
    ["order_id", "user_id", "amount", "dt"]
)

users = spark.createDataFrame(
    [(100, "alice", "IT"),
     (200, "bob", "DE"),
     (300, "carol", "FR")],
    ["user_id", "name", "country"]
)

q = (orders
     .filter(F.col("amount") > 40)
     .join(users, "user_id")
     .groupBy("country")
     .agg(F.sum("amount").alias("total"))
     .filter(F.col("country") == "IT"))

q.explain(True)

.explain(True) stampa tutte e quattro le fasi. Camminiamo attraverso quello che ognuna mostra.

Fase 1: parsed logical plan

== Parsed Logical Plan ==
'Filter ('country = IT)
+- Aggregate [country#X], [country#X, sum(amount#Y) AS total#Z]
   +- Join Inner, (user_id#A = user_id#B)
      :- Filter (amount#Y > 40)
      :  +- LogicalRDD [order_id#W, user_id#A, amount#Y, dt#V], false
      +- LogicalRDD [user_id#B, name#U, country#X], false

Questo è il tuo codice, riflesso fedelmente come un albero. Nota gli apostrofi ('Filter, 'country): segnano riferimenti non risolti. Al parse time Spark non sa ancora se country esista o di che tipo sia. È solo un nome. Le foglie LogicalRDD sono dei placeholder per i DataFrame in input.

Questa fase riguarda la validità sintattica: hai usato operazioni che esistono, con la giusta arity? Un typo in un nome di metodo fallisce qui. Un typo in un nome di colonna no: quello è il lavoro della fase successiva.

Fase 2: analyzed logical plan

== Analyzed Logical Plan ==
country: string, total: double
Filter (country#X = IT)
+- Aggregate [country#X], [country#X, sum(amount#Y) AS total#Z]
   +- Project [user_id#A, order_id#W, amount#Y, dt#V, name#U, country#X]
      +- Join Inner, (user_id#A = user_id#B)
         :- Filter (amount#Y > 40)
         :  +- LogicalRDD [order_id#W, user_id#A, amount#Y, dt#V], false
         +- LogicalRDD [user_id#B, name#U, country#X], false

Gli apostrofi sono spariti. Ogni colonna ha un exprId univoco (i numeri #X, #Y), che è come Catalyst traccia l’identità attraverso i rewrite successivi. Lo schema di output è stampato in cima.

L’analyzer è anche dove scopri di aver commesso degli errori. Riferire una colonna che non esiste, provare a comparare una stringa con una struct, fare alias verso un nome che esiste già nello scope: tutti questi errori escono da questa fase, con il messaggio “cannot resolve foo given input columns …”. Quando la gente si lamenta che “gli errori di Spark sono illeggibili”, di solito intendono errori dell’analyzer, e il trucco è leggere il piano per vedere dove la risoluzione si è effettivamente rotta.

Fase 3: optimized logical plan

Qui Catalyst si guadagna lo stipendio:

== Optimized Logical Plan ==
Aggregate [country#X], [country#X, sum(amount#Y) AS total#Z]
+- Project [amount#Y, country#X]
   +- Join Inner, (user_id#A = user_id#B)
      :- Project [user_id#A, amount#Y]
      :  +- Filter ((isnotnull(amount#Y) AND (amount#Y > 40)) AND isnotnull(user_id#A))
      :     +- LogicalRDD [order_id#W, user_id#A, amount#Y, dt#V], false
      +- Project [user_id#B, country#X]
         +- Filter ((country#X = IT) AND isnotnull(country#X) AND isnotnull(user_id#B))
            +- LogicalRDD [user_id#B, name#U, country#X], false

Confrontalo con l’analyzed plan. Diverse cose sono cambiate:

  • Il Filter (country = IT) esterno del codice originale è stato pushato attraverso l’aggregazione e il join, fino allo scan di users. L’optimizer ha notato che country viene solo da users e che filtrarla prima del join è logicamente equivalente e fisicamente molto più economico.
  • Sopra ogni scan ora vedi nodi Project che selezionano solo le colonne di cui il resto del piano ha bisogno. order_id, dt, name: mai referenziate dopo lo scan, vengono droppate immediatamente. Questo è il column pruning.
  • Filtri isnotnull sono stati aggiunti implicitamente. Gli inner join su chiavi null non matchano mai, quindi Catalyst inserisce il filtro null per ridurre l’input del join. Vittoria gratuita.
  • L’ordine del filter(amount > 40) originale e del join è stato preservato qui, ma in piani più grandi Catalyst riordinerà filtri e proiezioni per spingere più lavoro possibile verso le foglie.

La lista di regole dell’optimizer è lunga: PushDownPredicates, ColumnPruning, ConstantFolding, ReorderJoin, EliminateOuterJoin, CollapseProject, decine di altre. Non hai bisogno di memorizzarle. Hai bisogno di riconoscerne gli effetti nell’optimized plan e di fidarti che l’optimizer stia facendo scelte sensate.

Il pezzo cost-based (CBO), introdotto in Spark 2.2, entra in gioco quando hai raccolto statistiche su una tabella (ANALYZE TABLE ... COMPUTE STATISTICS). Con le stats, Catalyst può stimare i conteggi di righe a ogni nodo e scegliere il join order migliore, decidere quando un lato è abbastanza piccolo da fare broadcast, scegliere il più economico fra due piani equivalenti. Senza stats torna a delle euristiche. La maggior parte dei team in produzione non lancia mai ANALYZE, il che significa che la maggior parte delle decisioni dell’optimizer è euristica. Vale la pena saperlo: il CBO migliora notevolmente quando gli dai delle stats.

Fase 4: physical plan

== Physical Plan ==
*(5) HashAggregate(keys=[country#X], functions=[sum(amount#Y)])
+- Exchange hashpartitioning(country#X, 200), ENSURE_REQUIREMENTS
   +- *(4) HashAggregate(keys=[country#X], functions=[partial_sum(amount#Y)])
      +- *(4) Project [amount#Y, country#X]
         +- *(4) BroadcastHashJoin [user_id#A], [user_id#B], Inner, BuildRight
            :- *(4) Project [user_id#A, amount#Y]
            :  +- *(4) Filter ((isnotnull(amount#Y) AND (amount#Y > 40)) AND isnotnull(user_id#A))
            :     +- *(4) Scan ExistingRDD[order_id#W, user_id#A, amount#Y, dt#V]
            +- BroadcastExchange HashedRelationBroadcastMode(...), [plan_id=...]
               +- *(2) Project [user_id#B, country#X]
                  +- *(2) Filter ((country#X = IT) AND isnotnull(country#X) AND isnotnull(user_id#B))
                     +- *(2) Scan ExistingRDD[user_id#B, name#U, country#X]

Ogni operatore logico è stato mappato a un’implementazione fisica. Tre cose da leggere qui:

I nomi degli operatori ti dicono l’algoritmo. BroadcastHashJoin perché users era piccolo dopo il filtro: Catalyst ha scelto di farne broadcast invece che shuffling. Se entrambi i lati fossero stati grandi avresti visto SortMergeJoin, con due nodi Exchange hashpartitioning(...) che lo precedono. HashAggregate è un GROUP BY basato su hash; per query ordinate o di rollup a volte vedi invece SortAggregate. BroadcastNestedLoopJoin è l’antipattern: appare quando hai scritto un join che Catalyst non riesce a esprimere in altro modo (un join non equi-join su big data, tipicamente), e quasi sempre significa che dovresti riscrivere.

I nodi Exchange sono i marker dello shuffle. Ovunque vedi Exchange, Spark scriverà file di shuffle e li rileggerà. L’HashAggregate qui ha un Exchange hashpartitioning(country, 200) tra le aggregazioni parziale e finale: il pattern standard di aggregazione a due fasi. Contare i nodi Exchange è il modo più rapido di valutare quanto shuffling farà una query.

I prefissi *(N) indicano whole-stage code generation. Quella è roba di Tungsten: prossima lezione.

PushedFilters appare sui file scan. Quando leggi Parquet o un altro formato colonnare, vedrai qualcosa come:

+- FileScan parquet warehouse.orders[user_id#A, amount#Y]
   PushedFilters: [IsNotNull(amount), GreaterThan(amount,40)],
   ReadSchema: struct<user_id:int,amount:double>

Quelli sono predicati che sono stati pushati fino dentro il reader del formato file. Parquet li valuta mentre legge e salta interi row group che non possono matchare. Combinato con il column pruning (ReadSchema mostra solo le colonne che usi davvero), questo è il motivo per cui Parquet è così tanto più veloce di CSV: non solo compressione, ma la capacità dell’optimizer di leggere meno dati.

Leggere i piani in pratica

.explain(True) mostra tutte e quattro le fasi ed è quello che vuoi quando sta succedendo qualcosa di sorprendente. .explain() (senza argomento) mostra solo il physical plan, che è la maggior parte di quello che ti serve giorno per giorno. C’è anche .explain("formatted"), che renderizza il physical plan come una lista numerata con i dettagli scorporati sotto: più gentile per gli occhi su piani grandi, più difficile da scansionare velocemente.

Un workflow che tende a essere utile quando si debugga una query lenta:

  1. Lancia .explain() e conta i nodi Exchange. Ognuno è uno shuffle. Tre o meno è di solito ok; sei è sospetto; dieci significa che qualcosa non va.
  2. Guarda gli algoritmi di join. Ogni BroadcastNestedLoopJoin è una bandiera rossa.
  3. Controlla gli scan delle foglie. I filtri che hai scritto compaiono effettivamente in PushedFilters? Il ReadSchema è il set minimo di colonne che ti servono?
  4. Se qualcosa sembra sbagliato, passa a .explain(True) e cammina l’optimized logical plan per vedere perché il rewrite è avvenuto o non è avvenuto.

Questo è anche il momento giusto per menzionare l’Adaptive Query Execution. AQE (attivo di default dallo Spark 3.2) aggiunge una quinta fase che gira durante l’esecuzione: dopo ogni shuffle, Spark guarda le dimensioni reali delle partition e può coalescere dinamicamente le partition piccole, cambiare un sort-merge join in un broadcast join se un lato risulta più piccolo del previsto, o splittare partition skewed. AQE compare nel piano come AdaptiveSparkPlan alla radice e nodi AQEShuffleRead dopo gli exchange. Se il tuo physical plan sembra strano perché è cambiato a runtime, è per questo.

Punti di estensione

Puoi inserire le tue regole in Catalyst: Spark Session Extensions ti permette di registrare regole optimizer, strategie planner, estensioni parser e regole di analisi all’avvio della session. Iceberg, Delta Lake e Hudi usano tutti questo per iniettare i propri rewrite per cose come il predicate pushdown nei transaction log. Scrivere le proprie è raro e avanzato; perlopiù incontrerai questa superficie come utente quando una libreria data-source ti dice di aggiungere una config tipo spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension. Ora sai cosa sta facendo.

Cosa ricordare

Catalyst riscrive il tuo codice attraverso quattro fasi, e .explain(True) è la tua finestra su tutte. Gli optimized plan ti dicono cosa Spark ha pensato della tua query; i physical plan ti dicono cosa farà. Leggili abbastanza spesso da rendere familiari i nomi degli operatori. Una volta che sai leggere un piano, il resto del lavoro sulle performance di Spark (join, partitioning, caching, tutto il modulo 6) si incastra come scelte che compaiono direttamente nel piano.

Prossima lezione: il livello sotto Catalyst. Tungsten: code generation, formato di memoria binario, e perché DataFrame Spark è così tanto più veloce dell’RDD che ha rimpiazzato.


Riferimenti: “Deep Dive into Spark SQL’s Catalyst Optimizer” (https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html) e la guida Apache Spark SQL al performance tuning (https://spark.apache.org/docs/latest/sql-performance-tuning.html). Consultati il 2026-05-01.

Cerca