select alege coloane, filter păstrează rânduri. A treia operație de zi cu zi e derivarea unei noi coloane din cele existente: o taxă pe deasupra unui preț, un flag bazat pe un prag, o versiune normalizată a unui string. Unealta principală a PySpark pentru asta e withColumn și, ca majoritatea metodelor PySpark, e simplă la suprafață și are o muchie ascuțită bine cunoscută dedesubt.
Acoperim întâi suprafața, withColumn, lit, logica condițională cu when().otherwise(), cast-urile de tip, și apoi muchia ascuțită: ce se întâmplă când apelezi withColumn de 50 de ori într-un loop, de ce e o capcană de performanță cunoscută și ce să folosești în loc.
withColumn: adaugă sau înlocuiește o singură coloană
withColumn(name, expr) returnează un nou DataFrame cu coloana numită adăugată sau înlocuită. Dacă numele există deja, coloana existentă e suprascrisă; dacă nu, e adăugată la sfârșit.
from pyspark.sql.functions import col
df2 = orders.withColumn("amount_with_vat", col("amount") * 1.22)
Asta-i toată API-ul. Un nume, o expresie Column, un nou DataFrame. Originalul orders e neschimbat: DataFrame-urile sunt imutabile, fiecare transformare returnează unul nou.
Expresia din partea dreaptă poate fi orice se evaluează la o Column: aritmetică, apeluri de funcții, when().otherwise(), chiar și valori literale:
from pyspark.sql.functions import col, upper, length
df3 = (
orders
.withColumn("amount_with_vat", col("amount") * 1.22)
.withColumn("country_upper", upper(col("country")))
.withColumn("name_length", length(col("customer_name")))
)
Ca să înlocuiești o coloană existentă, folosește același nume. Caz comun: corectarea tipului unei coloane deduse ca string.
df4 = orders.withColumn("amount", col("amount").cast("double"))
Asta nu adaugă o coloană numită amount lângă cea veche, ci suprascrie amount-ul existent cu versiunea cast-uită. Poziția în schema rămâne aceeași.
lit: transformarea valorilor Python în expresii Column
Când amesteci o valoare Python cu o Column într-o expresie, PySpark de obicei își dă seama. col("amount") * 1.22 merge fiindcă clasa Column din PySpark suprascrie * și știe cum să gestioneze literalul numeric 1.22.
Dar uneori această auto-promovare nu e suficientă, de obicei când apelezi o funcție care așteaptă explicit o Column sau când valoarea Python însăși e ambiguă (un None, un string de dată). Pentru acele cazuri, învelești literalul explicit cu lit:
from pyspark.sql.functions import col, lit, when
df5 = orders.withColumn(
"discount",
when(col("amount") > 100, lit(10.0)).otherwise(lit(0.0))
)
lit(10.0) produce o expresie Column care se evaluează la constanta 10.0 pentru fiecare rând. when().otherwise() are nevoie de argumente Column pe ambele părți; fără lit, ai trimite float-uri Python brute la o funcție care așteaptă Columns, iar în funcție de versiune, poate sau nu să auto-promoveze.
Regulă empirică: înăuntrul when().otherwise(), array(), struct() și a majorității apelurilor de funcții, învelește literalele în lit(). În afara acelora, când faci aritmetică simplă ca col("x") + 1, nu ai nevoie de lit. PySpark se descurcă.
lit(None) e modalitatea standard de a adăuga o coloană literală null:
df6 = orders.withColumn("not_yet_processed_at", lit(None).cast("timestamp"))
Observă cast-ul: lit(None) în sine e tipat ca null/void, ceea ce poate deruta operațiile din aval. Cast-uiește mereu lit(None) la tipul pe care îl vrei.
when().otherwise(): coloana condițională
Echivalentul PySpark al CASE WHEN-ului din SQL. Se citește de sus în jos: prima ramură care se potrivește câștigă, cade prin la .otherwise(...) dacă nimic nu se potrivește.
from pyspark.sql.functions import when, col, lit
df7 = orders.withColumn(
"size_bucket",
when(col("amount") < 50, lit("small"))
.when(col("amount") < 200, lit("medium"))
.otherwise(lit("large"))
)
Înlănțuiește câte apeluri .when(...) ai nevoie. Dacă sari peste .otherwise(...), rândurile nepotrivite primesc null pentru acea coloană, ceea ce uneori e ce vrei și deseori e un bug care își așteaptă rândul. Eu includ mereu .otherwise(...), chiar dacă e doar lit(None).cast("string"); explicitatea merită.
Poți folosi orice expresie Column ca predicat, nu doar egalități simple:
df8 = orders.withColumn(
"fraud_risk",
when(
(col("amount") > 1000) & col("email").isNull(),
lit("high")
)
.when(col("country").isin("XX", "YY"), lit("medium"))
.otherwise(lit("low"))
)
Operatorii booleeni urmează aceleași reguli ca în where: &, |, ~, cu paranteze în jurul fiecărei comparații. (Lecția 14 acoperă asta; dacă (col(...) > 100) & (col(...) == "IT") începe să-ți pară un tic, e ticul potrivit de dezvoltat.)
Cast-uri de tip inline
col("x").cast(type) returnează o Column care se evaluează la valoarea cast-uită. O vei combina constant cu withColumn când cureți date:
df9 = (
raw
.withColumn("amount", col("amount").cast("double"))
.withColumn("customer_id", col("customer_id").cast("int"))
.withColumn("ts", col("ts").cast("timestamp"))
)
Argumentul cast e fie un nume de tip DDL ca string ("double", "int", "timestamp", "string"), fie un obiect DataType. Ambele merg; forma string e mai scurtă și se citește mai bine.
Dacă o valoare nu poate fi cast-uită (un string non-numeric care merge la double, un timestamp neparsabil), rezultatul e null, tăcut, fără eroare. Dacă vrei să detectezi cast-uri stricate, numără null-urile înainte și după:
nulls_before = raw.filter(col("amount").isNull()).count()
casted = raw.withColumn("amount", col("amount").cast("double"))
nulls_after = casted.filter(col("amount").isNull()).count()
print(f"Cast lost {nulls_after - nulls_before} rows to nulls")
Capcana înlănțuirii
Iată muchia ascuțită. withColumn e convenabil, iar conveniența duce la cod ca ăsta:
# 50 columns from a config
new_df = df
for col_spec in feature_specs:
new_df = new_df.withColumn(col_spec.name, build_expr(col_spec))
Pare nevinovat. Fiecare apel withColumn returnează un nou DataFrame, ceea ce și fac, nu calculează nimic eager, doar adaugă încă un nod la planul logic. Nu se întâmplă nicio muncă fizică.
Cu excepția: fiecare apel withColumn adaugă un nod Project la plan. După 50 de apeluri, ai 50 de noduri Project imbricate. Catalyst (optimizatorul de interogări al Spark, îl vom acoperi în lecția 41) încearcă să le prăbușească și de cele mai multe ori reușește. Dar „de cele mai multe ori” ascunde cazurile în care nu reușește, și când nu reușește, primești:
- Timp de analiză a planului care crește non-liniar cu numărul de apeluri
withColumn. Pe pipeline-uri cu sute de coloane derivate, doar timpul de plan poate dura minute înainte ca vreo dată să fie atinsă. - Erori stack-overflow în planificator cu lanțuri foarte adânci (asta e documentată în istoricul JIRA al Spark; pragul s-a mutat între versiuni, dar pattern-ul e real).
- Reguli de optimizator care nu se declanșează fiindcă fac pattern matching pe forme pe care lanțul adânc nu le expune.
Documentația oficială Spark semnalează asta explicit și recomandă un select cu toate expresiile deodată când ai multe coloane de adăugat sau modificat.
Folosește select pentru multe coloane deodată
Soluția e să faci toată munca cu coloane într-un singur select:
from pyspark.sql.functions import col
# Build the list of expressions: keep existing columns, add new ones
projection = [
col("*"), # keep everything
(col("amount") * 1.22).alias("amount_with_vat"),
upper(col("country")).alias("country_upper"),
when(col("amount") > 100, lit(10.0)).otherwise(lit(0.0)).alias("discount"),
]
result = orders.select(*projection)
Sau, dacă vrei control complet asupra schemei de output (înlocuind coloane în loc să le adaugi doar):
result = orders.select(
"order_id",
"customer_id",
col("amount").cast("double").alias("amount"),
upper(col("country")).alias("country"),
(col("amount") * 1.22).alias("amount_with_vat"),
when(col("amount") > 100, lit(10.0)).otherwise(lit(0.0)).alias("discount"),
)
Un nod de plan. O proiecție. Catalyst primește o listă plată de expresii de optimizat, nu un lanț adânc de 50 pe care trebuie să-l aplatizeze întâi. Timpul de plan rămâne rapid, optimizatorul se comportă consistent.
Același pattern într-un loop, generând o listă:
exprs = [col("*")]
for spec in feature_specs:
exprs.append(build_expr(spec).alias(spec.name))
result = df.select(*exprs)
Un singur select cu len(feature_specs) + 1 expresii, indiferent câte spec-uri sunt.
Când înlănțuirea withColumn e ok
Nu-ți zic să nu folosești niciodată withColumn. Pentru o mână de coloane, să zicem până la zece, înlănțuirea e perfect lizibilă și nu există nicio problemă de performanță. Capcana e specific când:
- Adaugi multe coloane (prag aproximativ: peste 20 sau cam așa).
- Coloanele noi sunt adăugate într-un loop condus de configurare sau metadate.
- Timpul de analiză a planului apare ca o problemă (îl vei vedea în tabul „SQL” al Spark UI ca o întârziere lungă înainte ca vreo etapă să pornească).
Pentru transformări ocazionale, scrise de mână, withColumn e unealta potrivită. Pentru feature engineering programatic cu sute de coloane, construiește o listă de select.
Compararea planurilor cu explain
Poți vedea singur diferența:
# 20 chained withColumn calls
chained = df
for i in range(20):
chained = chained.withColumn(f"f{i}", col("amount") * (i + 1))
chained.explain(extended=True)
# The Analyzed and Optimized plans show 20 nested Project nodes,
# which Catalyst (usually) flattens to one in the Physical plan.
# Same thing as a single select
exprs = [col("*")] + [(col("amount") * (i + 1)).alias(f"f{i}") for i in range(20)]
flat = df.select(*exprs)
flat.explain(extended=True)
# One Project node from the start.
Pentru 20 de coloane aplatizarea reușește și planurile fizice sunt identice. Crește la 200 și vei începe să vezi diferențe în timpul de analiză, chiar dacă planul fizic final tot la fel arată. (Vom intra în citirea output-ului explain cum trebuie în lecția 41 despre optimizatorul Catalyst.)
Rulează asta pe propria mașină
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when, upper, length
spark = SparkSession.builder.appName("withcolumn-demo").getOrCreate()
data = [
(1, "anne", 59.00, "it", "anne@example.com"),
(2, "bob", 149.00, "de", "bob@example.com"),
(3, "claire", 12.00, "es", None),
(4, "diego", 999.00, "it", "diego@example.com"),
(5, "elena", 29.00, "fr", "elena@example.com"),
]
schema = "order_id INT, name STRING, amount DOUBLE, country STRING, email STRING"
orders = spark.createDataFrame(data, schema)
# Q1: simple withColumn — add a derived column
orders.withColumn("amount_with_vat", col("amount") * 1.22).show()
# Q2: replace a column (uppercase the country)
orders.withColumn("country", upper(col("country"))).show()
# Q3: when().otherwise() with lit
orders.withColumn(
"size",
when(col("amount") < 50, lit("small"))
.when(col("amount") < 200, lit("medium"))
.otherwise(lit("large"))
).show()
# Q4: lit(None) typed correctly
orders.withColumn("processed_at", lit(None).cast("timestamp")).printSchema()
# Q5: many columns the wrong way (chained)
chained = orders
for i in range(15):
chained = chained.withColumn(f"f{i}", col("amount") * (i + 1))
print("=== chained ===")
chained.explain(False) # look for the nested Projects in the analyzed plan
# Q6: many columns the right way (single select)
exprs = [col("*")] + [(col("amount") * (i + 1)).alias(f"f{i}") for i in range(15)]
flat = orders.select(*exprs)
print("=== flat ===")
flat.explain(False)
# The physical plans should match. The analyzed plans differ in depth.
Rulează ambele apeluri explain. Planul fizic din partea de jos e ce rulează efectiv; pentru numere moderate de coloane va fi același. Planul analizat de deasupra arată diferența structurală. Acea diferență e ce mușcă la scală.
Dacă ești curios cât de rău poate ajunge, împinge numărul de iterații la 500 și măsoară timpul lui explain în sine. Pe un laptop, versiunea înlănțuită durează secunde; versiunea plată rămâne sub-secundă. Nu ai procesat încă nicio dată; ăsta e overhead pur de timp de plan. Într-un job care rulează de sute de ori pe zi, acel overhead se adună la bani reali în minute de cluster.
O notă despre withColumns (plural)
Versiunile recente de Spark au adăugat withColumns (plural), care primește un dicționar {name: expr} și le adaugă pe toate deodată:
df.withColumns({
"amount_with_vat": col("amount") * 1.22,
"country_upper": upper(col("country")),
})
Asta e în esență un wrapper convenabil în jurul pattern-ului cu un singur select de mai sus. Produce un singur nod Project, nu mai multe. Dacă versiunea ta de Spark îl suportă (3.3+), e o alternativă mai plăcut de citit decât construirea manuală a unei liste select pentru cazul „adaugă multe coloane”. Capcana înlănțuirii nu se aplică, fiindcă nu există un lanț, e un singur apel.
Lecția următoare: agregări și groupBy, numărare, însumare, mediere și surprizele care vin cu agg față de metodele scurte.
Referință: Apache Spark Python API (https://spark.apache.org/docs/latest/api/python/), consultat 2026-05-01.