Aproape orice program PySpark e construit din două verbe: alege niște coloane, păstrează niște rânduri. Tot restul, joins, agregări, ferestre, UDF-uri, e decor pe deasupra. Așa că, înainte să ajungem la oricare dintre acele decoruri, ar trebui să fim foarte confortabili cu cele două verbe de bază.
În PySpark astea sunt select (alege coloane) și filter/where (păstrează rânduri). Nu sunt exotice; dacă ai scris SQL, modelul mental se mapează curat. Partea interesantă e cum te referi la o coloană, fiindcă PySpark îți dă patru moduri diferite de a o face și doar trei dintre ele supraviețuiesc contactului cu datele din lumea reală.
select: proiectarea coloanelor
select ia oricâte referințe la coloane și returnează un nou DataFrame doar cu acele coloane:
df.select("order_id", "amount").show(5)
Poți trimite string-uri, obiecte coloană sau expresii. Se amestecă liber:
from pyspark.sql.functions import col, upper
df.select(
"order_id",
upper(col("country")).alias("country_code"),
(col("amount") * 1.22).alias("amount_with_vat"),
).show(5)
Schema de output e exact coloanele cerute, în ordinea cerută. Fără surprize.
select e o transformare, nu o acțiune; nu rulează nimic până nu o declanșezi (show, collect, write). Tot ce construiește e un nod de plan leneș care zice „când mă va executa cineva în sfârșit, proiectează aceste coloane”.
where și filter: păstrarea rândurilor
where și filter sunt aliasuri. Aceeași metodă, două nume. Ambele iau o expresie de coloană booleană și păstrează doar rândurile în care expresia e adevărată:
df.filter(col("amount") > 100).show(5)
df.where(col("amount") > 100).show(5) # identical
Alege unul și ține-te de el. Eu înclin spre where fiindcă se citește ca SQL. Unele echipe preferă filter fiindcă se citește ca Python. Niciuna nu e greșită; consistența într-un singur codebase contează mai mult decât alegerea în sine.
Poți trimite și un fragment de string SQL dacă vrei, ceea ce poate fi util când dai copy-paste dintr-un editor SQL:
df.where("amount > 100 AND country = 'IT'").show(5)
Asta funcționează, trece prin același optimizator Catalyst și produce același plan ca forma cu expresie de coloană. Folosește-o când e clar mai lizibilă; altfel rămâi la expresiile de coloană, fiindcă sunt verificate la momentul parse-ului și au autocomplete în IDE.
Patru moduri de a referi o coloană
Aici PySpark devine interesant. Să zicem că DataFrame-ul tău are o coloană numită amount. Te poți referi la ea ca:
"amount" # string simplu
df.amount # acces ca atribut
df["amount"] # acces ca element
col("amount") # apel de functie (din pyspark.sql.functions)
Toate patru funcționează în select. Nu sunt echivalente peste tot. Iată cheat sheet-ul:
1. String simplu "amount": funcționează în select, groupBy, orderBy, drop, dropDuplicates. Nu funcționează acolo unde Spark așteaptă un obiect Column; de exemplu, nu poți scrie df.where("amount" > 100), fiindcă Python evaluează "amount" > 100 ca o comparație string-vs-int înainte ca Spark să vadă vreodată asta. E o eroare Python, nu Spark.
2. Acces ca atribut df.amount: merge de cele mai multe ori, eșuează în trei situații. (a) Numele coloanei are un spațiu sau punctuație: df.amount paid e o eroare de sintaxă, df["amount paid"] merge. (b) Numele coloanei umbrește o metodă a DataFrame-ului: df.count nu referă coloana count, ci metoda count(). Confuz și tăcut. (c) Numele coloanei are casing ciudat sau începe cu o cifră.
3. Acces ca element df["amount"]: merge mereu, pe orice nume de coloană, oricât de blestemat ar fi. Ăsta e cel la care recurg când nu sunt sigur. E puțin mai zgomotos decât accesul ca atribut, dar nu mă surprinde niciodată.
4. Funcția col(): merge mereu și, în mod unic, merge fără un DataFrame în scope. Asta contează în două contexte:
from pyspark.sql.functions import col, when, sum as _sum
# Inside agg(): there's no df.x to write, you must use col()
df.groupBy("country").agg(_sum(col("amount")).alias("total"))
# When building reusable expressions in a function
def positive_amount():
return col("amount") > 0
df.where(positive_amount())
Vei folosi col() constant, odată ce începi să scrii funcții care operează pe coloane în mod generic. Obișnuiește-te cu el.
Recomandarea, dacă vrei una: folosește col() în expresii, df["x"] când trebuie să fii fără echivoc despre care DataFrame e vorba și string-uri simple doar în argumente de tip listă-de-coloane precum select și groupBy. Evită df.x în cod partajat, fiindcă modurile lui de eșec sunt tăcute.
Operatorii booleeni: capcana de precedență
Expresiile de coloană din PySpark suprascriu operatorii bitwise din Python (&, |, ~) pentru logică booleană. Nu sunt la fel ca and, or, not. E cea mai comună greșeală a începătorilor în PySpark.
# WRONG — Python's `and` doesn't know what a Column is
df.where(col("amount") > 100 and col("country") == "IT")
# Raises: ValueError: Cannot convert column into bool
# RIGHT
df.where((col("amount") > 100) & (col("country") == "IT"))
Și ai absolut nevoie de paranteze în jurul fiecărei comparații. De ce? Fiindcă & și | au precedență mai mare decât >, ==, < etc. În gramatica Python, & se leagă mai strâns decât operatorii de comparație. Fără paranteze, Python parsează expresia în ordinea greșită și primești o eroare derutantă sau, mai rău, un filtru tăcut greșit.
# WRONG — parsed as col("amount") > (100 & col("country")) == "IT"
df.where(col("amount") > 100 & col("country") == "IT")
# RIGHT
df.where((col("amount") > 100) & (col("country") == "IT"))
Antrenează-ți degetele să tasteze parantezele automat. Fiecare comparație într-un filtru compus se învelește în paranteze. Fără excepții.
~ e negația:
df.where(~(col("country") == "NL"))
# same as
df.where(col("country") != "NL")
Ambele forme merg. Forma ~ se compune frumos când negi o expresie mai lungă, ca ~col("email").isNull().
selectExpr: când SQL e mai scurt
Există o metodă-soră, selectExpr, care primește fragmente de string SQL în locul expresiilor Column. E pur și simplu select cu un parser SQL în față:
df.selectExpr(
"order_id",
"amount * 1.22 AS amount_with_vat",
"CASE WHEN amount > 100 THEN 'big' ELSE 'small' END AS bucket",
)
Când o expresie e scurtă și clar de formă SQL, selectExpr e mai compact decât lanțul echivalent col(...).cast(...). Când expresia e construită programatic, forma cu expresie Column e mai bună fiindcă o poți compune. Alege per situație; ambele compilează la același plan.
Potrivirea de string-uri
Patru pattern-uri, în ordine de la cel mai ieftin la cel mai general:
df.where(col("email").startswith("anne")) # prefix
df.where(col("email").endswith("@gmail.com")) # suffix
df.where(col("email").contains("anne")) # substring
df.where(col("name").like("anne %")) # SQL LIKE — % and _ wildcards
df.where(col("name").rlike("^[Aa]nne")) # regex (Java syntax)
startswith și endswith sunt cele mai ieftine, fiindcă pot folosi uneori partition pruning sau skipping bazat pe sortare. contains e o căutare de substring. like urmează regulile de wildcard din SQL: % potrivește orice secvență, _ potrivește un caracter. rlike e regex complet, aroma Java a regex-urilor, care e 95% la fel cu re din Python, dar nu identică. Atenție la \\ vs \ la escape, când pui un regex într-un string Python.
Filtre pentru null
isNull() și isNotNull() sunt metode pe o coloană. Aceleași reguli de logică trivalentă din SQL se aplică: o comparație care implică null returnează null, nu true sau false, deci where(col("x") == None) nu face ce crezi.
df.where(col("email").isNull()).count() # rows with no email
df.where(col("email").isNotNull()).count() # rows with email
# WRONG — None comparison; never matches
df.where(col("email") == None)
Aceeași capcană, alt limbaj. SQL are = NULL care returnează null. PySpark are == None care returnează null. Aceeași soluție: folosește metodele dedicate pentru null.
Mai există și eqNullSafe (uneori scris ca operatorul <=> în Spark SQL), care tratează null ca o valoare comparabilă: null eqNullSafe null e true, null eqNullSafe 1 e false. Util pentru condiții de join în care vrei ca null-urile să se potrivească cu null-urile, rar în practică, dar singura dată când == nu face ce vrei și există o alternativă curată.
Înlănțuirea filtrelor
Poți înlănțui filtre cât vrei. Fiecare e propriul lui nod de plan, dar Catalyst le va prăbuși într-un singur filtru fizic la momentul execuției:
df.filter(col("country") == "IT") \
.filter(col("amount") > 100) \
.filter(col("ts") >= "2026-01-01") \
.show()
Forma înlănțuită și forma compusă ((country == "IT") & (amount > 100) & (ts >= "...")) produc planuri fizice identice. Alege-o pe cea care se citește mai bine. Eu tind să înlănțui când fiecare filtru e o „regulă” separată cu propriul înțeles („clienți italieni”, „comenzi cu valoare mare”, „acest trimestru”) și să folosesc forma compusă când e o singură condiție logică cu mai multe părți.
Punând totul cap la cap
Exemplu realist: dintr-un DataFrame orders, ia ID-ul comenzii, ID-ul clientului, suma și codul de țară, pentru comenzi din Italia, Spania sau Franța din primul trimestru, unde suma e peste 50 și emailul nu e null.
from pyspark.sql.functions import col
result = (
orders
.where(col("country").isin("IT", "ES", "FR"))
.where(col("amount") > 50)
.where(col("email").isNotNull())
.where((col("ts") >= "2026-01-01") & (col("ts") < "2026-04-01"))
.select(
col("order_id"),
col("customer_id"),
col("amount").alias("amount_eur"),
col("country"),
)
)
result.show(20)
isin e echivalentul IN-ului din SQL. Ia oricâte valori literale. Nu încerca să trimiți o listă Python cu unpacking * decât dacă chiar ai nevoie; isin("IT", "ES", "FR") e ok, isin(*countries_list) merge și el dacă countries_list e dinamic.
Observă că ordinea operațiilor nu contează pentru corectitudine: Catalyst va reordona filtrele în timpul planificării ca să le pună pe cele mai ieftine și mai selective primele, și să le împingă în jos spre sursa de date acolo unde e posibil (vom ajunge la predicate pushdown într-o lecție viitoare). Scrie-ți filtrele în ordinea cea mai clară de citit. Optimizatorului nu-i pasă.
Încă o notă practică: select și filter sunt amândouă transformări înguste, ceea ce înseamnă că nu fac shuffle de date între executori. Sunt ieftine. Le poți împrăștia liber prin pipeline-ul tău, iar optimizatorul le va prăbuși pe cele adiacente într-un singur pas fizic. Asta e opusul operațiilor ca groupBy și join, la care vom ajunge în următoarele câteva lecții; alea fac shuffle, iar locul în care le pui contează mult.
Rulează asta pe propria mașină
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("select-filter-demo").getOrCreate()
data = [
(1, 42, 59.00, "IT", "anne@example.com", "2026-01-03 10:32:00"),
(2, 42, 29.00, "IT", None, "2026-01-04 14:22:00"),
(3, 17, 149.00, "DE", "bob@example.com", "2026-02-05 09:15:00"),
(4, 17, 199.00, "ES", "bob@example.com", "2026-02-22 11:40:00"),
(5, 8, 42.42, "RO", None, "2026-03-28 08:12:00"),
(6, 99, 12.00, "IT", "claire@example.com", "2026-04-15 16:00:00"),
]
schema = "order_id INT, customer_id INT, amount DOUBLE, country STRING, email STRING, ts TIMESTAMP"
orders = spark.createDataFrame(data, schema)
# Q1: simple filter, simple select
orders.where(col("country") == "IT").select("order_id", "amount").show()
# Q2: compound filter — note the parens around each comparison
orders.where(
(col("amount") > 50) & (col("country").isin("IT", "ES", "FR"))
).show()
# Q3: null-safe filter
orders.where(col("email").isNotNull()).select("order_id", "email").show()
# Q4: the four column-reference styles, all equivalent here
orders.select("amount").show(2)
orders.select(orders.amount).show(2)
orders.select(orders["amount"]).show(2)
orders.select(col("amount")).show(2)
# Q5: a string-pattern filter
orders.where(col("email").rlike("^[ab]")).show()
# Q6: deliberate trap — try removing the parens around the comparisons
# orders.where(col("amount") > 50 & col("country") == "IT") # FAILS
orders.where((col("amount") > 50) & (col("country") == "IT")).show()
Rulează fiecare interogare. Încearcă să scoți parantezele din Q6. Citește eroarea. Acea eroare îți va salva o oră data viitoare când o vei vedea.
Lecția următoare: adăugarea de coloane cu withColumn, helper-ul lit și capcana de înlănțuire care transformă un pipeline de 50 de pași într-un plan de execuție încâlcit.
Referință: Apache Spark Python API (https://spark.apache.org/docs/latest/api/python/), consultat 2026-05-01.