select prende le colonne, filter tiene le righe. La terza operazione di tutti i giorni è derivare una nuova colonna da quelle esistenti: una tassa sopra un prezzo, un flag basato su una soglia, una versione normalizzata di una stringa. Lo strumento principale di PySpark per questo è withColumn, e come la maggior parte dei metodi PySpark, è semplice in superficie e ha un noto bordo affilato sotto.
Tratteremo prima la superficie, withColumn, lit, logica condizionale con when().otherwise(), type cast, e poi il bordo affilato: cosa succede quando chiami withColumn 50 volte in un loop, perché è una trappola di performance nota, e cosa usare al suo posto.
withColumn: aggiungere o sostituire una singola colonna
withColumn(name, expr) restituisce un nuovo DataFrame con la colonna nominata aggiunta o sostituita. Se il nome esiste già, la colonna esistente viene sovrascritta; altrimenti, viene aggiunta in coda.
from pyspark.sql.functions import col
df2 = orders.withColumn("amount_with_vat", col("amount") * 1.22)
Questa è tutta l’API. Un nome, un’espressione Column, un nuovo DataFrame. L’orders originale è invariato: i DataFrame sono immutabili, ogni trasformazione restituisce uno nuovo.
L’espressione sul lato destro può essere qualsiasi cosa che valuti a una Column: aritmetica, chiamate di funzione, when().otherwise(), anche valori letterali:
from pyspark.sql.functions import col, upper, length
df3 = (
orders
.withColumn("amount_with_vat", col("amount") * 1.22)
.withColumn("country_upper", upper(col("country")))
.withColumn("name_length", length(col("customer_name")))
)
Per sostituire una colonna esistente, usa lo stesso nome. Caso comune: aggiustare il tipo di una colonna inferita come stringa.
df4 = orders.withColumn("amount", col("amount").cast("double"))
Questo non aggiunge una colonna chiamata amount accanto a quella vecchia: sovrascrive l’amount esistente con la versione castata. La posizione nello schema rimane la stessa.
lit: trasformare valori Python in espressioni Column
Quando mescoli un valore Python con una Column in un’espressione, PySpark di solito ci capisce. col("amount") * 1.22 funziona perché la classe Column di PySpark fa overload di * e sa come gestire il letterale numerico 1.22.
Ma a volte questa promozione automatica non basta, di solito quando stai chiamando una funzione che si aspetta esplicitamente una Column, o quando il valore Python è di per sé ambiguo (un None, una stringa data). Per quei casi, avvolgi il letterale esplicitamente con lit:
from pyspark.sql.functions import col, lit, when
df5 = orders.withColumn(
"discount",
when(col("amount") > 100, lit(10.0)).otherwise(lit(0.0))
)
lit(10.0) produce un’espressione Column che valuta alla costante 10.0 per ogni riga. when().otherwise() ha bisogno di argomenti Column su entrambi i lati; senza lit, staresti passando float Python grezzi a una funzione che si aspetta Column, e a seconda della versione potrebbe o meno fare la promozione automatica.
Regola pratica: dentro when().otherwise(), array(), struct(), e la maggior parte delle chiamate di funzione, avvolgi i letterali in lit(). Fuori da quelle, quando fai aritmetica semplice come col("x") + 1, non ti serve lit. PySpark ci capisce.
lit(None) è il modo standard di aggiungere una colonna null letterale:
df6 = orders.withColumn("not_yet_processed_at", lit(None).cast("timestamp"))
Nota il cast: lit(None) da solo è tipato come null/void, il che può confondere le operazioni a valle. Casta sempre lit(None) al tipo che vuoi.
when().otherwise(): la colonna condizionale
L’equivalente PySpark del CASE WHEN di SQL. Si legge dall’alto al basso: vince il primo branch che fa match, cade in .otherwise(...) se nessuno fa match.
from pyspark.sql.functions import when, col, lit
df7 = orders.withColumn(
"size_bucket",
when(col("amount") < 50, lit("small"))
.when(col("amount") < 200, lit("medium"))
.otherwise(lit("large"))
)
Concatena tutte le chiamate .when(...) che ti servono. Se salti .otherwise(...), le righe non matchate prendono null per quella colonna, il che a volte è quello che vuoi e spesso è un bug in attesa di accadere. Includo sempre .otherwise(...) anche se è solo lit(None).cast("string"); l’esplicitezza vale lo sforzo.
Puoi usare qualsiasi espressione Column come predicato, non solo l’uguaglianza semplice:
df8 = orders.withColumn(
"fraud_risk",
when(
(col("amount") > 1000) & col("email").isNull(),
lit("high")
)
.when(col("country").isin("XX", "YY"), lit("medium"))
.otherwise(lit("low"))
)
Gli operatori booleani seguono le stesse regole di where: &, |, ~, con parentesi attorno a ogni confronto. (La lezione 14 lo copre; se (col(...) > 100) & (col(...) == "IT") sta iniziando a sembrarti un tic, è il tic giusto da sviluppare.)
Type cast inline
col("x").cast(type) restituisce una Column che valuta al valore castato. Lo combinerai con withColumn di continuo quando ripulisci dati:
df9 = (
raw
.withColumn("amount", col("amount").cast("double"))
.withColumn("customer_id", col("customer_id").cast("int"))
.withColumn("ts", col("ts").cast("timestamp"))
)
L’argomento di cast è o una stringa con il nome del tipo DDL ("double", "int", "timestamp", "string") o un oggetto DataType. Entrambi funzionano; la forma stringa è più corta e si legge meglio.
Se un valore non può essere castato (una stringa non numerica che va a double, un timestamp non parsabile), il risultato è null, silenziosamente, senza errore. Se vuoi rilevare i cast falliti, conta i null prima e dopo:
nulls_before = raw.filter(col("amount").isNull()).count()
casted = raw.withColumn("amount", col("amount").cast("double"))
nulls_after = casted.filter(col("amount").isNull()).count()
print(f"Cast lost {nulls_after - nulls_before} rows to nulls")
La trappola della concatenazione
Ecco il bordo affilato. withColumn è comodo, e la comodità porta a scrivere codice come questo:
# 50 columns from a config
new_df = df
for col_spec in feature_specs:
new_df = new_df.withColumn(col_spec.name, build_expr(col_spec))
Sembra innocuo. Ogni chiamata withColumn restituisce un nuovo DataFrame, che è quello che fanno: non sta calcolando niente con eagerness, sta solo aggiungendo un altro nodo al piano logico. Nessun lavoro fisico avviene.
Tranne: ogni chiamata withColumn aggiunge un nodo Project al piano. Dopo 50 chiamate, hai 50 nodi Project annidati. Catalyst (l’ottimizzatore di query di Spark, lo copriremo nella lezione 41) prova a collassarli, e la maggior parte delle volte ci riesce. Ma “la maggior parte delle volte” nasconde casi in cui non ci riesce, e quando non ci riesce, ottieni:
- Tempo di analisi del piano che cresce in modo non lineare con il numero di chiamate
withColumn. Su pipeline con centinaia di colonne derivate, il solo plan-time può richiedere minuti prima che venga toccato un dato. - Errori di stack-overflow nel planner con catene molto profonde (questo è documentato nella storia JIRA di Spark; la soglia si è spostata tra le versioni, ma il pattern è reale).
- Regole dell’ottimizzatore che non scattano perché fanno pattern matching su forme che la catena profonda non espone.
La documentazione ufficiale di Spark lo fa notare esplicitamente e raccomanda un select con tutte le espressioni in una volta sola quando hai molte colonne da aggiungere o modificare.
Usa select per molte colonne in una volta
La soluzione è fare tutto il lavoro sulle colonne in un singolo select:
from pyspark.sql.functions import col
# Build the list of expressions: keep existing columns, add new ones
projection = [
col("*"), # keep everything
(col("amount") * 1.22).alias("amount_with_vat"),
upper(col("country")).alias("country_upper"),
when(col("amount") > 100, lit(10.0)).otherwise(lit(0.0)).alias("discount"),
]
result = orders.select(*projection)
O, se vuoi pieno controllo sullo schema di output (sostituire colonne piuttosto che solo aggiungerle in coda):
result = orders.select(
"order_id",
"customer_id",
col("amount").cast("double").alias("amount"),
upper(col("country")).alias("country"),
(col("amount") * 1.22).alias("amount_with_vat"),
when(col("amount") > 100, lit(10.0)).otherwise(lit(0.0)).alias("discount"),
)
Un nodo del piano. Una proiezione. Catalyst riceve una lista piatta di espressioni da ottimizzare, non una catena profonda 50 da appiattire prima. Il plan-time rimane veloce, l’ottimizzatore si comporta in modo coerente.
Lo stesso pattern in un loop, generando una lista:
exprs = [col("*")]
for spec in feature_specs:
exprs.append(build_expr(spec).alias(spec.name))
result = df.select(*exprs)
Un singolo select con len(feature_specs) + 1 espressioni, indipendentemente da quante spec ci sono.
Quando concatenare withColumn va bene
Non ti sto dicendo di non usare mai withColumn. Per una manciata di colonne, diciamo fino a dieci, concatenare è perfettamente leggibile e non c’è nessun problema di performance. La trappola è specificamente quando:
- Stai aggiungendo molte colonne (soglia indicativa: più di 20 o giù di lì).
- Le nuove colonne vengono aggiunte in un loop guidato da config o metadati.
- Il tempo di analisi del piano si sta presentando come un problema (lo vedrai nel tab “SQL” della Spark UI come un lungo ritardo prima che parta qualsiasi stage).
Per trasformazioni occasionali scritte a mano, withColumn è lo strumento giusto. Per il feature engineering programmatico con centinaia di colonne, costruisci una lista per select.
Confrontare i piani con explain
Puoi vederlo coi tuoi occhi:
# 20 chained withColumn calls
chained = df
for i in range(20):
chained = chained.withColumn(f"f{i}", col("amount") * (i + 1))
chained.explain(extended=True)
# The Analyzed and Optimized plans show 20 nested Project nodes,
# which Catalyst (usually) flattens to one in the Physical plan.
# Same thing as a single select
exprs = [col("*")] + [(col("amount") * (i + 1)).alias(f"f{i}") for i in range(20)]
flat = df.select(*exprs)
flat.explain(extended=True)
# One Project node from the start.
Per 20 colonne l’appiattimento riesce e i piani fisici sono identici. Aumenta a 200 e inizierai a vedere differenze nel tempo di analisi, anche se il piano fisico finale sembra ancora lo stesso. (Scaveremo nel leggere l’output di explain per bene nella lezione 41 sull’ottimizzatore Catalyst.)
Fai girare questo sulla tua macchina
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when, upper, length
spark = SparkSession.builder.appName("withcolumn-demo").getOrCreate()
data = [
(1, "anne", 59.00, "it", "anne@example.com"),
(2, "bob", 149.00, "de", "bob@example.com"),
(3, "claire", 12.00, "es", None),
(4, "diego", 999.00, "it", "diego@example.com"),
(5, "elena", 29.00, "fr", "elena@example.com"),
]
schema = "order_id INT, name STRING, amount DOUBLE, country STRING, email STRING"
orders = spark.createDataFrame(data, schema)
# Q1: simple withColumn -- add a derived column
orders.withColumn("amount_with_vat", col("amount") * 1.22).show()
# Q2: replace a column (uppercase the country)
orders.withColumn("country", upper(col("country"))).show()
# Q3: when().otherwise() with lit
orders.withColumn(
"size",
when(col("amount") < 50, lit("small"))
.when(col("amount") < 200, lit("medium"))
.otherwise(lit("large"))
).show()
# Q4: lit(None) typed correctly
orders.withColumn("processed_at", lit(None).cast("timestamp")).printSchema()
# Q5: many columns the wrong way (chained)
chained = orders
for i in range(15):
chained = chained.withColumn(f"f{i}", col("amount") * (i + 1))
print("=== chained ===")
chained.explain(False) # look for the nested Projects in the analyzed plan
# Q6: many columns the right way (single select)
exprs = [col("*")] + [(col("amount") * (i + 1)).alias(f"f{i}") for i in range(15)]
flat = orders.select(*exprs)
print("=== flat ===")
flat.explain(False)
# The physical plans should match. The analyzed plans differ in depth.
Fai girare entrambe le chiamate explain. Il piano fisico in fondo è quello che davvero gira; per conteggi di colonne moderati sarà lo stesso. Il piano analizzato sopra mostra la differenza strutturale. Quella differenza è ciò che morde alla scala.
Se sei curioso di quanto possa diventare brutto, spingi il conteggio del loop a 500 e cronometra l’explain stesso. Su un portatile la versione concatenata ci mette secondi; la versione piatta resta sotto il secondo. Non hai nemmeno processato dei dati: è puro overhead di plan-time. In un job che gira centinaia di volte al giorno, quell’overhead si somma in soldi veri in minuti di cluster.
Una nota su withColumns (plurale)
Le versioni recenti di Spark hanno aggiunto withColumns (plurale), che prende un dict {name: expr} e le aggiunge tutte in una volta:
df.withColumns({
"amount_with_vat": col("amount") * 1.22,
"country_upper": upper(col("country")),
})
Questo è essenzialmente un wrapper di comodità attorno al pattern del singolo select di sopra. Produce un nodo Project, non molti. Se la tua versione di Spark lo supporta (3.3+), è un’alternativa più leggibile al costruire una lista per select a mano per il caso “aggiungi molte colonne”. La trappola della concatenazione non si applica perché non c’è catena: è una singola chiamata.
Prossima lezione: aggregazioni e groupBy, contare, sommare, fare media, e le sorprese che vengono con agg rispetto ai metodi shorthand.
Riferimento: Apache Spark Python API (https://spark.apache.org/docs/latest/api/python/), recuperato il 2026-05-01.