Există două forme pe care un dataset tabular le poate lua, iar aproape fiecare flux analitic comută între ele cel puțin o dată. Formatul long are un rând per observație: (user_id, metric_name, metric_value). Formatul wide are un rând per entitate, cu fiecare metric ca propria coloană: (user_id, sessions, orders, revenue). Long e excelent pentru stocare, agregare și pipeline-uri ML; wide e excelent pentru raportare, export-uri și ochii umani.
Verbele care fac trecerea între ele sunt pivot (long la wide) și unpivot (wide la long, recent numit melt în PySpark). Ambele sunt esențiale, ambele au gotcha-uri specifice PySpark care merită cunoscute, iar ambele pot produce regresii spectaculoase de performanță dacă apelezi la ele neglijent. Hai să parcurgem fiecare direcție cu idiomurile potrivite și cele greșite, de evitat.
Pivot: long la wide
Pivot-ul clasic pornește de la un tabel long și produce unul wide. Vânzările pe țară și trimestru:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = (SparkSession.builder
.appName("PivotDemo")
.master("local[*]")
.getOrCreate())
long_df = spark.createDataFrame(
[
("IT", "Q1", 100.0),
("IT", "Q2", 150.0),
("IT", "Q3", 120.0),
("IT", "Q4", 200.0),
("NL", "Q1", 80.0),
("NL", "Q2", 90.0),
("NL", "Q3", 110.0),
("NL", "Q4", 130.0),
("DE", "Q1", 50.0),
("DE", "Q2", 70.0),
],
"country STRING, quarter STRING, revenue DOUBLE",
)
wide_df = (long_df
.groupBy("country")
.pivot("quarter")
.agg(F.sum("revenue")))
wide_df.show()
# +-------+----+----+----+----+
# |country| Q1| Q2| Q3| Q4|
# +-------+----+----+----+----+
# | IT|100 |150 |120 |200 |
# | NL| 80 | 90 |110 |130 |
# | DE| 50 | 70 |null|null|
# +-------+----+----+----+----+
Trei bucăți:
groupBy("country")- coloana(ele) care rămân rânduri în output.pivot("quarter")- coloana ale cărei valori distincte devin nume de coloane.agg(F.sum("revenue"))- ce să pui în fiecare celulă unde cheia rândului și valoarea coloanei coincid. Poți pasa orice agregat; pivot e fundamental ungroupByplus împrăștiere.
DE are null-uri pentru Q3 și Q4 fiindcă acele rânduri nu existau în input. Acela e comportamentul corect: o observație lipsă, nu zero. Dacă vrei zerouri, .fillna(0, subset=["Q3", "Q4"]) după pivot.
Sunt permise mai multe agregări:
(long_df
.groupBy("country")
.pivot("quarter")
.agg(F.sum("revenue").alias("rev"), F.count("*").alias("n"))
.show())
# Coloanele devin: Q1_rev, Q1_n, Q2_rev, Q2_n, ...
Capcana de performanță a pivotului
Iată chestia pe care nimeni nu o menționează în primul tutorial: pivot("quarter") fără o listă de valori rulează un job suplimentar. Spark trebuie să știe ce coloane să producă, așa că pornește un scan complet al datelor sursă pentru a colecta valorile distincte ale coloanei pivot, înainte să poată planifica pivotul efectiv.
Pentru un DataFrame mic asta e invizibil. Pentru un dataset de 200 GB partiționat peste o mie de executori, acel „scan de descoperire” e o cheltuială reală - uneori mai costisitor decât pivotul în sine. Și mai rău, rulează eager chiar dacă tot restul pipeline-ului tău e lazy.
Soluția e să-i dai lui Spark lista:
quarters = ["Q1", "Q2", "Q3", "Q4"]
(long_df
.groupBy("country")
.pivot("quarter", quarters)
.agg(F.sum("revenue"))
.show())
Acum Spark știe exact ce coloane să producă. Fără scan de descoperire, fără job suplimentar. Dacă datele conțin o valoare care nu e în lista ta, e eliminată tăcut din output, ceea ce de obicei e ce vrei pentru raportare (nu vrei ca un rând rătăcit de test să adauge o coloană null_quarter raportului tău lunar).
Intră în obiceiul de a pasa mereu lista când o cunoști. Numele de trimestre, codurile de monedă, codurile de țară, enum-urile de status: acestea sunt de obicei cunoscute din start. Scutește-l pe Spark de muncă.
Pivot-uri care explodează
Un pivot e o transformare wide, iar cu cât rezultatul e mai larg, cu atât mai multă memorie are nevoie fiecare executor pentru a ține un singur rând. Pivot pe o coloană cu 50 de valori distincte e în regulă. Pivot pe o coloană cu 50 de mii de valori distincte produce un DataFrame în care fiecare rând are 50 de mii de coloane, iar acela nu e cu adevărat un DataFrame: e o matrice rară cu iluzii de grandoare.
Dacă coloana ta de pivot are cardinalitate înaltă, aproape sigur vrei o altă formă: format long cu un rând per pereche (entity, metric), posibil partiționat după metric. Apelează la pivot când numărul de coloane rezultate e în zecile sau sute joase, nu mii.
O gardă utilă: verifică mai întâi cardinalitatea.
n = long_df.select("quarter").distinct().count()
if n > 200:
raise ValueError(f"Pivot column has {n} distinct values, refusing to pivot")
Asta prinde cazul în care cineva îndreaptă pivot către coloana greșită la 3 AM într-o sâmbătă.
Unpivot: wide la long
Direcția inversă (wide la long) e mai comună decât realizează lumea. Apare ori de câte ori ai primit un CSV de la un furnizor, un export Excel de la finanțe sau un tabel de warehouse denormalizat în care fiecare metric și-a primit propria coloană. Pentru a agrega, face join sau alimenta un model, de obicei îl vrei long.
În Spark 3.4 și mai nou, există un operator de primă clasă: melt.
wide_df = spark.createDataFrame(
[
("IT", 100.0, 150.0, 120.0, 200.0),
("NL", 80.0, 90.0, 110.0, 130.0),
("DE", 50.0, 70.0, None, None),
],
"country STRING, Q1 DOUBLE, Q2 DOUBLE, Q3 DOUBLE, Q4 DOUBLE",
)
# Spark 3.4+
long_again = wide_df.melt(
ids=["country"],
values=["Q1", "Q2", "Q3", "Q4"],
variableColumnName="quarter",
valueColumnName="revenue",
)
long_again.show()
# +-------+-------+-------+
# |country|quarter|revenue|
# +-------+-------+-------+
# | IT| Q1| 100.0|
# | IT| Q2| 150.0|
# | IT| Q3| 120.0|
# | IT| Q4| 200.0|
# | NL| Q1| 80.0|
# ...
ids sunt coloanele de păstrat as-is (una per rând, repetată pentru fiecare valoare unpivotată). values sunt coloanele de unpivotat. variableColumnName și valueColumnName numesc cele două coloane noi pe care le produce unpivot-ul. Gata.
Trucul de dinainte de 3.4
Dacă ești pe Spark 3.3 sau mai vechi (încă comun în platforme gestionate care rămân în urmă cu un release sau două), melt nu există. Idiomul din ultimii câțiva ani a fost stack în interiorul unui selectExpr:
# Spark < 3.4
long_again = wide_df.selectExpr(
"country",
"stack(4, 'Q1', Q1, 'Q2', Q2, 'Q3', Q3, 'Q4', Q4) AS (quarter, revenue)"
)
long_again.show()
stack(N, ...) ia numărul de grupuri, apoi N perechi de (label, value). Fiecare pereche devine un rând. AS (quarter, revenue) numește cele două coloane de output. E urât în comparație cu melt, dar funcționează pe orice versiune Spark și e încă util când ai nevoie de control complet asupra etichetelor (etichete diferite față de numele coloanelor, de exemplu).
Pentru o versiune programatică în care lista de coloane vine din configurare:
metric_cols = ["Q1", "Q2", "Q3", "Q4"]
stack_expr = f"stack({len(metric_cols)}, " + \
", ".join(f"'{c}', `{c}`" for c in metric_cols) + \
") AS (quarter, revenue)"
long_again = wide_df.selectExpr("country", stack_expr)
Pune referințele de coloane între backtick-uri în caz că numele conțin caractere speciale. Pune etichetele între ghilimele simple. Așază virgula dintre ele corect.
Acesta e și un caz rezonabil pentru a recurge la sintaxa SQL: selectExpr e oricum SQL în esență. Regula din lecția 37 „folosește SQL când codul e în mare parte SQL” se aplică.
Round-trip: pivot apoi unpivot
Pentru a demonstra că sunt inverse, fă round-trip pe date și confirmă:
wide = long_df.groupBy("country").pivot("quarter", ["Q1","Q2","Q3","Q4"]).agg(F.sum("revenue"))
# Spark 3.4+
roundtrip = wide.melt(
ids=["country"],
values=["Q1", "Q2", "Q3", "Q4"],
variableColumnName="quarter",
valueColumnName="revenue",
).filter(F.col("revenue").isNotNull())
# Compara count-urile
print(long_df.count(), roundtrip.count())
# La fel modulo randurile care nu existau in long (DE Q3, Q4)
filter(F.col("revenue").isNotNull()) îndepărtează celulele pe care pivot-ul le-a umplut cu null din cauza rândurilor de input lipsă. Fără el, round-trip-ul e „aproape corect”: fiecare entitate are un rând pentru fiecare metric, inclusiv pentru cele care nu au existat niciodată în sursă. Uneori asta e ce vrei; alteori nu. Fii deliberat.
Alegerea unei forme pentru stocare
Formatul long e aproape întotdeauna forma corectă pentru stocare:
- Metricile noi devin rânduri noi, nu coloane noi. Evoluția schemei nu mai e un proiect.
- Agregările între metrici funcționează natural:
groupBy("metric_name").agg(F.avg("metric_value"))e o singură interogare. - Compresia de fișiere e mai uniformă: tabelele wide rare risipesc mult spațiu pe markeri null în Parquet.
- Feature store-urile ML și multe unelte de time-series așteaptă input long.
Formatul wide e forma corectă pentru output-uri: un raport, o interogare de dashboard, un CSV pentru un destinatar non-tehnic. Pivot la limita pipeline-ului tău, nu la mijloc.
Dacă te trezești pivotând și re-agregând și unpivotând în același job, acela e un miros. Stai long până la ultimul pas, apoi pivot o singură dată la ieșire.
O euristică practică: dacă un consumator din aval e uman, înclină spre wide. Dacă e un alt job Spark, cod de antrenare ML sau o bază de date de metrici, înclină spre long. Cei doi consumatori au preferințe ergonomice diferite, iar datele ar trebui să se potrivească consumatorului.
Mai există și un al treilea format care merită menționat: o coloană struct sau map. În loc să pivotezi patru trimestre în patru coloane, le poți colecta într-o singură coloană MapType cu F.map_from_entries(F.collect_list(F.struct("quarter", "revenue"))). Citirile rămân înguste, schema nu explodează când apare un trimestru nou, iar orice analist care are nevoie de un layout wide poate face select("country", "metrics.Q1", "metrics.Q2") pentru a extrage chei specifice. Este o cale de mijloc care evită ce e mai rău din ambele forme; nu întotdeauna alegerea corectă, dar de păstrat în trusa ta.
Rulează asta pe propriul tău calculator
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = (SparkSession.builder
.appName("ReshapeDemo")
.master("local[*]")
.getOrCreate())
long_df = spark.createDataFrame(
[
("IT", "Q1", 100.0), ("IT", "Q2", 150.0),
("IT", "Q3", 120.0), ("IT", "Q4", 200.0),
("NL", "Q1", 80.0), ("NL", "Q2", 90.0),
("NL", "Q3", 110.0), ("NL", "Q4", 130.0),
("DE", "Q1", 50.0), ("DE", "Q2", 70.0),
],
"country STRING, quarter STRING, revenue DOUBLE",
)
# Pivot cu valori explicite - fara scan de descoperire
wide = (long_df
.groupBy("country")
.pivot("quarter", ["Q1", "Q2", "Q3", "Q4"])
.agg(F.sum("revenue"))
.fillna(0))
wide.show()
# Unpivot, modern (Spark 3.4+)
try:
long_again = wide.melt(
ids=["country"],
values=["Q1", "Q2", "Q3", "Q4"],
variableColumnName="quarter",
valueColumnName="revenue",
)
long_again.orderBy("country", "quarter").show()
except AttributeError:
print("melt not available in this Spark version, falling back to stack")
# Unpivot, fallback pre-3.4
metric_cols = ["Q1", "Q2", "Q3", "Q4"]
stack_expr = (
f"stack({len(metric_cols)}, " +
", ".join(f"'{c}', `{c}`" for c in metric_cols) +
") AS (quarter, revenue)"
)
long_stack = wide.selectExpr("country", stack_expr)
long_stack.orderBy("country", "quarter").show()
# Garda de cardinalitate inainte de pivot
n_distinct = long_df.select("quarter").distinct().count()
print(f"pivot column cardinality: {n_distinct}")
Rulează toate trei. Cele trei DataFrames long de la final sunt echivalente (modulo zerourile unde ai folosit fillna). Print-ul de cardinalitate e verificarea ta de bun simț „e sigur să fac pivot”: intră în obicei.
Asta încheie trusa de unelte pentru forma datelor. Primele trei lecții ale Modulului 7 ți-au dat suprafața SQL completă (lecția trecută), windowing în interiorul acelei suprafețe (lecția dinainte) și reformarea între cele două layout-uri. Lecția următoare ne duce într-un teritoriu unde Catalyst nu te mai poate ajuta: user-defined functions, scăparea de urgență când primitivele SQL și DataFrame nu au ce-ți trebuie, și costul de performanță care vine cu trecerea graniței JVM-Python pe fiecare rând.
Referințe: documentația Apache Spark DataFrame pivot și melt (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.melt.html) și referința funcției stack. Consultat 2026-05-01.