Jumătate din fiecare job Spark real se termină cu aceeași formă de întrebare: „pentru fiecare X, care e totalul / media / numărul de Y?”. Venitul pe țară. Comenzile pe client. Încercările de login pe oră. Click-through pe campanie. Răspunsul implică mereu groupBy și agg, iar odată ce știi pattern-ul vei scrie cincizeci dintr-astea pe săptămână fără să te gândești.
Lecția asta e catalogul. Vom acoperi funcțiile de agregare uzuale, idiomul single-pass care scanează datele tale o dată în loc de N ori și un detaliu care separă agregarea de tot ce am făcut până acum: e o transformare wide. Spark trebuie să facă shuffle. Punem aici fundamentul lui „de ce” și vom desfășura costul ca lumea în lecția 21.
Setup
Un DataFrame mic cu care ne putem juca:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = (SparkSession.builder
.appName("Aggregations101")
.master("local[*]")
.config("spark.sql.shuffle.partitions", "8")
.getOrCreate())
orders = spark.createDataFrame(
[
(1001, 1, 59.00, "NL", "2026-03-05"),
(1002, 1, 29.00, "NL", "2026-03-18"),
(1003, 2, 149.00, "IT", "2026-02-15"),
(1004, 2, 89.50, "IT", "2026-03-22"),
(1005, 3, 199.00, "DE", "2026-03-10"),
(1006, 4, 42.42, "RO", "2026-03-28"),
(1007, 1, 12.00, "NL", "2026-03-30"),
(1008, 2, 75.00, "IT", "2026-03-31"),
],
"OrderId INT, CustomerId INT, Total DOUBLE, Country STRING, OrderDate STRING",
)
Opt rânduri sunt suficiente ca să demonstrăm tot; pattern-urile se scalează identic la opt miliarde.
groupBy întoarce un GroupedData, nu un DataFrame
Asta e capcana care îi prinde pe toți prima dată:
g = orders.groupBy("Country")
print(type(g))
# <class 'pyspark.sql.group.GroupedData'>
GroupedData e un obiect intermediar. E etapa „am decis cum grupez, dar nu ți-am zis încă ce să calculezi”. Nu poți chema .show() pe el, nu poți .write pe el, nu poți face nimic în formă de DataFrame. Trebuie să-l hrănești într-una din astea:
.count()— prescurtare, întoarce un DataFrame cu o singură coloană numităcount..sum("col"),.avg("col"),.min("col"),.max("col"),.mean("col")— scurtături pentru o singură agregare..agg(...)— toată puterea, mai multe agregări deodată cu redenumiri și expresii.
Scurtăturile sunt convenabile pentru căutări izolate:
orders.groupBy("Country").count().show()
# +-------+-----+
# |Country|count|
# +-------+-----+
# | NL| 3|
# | IT| 3|
# | DE| 1|
# | RO| 1|
# +-------+-----+
orders.groupBy("Country").sum("Total").show()
# +-------+----------+
# |Country|sum(Total)|
# +-------+----------+
# | NL| 100.0|
# | IT| 313.5|
# | DE| 199.0|
# | RO| 42.42|
# +-------+----------+
Pentru orice depășește un singur număr, lasă scurtăturile și folosește .agg(...).
Pattern-ul single-pass cu agg
Iată ce nu-ți spune nimeni: fiecare agregare separată e o scanare separată. Dacă scrii trei query-uri ca să calculezi trei KPI-uri, Spark citește datele de trei ori. Calculatoarele iubesc citirile secvențiale, dar nu le iubesc chiar atât.
Bagă tot într-un singur apel agg(...):
orders.groupBy("Country").agg(
F.count("*").alias("orders"),
F.sum("Total").alias("revenue"),
F.avg("Total").alias("aov"), # average order value
F.min("Total").alias("smallest_order"),
F.max("Total").alias("biggest_order"),
F.countDistinct("CustomerId").alias("unique_customers"),
).show()
# +-------+------+-------+-----+--------------+-------------+----------------+
# |Country|orders|revenue| aov|smallest_order|biggest_order|unique_customers|
# +-------+------+-------+-----+--------------+-------------+----------------+
# | NL| 3| 100.0|33.33| 12.0| 59.0| 1|
# | IT| 3| 313.5|104.5| 75.0| 149.0| 1|
# | DE| 1| 199.0|199.0| 199.0| 199.0| 1|
# | RO| 1| 42.42|42.42| 42.42| 42.42| 1|
# +-------+------+-------+-----+--------------+-------------+----------------+
O scanare, șase agregări, fiecare metrică pe care un om de business ar întreba-o. Asta e forma pe care vrei s-o scrii din reflex.
Apelurile .alias(...) nu sunt opționale ca spirit: fără ele primești nume de coloane precum sum(Total) și avg(Total), dureroase la referit downstream și urâte în dashboard-uri. Pune mereu alias.
Catalogul
Funcțiile de agregare pe care chiar le vei folosi, sortate după cât de des le tastez personal:
F.count("*") # row count, never null-skipping
F.count("col") # row count, NULLs excluded (lesson 13)
F.sum("col") # total
F.avg("col") / F.mean("col") # arithmetic mean (aliases for each other)
F.min("col") / F.max("col") # smallest / largest
F.countDistinct("col") # unique value count — exact, expensive
F.approx_count_distinct("col") # HyperLogLog estimate, much cheaper
F.stddev("col") / F.variance("col") # sample standard deviation / variance
F.stddev_pop("col") # population variant if that's what you need
F.collect_list("col") # gather all values (with duplicates) into an array
F.collect_set("col") # gather unique values into an array
F.first("col") / F.last("col") # first/last value in each group (order-dependent — be careful)
Două dintre ele merită o privire mai atentă.
approx_count_distinct e cea care sună înfricoșător și e de fapt un cadou. Numărarea exactă a valorilor distincte pe un miliard de rânduri înseamnă că Spark trebuie să țină evidența fiecărei valori văzute: scump la memorie și lent. Versiunea aproximativă folosește HyperLogLog și îți dă ~2% eroare relativă pentru ordine de mărime mai puțină muncă. Pentru dashboard-uri și KPI-uri de „active users” e aproape mereu ok:
orders.groupBy("Country").agg(
F.countDistinct("CustomerId").alias("exact_unique"),
F.approx_count_distinct("CustomerId").alias("approx_unique"),
).show()
collect_list și collect_set îți permit să strângi un grup într-un array, util când vrei un singur rând pe grup cu o listă a tuturor valorilor asociate:
orders.groupBy("CustomerId").agg(
F.count("*").alias("orders"),
F.collect_list("OrderId").alias("order_ids"),
F.collect_set("Country").alias("countries_ordered_from"),
).show(truncate=False)
# +----------+------+----------------+----------------------+
# |CustomerId|orders|order_ids |countries_ordered_from|
# +----------+------+----------------+----------------------+
# |1 |3 |[1001, 1002, 1007]|[NL] |
# |2 |3 |[1003, 1004, 1008]|[IT] |
# |3 |1 |[1005] |[DE] |
# |4 |1 |[1006] |[RO] |
# +----------+------+----------------+----------------------+
Atenție la collect_list pe grupuri mari: dacă un singur grup are milioane de rânduri, construiești un array cu un milion de elemente pe un singur executor. Aia e o bombă de memorie. Pentru analytics, preferă agregări care produc scalari.
De ce agregarea e o transformare wide
Până acum, fiecare transformare scrisă a fost narrow: fiecare partiție de output depinde de exact o partiție de input. select, filter, withColumn: toate narrow. Spark doar trece rândurile prin orice executori le țin deja.
groupBy rupe asta. Ca să calculezi suma pentru Country = 'NL', fiecare rând NL trebuie să ajungă pe același executor. Nu pornesc acolo: sunt împrăștiate prin partiții, oriunde s-au întâmplat datele să ajungă la citire. Spark trebuie să facă shuffle: hashuiește fiecare rând după Country, îl trimite prin rețea către executorul corespunzător, apoi agreghează.
Pasul ăsta de rețea e cel mai scump lucru pe care îl face Spark. De aia există regula de bun simț „filtrează înainte să agregi”: fiecare rând pe care îl scoți cu where e un rând care nu trebuie să călătorească prin rețea. De aia contează partiționarea bună a datelor la scriere. De aia un join de 100GB poate dura zece minute, în timp ce un filter de 100GB ia treizeci de secunde.
Vom dedica o lecție întreagă lui shuffle, lecția 25, cu plan-uri .explain() și UI-ul Spark. Pentru moment, înregistrează doar atât: groupBy e momentul în care job-ul tău părăsește teritoriul „streamăm asta direct” și intră în teritoriul „redistribuim universul”. Fii intenționat cu asta.
NULL-uri și count — capcana de memorat
Un punct subtil care prinde pe toată lumea cel puțin o dată. count("*") și count("col") nu sunt la fel:
count("*")numără rânduri. Mereu.count("col")numără rândurile undecolnu e NULL.
from pyspark.sql import Row
with_nulls = spark.createDataFrame([
Row(country="NL", customer=1),
Row(country="NL", customer=None),
Row(country="IT", customer=2),
Row(country="IT", customer=None),
])
with_nulls.groupBy("country").agg(
F.count("*").alias("rows"),
F.count("customer").alias("non_null_customers"),
).show()
# +-------+----+------------------+
# |country|rows|non_null_customers|
# +-------+----+------------------+
# | NL| 2| 1|
# | IT| 2| 1|
# +-------+----+------------------+
Ăsta e exact comportamentul SQL și e genuin util: count("col") joacă și rolul de verificare „câte rânduri au avut chiar o valoare aici?”. Doar ține minte diferența când calculezi procente: împărțind count("col") la count("*") îți dă o rată de fill, nu un row count.
Aceeași sărire peste NULL-uri se aplică la sum, avg, min, max: toate ignoră tăcut input-urile NULL. avg din [10, NULL, 30] e 20, nu 13.33. De obicei asta vrei. Când nu, înlocuiește NULL-urile cu zero (sau cu default-ul corect) folosind F.coalesce(col, F.lit(0)) înainte să agregi.
Subtotaluri în stil SQL: rollup și cube
Un ultim truc. Uneori vrei totaluri grupate și un grand total în același rezultat. SQL are WITH ROLLUP și WITH CUBE; PySpark le expune ca .rollup(...) și .cube(...).
orders.rollup("Country").agg(
F.sum("Total").alias("revenue"),
F.count("*").alias("orders"),
).orderBy("Country").show()
# +-------+-------+------+
# |Country|revenue|orders|
# +-------+-------+------+
# | NULL| 654.92| 8| ← grand total
# | DE| 199.00| 1|
# | IT| 313.50| 3|
# | NL| 100.00| 3|
# | RO| 42.42| 1|
# +-------+-------+------+
rollup("Country", "CustomerId") ți-ar da totaluri per țară per client, plus subtotaluri per țară, plus un grand total. cube îți dă fiecare combinație de subtotaluri (fiecare coloană x fiecare alta). Util pentru rapoarte tip pivot, unde finance vrea „pe țară, pe an, pe ambele și linia de jos” totul într-un singur query.
F.grouping("col") întoarce 1 dacă NULL-ul unui rând a venit din rollup/cube și 0 dacă e un NULL real: la îndemână când vrei să etichetezi rândul cu totaluri în loc să lași un blank.
Filtrarea agregărilor: agg plus where după
Spark nu are clauza SQL HAVING ca operator separat: nici nu ai nevoie. După agg, ai un DataFrame normal. Filtrează-l cu where ca pe orice altceva:
# "Countries with revenue above 100 EUR and at least 2 orders"
(orders
.groupBy("Country")
.agg(
F.sum("Total").alias("revenue"),
F.count("*").alias("orders"),
)
.where((F.col("revenue") > 100) & (F.col("orders") >= 2))
.show())
Asta se citește natural, iar optimizer-ul e suficient de deștept încât să împingă filtrul unde poate. Singurul lucru de ținut minte: filtrele care referențiază coloane agregate trebuie să vină după agg. Filtrele pe coloanele brute aparțin înainte: sunt mai ieftine fiindcă fiecare rând eliminat e un rând care nu călătorește prin shuffle.
# Pre-aggregation filter: cheap, narrow transformation
(orders
.where(F.col("OrderDate") >= "2026-03-01")
.groupBy("Country")
.agg(F.sum("Total").alias("revenue"))
.where(F.col("revenue") > 100) # post-aggregation filter
.show())
Ordinea contează pentru performanță. Filtrează rândurile întâi, apoi grupează, apoi filtrează grupurile. Aceeași logică ca WHERE vs HAVING din SQL.
O notă despre window functions
Uneori nu vrei să colapsezi grupurile, vrei să adnotezi rândurile cu numere la nivel de grup. „Pentru fiecare comandă, ce fracțiune din venitul total al țării ei reprezintă?” Aia e o window function: F.sum("Total").over(Window.partitionBy("Country")). Același cost de shuffle ca un groupBy, formă diferită la output: fiecare rând de input rămâne, primește o coloană nouă.
Vom acoperi window-urile în detaliu în lecția 38. Pentru moment, când vezi groupBy, gândește „colapsează”; când vezi over, gândește „adnotează”.
Rulează asta pe propria mașină
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = (SparkSession.builder
.appName("Aggregations101")
.master("local[*]")
.getOrCreate())
orders = spark.createDataFrame(
[
(1001, 1, 59.00, "NL"), (1002, 1, 29.00, "NL"),
(1003, 2, 149.00, "IT"), (1004, 2, 89.50, "IT"),
(1005, 3, 199.00, "DE"), (1006, 4, 42.42, "RO"),
(1007, 1, 12.00, "NL"), (1008, 2, 75.00, "IT"),
],
"OrderId INT, CustomerId INT, Total DOUBLE, Country STRING",
)
# 1. Single-pass aggregation, the shape you want to memorize
orders.groupBy("Country").agg(
F.count("*").alias("orders"),
F.sum("Total").alias("revenue"),
F.avg("Total").alias("aov"),
F.min("Total").alias("smallest"),
F.max("Total").alias("biggest"),
F.countDistinct("CustomerId").alias("unique_customers"),
).orderBy(F.col("revenue").desc()).show()
# 2. Per-customer fold with collect_set
orders.groupBy("CustomerId").agg(
F.count("*").alias("orders"),
F.sum("Total").alias("ltv"),
F.collect_set("Country").alias("countries"),
).show(truncate=False)
# 3. Rollup with a grand total
orders.rollup("Country").agg(
F.sum("Total").alias("revenue"),
F.count("*").alias("orders"),
).orderBy(F.col("Country").asc_nulls_first()).show()
# 4. Approximate vs exact distinct
orders.agg(
F.countDistinct("CustomerId").alias("exact"),
F.approx_count_distinct("CustomerId").alias("approx"),
).show()
Rulează fiecare query. Observă cum query-ul 1 întoarce un rând per țară cu șase metrici, query-ul 2 întoarce un rând per client cu array-uri înăuntru, iar query-ul 3 are rândul în plus „pentru tot” sus de tot. Asta e toată gama expresivă a groupBy + agg.
Lecția următoare: sortarea la scară. orderBy, sort, costul unui sort global și trucul sortWithinPartitions care te salvează când chiar nu ai nevoie de o ordine globală.