În SQL pe o singură mașină, ORDER BY e ieftin și nu te gândești la el. Dai click pe „sort by date desc” pe un tabel de un milion de rânduri și SQL Server își face treaba sub o secundă. În Spark, sortarea unui DataFrame pe un cluster e genul de operație care transformă un job de 30 de secunde într-unul de 15 minute dacă nu ești atent.
Lecția asta e despre de ce un sort global e scump într-un engine distribuit, despre portița sortWithinPartitions pentru când chiar nu ai nevoie de o ordine globală și despre singurul truc al optimizer-ului care face ca orderBy(...).limit(N) să fie perfect ok, deși orderBy(...).collect() e brutal.
orderBy și sort sunt aceeași funcție
Întâi partea ușoară: orderBy și sort sunt aliasuri. Alege unul și rămâi la el. Restul lecției folosește orderBy fiindcă se potrivește cu cuvântul cheie SQL.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, desc, asc
spark = (SparkSession.builder
.appName("SortingAtScale")
.master("local[*]")
.config("spark.sql.shuffle.partitions", "8")
.getOrCreate())
orders = spark.createDataFrame(
[
(1001, 1, 59.00, "NL"), (1002, 1, 29.00, "NL"),
(1003, 2, 149.00, "IT"), (1004, 2, 89.50, "IT"),
(1005, 3, 199.00, "DE"), (1006, 4, 42.42, "RO"),
(1007, 1, 12.00, "NL"), (1008, 2, 75.00, "IT"),
],
"OrderId INT, CustomerId INT, Total DOUBLE, Country STRING",
)
# Ascending by default
orders.orderBy("Total").show()
# Descending — three equivalent forms
orders.orderBy(col("Total").desc()).show()
orders.orderBy(desc("Total")).show()
orders.sort(F.col("Total").desc()).show() # alias proof
Sortările multi-cheie funcționează cum te aștepți: prioritate de la stânga la dreapta:
# By country, then biggest order first within country
orders.orderBy("Country", col("Total").desc()).show()
# +-------+----------+------+-------+
# |OrderId|CustomerId| Total|Country|
# +-------+----------+------+-------+
# | 1005| 3|199.00| DE|
# | 1003| 2|149.00| IT|
# | 1004| 2| 89.50| IT|
# | 1008| 2| 75.00| IT|
# | 1001| 1| 59.00| NL|
# | 1002| 1| 29.00| NL|
# | 1007| 1| 12.00| NL|
# | 1006| 4| 42.42| RO|
# +-------+----------+------+-------+
Pentru NULL-uri, Spark are default-ul NULLs-first la ascending și NULLs-last la descending. Suprascrie explicit cu col("x").asc_nulls_last() sau .desc_nulls_first() când îți pasă.
Ce înseamnă cu adevărat „sort global” într-un cluster
Iată partea care e invizibilă până faci .explain() pe un query: într-un engine pe o singură mașină, sortarea unui array înseamnă „compară elemente, schimbă, repetă”. În Spark, datele tale nu sunt un singur array, sunt împrăștiate pe N partiții pe M executori, iar un sort global înseamnă partiția 0 conține toate cele mai mici elemente, partiția 1 următoarele cele mai mici și așa mai departe, cu fiecare partiție sortată intern.
E o garanție puternică. Mai puternică decât doar „sortează în fiecare partiție”. Și ca s-o atingi, sunt necesare două faze:
- Range partitioning. Spark nu poate doar să facă hash-partition (asta amestecă cheile mari și mici). Trebuie să-și dea seama de granițe: care e cutoff-ul între partiția 0 și partiția 1? O face prin sampling pe date, construind o histogramă aproximativă și calculând tăieturile de range. Pasul de sample e el însuși muncă. Apoi fiecare rând e mutat prin shuffle peste rețea către partiția lui țintă.
- Local sort. Odată ce fiecare partiție ține range-ul corect de valori, Spark sortează în interiorul fiecărei partiții. Partea asta e rapidă: doar sortează un chunk din memorie.
Partea scumpă e pasul 1: un shuffle bazat pe sample. Fiecare rând traversează rețeaua. Uite:
orders.orderBy("Total").explain()
# == Physical Plan ==
# AdaptiveSparkPlan isFinalPlan=false
# +- Sort [Total#3 ASC NULLS FIRST], true, 0
# +- Exchange rangepartitioning(Total#3 ASC NULLS FIRST, 8), ...
# +- Scan ExistingRDD ...
Exchange rangepartitioning e shuffle-ul. Aceea e linia scumpă. De fiecare dată când vezi orderBy într-un query Spark, linia aia va fi în plan, iar acela e costul pe care-l plătești.
orderBy(...).limit(N) e ok
Iată salvarea. Optimizer-ul Catalyst recunoaște pattern-ul orderBy(...).limit(N) și îl rescrie într-o operație top-K:
orders.orderBy(col("Total").desc()).limit(3).explain()
# == Physical Plan ==
# AdaptiveSparkPlan isFinalPlan=false
# +- TakeOrderedAndProject(limit=3, orderBy=[Total#3 DESC NULLS LAST], ...)
# +- Scan ExistingRDD ...
TakeOrderedAndProject e dramatic mai ieftin decât un sort complet. Fiecare partiție își păstrează doar top 3 local, apoi le trimite (3 x N partiții) către driver, care face un merge final. Fără range partitioning, fără shuffle global. Liniar în date, sub o secundă pe tabele de mai mulți GB.
Așa că orderBy(...).limit(100) e ok. Pattern-ul „dă-mi cele mai mari 100 de comenzi” e unul dintre acele lucruri care arată înfricoșător dar nu sunt.
Ce e brutal: orderBy(...).collect() sau, mai rău, orderBy(...).write.parquet(...). Nu există limit, deci optimizer-ul nu poate aplica trucul. Rulează shuffle-ul complet de range-partition. Pe un dataset mic nu vei observa. Pe 500GB vei observa o oră întreagă. Dacă nu ai nevoie de un output sortat global, nu cere unul.
sortWithinPartitions: portița
Uneori chiar n-ai nevoie de o ordine globală: ai nevoie doar ca conținutul fiecărei partiții să fie sortat. Cazul clasic e scrierea de output partiționat unde fiecare fișier de output ar trebui să fie sortat intern (bun pentru engine-uri de query care citesc fișierul ulterior, bun pentru compresie, bun pentru delta encoding):
# Write Parquet partitioned by country, with each file sorted by date inside
(orders
.sortWithinPartitions("OrderDate")
.write
.partitionBy("Country")
.parquet("./out/orders"))
sortWithinPartitions nu face shuffle. Sortează fiecare partiție existentă pe loc. Rezultatul: fiecare partiție e ordonată intern, dar partiția 0 ar putea conține valori mai mari decât cele din partiția 1. Nu există ordine globală. Pentru majoritatea output-urilor de ETL e exact ce vrei și de 10x mai ieftin decât orderBy.
Compară plan-urile:
orders.orderBy("Total").explain()
# Includes: Exchange rangepartitioning(...) ← network shuffle
orders.sortWithinPartitions("Total").explain()
# == Physical Plan ==
# *(1) Sort [Total#3 ASC NULLS FIRST], false, 0 ← false = local sort, no shuffle
# +- Scan ExistingRDD ...
false de după cheia de sort în Sort [..., false, 0] e indicatorul: e un sort parțial (per-partiție), nu unul global. Nicio linie Exchange deasupra. Niciun cost de rețea.
Când să folosești care:
- Trebuie să afișezi „top 10 după X” →
orderBy(col("x").desc()).limit(10). Optimizer-ul se ocupă. - Scrii fișiere unde consumatorii citesc în ordine →
sortWithinPartitions(...). Fără shuffle. - Scrii fișiere unde întregul output trebuie să fie sortat global (rar) →
orderBy(...). Mănâncă costul. - Sortare înainte de window function sau join → de obicei inutilă, operatorul își face propriul sort.
- „Vreau doar să fie sortat fiindcă arată drăguț la print” →
.show()n-are nevoie de sortare; dacă afișezi un sample mic, doar.show().
O greșeală frecventă: sortare înainte de agregare
# Pointless and expensive
(orders
.orderBy("Country")
.groupBy("Country")
.agg(F.sum("Total").alias("revenue"))
.show())
orderBy nu face nimic pentru rezultat: groupBy va face shuffle și va rearanja totul oricum. Ai adăugat un shuffle complet de range-partition fără motiv. Dacă vrei output-ul sortat, sortează după agregare:
(orders
.groupBy("Country")
.agg(F.sum("Total").alias("revenue"))
.orderBy(col("revenue").desc())
.show())
Optimizer-ul prinde uneori asta și scoate sort-ul redundant, dar nu te baza pe el. Fii intenționat unde în pipeline merge sort-ul.
Sortare după o expresie derivată
Nu trebuie să sortezi după o coloană literală: orice expresie merge:
# Sort by length of country code, then alphabetically
orders.orderBy(F.length("Country").desc(), "Country").show()
# Sort by computed revenue, descending
(orders
.groupBy("Country")
.agg(F.sum("Total").alias("revenue"))
.orderBy(F.col("revenue").desc())
.show())
# Sort by month-of-year extracted from a date
orders.orderBy(F.month(F.to_date("OrderDate"))).show()
Expresia e calculată înainte de sort. Spark n-o materializează ca o coloană permanentă decât dacă o pui întâi cu withColumn: e folosită doar pentru comparație. Util când cheia de sort e „coloana asta transformată cumva” și nu vrei ca transformarea să se scurgă în schema ta de output.
O capcană obișnuită: sortarea după o expresie care folosește o funcție non-deterministă (F.rand(), F.current_timestamp()) îți dă ordini diferite la rulări diferite. De obicei nu asta vrei. Dacă chiar vrei ordine randomizată, să zicem pentru sampling, fixează seed-ul: F.rand(seed=42).
Stabilitatea sortării
Un sort stabil păstrează ordinea relativă a rândurilor cu chei egale. sorted din Python e stabil; orderBy din Spark nu e garantat. Două rânduri cu aceeași valoare Country pot ieși în orice ordine, iar rerularea aceluiași query poate produce ordini diferite.
Asta contează când paginezi. „Pagina 1 arată rândurile A B C D E. Click next. Pagina 2 arată D E F G H.” D și E au apărut de două ori fiindcă sort-ul a fost la egalitate pe Country, iar engine-ul a ales o ordine internă diferită la al doilea apel. Soluția e aceeași ca în SQL: include mereu un tiebreaker, ideal cheia primară.
# Brittle: ties on Country leave order undefined
orders.orderBy("Country").show()
# Reproducible: ties broken by OrderId
orders.orderBy("Country", "OrderId").show()
Fă-ți asta reflex. Fiecare orderBy din cod de producție ar trebui să se termine cu o coloană unică per rând.
repartitionByRange, vărul invizibil al sortării
Dacă apelezi df.repartitionByRange(8, "Total"), Spark face pasul de range-partitioning dintr-un sort global fără pasul de sort per-partiție. Rezultatul: fiecare partiție ține un interval contiguu de valori Total, dar rândurile dinăuntrul unei partiții nu sunt ordonate. Combină-l cu sortWithinPartitions și ai reconstruit manual ce face orderBy:
# Equivalent to orderBy("Total"), in two explicit steps
manual_sort = (orders
.repartitionByRange(8, "Total")
.sortWithinPartitions("Total"))
Când te-ai obosi? Aproape niciodată. Optimizer-ul deja se ocupă corect de orderBy. Motivul pentru care merită să știi de repartitionByRange e când scrii Parquet range-partiționat sau tabele bucketed și vrei control explicit asupra modului cum sunt așezate datele în fișiere. Pentru sortare zilnică, rămâi la orderBy.
asc_nulls_last, desc_nulls_first și prietenii
NULL-urile au nevoie de o poziție. Default-urile Spark: NULL-urile sortează primele la ascending, ultimele la descending. Aia e interpretarea standard SQL și de cele mai multe ori asta vrei. Când nu:
df.orderBy(col("MaybeNull").asc_nulls_last())
df.orderBy(col("MaybeNull").desc_nulls_first())
Cele patru combinații acoperă fiecare caz rezonabil. Fii explicit când contează corectitudinea; default-urile sunt ok pentru muncă exploratorie.
Rulează asta pe propria mașină
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, desc
spark = (SparkSession.builder
.appName("SortingAtScale")
.master("local[*]")
.getOrCreate())
orders = spark.createDataFrame(
[
(1001, 1, 59.00, "NL"), (1002, 1, 29.00, "NL"),
(1003, 2, 149.00, "IT"), (1004, 2, 89.50, "IT"),
(1005, 3, 199.00, "DE"), (1006, 4, 42.42, "RO"),
(1007, 1, 12.00, "NL"), (1008, 2, 75.00, "IT"),
],
"OrderId INT, CustomerId INT, Total DOUBLE, Country STRING",
)
# 1. Plain orderBy — note the Exchange rangepartitioning in the plan
orders.orderBy(col("Total").desc()).explain()
orders.orderBy(col("Total").desc()).show()
# 2. orderBy + limit — top-K, no global shuffle
orders.orderBy(col("Total").desc()).limit(3).explain()
orders.orderBy(col("Total").desc()).limit(3).show()
# 3. sortWithinPartitions — no Exchange in the plan
orders.sortWithinPartitions("Total").explain()
# 4. Multi-key with tiebreaker
orders.orderBy("Country", col("Total").desc(), "OrderId").show()
# 5. The pointless-pre-aggregation anti-pattern
(orders
.groupBy("Country")
.agg(F.sum("Total").alias("revenue"))
.orderBy(col("revenue").desc())
.show())
Rulează fiecare. Obiceiul crucial: tastează .explain() înainte de .show() pe orice implică sortare. Caută cuvântul Exchange. Dacă e acolo, plătești pentru un shuffle. Decide dacă chiar ai nevoie.
Lecția următoare: operatorii uzuali de cleanup, redenumire, drop, cast pe coloane. Jumătate din orice ETL real e remodelarea unui nume de coloană sau repararea unui string-care-ar-trebui-să-fie-int. O facem ca lumea. Apoi în lecția 25 deschidem capota lui shuffle însuși și explicăm exact ce zboară prin rețea.