PySpark, dalle fondamenta Lezione 37 / 60

PySpark SQL: quando SQL batte la sintassi DataFrame

Registrare temp view, chiamare spark.sql() e i casi in cui la stringa SQL è davvero più pulita della catena DataFrame.

Il modulo 7 parte qui, e il tema è l’optimizer: Catalyst. Prima di aprire il cofano, c’è un pezzo dell’API che la maggior parte dei corsi di PySpark sorvola perché sembra una strada secondaria: l’interfaccia SQL. Ogni DataFrame può essere trasformato in una tabella SQL con una sola riga, interrogato con una stringa, e riconvertito in DataFrame. Non è un motore separato, non è più lento, non è legacy. E’ la stessa pipeline Catalyst che entra da una porta diversa.

Il motivo per cui conta in questo modulo è che, una volta che sai scrivere la stessa query in due modi (DataFrame e SQL), inizi a notare quando ognuno è lo strumento giusto. E quella decisione si riflette direttamente su quanto saranno leggibili le tue pipeline tra sei mesi, quando qualcuno (probabilmente tu) le sta debuggando alle 23.

Due porte, un solo motore

Prendi un DataFrame qualsiasi. Registralo come vista temporanea. Interrogalo con SQL.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = (SparkSession.builder
         .appName("PySparkSQL")
         .master("local[*]")
         .getOrCreate())

orders = spark.createDataFrame(
    [
        (1, "IT", 59.0,  "2024-03-15"),
        (2, "IT", 29.0,  "2024-03-15"),
        (3, "NL", 149.0, "2024-04-02"),
        (4, "NL", 89.5,  "2024-04-02"),
        (5, "DE", 12.0,  "2025-01-08"),
        (6, "DE", 240.0, "2025-01-09"),
    ],
    "order_id INT, country STRING, total DOUBLE, dt STRING",
)

orders.createOrReplaceTempView("orders")

by_country_sql = spark.sql("""
    SELECT country, SUM(total) AS revenue
    FROM orders
    GROUP BY country
    ORDER BY revenue DESC
""")

by_country_df = (orders
                 .groupBy("country")
                 .agg(F.sum("total").alias("revenue"))
                 .orderBy(F.col("revenue").desc()))

Sia by_country_sql che by_country_df sono DataFrame. Passano per lo stesso parser Catalyst, le stesse riscritture del logical plan, lo stesso physical planner, la stessa code generation di Tungsten. Esegui .explain() su ognuno e vedrai piani identici, perché sono piani identici. La stringa SQL è soltanto un front end alternativo.

Questo è il fatto più importante della lezione: non c’è alcuna differenza di performance tra le due API. Nessuna. Smettila di scegliere l’una o l’altra “per le performance”. Scegli in base a cosa si legge meglio.

Se vuoi vedere l’equivalenza con i tuoi occhi, esegui .explain(mode="extended") su entrambe. Le sezioni == Parsed Logical Plan == differiranno leggermente (la stringa SQL parte come una UnresolvedRelation non risolta, la chiamata DataFrame parte già risolta), ma quando arrivi a == Optimized Logical Plan == e == Physical Plan == i due output sono identici fin nei numeri di ID degli operatori (che sono cosmetica del session counter). Non è una coincidenza; è l’architettura. I metodi builder del DataFrame costruiscono un albero di logical plan, e così fa anche il parser SQL. Da lì in poi, ogni regola di ottimizzazione, ogni stima di costo, ogni passo di code generation avviene su una rappresentazione condivisa.

Quando SQL è chiaramente meglio

Ci sono alcuni casi in cui la stringa SQL sarà, ogni volta, più piacevole della catena DataFrame equivalente.

Query multi-CTE. Spark SQL supporta la sintassi CTE completa: WITH a AS (...), b AS (...), c AS (...) SELECT ..., e se la tua trasformazione è naturalmente fatta di tre o quattro stadi nominati, è esattamente così che SQL la descrive.

spark.sql("""
    WITH daily_revenue AS (
        SELECT dt, country, SUM(total) AS revenue
        FROM orders
        GROUP BY dt, country
    ),
    country_totals AS (
        SELECT country, SUM(revenue) AS country_revenue
        FROM daily_revenue
        GROUP BY country
    ),
    flagged AS (
        SELECT d.*, c.country_revenue,
               d.revenue / c.country_revenue AS share
        FROM daily_revenue d
        JOIN country_totals c USING (country)
    )
    SELECT * FROM flagged WHERE share > 0.1
""").show()

La versione DataFrame è una catena di tre variabili intermedie, ciascuna seguita da .alias("daily_revenue") e poi rijoinata. Funziona, ma ogni lettore deve ricostruirsi mentalmente lo scoping di un blocco WITH.

Window function con più frame. La sintassi SQL OVER (PARTITION BY ... ORDER BY ... ROWS BETWEEN ...) è, francamente, quello per cui le window function sono state progettate. Le tratteremo come si deve nella prossima lezione, ma una query che usa tre window diverse nella stessa SELECT è significativamente più ordinata in SQL che nell’API DataFrame, dove devi dichiarare ogni Window spec separatamente e poi usarla in .over(...).

Codice che è già SQL. Quando la trasformazione è “questa query contro il warehouse, poi salva”, e non c’è nessun control flow Python in mezzo, scriverla come catena DataFrame è soltanto tradurre SQL in Python e chiedere al prossimo lettore di tradurla indietro. Saltati quel passaggio.

Quando i DataFrame sono chiaramente meglio

L’altra faccia della medaglia: se ti ritrovi a costruire una stringa SQL con f-string e chiamate a .join(), stai lavorando contro la corrente.

Generazione programmatica. Una lista di colonne da aggregare, un dict di rinomine, una pipeline guidata da config: queste cose sono di casa in Python e miserabili in una stringa SQL.

metrics = ["revenue", "qty", "discount", "tax"]
agg_exprs = [F.sum(c).alias(f"total_{c}") for c in metrics]

(orders
 .groupBy("country")
 .agg(*agg_exprs)
 .show())

Provare ad assemblare quella roba come "SELECT country, " + ", ".join(...) + " FROM ..." funziona ma invita bug di injection e si legge come un template engine.

Integrazione stretta con il control flow. Se il prossimo passo dipende da un controllo su un count, o stai ciclando su una lista di percorsi sorgente, o devi diramare in base allo schema, sei in Python. Resta in Python.

df = spark.read.parquet(path)

if "country" in df.columns:
    df = df.withColumn("country", F.upper("country"))

df.groupBy("country").count().show()

Puoi esprimere logica condizionale in SQL, ma le cuciture si vedono.

Riferimenti a colonne type-safe. F.col("revenue") + F.col("tax") sopravvive a un rename dell’IDE e ai linter. La stringa "revenue + tax" no. Su una codebase che vive a lungo, lo stile di riferimento per colonna ripaga.

Espressioni di colonna riutilizzabili. Un predicato complesso che viene usato in tre query è una sola variabile in DataFrame land:

is_active_paying = (F.col("status") == "active") & (F.col("plan") != "free")

active_orders   = orders.filter(is_active_paying)
active_users    = users.filter(is_active_paying)
active_sessions = sessions.filter(is_active_paying)

L’equivalente SQL è il predicato copincollato in tre clausole WHERE, con tutto il rischio di drift che ciò implica. Catalyst non ti salva dal digitare la stessa cosa tre volte leggermente sbagliata.

Mescolare i due

Non devi scegliere una volta per tutte. Le pipeline più ergonomiche che ho scritto prendono un DataFrame, lo registrano, eseguono un pezzo di SQL perché quella parte è davvero più pulita, e restituiscono il risultato alla sintassi DataFrame per i pezzi che Python fa meglio:

raw = spark.read.parquet("s3://bucket/orders/")
raw.createOrReplaceTempView("raw_orders")

cleaned = spark.sql("""
    WITH dedup AS (
        SELECT *,
               ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY ingested_at DESC) AS rn
        FROM raw_orders
    )
    SELECT order_id, country, total, dt
    FROM dedup
    WHERE rn = 1
""")

# Back to DataFrame for programmatic enrichment
for col in ["country", "dt"]:
    cleaned = cleaned.withColumn(col, F.trim(col))

cleaned.write.mode("overwrite").parquet("s3://bucket/orders_cleaned/")

Usa quello che esprime il passo nel modo più chiaro.

Ispezionare cosa è registrato

spark.catalog è la tua maniglia su quali view e tabelle Spark conosce in questo momento.

spark.catalog.listTables()
# [Table(name='orders', database=None, description=None, tableType='TEMPORARY', ...)]

spark.catalog.listColumns("orders")
# [Column(name='order_id', description=None, dataType='int', nullable=True, ...), ...]

spark.catalog.dropTempView("orders")

Utile nei notebook quando hai registrato tre view e non ti ricordi più qual è qual è, e utile negli script come sanity check prima che parta una query.

Temp view, global temp view e managed table

Tre livelli di “permanente”. Scegli quello che corrisponde alla durata di vita che vuoi davvero.

Temp view. df.createOrReplaceTempView("orders"). Vive per la SparkSession. Quando la sessione finisce, la view sparisce. Ha scope limitato alla sessione che l’ha registrata: un’altra SparkSession nella stessa applicazione non la vede. E’ il default e la risposta giusta nel 95% dei casi.

Global temp view. df.createGlobalTempView("orders") (oppure createOrReplaceGlobalTempView). Vive per l’intera applicazione Spark: ogni SparkSession nella JVM la può vedere. Vive nel database speciale global_temp, quindi la interroghi come SELECT * FROM global_temp.orders. Utile quando hai più sessioni che condividono un’applicazione Spark e devi passarti una view tra loro. Quasi nessuno incontra questo caso.

Managed table. df.write.saveAsTable("warehouse.orders"). Persistita nel metastore (Hive metastore, Glue, Unity Catalog) e i file sottostanti scritti nella directory del warehouse. Sopravvive ai restart. Sopravvive ai cluster. E’ quello che vuoi per le tabelle che interrogherai domani, la prossima settimana, da un altro job, da un altro team. Il modulo 6 ha usato questo per il bucketing: stesso meccanismo.

# Session-scoped — gone when the session ends
df.createOrReplaceTempView("session_local")

# App-scoped — survives across sessions in this app
df.createGlobalTempView("app_wide")
spark.sql("SELECT * FROM global_temp.app_wide").show()

# Persisted to the metastore — survives the cluster
df.write.mode("overwrite").saveAsTable("analytics.orders")

Il modello mentale: temp view = variabile locale, global temp view = variabile a livello di modulo, managed table = file su disco.

Un piccolo esempio guidato dall’inizio alla fine

Facciamo il pattern multi-CTE sul DataFrame orders di prima e confermiamo che entrambe le API producono lo stesso piano.

orders.createOrReplaceTempView("orders")

# SQL form
sql_result = spark.sql("""
    WITH country_revenue AS (
        SELECT country, SUM(total) AS revenue
        FROM orders
        GROUP BY country
    )
    SELECT country, revenue
    FROM country_revenue
    WHERE revenue > 50
    ORDER BY revenue DESC
""")

# DataFrame form
df_result = (orders
             .groupBy("country")
             .agg(F.sum("total").alias("revenue"))
             .filter(F.col("revenue") > 50)
             .orderBy(F.col("revenue").desc()))

sql_result.explain(mode="formatted")
df_result.explain(mode="formatted")

I due output di explain differiscono solo nei numeri di ID cosmetici. Stesso physical plan, stesso numero di shuffle, stesso wall-clock time. La scelta tra i due è puramente una questione di chi leggerà questo codice tra tre mesi.

Esegui questo sulla tua macchina

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = (SparkSession.builder
         .appName("PySparkSQLDemo")
         .master("local[*]")
         .getOrCreate())

orders = spark.createDataFrame(
    [(i, ["IT", "NL", "DE"][i % 3], float(i * 7 % 200), f"2024-{(i % 12) + 1:02d}-01")
     for i in range(1, 50)],
    "order_id INT, country STRING, total DOUBLE, dt STRING",
)

orders.createOrReplaceTempView("orders")

# Multi-CTE in SQL
spark.sql("""
    WITH monthly AS (
        SELECT country, SUBSTR(dt, 1, 7) AS month, SUM(total) AS revenue
        FROM orders
        GROUP BY country, SUBSTR(dt, 1, 7)
    ),
    ranked AS (
        SELECT *, RANK() OVER (PARTITION BY month ORDER BY revenue DESC) AS rk
        FROM monthly
    )
    SELECT * FROM ranked WHERE rk = 1 ORDER BY month
""").show()

# Same logic, partly DataFrame, partly SQL
df = (orders
      .withColumn("month", F.substring("dt", 1, 7))
      .groupBy("country", "month")
      .agg(F.sum("total").alias("revenue")))

df.createOrReplaceTempView("monthly")
spark.sql("""
    SELECT * FROM (
        SELECT *, RANK() OVER (PARTITION BY month ORDER BY revenue DESC) AS rk
        FROM monthly
    ) WHERE rk = 1
""").show()

# Inspect the catalog
for t in spark.catalog.listTables():
    print(t.name, t.tableType)

Esegui entrambi. Scorri entrambi. Quello che trovi più facile da leggere domani mattina è quello giusto per il tuo team.

Quel RANK() OVER (PARTITION BY ... ORDER BY ...) che hai appena scritto è una window function, l’argomento della prossima lezione. Le window function sono lo strumento secondo per utilità nella cassetta degli attrezzi SQL dopo GROUP BY, e una volta che le hai, smetti di ricorrere ai self-join per calcolare pattern del tipo “riga N rispetto a riga N-1”.


Riferimenti: Apache Spark SQL programming guide (https://spark.apache.org/docs/latest/sql-programming-guide.html) e la reference dell’API Spark Catalog. Recuperati il 2026-05-01.

Cerca