Ci sono due forme che un dataset tabellare può assumere, e quasi ogni workflow analitico passa dall’una all’altra almeno una volta. Il formato long ha una riga per osservazione: (user_id, metric_name, metric_value). Il formato wide ha una riga per entità, con ciascuna metrica nella sua colonna: (user_id, sessions, orders, revenue). Il long è ottimo per storage, aggregazione e pipeline ML; il wide è ottimo per reporting, export e occhi umani.
I verbi che spostano tra le due forme sono pivot (long verso wide) e unpivot (wide verso long, recentemente chiamato melt in PySpark). Entrambi sono essenziali, entrambi hanno specifici gotcha PySpark che vale la pena conoscere, ed entrambi possono produrre regressioni di performance spettacolari se ci ricorri con leggerezza. Camminiamoci attraverso in entrambe le direzioni con gli idiom giusti e quelli sbagliati da evitare.
Pivot: da long a wide
Il pivot classico parte da una tabella long e produce una wide. Vendite per country e quarter:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = (SparkSession.builder
.appName("PivotDemo")
.master("local[*]")
.getOrCreate())
long_df = spark.createDataFrame(
[
("IT", "Q1", 100.0),
("IT", "Q2", 150.0),
("IT", "Q3", 120.0),
("IT", "Q4", 200.0),
("NL", "Q1", 80.0),
("NL", "Q2", 90.0),
("NL", "Q3", 110.0),
("NL", "Q4", 130.0),
("DE", "Q1", 50.0),
("DE", "Q2", 70.0),
],
"country STRING, quarter STRING, revenue DOUBLE",
)
wide_df = (long_df
.groupBy("country")
.pivot("quarter")
.agg(F.sum("revenue")))
wide_df.show()
# +-------+----+----+----+----+
# |country| Q1| Q2| Q3| Q4|
# +-------+----+----+----+----+
# | IT|100 |150 |120 |200 |
# | NL| 80 | 90 |110 |130 |
# | DE| 50 | 70 |null|null|
# +-------+----+----+----+----+
Tre pezzi:
groupBy("country")— la colonna (o le colonne) che restano righe nell’output.pivot("quarter")— la colonna i cui valori distinti diventano nomi di colonna.agg(F.sum("revenue"))— cosa mettere in ogni cella dove la chiave di riga e il valore di colonna coincidono. Puoi passare qualsiasi aggregato; pivot è fondamentalmente ungroupBy+ spread.
DE ha null per Q3 e Q4 perché quelle righe non esistevano nell’input. E’ il comportamento giusto: un’osservazione mancante, non zero. Se vuoi gli zeri, .fillna(0, subset=["Q3", "Q4"]) dopo il pivot.
Sono ammesse aggregazioni multiple:
(long_df
.groupBy("country")
.pivot("quarter")
.agg(F.sum("revenue").alias("rev"), F.count("*").alias("n"))
.show())
# Columns become: Q1_rev, Q1_n, Q2_rev, Q2_n, ...
La trappola di performance del pivot
Ecco la cosa che nessuno ti dice nel primo tutorial: pivot("quarter") senza una lista di valori fa partire un job aggiuntivo. Spark deve sapere quali colonne produrre, quindi fa partire una scansione completa dei dati sorgente per raccogliere i valori distinti della colonna di pivot, prima di poter pianificare il pivot vero e proprio.
Per un DataFrame piccolo questo è invisibile. Per un dataset da 200 GB partizionato su mille executor, quella “discovery scan” è una spesa reale, a volte più costosa del pivot stesso. E peggio, gira eagerly anche se tutto il resto della tua pipeline è lazy.
La fix è dare a Spark la lista:
quarters = ["Q1", "Q2", "Q3", "Q4"]
(long_df
.groupBy("country")
.pivot("quarter", quarters)
.agg(F.sum("revenue"))
.show())
Adesso Spark sa esattamente quali colonne produrre. Niente discovery scan, niente job aggiuntivo. Se i dati contengono un valore non nella tua lista, viene silenziosamente scartato dall’output, che di solito è quello che vuoi per il reporting (non vuoi che una riga di test errante aggiunga una colonna null_quarter al tuo report mensile).
Prendi l’abitudine di passare sempre la lista quando la conosci. Nomi di quarter, codici valuta, codici country, status enum: questi sono di solito noti in anticipo. Risparmia il lavoro a Spark.
Pivot che esplodono
Un pivot è una wide transformation, e più largo è il risultato, più memoria serve a ogni executor per tenere una singola riga. Fare pivot su una colonna con 50 valori distinti va bene. Fare pivot su una colonna con 50.000 valori distinti produce un DataFrame in cui ogni riga ha 50.000 colonne, e quello non è davvero un DataFrame: è una sparse matrix con manie di grandezza.
Se la tua colonna di pivot ha cardinalità alta, quasi sicuramente vuoi una forma diversa: formato long con una riga per coppia (entity, metric), eventualmente partizionato per metric. Ricorri al pivot quando il numero di colonne risultanti è nelle decine o nelle basse centinaia, non nelle migliaia.
Una guardia utile: controlla prima la cardinalità.
n = long_df.select("quarter").distinct().count()
if n > 200:
raise ValueError(f"Pivot column has {n} distinct values, refusing to pivot")
Questo intercetta il caso in cui qualcuno punta il pivot sulla colonna sbagliata alle 3 di mattina di un sabato.
Unpivot: da wide a long
La direzione inversa, da wide a long, è più comune di quanto la gente creda. Salta fuori ogni volta che hai ricevuto un CSV da un vendor, un export Excel dalla finance o una tabella di warehouse denormalizzata in cui ogni metrica si è presa la sua colonna. Per aggregare, fare join o alimentare un modello, di solito la vuoi long.
In Spark 3.4 e successivi c’è un operatore di prima classe: melt.
wide_df = spark.createDataFrame(
[
("IT", 100.0, 150.0, 120.0, 200.0),
("NL", 80.0, 90.0, 110.0, 130.0),
("DE", 50.0, 70.0, None, None),
],
"country STRING, Q1 DOUBLE, Q2 DOUBLE, Q3 DOUBLE, Q4 DOUBLE",
)
# Spark 3.4+
long_again = wide_df.melt(
ids=["country"],
values=["Q1", "Q2", "Q3", "Q4"],
variableColumnName="quarter",
valueColumnName="revenue",
)
long_again.show()
# +-------+-------+-------+
# |country|quarter|revenue|
# +-------+-------+-------+
# | IT| Q1| 100.0|
# | IT| Q2| 150.0|
# | IT| Q3| 120.0|
# | IT| Q4| 200.0|
# | NL| Q1| 80.0|
# ...
ids sono le colonne da tenere così come sono (una per riga, ripetuta per ciascun valore di unpivot). values sono le colonne da unpivot. variableColumnName e valueColumnName danno il nome alle due nuove colonne che l’unpivot produce. Fatto.
Il trucco pre-3.4
Se sei su Spark 3.3 o più vecchio (ancora comune nelle piattaforme managed che restano indietro di una release o due), melt non esiste. L’idiom degli ultimi anni è stato stack dentro un selectExpr:
# Spark < 3.4
long_again = wide_df.selectExpr(
"country",
"stack(4, 'Q1', Q1, 'Q2', Q2, 'Q3', Q3, 'Q4', Q4) AS (quarter, revenue)"
)
long_again.show()
stack(N, ...) prende il numero di gruppi, poi N coppie di (label, value). Ogni coppia diventa una riga. L’AS (quarter, revenue) dà il nome alle due colonne di output. E’ brutto rispetto a melt ma funziona su ogni versione di Spark, ed è ancora utile quando ti serve pieno controllo sulle label (ad esempio label diverse dai nomi di colonna).
Per una versione programmatica in cui la lista di colonne arriva da config:
metric_cols = ["Q1", "Q2", "Q3", "Q4"]
stack_expr = f"stack({len(metric_cols)}, " + \
", ".join(f"'{c}', `{c}`" for c in metric_cols) + \
") AS (quarter, revenue)"
long_again = wide_df.selectExpr("country", stack_expr)
Quota con backtick i riferimenti di colonna nel caso i nomi contengano caratteri speciali. Quota con apice singolo le label. Azzecca le virgole tra di esse.
E’ anche un caso ragionevole per ricadere sulla sintassi SQL: il selectExpr essenzialmente è SQL. Vale la regola della lezione 37, “usa SQL quando il codice è fondamentalmente SQL”.
Andata e ritorno: pivot poi unpivot
Per dimostrare che sono inversi, fai andata e ritorno sui dati e conferma:
wide = long_df.groupBy("country").pivot("quarter", ["Q1","Q2","Q3","Q4"]).agg(F.sum("revenue"))
# Spark 3.4+
roundtrip = wide.melt(
ids=["country"],
values=["Q1", "Q2", "Q3", "Q4"],
variableColumnName="quarter",
valueColumnName="revenue",
).filter(F.col("revenue").isNotNull())
# Compare counts
print(long_df.count(), roundtrip.count())
# Same modulo the rows that didn't exist in long (DE Q3, Q4)
Il filter(F.col("revenue").isNotNull()) rimuove le celle che il pivot ha riempito con null per via delle righe di input mancanti. Senza, l’andata e ritorno è “quasi giusto”: ogni entità ha una riga per ogni metrica, incluse quelle che non sono mai esistite nella sorgente. A volte è quello che vuoi; a volte no. Sii deliberato.
Scegliere una forma per lo storage
Il formato long è quasi sempre la forma giusta per lo storage:
- Le nuove metriche diventano nuove righe, non nuove colonne. L’evoluzione dello schema smette di essere un progetto.
- Le aggregazioni tra metriche funzionano in modo naturale:
groupBy("metric_name").agg(F.avg("metric_value"))è una sola query. - La compressione dei file è più uniforme: le tabelle wide sparse sprecano molto spazio nei marker di null in Parquet.
- I feature store ML e molti tool per le serie temporali si aspettano input long.
Il formato wide è la forma giusta per gli output: un report, una query di dashboard, un CSV per un destinatario non tecnico. Fai pivot al confine della tua pipeline, non in mezzo.
Se ti ritrovi a fare pivot e riaggregare e unpivot nello stesso job, è un odore. Resta long fino all’ultimissimo step, poi fai pivot una volta sola in uscita.
Una euristica pratica: se un consumer a valle è umano, propendi per il wide. Se è un altro job Spark, codice di training ML o un metrics database, propendi per il long. I due consumer hanno preferenze ergonomiche diverse e i dati dovrebbero combaciare con il consumer.
C’è anche un terzo formato che vale la pena menzionare: una colonna struct o map. Invece di pivottare quattro quarter in quattro colonne, puoi raccoglierli in una singola colonna MapType con F.map_from_entries(F.collect_list(F.struct("quarter", "revenue"))). Le letture restano strette, lo schema non esplode quando appare un nuovo quarter, e qualunque analista che voglia un layout wide può fare select("country", "metrics.Q1", "metrics.Q2") per estrarre chiavi specifiche. E’ una via di mezzo che evita il peggio di entrambe le forme; non sempre è la scelta giusta, ma vale la pena tenerla nella cassetta degli attrezzi.
Esegui questo sulla tua macchina
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = (SparkSession.builder
.appName("ReshapeDemo")
.master("local[*]")
.getOrCreate())
long_df = spark.createDataFrame(
[
("IT", "Q1", 100.0), ("IT", "Q2", 150.0),
("IT", "Q3", 120.0), ("IT", "Q4", 200.0),
("NL", "Q1", 80.0), ("NL", "Q2", 90.0),
("NL", "Q3", 110.0), ("NL", "Q4", 130.0),
("DE", "Q1", 50.0), ("DE", "Q2", 70.0),
],
"country STRING, quarter STRING, revenue DOUBLE",
)
# Pivot with explicit values — no discovery scan
wide = (long_df
.groupBy("country")
.pivot("quarter", ["Q1", "Q2", "Q3", "Q4"])
.agg(F.sum("revenue"))
.fillna(0))
wide.show()
# Unpivot, modern (Spark 3.4+)
try:
long_again = wide.melt(
ids=["country"],
values=["Q1", "Q2", "Q3", "Q4"],
variableColumnName="quarter",
valueColumnName="revenue",
)
long_again.orderBy("country", "quarter").show()
except AttributeError:
print("melt not available in this Spark version, falling back to stack")
# Unpivot, pre-3.4 fallback
metric_cols = ["Q1", "Q2", "Q3", "Q4"]
stack_expr = (
f"stack({len(metric_cols)}, " +
", ".join(f"'{c}', `{c}`" for c in metric_cols) +
") AS (quarter, revenue)"
)
long_stack = wide.selectExpr("country", stack_expr)
long_stack.orderBy("country", "quarter").show()
# Cardinality guard before pivot
n_distinct = long_df.select("quarter").distinct().count()
print(f"pivot column cardinality: {n_distinct}")
Esegui tutti e tre. I tre DataFrame long alla fine sono equivalenti (modulo gli zeri dove hai usato fillna). La stampa della cardinalità è il tuo sanity check “è sicuro fare pivot”: prendine l’abitudine.
Cosi’ chiudiamo il toolkit della forma dei dati. Le prime tre lezioni del modulo 7 ti hanno dato l’intera superficie SQL (ultima lezione), il windowing dentro quella superficie (la lezione precedente) e il rimodellamento tra i due layout. La prossima lezione ci porta in territorio dove Catalyst non può più aiutarti: le user-defined function, la via di fuga quando i primitivi SQL e DataFrame non hanno quello che ti serve, e il costo di performance che arriva quando attraversi il confine JVM-Python su ogni riga.
Riferimenti: documentazione di Apache Spark per il pivot dei DataFrame e melt (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.melt.html) e reference della funzione stack. Recuperati il 2026-05-01.