Dacă groupBy e prima unealtă de putere pe care o învață fiecare analist, window functions sunt a doua, iar majoritatea oamenilor se opresc undeva pe la jumătatea învățării lor, la jumătatea drumului prin row_number(), niciodată suficient de încrezători cât să apeleze la ele când ar fi răspunsul cel mai curat. Lecția asta e programul de reabilitare pentru asta.
O window function calculează o valoare pentru fiecare rând, uitându-se la o fereastră de rânduri înrudite din jurul lui, fără să le colapseze. Acel ultim detaliu este toată ideea. groupBy("country").agg(F.sum("total")) îți dă un rând per țară. O window function cu aceeași partiție îți dă același total, lângă fiecare rând original. Rândurile de detaliu supraviețuiesc.
Odată ce internalizezi asta, trei pattern-uri ies la iveală: ranking (care rând este cel mai recent, top trei, mediana), comparare cu un vecin (azi vs ieri, această achiziție vs cea precedentă) și totaluri cumulative (venit cumulativ, medii mobile). Trei pattern-uri, un singur operator, se aplică în 80% din interogările analitice.
Window spec
O window function are nevoie de un window spec: ce este fereastra, pentru orice rând dat. Spec-ul are două bucăți:
from pyspark.sql import Window
from pyspark.sql import functions as F
w = Window.partitionBy("user_id").orderBy("event_time")
partitionBy spune: rândurile se împart în grupuri după user_id, iar fereastra nu trece niciodată peste grupuri. Gândește-te la el ca la un GROUP BY pentru fereastră: fiecare utilizator primește propria sa fereastră privată. orderBy spune: în interiorul unei partiții, rândurile sunt ordonate după event_time, deci concepte precum „rândul precedent” și „totalul cumulativ până aici” au un sens.
Apoi aplici o funcție cu .over(w):
df.withColumn("rn", F.row_number().over(w))
Primești o coloană nouă cu rezultatul funcției calculat per rând, în raport cu fereastra definită de w. DataFrame-ul are încă același număr de rânduri: nimic nu se colapsează.
Două lucruri de reținut:
partitionByeste coloana echivalentă cu shuffle-ul. Spark trebuie să aducă toate rândurile pentru un anumit utilizator pe același executor ca să poată calcula fereastra. Aceasta este o transformare wide. Alege o coloană de partiție care scalează: nu facepartitionBype ceva cu două valori distincte într-un tabel de un miliard de rânduri; una dintre acele două partiții devine o cheie fierbinte.- Fără
partitionBy, întregul dataset este o singură fereastră. Uneori asta e ce vrei (un total cumulativ global). Mai des e un bug. Rulează.explain()și vei vedea unWindowExecfără chei de partiție: a rulat pe un singur executor și o vei simți.
Catalogul window functions
Există o mică grădină zoologică de funcții concepute pentru a fi folosite peste o fereastră. Memorează această listă o singură dată, iar restul e compoziție.
Ranking și numerotare.
F.row_number()- 1, 2, 3, … per partiție, fără egalități. Două rânduri cu valori de sortare identice primesc tot numere secvențiale.F.rank()- 1, 2, 2, 4, … egalitățile primesc același rang, iar următorul rang sare. Ranking standard „olimpic”.F.dense_rank()- 1, 2, 2, 3, … egalitățile primesc același rang, fără sărituri.F.percent_rank()- rang relativ în [0, 1], util pentru interogări de tip percentilă.F.ntile(n)- împarte fiecare partiție înngrupuri de dimensiuni aproximativ egale.ntile(4)peste venit dă cuartile.
Vecini.
F.lag(col, offset)- valoarea luicolcuoffsetrânduri înainte în partiție.F.lead(col, offset)- valoarea luicolcuoffsetrânduri după.- Ambele acceptă o valoare implicită opțională pentru cazul când rândul de offset nu există.
Agregate. Fiecare agregat (F.sum, F.avg, F.count, F.max, F.min, F.collect_list, …) devine o window function când e apelat prin .over(...). Sum peste o fereastră este un total cumulativ; avg peste un frame de șapte rânduri este o medie mobilă; max peste o fereastră nemărginită este maximul curent.
First și last.
F.first(col)șiF.last(col)peste o fereastră - valoarea luicolla primul/ultimul rând al (frame-ului) ferestrei. Util pentru „session start time” sau „latest status”.
Frame specs
Fiecare window function are un frame implicit: care rânduri ale partiției alimentează funcția pentru rândul curent. Funcțiile de ranking au un frame fix. Agregatele și first/last respectă unul explicit:
running = Window.partitionBy("user_id").orderBy("event_time") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
moving = Window.partitionBy("user_id").orderBy("event_time") \
.rowsBetween(-3, 3) # fereastra centrata pe 7 randuri
trailing = Window.partitionBy("user_id").orderBy("event_time") \
.rowsBetween(-6, 0) # ultimele 7 randuri inclusiv acesta
Cele două capete sunt Window.unboundedPreceding, întregi (numere de rânduri), Window.currentRow și Window.unboundedFollowing. Există și rangeBetween dacă vrei limitele exprimate în termenii valorii coloanei de ordonare (de exemplu „ultimele 7 zile” în loc de „ultimele 7 rânduri”), cu costul unei manipulări puțin mai atente.
Frame-ul implicit:
- Pentru funcții de ranking: irelevant.
- Pentru agregate cu
orderBy:unboundedPrecedinglacurrentRow, adică un total cumulativ. Asta prinde oamenii pe picior greșit. Dacă faciF.sum("total").over(Window.partitionBy("user_id").orderBy("event_time"))primești o sumă cumulativă, nu totalul partiției. Ca să primești totalul partiției, omiteorderBysau setează frame-ul launboundedPreceding, unboundedFollowing.
# Total cumulativ per utilizator
running_w = Window.partitionBy("user_id").orderBy("event_time")
df.withColumn("running", F.sum("total").over(running_w))
# Total per utilizator, atasat fiecarui rand
total_w = Window.partitionBy("user_id")
df.withColumn("user_total", F.sum("total").over(total_w))
Acea diferență (orderBy sau fără orderBy) îi mușcă pe toți o dată. Merită memorată.
Pattern 1: cea mai recentă înregistrare per cheie
Probabil cel mai cerut pattern din analitică: „dă-mi cel mai recent rând per utilizator.” Self-join-urile sunt răspunsul greșit; window functions sunt cel corect.
events = spark.createDataFrame(
[
(1, "u1", "login", "2024-03-15 09:00"),
(2, "u1", "view", "2024-03-15 09:30"),
(3, "u1", "logout", "2024-03-15 10:00"),
(4, "u2", "login", "2024-03-15 11:00"),
(5, "u2", "view", "2024-03-15 11:15"),
],
"event_id INT, user_id STRING, action STRING, ts STRING",
)
w = Window.partitionBy("user_id").orderBy(F.col("ts").desc())
latest = (events
.withColumn("rn", F.row_number().over(w))
.filter(F.col("rn") == 1)
.drop("rn"))
latest.show()
# u1 -> logout @ 10:00
# u2 -> view @ 11:15
row_number() mai degrabă decât rank() fiindcă vrem exact un rând per utilizator chiar dacă două evenimente împart același timestamp. Alege un tiebreaker (event_id desc) dacă asta contează pentru datele tale:
w = Window.partitionBy("user_id").orderBy(F.col("ts").desc(), F.col("event_id").desc())
Același pattern funcționează pentru „top N per grup”: înlocuiește == 1 cu <= N.
Pattern 2: acest rând vs rândul precedent
lag și lead strălucesc atunci când trebuie să calculezi delte, pauze sau tranziții.
prices = spark.createDataFrame(
[
("AAPL", "2024-03-15", 170.0),
("AAPL", "2024-03-16", 172.5),
("AAPL", "2024-03-17", 168.0),
("MSFT", "2024-03-15", 410.0),
("MSFT", "2024-03-16", 415.0),
("MSFT", "2024-03-17", 420.0),
],
"ticker STRING, dt STRING, close DOUBLE",
)
w = Window.partitionBy("ticker").orderBy("dt")
with_change = prices.withColumn("prev_close", F.lag("close", 1).over(w)) \
.withColumn("daily_return",
(F.col("close") - F.col("prev_close")) / F.col("prev_close"))
with_change.show()
Primul rând per ticker are prev_close = null și daily_return = null, ceea ce e corect. Dacă ai nevoie de o valoare implicită, F.lag("close", 1, 0.0) o va completa.
Variații: lag cu offset = 7 pentru comparații săptămână cu săptămână, lead pentru etichete „ce se întâmplă în continuare” în analiza de evenimente, lag combinat cu o verificare de semn pentru a găsi schimbări de direcție.
Pattern 3: agregate cumulative și mobile
Venitul cumulativ este o singură window function:
sales = spark.createDataFrame(
[
("IT", "2024-03-15", 100.0),
("IT", "2024-03-16", 50.0),
("IT", "2024-03-17", 75.0),
("NL", "2024-03-15", 200.0),
("NL", "2024-03-16", 80.0),
],
"country STRING, dt STRING, revenue DOUBLE",
)
w = Window.partitionBy("country").orderBy("dt")
cumulative = sales.withColumn("running_revenue", F.sum("revenue").over(w))
cumulative.show()
# IT 03-15 100 -> 100
# IT 03-16 50 -> 150
# IT 03-17 75 -> 225
# NL 03-15 200 -> 200
# NL 03-16 80 -> 280
O medie mobilă pe 7 zile e la o singură schimbare de frame distanță:
w7 = Window.partitionBy("country").orderBy("dt").rowsBetween(-6, 0)
sales.withColumn("ma7", F.avg("revenue").over(w7)).show()
Pentru primele șase rânduri ale fiecărei partiții, fereastra e mai scurtă de șapte (nu poate citi rânduri care nu există), așa că media e peste mai puține rânduri. De obicei acesta e comportamentul corect; dacă vrei strict „doar când sunt 7 disponibile”, urmărește cu un filtru pe o fereastră de count.
Note despre performanță
O window function cu partitionBy necesită un shuffle: Spark trebuie să adune fiecare rând pentru o anumită cheie de partiție pe același executor înainte să poată calcula fereastra. Modelul mental e același ca la groupBy: coloana de partiție = coloana de shuffle, iar cheile de partiție skewed produc job-uri de window skewed. Trucul cu salting din lecția 29 se aplică dacă ai un utilizator cu 50 de milioane de evenimente.
O window function fără partitionBy e și mai rea: fiecare rând merge pe un singur executor. Folosește-o doar pe dataset-uri mici sau pentru calcule cu adevărat globale (și chiar și atunci, gândește-te dacă ai într-adevăr nevoie de viziunea globală).
Mai multe ferestre în același select sunt în regulă. Spark va reutiliza un singur shuffle când cheile de partiție se potrivesc între ferestre, chiar dacă ordinea sau frame-ul diferă:
w1 = Window.partitionBy("user_id").orderBy("ts")
w2 = Window.partitionBy("user_id").orderBy("ts").rowsBetween(-3, 3)
df.select(
"*",
F.row_number().over(w1).alias("rn"),
F.avg("amount").over(w2).alias("ma"),
)
Un singur shuffle, două ferestre. Adaugă o a treia cu partitionBy("country") și primești un al doilea shuffle: Catalyst nu poate reutiliza schimbul cu cheia user.
Dacă ai parcurs capitolul despre window functions din SQL Server în altă parte din aceste note, semantica SQL e identică; API-ul Window din PySpark e doar versiunea de builder a sintaxei clauzei OVER (...). Polenizarea încrucișată între cele două e bună pentru fixarea ambelor.
Încă o notă despre performanță: window functions și groupBy nu sunt substitute. Dacă ai nevoie doar de un agregat per grup și nu îți pasă de păstrarea rândurilor de detaliu, folosește groupBy: e mai ieftin. Optimizatorul poate aplica agregare parțială pe fiecare partiție de input înainte de shuffle, trimițând rezumate mici per partiție prin rețea în loc de seturi întregi de rânduri. O window function trebuie să transmită fiecare rând la executorul său de destinație fiindcă funcția ar putea fi row_number sau lag, unde agregarea parțială nu are sens. Catalyst joacă în siguranță și face shuffle la rândurile întregi. Deci: groupBy când poți, window când ai nevoie de output-ul per rând.
Rulează asta pe propriul tău calculator
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
spark = (SparkSession.builder
.appName("WindowsDemo")
.master("local[*]")
.getOrCreate())
events = spark.createDataFrame(
[
(1, "u1", "login", "2024-03-15 09:00", 0.0),
(2, "u1", "buy", "2024-03-15 09:30", 25.0),
(3, "u1", "buy", "2024-03-15 10:00", 40.0),
(4, "u1", "logout", "2024-03-15 10:30", 0.0),
(5, "u2", "login", "2024-03-15 11:00", 0.0),
(6, "u2", "buy", "2024-03-15 11:15", 60.0),
(7, "u2", "buy", "2024-03-15 12:00", 30.0),
],
"event_id INT, user_id STRING, action STRING, ts STRING, amount DOUBLE",
)
w = Window.partitionBy("user_id").orderBy("ts")
enriched = (events
.withColumn("rn", F.row_number().over(w))
.withColumn("rk", F.rank().over(w))
.withColumn("prev_amt", F.lag("amount", 1).over(w))
.withColumn("next_act", F.lead("action", 1).over(w))
.withColumn("running", F.sum("amount").over(w))
.withColumn("user_total",
F.sum("amount").over(Window.partitionBy("user_id"))))
enriched.show(truncate=False)
# Cel mai recent per utilizator
w_desc = Window.partitionBy("user_id").orderBy(F.col("ts").desc())
events.withColumn("rn", F.row_number().over(w_desc)) \
.filter(F.col("rn") == 1) \
.drop("rn") \
.show()
Citește output-ul coloană cu coloană. rn e dens, rk se potrivește cu el fiindcă nu sunt egalități. prev_amt e null pe primul rând per utilizator. running crește; user_total e același pentru fiecare rând dintr-o partiție. Acel singur output încadrează întregul model mental.
Lecția următoare schimbă de la operații la nivel de rând la transformări de formă: pivot pentru a face datele wide, unpivot pentru a le face long și trucul care a trăit în interiorul lui selectExpr("stack(...)") în anii dinainte ca Spark 3.4 să adauge melt.
Referințe: documentația window function din Apache Spark (https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-window.html) și API-ul pyspark.sql.Window. Consultat 2026-05-01.