PySpark, de la zero Lecția 33 / 60

repartition vs coalesce: doua moduri de a schimba numarul de partitii

Cand sa folosesti pe care, costul fiecareia si capcana de a-ti serializa accidental jobul intr-un singur task.

Cele două lecții anterioare au pregătit terenul: o partiție este o bucată de rânduri atribuită unui task, iar Spark este foarte preocupat de câte ai și de cât de uniform sunt împachetate. Lecția asta e despre cei doi operatori care îți permit să schimbi asta intenționat: repartition și coalesce.

Par interschimbabili. Nu sunt. Unul e un baros care declanșează un shuffle complet și îți dă exact ce ai cerut. Celălalt e o șurubelniță care combină partiții pe loc, ieftin, dar cu o muchie tăioasă care îți poate serializa în liniște întregul pipeline upstream într-un singur task. Lumea apucă coalesce(1) ca „să scrie un singur fișier” și se miră de ce jobul care se termina în două minute durează acum patruzeci.

Lecția asta explică ambii operatori, când folosești pe care și capcana care îi prinde pe toți la prima întâlnire.

repartition(N): shuffle complet

df.repartition(N) reconstruiește DataFrame-ul în exact N partiții, distribuite uniform. Face asta cu un shuffle complet: fiecare rând trece prin rețea (sau pe disc local în mod single-machine), e hash-uit într-un bucket țintă și aterizează într-o partiție nouă.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = (SparkSession.builder
         .appName("RepartitionVsCoalesce")
         .master("local[*]")
         .getOrCreate())

events = spark.range(0, 1_000_000).select(
    F.col("id").alias("event_id"),
    (F.col("id") % 1000).alias("user_id"),
    F.lit("data").alias("payload"),
)

print(events.rdd.getNumPartitions())   # likely 8 on a local[*] machine
big = events.repartition(50)
print(big.rdd.getNumPartitions())      # 50

Două lucruri de observat. Întâi, repartition funcționează în ambele direcții: poți merge de la 8 la 50 sau de la 200 până la 10. Al doilea, partițiile rezultate sunt uniforme: cam același număr de rânduri în fiecare, indiferent cât de dezechilibrat era inputul. Uniformitatea aceea e exact motivul pentru care repartition e scump: ca s-o obții, fiecare rând trebuie re-rutat printr-un shuffle.

Uită-te la plan:

big.explain()
# == Physical Plan ==
# Exchange RoundRobinPartitioning(50), REPARTITION_BY_NUM, [plan_id=...]
# +- Range (0, 1000000, ...)

Exchange e cuvântul Spark pentru „aici se face shuffle”. RoundRobinPartitioning(50) înseamnă că rândurile sunt împărțite în 50 de bucket-uri în mod round-robin: fără cheie, doar uniform. Asta e implicit pentru repartition(N).

repartition(N, *keys): hash-partition pe o coloană

Există o supraîncărcare care ia coloane:

hashed = events.repartition(50, "user_id")
hashed.explain()
# == Physical Plan ==
# Exchange hashpartitioning(user_id#3L, 50), REPARTITION_BY_NUM, ...
# +- Range ...

Acum Spark hash-uiește user_id din fiecare rând, face mod 50 și folosește asta drept număr de partiție. Aceeași muncă totală ca repartition(50), tot un shuffle complet, dar acum rândurile sunt grupate: fiecare event pentru user_id = 42 ajunge în aceeași partiție.

E util chiar înainte de un join sau de o funcție window pe cheia aia. Spark ar face oricum shuffle ca să colocheze cheile potrivite; dacă o faci tu explicit întâi, controlezi numărul de partiții și poți face cache rezultatului ca să-l refolosești între interogări. Lecția 36 va arăta cum aceeași idee, persistată pe disc, devine bucketing.

O capcană: hash-partitioning după o cheie cu skew reproduce skew-ul exact. Dacă user_id = 1 are un milion de rânduri și utilizatorul median are o sută, repartition(50, "user_id") va hash-ui fiecare rând al utilizatorului 1 într-o singură partiție. Diagnosticul de skew din lecția 28 se aplică aici, repartition(N, key) nu e o soluție pentru skew, e un control pentru numărul de partiții și distribuția cheilor.

repartitionByRange(N, *keys): output ordonat

Folosit mai rar, util când vrei output aproximativ sortat:

ranged = events.repartitionByRange(50, "event_id")

Spark eșantionează datele, alege 50 de margini de interval pe event_id și rutează rândurile în orice bucket de interval în care cad. Rezultatul: partiții individual sortabile și global aproximativ ordonate. Asta folosește df.sort() în culise când sortezi peste tot DataFrame-ul. Folosește-l când vrei să scrii fișiere Parquet în care fiecare fișier acoperă un interval contiguu, util pentru partition pruning când layer-ul de stocare nu suportă partitionBy pe coloane continue.

coalesce(N): micșorarea ieftină

df.coalesce(N) e cealaltă unealtă. Nu face shuffle. În schimb, fuzionează partiții existente pe loc:

shrunk = events.coalesce(2)
print(shrunk.rdd.getNumPartitions())    # 2
shrunk.explain()
# == Physical Plan ==
# Coalesce 2
# +- Range (0, 1000000, ...)

Niciun Exchange în plan. Spark decide pur și simplu că, în loc să ruleze 8 task-uri fiecare pe partiția lui, va rula 2 task-uri unde fiecare task citește din 4 dintre partițiile originale în secvență. Datele nu s-au mișcat; granițele au.

E mult mai ieftin decât repartition. E și mai slab:

  1. coalesce poate doar reduce numărul de partiții. Cerând coalesce(50) pe un DataFrame cu 8 partiții îți dă tot 8 partiții, fără eroare. Ca să crești, ai nevoie de repartition.
  2. Partițiile rezultate pot fi neuniforme. Dacă cele 8 partiții de input erau dezechilibrate, coalesce(2) produce 2 partiții ale căror dimensiuni sunt suma celor 4 originale care au fost fuzionate împreună. Nu se face nicio reechilibrare.
  3. Și capcana cea mare, problema upstream.

Capcana upstream a lui coalesce(1)

Asta e cursa. df.coalesce(1) nu doar pune rezultatul final într-o singură partiție, ci împinge constrângerea aceea în sus prin DAG, făcând adesea întreg pipeline-ul upstream single-threaded.

Consideră:

result = (spark.read.parquet("/data/big-input")     # 200 partitions
          .filter(F.col("country") == "IT")
          .withColumn("year", F.year("dt"))
          .groupBy("year").count()
          .coalesce(1)                              # because we want 1 file
          .write.parquet("/data/output"))

Intuiția zice: citește 200 de partiții în paralel, filtrează, grupează, apoi fuzionează rezultatul într-un singur fișier la final. Ce se întâmplă de fapt: Spark vede coalesce(1) și propagă paralelismul înapoi cât de departe poate fără să traverseze o graniță de shuffle. Filtrul, withColumn și citirea rulează toate cu paralelism 1. Două sute de partiții de input sunt citite serial de un singur task. Jobul care dura două minute durează acum patruzeci.

Motivul: coalesce e o transformare îngustă (lecția 21). Spark nu inserează un shuffle ca să-l satisfacă; în schimb absoarbe noul număr de partiții în stadiul anterior. Dacă faci coalesce(1), stadiul anterior rulează acum cu un singur task. Dacă există un shuffle mai sus în upstream, ca groupBy din exemplu, propagarea înapoi se oprește acolo, fiindcă shuffle-ul e o graniță de stadiu. Așa că în cazul ăsta groupBy și ce e înainte ar putea totuși să paralelizeze, în funcție de cum planifică Spark stadiile, dar munca de după groupBy e single-threaded.

Soluția e să folosești repartition(1) în loc de coalesce(1) ori de câte ori costul unui shuffle final e mai mic decât costul de a rula upstream serial:

result = (spark.read.parquet("/data/big-input")
          .filter(F.col("country") == "IT")
          .withColumn("year", F.year("dt"))
          .groupBy("year").count()
          .repartition(1)              # full shuffle, but upstream stays parallel
          .write.parquet("/data/output"))

Acum citirea, filtrul și groupBy folosesc cât paralelism vrea Spark, iar shuffle-ul final într-o singură partiție e un cost unic pe un rezultat agregat minuscul. Wall clock total: înapoi la două minute.

Regula empirică: coalesce(N) e sigur când N e aproape de numărul existent de partiții și N e suficient de mare ca stadiul upstream să fie încă mulțumit cu acel nivel de paralelism. coalesce(1) aproape niciodată nu e ce vrei, decât dacă upstream-ul e deja ieftin.

Exemplu rezolvat: problema fișierelor mici

Un motiv comun pentru a schimba numărul de partiții e problema fișierelor mici la write. Imaginează-ți un job al cărui stadiu final are 5.000 de partiții din cauza unui shuffle upstream. Dacă scrii asta ca Parquet, primești 5.000 de fișiere micuțe: oribil pentru cititorii downstream, lent de listat și cu multe costuri PUT pe S3.

Vrei poate 50 de fișiere de dimensiune rezonabilă. Două opțiuni:

# Option A: coalesce — cheap, but watch upstream
final.coalesce(50).write.parquet("/data/out")

# Option B: repartition — more expensive, but uniform and parallelism-safe
final.repartition(50).write.parquet("/data/out")

Opțiunea A sare peste shuffle. Task-urile fuzionează 100 de partiții de input fiecare (5.000 / 50). Stadiul anterior rulează cu 50 de task-uri în loc de 5.000, de obicei tot bine, uneori e o problemă dacă upstream-ul era deja CPU-bound și 50 nu sunt destule core-uri de paralelism.

Opțiunea B face un shuffle complet. Mai scump, dar stadiul upstream rulează cu cele 5.000 de partiții originale de paralelism, iar partițiile de output sunt uniforme.

Pentru „micșorează de la N la N/100” răspunsul e de obicei coalesce. Pentru „micșorează de la N la un singur fișier sau două” răspunsul e aproape întotdeauna repartition. Pentru „micșorează de la N la N/10 dar mă tem că stadiul upstream e lent”, măsoară ambele. AQE din Spark 3.x poate, de asemenea, fuziona automat partiții de shuffle, ceea ce elimină adesea nevoia de a face asta manual. Lecția 59 acoperă AQE.

Referință rapidă

Vrei astaFoloseșteDeclanșează shuffle?
Exact N partiții uniformerepartition(N)Da
N partiții hash-uite după o cheierepartition(N, "key")Da
N partiții aproximativ sortaterepartitionByRange(N, "key")Da
Reducere ieftină a numărului de partițiicoalesce(N)Nu
Un singur fișier de output, upstream marerepartition(1)Da
Un singur fișier de output, upstream miccoalesce(1)Nu

Rândul cu repartition(1) e cel pe care majoritatea oamenilor îl greșesc la primul contact.

Rulează asta pe propria mașină

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = (SparkSession.builder
         .appName("RepartitionDemo")
         .master("local[*]")
         .getOrCreate())

events = spark.range(0, 1_000_000).select(
    F.col("id").alias("event_id"),
    (F.col("id") % 1000).alias("user_id"),
)
print("default:", events.rdd.getNumPartitions())

# repartition up
up = events.repartition(50)
print("after repartition(50):", up.rdd.getNumPartitions())
up.explain()

# repartition by key
hashed = events.repartition(20, "user_id")
print("after repartition(20, user_id):", hashed.rdd.getNumPartitions())
hashed.explain()

# coalesce down — no shuffle
down = events.coalesce(2)
print("after coalesce(2):", down.rdd.getNumPartitions())
down.explain()

# coalesce up doesn't actually go up
no_op = events.coalesce(50)
print("after coalesce(50):", no_op.rdd.getNumPartitions())  # still 8

# Simulate the small-files write
(events.repartition(10)
 .write.mode("overwrite").parquet("/tmp/repartition-out"))

Rulează fiecare .explain() și caută linia Exchange. repartition are mereu una. coalesce niciodată. Asta e diferența, într-un cuvânt.

Lecția următoare: scrieri partiționate, partitionBy pe disc, layout-ul de directoare pe care îl produce și cum îl folosește Spark ca să sară peste fișiere la read time. Plus capcana cardinalității care face ca partitionBy("user_id") să fie mai rău decât niciun fel de partitioning.


Referințe: Documentația Apache Spark SQL (https://spark.apache.org/docs/latest/sql-data-sources.html) și articole de inginerie Databricks despre tuning-ul partițiilor. Consultat 2026-05-01.

Caută