PySpark, de la zero Lecția 25 / 60

Ce este de fapt un shuffle, in termeni fizici

Operatia de retea din inima calculului distribuit, ce se intampla in timpul ei si de ce se teme toata lumea de ea.

Aruncăm cu cuvântul „shuffle” de câteva lecții fără să-l deschidem vreodată. Astăzi îl deschidem. Până la finalul acestei postări ar trebui să poți să-ți imaginezi, aproape cadru cu cadru, ce se întâmplă între executors când Spark execută un groupBy. Acea imagine mentală e ce separă inginerii care pot ajusta un job Spark de inginerii care lipesc același apel .repartition(200) în fiecare notebook și speră.

Conceptul la nivel înalt mai întâi: un shuffle e redistribuirea datelor în cluster astfel încât înregistrările cu aceeași cheie să ajungă în aceeași partiție. Orice depinde de „toate rândurile pentru cheia X într-un singur loc”, group, join, distinct, sort, necesită unul. Este cel mai scump lucru pe care Spark îl face în mod obișnuit. Modulele 5 și 6 ale acestui curs sunt în esență o privire lungă și răbdătoare asupra modului de a face mai puține și de a le face pe cele pe care nu le poți evita cât mai ieftine cu putință.

Ce declanșează un shuffle

Memorează această listă. De fiecare dată când scrii cod care include una dintre acestea, imaginează-ți date zburând prin rețea:

  • groupBy(...).agg(...), grupare după cheie; rândurile cu aceeași cheie trebuie să se co-localizeze.
  • df.distinct(), are nevoie să dedupliceze între toate partițiile.
  • df.dropDuplicates([...]), la fel.
  • df.join(other, on=...), ambele părți trebuie să fie partiționate după join key (cu excepția cazului în care una e broadcast, vezi lecția 28).
  • df.orderBy(...) / df.sort(...), ordonarea globală are nevoie de un shuffle range-partitioned.
  • df.repartition(...), prin definiție, asta e treaba lui.
  • df.repartitionByRange(...), la fel, cu un range partitioner.

Operații care nu declanșează un shuffle (acestea sunt narrow):

  • select, filter, withColumn, drop, cast, na.fill, withColumnRenamed simplu.
  • union (concatenează partiții; fără redistribuire).
  • coalesce(n) când reduce numărul de partiții (combină local, fără shuffle, diferit de repartition).

coalesce vs repartition e propria lecție (35), dar one-liner-ul: coalesce e narrow și ieftin; repartition e wide și un shuffle real.

Secvența fizică

Hai să parcurgem ce se întâmplă efectiv pentru un groupBy("status").sum("total") peste un dataset de 100GB pe un cluster de 10 executors, fiecare executor cu 4 core-uri. Presupune spark.sql.shuffle.partitions = 200.

Stage N: map side

Fiecare partiție de input are un task care rulează pe ea. Presupune 200 de partiții de input, deci 200 de map tasks. Fiecare task:

  1. Citește partiția lui de input. De oriunde, S3, HDFS, disc local, un DataFrame cached upstream.
  2. Aplică agregarea parțială. Optimizatorul Spark pre-agregă per partiție, așa că în loc să scrie fiecare rând, map task-ul scrie o sumă parțială per valoare status pe care a văzut-o. (Pentru un groupBy cu câteva chei distincte, asta e o reducere masivă. Pentru distinct sau o grupare cu cardinalitate mare, mai puțin.)
  3. Partiționează output-ul după destinație. Pentru fiecare înregistrare de output, calculează hash(status) mod 200 ca să afle cărei partiții downstream îi aparține. Pune înregistrările în 200 de buckets de destinație.
  4. Scrie buckets pe discul local. Fiecare map task produce un fișier de shuffle cu 200 de secțiuni (sau 200 de fișiere mici, în funcție de implementarea de shuffle). Datele sunt serializate cu orice serializator e configurat (Kryo sau Java) și pot fi comprimate (spark.shuffle.compress = true by default, LZ4).

În acest punct toate cele 200 de map tasks au terminat. Există acum 200 × 200 = 40.000 de „blocks” logice care stau pe discurile locale ale celor 10 executors. Fiecare block e datele destinate unei partiții downstream dintr-o partiție upstream.

Map side a făcut două lucruri scumpe: serializare în bytes și o scriere pe disc local.

Stage N+1: reduce side

200 reduce tasks pornesc (una per partiție de output). Fiecare reduce task e responsabilă pentru unul dintre cele 200 buckets de destinație. Ca să-și facă treaba, trebuie să aducă bucket-ul ei de la fiecare map task, ceea ce înseamnă să facă o cerere de rețea către fiecare executor care deține output map (potențial toate cele 10).

Fiecare reduce task:

  1. Aduce blocks-urile ei. Cereri de rețea către fiecare executor care are date pentru ea. Bytes curg prin sârmă. BlockManager-ul Spark gestionează asta; shuffle service-ul (când e activat) ține output-ul de map disponibil chiar dacă executors mor.
  2. Deserializează blocks-urile incoming. Bytes înapoi în obiecte JVM.
  3. Le combină. Toate sumele parțiale pentru cheile alocate acestei partiții sunt combinate în agregate finale.
  4. Trece datele combinate la operatorul următor (scriere în Parquet, în exemplul nostru).

Reduce side a făcut: un fan-in network fetch de la fiecare nod, deserializare și un merge.

Costul, în cifre

În agregat pe cluster, acel singur shuffle al unui dataset de 100GB mută ~100GB prin rețea. În funcție de cardinalitate și agregarea parțială, uneori mult mai puțin (un groupBy cu 5 chei distincte se prăbușește dramatic); uneori practic dataset-ul complet (un distinct pe o coloană cu cardinalitate mare sau un join pe rânduri brute).

În plus: aceleași ~100GB lovesc discul local pe map side și sunt citite înapoi de pe discul local pe reduce side. Compresia taie volumul prin sârmă și pe disc dar adaugă CPU.

Costul de wallclock pe un cluster tipic: shuffle-ul e de obicei 60-90% din timpul de wallclock al unui job care are unul. Citirea sursei din S3 e și ea scumpă, dar shuffle-ul e comparabil sau mai rău, și spre deosebire de citirea sursei nu poate fi paralelizat la dispariție, e bottleneck-ed de bandwidth-ul de rețea și de cel mai lent fetch.

De aceea oamenilor le e frică de shuffles. Nu pentru că sunt misterioase, ci pentru că sunt genuin cel mai lent lucru într-un job Spark tipic.

O notă despre shuffle service

Clusterele de producție rulează de obicei un external shuffle service, un daemon de lungă durată pe fiecare nod de worker, separat de JVM-urile de executor. Singura lui treabă e să servească shuffle blocks oricui întreabă. De ce să te deranjezi? Pentru că executors vin și pleacă (autoscaling, dynamic allocation, spot eviction) și nu vrei ca un executor dispărut să-și ia output-ul de map cu el. Shuffle service-ul decuplează „cine a scris acest block” de „cine poate servi acest block”. Dacă executor-ul care a produs output-ul de shuffle moare, datele sunt încă pe nod, iar shuffle service-ul le distribuie oricui are nevoie.

Când shuffle service-ul e oprit, pierderea unui executor în mijlocul unui shuffle înseamnă re-rularea stage-ului upstream pentru a regenera output-ul de map pierdut. Scump. Pornește serviciul în producție. Cele mai multe platforme Spark gestionate (Databricks, EMR, Dataproc) îl activează by default; deployment-urile bare-metal trebuie să seteze spark.shuffle.service.enabled = true și să configureze daemon-ul.

Cum vezi asta în Spark UI

Deschide tab-ul Stages (lecția 22 a acoperit asta). Două coloane sunt amprenta shuffle-ului:

  • Shuffle Write, bytes scriși pe discul local de tasks-urile acestui stage (map side).
  • Shuffle Read, bytes aduși prin rețea de tasks-urile acestui stage (reduce side).

Un stage care produce un shuffle are un Shuffle Write non-zero. Un stage care consumă unul are un Shuffle Read non-zero. Vin în perechi peste granițele dintre stages.

Apasă pe un stage și uită-te la task summary. Coloana Shuffle Read Time îți spune cât a așteptat fiecare task ca blocks-urile lui să sosească. Dacă acela e chunk-ul dominant din durata task-ului, bottleneck-ul e viteza rețelei sau a discului remote, nu codul tău. Dacă duratele tasks-urilor sunt sălbatic skewed, mediană 4s, max 5 minute, o cheie are mult mai multe rânduri decât celelalte și un reducer face aproape toată munca singur. Acela e data skew, și e un subiect întreg (lecția 30).

De ce „evită shuffles” e un semi-adevăr

Vei auzi „evită shuffles” ca sfat constant. E în mare parte corect, dar poate fi înșelător. Iată versiunea mai onestă:

  • Nu poți evita shuffles complet. Orice agregare peste chei, orice join (cu excepția broadcast), orice sort global, acestea necesită redistribuire. Nu există rescriere inteligentă care să te scoată din fizică.
  • Ce poți face e să le micșorezi. Reduci cât de multe date trebuie să mute shuffle-ul.

Mișcări concrete de micșorare, fiecare propria lecție viitoare:

  1. Filtrează devreme. Împinge clauzele tale WHERE upstream ca să ajungă mai puține date la shuffle. (Catalyst optimizer-ul Spark face mult din asta pentru tine, dar doar când poate dovedi că e safe.)
  2. Proiectează narrow. Nu târî 50 de coloane printr-un shuffle dacă ai nevoie doar de 4. Fă select devreme.
  3. Broadcast tabele mici. O tabelă de dimensiune mică join-uită cu o tabelă de fapt mare nu are nevoie de un shuffle; poate fi replicată în întregime pe fiecare executor. Lecția 28.
  4. Partiționează deștept upstream. Dacă input-ul tău e deja partiționat după coloana după care urmează să grupezi, Spark poate sări shuffle-ul complet. Lecția 36.
  5. Ajustează spark.sql.shuffle.partitions. Default-ul 200 e greșit pentru aproape fiecare workload, prea mare pentru date mici, prea mic pentru date mari. Lecția 32.
  6. Atenție la skew. O cheie hot poate distruge un shuffle care altfel ar fi fost bine. Lecția 30 acoperă salting-ul.

Despre ce e restul cursului

E momentul să fiu onest cu tine: modulele 5 și 6 ale acestui curs există din cauza shuffles-urilor. Joins, partitioning, AQE, broadcast hints, salting, repartition vs coalesce, bucketing, fiecare dintre aceste subiecte e, la rădăcină, un mod de a modela sau evita shuffles. Dacă înțelegi bine lecția de azi, fiecare optimizare ulterioară se va simți ca un pas logic și nu ca un sac de trucuri.

Modelul mental cu care să pleci:

Unitatea de bază a durerii lui Spark este shuffle-ul. Fiecare transformare wide declanșează unul. Fiecare shuffle e map side serialize + scriere pe disc local, apoi fetch prin rețea + deserializare + merge pe reduce side. Costul e aproximativ proporțional cu bytes mutați (după agregare parțială și compresie). Tab-ul Stages îți spune cât de mare e fiecare.

Rulează asta pe propria mașină

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand

spark = (
    SparkSession.builder
        .appName("shuffle-demo")
        .config("spark.sql.shuffle.partitions", "16")
        .getOrCreate()
)

# 5M rows across 32 partitions, 1000 distinct keys
df = (
    spark.range(0, 5_000_000, numPartitions=32)
        .withColumn("k", (col("id") % 1000).cast("int"))
        .withColumn("v", rand() * 1000)
)

# Force materialization so we measure the shuffle, not the source read
df = df.cache()
df.count()

# 1) A groupBy: triggers ONE shuffle
df.groupBy("k").sum("v").count()

# 2) A distinct: also a shuffle
df.select("k").distinct().count()

# 3) A self-join on k: shuffle on BOTH sides
df.alias("a").join(df.alias("b"), "k").count()

input("Press Enter to exit (UI stays alive)... ")
spark.stop()

Deschide tab-ul Stages al UI-ului. Pentru fiecare dintre cele trei acțiuni, găsește stages-urile unde Shuffle Write și Shuffle Read sunt non-zero. Notează bytes shuffled. Compară self-join-ul (cel mai greu, două shuffles mari în zbor) cu groupBy-ul simplu (un shuffle, cu agregarea parțială prăbușindu-l la aproape nimic). Acea diferență, între o agregare care se comprimă bine și un join care nu, e intuiția care conduce cea mai mare parte a muncii de performanță Spark.

Acela e shuffle-ul. Modulul 4 e acum gata; lecția următoare deschide modulul 5 cu transformări narrow vs wide ca principiu de design, odată ce înțelegi shuffles-urile fizic, poți începe să proiectezi pipeline-uri în jurul lor în loc să doar reacționezi la ele.

Caută