PySpark, dalle fondamenta Lezione 17 / 60

Ordinamento alla scala: orderBy, sort e il costo del sort globale

Come funziona l'ordinamento in un motore distribuito, perché un sort globale è costoso, e la scappatoia sortWithinPartitions.

In SQL su singola macchina, ORDER BY è economico e non ci pensi. Clicca “ordina per data desc” su una tabella da un milione di righe e SQL Server fa la sua cosa in meno di un secondo. In Spark, ordinare un DataFrame attraverso un cluster è il tipo di operazione che trasforma un job da 30 secondi in uno da 15 minuti se non stai attento.

Questa lezione tratta del perché un sort globale è costoso in un motore distribuito, della scappatoia sortWithinPartitions per quando non ti serve davvero un ordine globale, e dell’unico trucco dell’optimizer che rende orderBy(...).limit(N) perfettamente accettabile anche se orderBy(...).collect() è brutale.

orderBy e sort sono la stessa funzione

Prima la parte facile: orderBy e sort sono alias. Scegline uno e tienitelo. Il resto della lezione usa orderBy perché coincide con la keyword SQL.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, desc, asc

spark = (SparkSession.builder
         .appName("SortingAtScale")
         .master("local[*]")
         .config("spark.sql.shuffle.partitions", "8")
         .getOrCreate())

orders = spark.createDataFrame(
    [
        (1001, 1, 59.00,  "NL"), (1002, 1, 29.00,  "NL"),
        (1003, 2, 149.00, "IT"), (1004, 2, 89.50,  "IT"),
        (1005, 3, 199.00, "DE"), (1006, 4, 42.42,  "RO"),
        (1007, 1, 12.00,  "NL"), (1008, 2, 75.00,  "IT"),
    ],
    "OrderId INT, CustomerId INT, Total DOUBLE, Country STRING",
)

# Ascending by default
orders.orderBy("Total").show()

# Descending -- three equivalent forms
orders.orderBy(col("Total").desc()).show()
orders.orderBy(desc("Total")).show()
orders.sort(F.col("Total").desc()).show()   # alias proof

I sort multi-chiave funzionano come ti aspetteresti, priorità da sinistra a destra:

# By country, then biggest order first within country
orders.orderBy("Country", col("Total").desc()).show()
# +-------+----------+------+-------+
# |OrderId|CustomerId| Total|Country|
# +-------+----------+------+-------+
# |   1005|         3|199.00|     DE|
# |   1003|         2|149.00|     IT|
# |   1004|         2| 89.50|     IT|
# |   1008|         2| 75.00|     IT|
# |   1001|         1| 59.00|     NL|
# |   1002|         1| 29.00|     NL|
# |   1007|         1| 12.00|     NL|
# |   1006|         4| 42.42|     RO|
# +-------+----------+------+-------+

Per i NULL, Spark di default li mette per primi sull’ascending e per ultimi sul descending. Forza esplicitamente con col("x").asc_nulls_last() o .desc_nulls_first() quando ti interessa.

Cosa significa davvero “global sort” in un cluster

Ecco la parte invisibile finché non fai .explain() di una query: in un motore single-machine, ordinare un array significa “confronta elementi, scambia, ripeti”. In Spark i tuoi dati non sono un singolo array: sono distribuiti su N partizioni su M executor, e un sort globale significa la partizione 0 contiene tutti gli elementi più piccoli, la partizione 1 i successivi più piccoli, e così via, con ogni partizione internamente ordinata.

È una garanzia forte. Più forte del semplice “ordinato dentro ogni partizione”. E ottenerla richiede due fasi:

  1. Range partitioning. Spark non può semplicemente fare hash-partition (quello mescola chiavi grandi e piccole). Deve capire i confini: qual è il taglio tra la partizione 0 e la partizione 1? Lo fa campionando i dati, costruendo un istogramma approssimativo e calcolando i tagli di range. Lo step di campionamento è lavoro in sé. Poi ogni riga viene shuffled attraverso la rete fino alla sua partizione di destinazione.
  2. Local sort. Una volta che ogni partizione contiene il range giusto di valori, Spark ordina dentro ogni partizione. Questa parte è veloce: è solo ordinare un chunk in memoria.

La parte costosa è lo step 1: uno shuffle basato su campione. Ogni riga attraversa la rete. Guarda:

orders.orderBy("Total").explain()
# == Physical Plan ==
# AdaptiveSparkPlan isFinalPlan=false
# +- Sort [Total#3 ASC NULLS FIRST], true, 0
#    +- Exchange rangepartitioning(Total#3 ASC NULLS FIRST, 8), ...
#       +- Scan ExistingRDD ...

Exchange rangepartitioning è lo shuffle. Quella è la riga costosa. Ogni volta che vedi orderBy in una query Spark, quella riga sarà nel piano, ed è il costo che stai pagando.

orderBy(...).limit(N) va bene

Ecco la salvezza. L’optimizer Catalyst riconosce il pattern orderBy(...).limit(N) e lo riscrive in un’operazione top-K:

orders.orderBy(col("Total").desc()).limit(3).explain()
# == Physical Plan ==
# AdaptiveSparkPlan isFinalPlan=false
# +- TakeOrderedAndProject(limit=3, orderBy=[Total#3 DESC NULLS LAST], ...)
#    +- Scan ExistingRDD ...

TakeOrderedAndProject è drasticamente più economico di un sort completo. Ogni partizione tiene solo la sua top 3 in locale, poi spedisce quelle (3 x N partizioni) al driver, che fa un merge finale. Niente range partitioning, niente shuffle globale. Lineare nei dati, sotto il secondo su tabelle multi-GB.

Quindi orderBy(...).limit(100) va bene. Il pattern “dammi i 100 ordini più grandi” è una di quelle cose che sembra spaventosa ma non lo è.

Quello che è brutale: orderBy(...).collect(), o peggio, orderBy(...).write.parquet(...). Non c’è limit, quindi l’optimizer non può applicare il trucco. Lo shuffle di range-partition completo gira. Su un dataset piccolo non te ne accorgi. Su 500GB te ne accorgi per un’ora. Se non ti serve un output ordinato globalmente, non chiederlo.

sortWithinPartitions: la scappatoia

A volte non ti serve davvero un ordine globale: ti serve solo che il contenuto di ogni partizione sia ordinato. Il caso classico è scrivere output partizionato dove ogni file di output dovrebbe essere ordinato internamente (utile per i query engine che leggono il file dopo, utile per la compressione, utile per il delta encoding):

# Write Parquet partitioned by country, with each file sorted by date inside
(orders
 .sortWithinPartitions("OrderDate")
 .write
 .partitionBy("Country")
 .parquet("./out/orders"))

sortWithinPartitions non fa alcuno shuffle. Ordina ogni partizione esistente sul posto. Il risultato: ogni partizione è internamente ordinata, ma la partizione 0 potrebbe contenere valori più grandi di quelli della partizione 1. Non c’è alcun ordinamento globale. Per la maggior parte degli output ETL è esattamente quello che vuoi e 10 volte più economico di orderBy.

Confronta i piani:

orders.orderBy("Total").explain()
# Includes: Exchange rangepartitioning(...)  <- network shuffle

orders.sortWithinPartitions("Total").explain()
# == Physical Plan ==
# *(1) Sort [Total#3 ASC NULLS FIRST], false, 0   <- false = local sort, no shuffle
#    +- Scan ExistingRDD ...

Il false dopo la chiave di sort in Sort [..., false, 0] è la spia: è un sort parziale (per partizione), non globale. Niente riga Exchange sopra. Nessun costo di rete.

Quando usare cosa:

  • Mostrare “i top 10 per X” -> orderBy(col("x").desc()).limit(10). L’optimizer se ne occupa.
  • Scrivere file dove i consumer leggono in ordine -> sortWithinPartitions(...). Niente shuffle.
  • Scrivere file dove l’intero output deve essere globalmente ordinato (raro) -> orderBy(...). Mangia il costo.
  • Ordinare prima di una window function o di una join -> di solito non necessario, l’operatore fa il suo sort.
  • “Lo voglio ordinato solo perché fa carino quando lo stampo” -> .show() non ha bisogno di ordinamento; se stai mostrando un piccolo sample, basta .show().

Un errore comune: ordinare prima di aggregare

# Pointless and expensive
(orders
 .orderBy("Country")
 .groupBy("Country")
 .agg(F.sum("Total").alias("revenue"))
 .show())

L’orderBy non fa niente per il risultato: groupBy farà shuffle e riarrangerà tutto comunque. Hai aggiunto uno shuffle di range-partition completo per niente. Se vuoi l’output ordinato, ordina dopo aver aggregato:

(orders
 .groupBy("Country")
 .agg(F.sum("Total").alias("revenue"))
 .orderBy(col("revenue").desc())
 .show())

L’optimizer a volte se ne accorge e rimuove il sort ridondante, ma non farci affidamento. Sii intenzionale su dove nella pipeline va il sort.

Ordinare per un’espressione derivata

Non devi ordinare per una colonna letterale: qualsiasi espressione funziona:

# Sort by length of country code, then alphabetically
orders.orderBy(F.length("Country").desc(), "Country").show()

# Sort by computed revenue, descending
(orders
 .groupBy("Country")
 .agg(F.sum("Total").alias("revenue"))
 .orderBy(F.col("revenue").desc())
 .show())

# Sort by month-of-year extracted from a date
orders.orderBy(F.month(F.to_date("OrderDate"))).show()

L’espressione viene calcolata prima del sort. Spark non la materializza come colonna permanente a meno che tu non faccia prima withColumn: viene usata solo per il confronto. Utile quando la chiave di sort è “questa colonna trasformata in qualche modo” e non vuoi che la trasformazione si infili nello schema di output.

Una trappola comune: ordinare per un’espressione che usa una funzione non deterministica (F.rand(), F.current_timestamp()) ti dà ordini diversi su run diverse. Di solito non è quello che vuoi. Se vuoi davvero un ordine random (per il sampling, diciamo), fissa il seed: F.rand(seed=42).

Stabilità del sort

Un sort stabile preserva l’ordine relativo delle righe con chiavi uguali. La sorted di Python è stabile; l’orderBy di Spark non è garantito che lo sia. Due righe con lo stesso valore di Country possono uscire in qualsiasi ordine, e rieseguire la stessa query può produrre ordini diversi.

Conta quando paginate. “Pagina 1 mostra le righe A B C D E. Click next. Pagina 2 mostra D E F G H.” D ed E sono apparse due volte perché il sort ha avuto un pareggio su Country e il motore ha scelto un ordine interno diverso alla seconda chiamata. La fix è la stessa di SQL: includi sempre un tiebreaker, idealmente la chiave primaria.

# Brittle: ties on Country leave order undefined
orders.orderBy("Country").show()

# Reproducible: ties broken by OrderId
orders.orderBy("Country", "OrderId").show()

Falla diventare un riflesso. Ogni orderBy in codice di produzione dovrebbe terminare con una colonna che è unica per riga.

repartitionByRange: la cugina invisibile dell’ordinamento

Se chiami df.repartitionByRange(8, "Total"), Spark fa lo step di range-partitioning di un sort globale senza lo step di sort per partizione. Risultato: ogni partizione contiene un range contiguo di valori Total, ma le righe dentro una partizione non sono ordinate. Combinala con sortWithinPartitions e hai ricostruito manualmente ciò che fa orderBy:

# Equivalent to orderBy("Total"), in two explicit steps
manual_sort = (orders
    .repartitionByRange(8, "Total")
    .sortWithinPartitions("Total"))

Quando ti scomoderesti? Quasi mai. L’optimizer già gestisce orderBy correttamente. Il motivo per conoscere repartitionByRange è quando scrivi Parquet range-partizionato o tabelle bucketed e vuoi controllo esplicito su come i dati sono disposti tra i file. Per il sort di tutti i giorni, attieniti a orderBy.

asc_nulls_last, desc_nulls_first e amici

I NULL hanno bisogno di una posizione. I default di Spark: NULL prima sull’ascending, ultimi sul descending. È l’interpretazione standard di SQL e la maggior parte delle volte è quello che vuoi. Quando non lo è:

df.orderBy(col("MaybeNull").asc_nulls_last())
df.orderBy(col("MaybeNull").desc_nulls_first())

Le quattro combinazioni coprono ogni caso ragionevole. Sii esplicito quando la correttezza conta; i default vanno bene per il lavoro esplorativo.

Esegui questo sulla tua macchina

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, desc

spark = (SparkSession.builder
         .appName("SortingAtScale")
         .master("local[*]")
         .getOrCreate())

orders = spark.createDataFrame(
    [
        (1001, 1, 59.00,  "NL"), (1002, 1, 29.00,  "NL"),
        (1003, 2, 149.00, "IT"), (1004, 2, 89.50,  "IT"),
        (1005, 3, 199.00, "DE"), (1006, 4, 42.42,  "RO"),
        (1007, 1, 12.00,  "NL"), (1008, 2, 75.00,  "IT"),
    ],
    "OrderId INT, CustomerId INT, Total DOUBLE, Country STRING",
)

# 1. Plain orderBy -- note the Exchange rangepartitioning in the plan
orders.orderBy(col("Total").desc()).explain()
orders.orderBy(col("Total").desc()).show()

# 2. orderBy + limit -- top-K, no global shuffle
orders.orderBy(col("Total").desc()).limit(3).explain()
orders.orderBy(col("Total").desc()).limit(3).show()

# 3. sortWithinPartitions -- no Exchange in the plan
orders.sortWithinPartitions("Total").explain()

# 4. Multi-key with tiebreaker
orders.orderBy("Country", col("Total").desc(), "OrderId").show()

# 5. The pointless-pre-aggregation anti-pattern
(orders
 .groupBy("Country")
 .agg(F.sum("Total").alias("revenue"))
 .orderBy(col("revenue").desc())
 .show())

Eseguili una a una. L’abitudine cruciale: digita .explain() prima di .show() su qualsiasi cosa che coinvolga ordinamento. Cerca la parola Exchange. Se c’è, stai pagando uno shuffle. Decidi se davvero ti serve.

Prossima lezione: gli operatori di pulizia di tutti i giorni. Rinominare, droppare, fare cast delle colonne. Metà di qualsiasi ETL reale è rimettere a posto un nome di colonna o sistemare una stringa-che-dovrebbe-essere-int. Lo faremo per bene. Poi nella lezione 25 apriamo il cofano sullo shuffle in sé e spieghiamo esattamente cosa sta volando attraverso la rete.

Cerca