PySpark, de la zero Lecția 34 / 60

Scrieri partitionate: layout de directoare, predicate pushdown si cand sa o faci

Coloane de partitie in stil Hive pe disc, cum le foloseste Spark la read time pentru a sari peste fisiere si capcana cardinalitatii de evitat.

Până acum în modul am vorbit despre partiții ca despre un lucru in-memory: bucăți de rânduri pe care Spark le dă task-urilor. Astăzi e despre celălalt partitioning, cel care trăiește pe disc și supraviețuiește între job-uri: scrieri partiționate.

Scrierile partiționate sunt cea mai mare accelerare „gratuită” dintr-un data lake. Un dataset partiționat corect transformă „citește 200 GB și filtrează la 2 GB” în „citește 2 GB”; Spark sare literalmente peste rest la nivelul sistemului de fișiere, înainte ca vreo dată să fie citită în memorie. Făcut bine, interogările care durau cinci minute se termină în secunde.

Făcut prost, ajungi cu zeci de mii de fișiere micuțe în zeci de mii de directoare, iar overhead-ul de listare singur îți face jobul mai lent decât fără niciun partitioning. Linia dintre cele două rezultate e o singură decizie: după ce coloană să partiționezi.

Ce face partitionBy pe disc

Când scrii un DataFrame cu partitionBy, Spark organizează output-ul într-un arbore de directoare unde fiecare coloană de partiție devine un nivel de director:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = (SparkSession.builder
         .appName("PartitionedWrites")
         .master("local[*]")
         .getOrCreate())

orders = spark.createDataFrame(
    [
        (1, "2024-03-15", 59.0,  "IT"),
        (2, "2024-03-15", 29.0,  "IT"),
        (3, "2024-04-02", 149.0, "NL"),
        (4, "2024-04-02", 89.5,  "NL"),
        (5, "2025-01-08", 12.0,  "DE"),
    ],
    "order_id INT, dt STRING, total DOUBLE, country STRING",
).withColumn("dt", F.to_date("dt")) \
 .withColumn("year",  F.year("dt")) \
 .withColumn("month", F.month("dt"))

(orders.write
 .mode("overwrite")
 .partitionBy("year", "month")
 .parquet("/tmp/orders"))

Pe disc ai acum:

/tmp/orders/
├── _SUCCESS
├── year=2024/
│   ├── month=3/
│   │   └── part-00000-...-c000.snappy.parquet
│   └── month=4/
│       └── part-00000-...-c000.snappy.parquet
└── year=2025/
    └── month=1/
        └── part-00000-...-c000.snappy.parquet

Două lucruri de observat. Primul, coloanele de partiție (year, month) sunt în numele directoarelor, nu în fișierele Parquet însele. Spark le reconstruiește din cale la read time, economisește spațiu de stocare și sunt practic gratuite de citit. Al doilea, denumirea directoarelor e column=value, numită partitioning în stil Hive. Aproape orice motor de interogare din lumea datelor citește layout-ul ăsta: Spark, Hive, Presto/Trino, Athena, BigQuery via tabele externe, DuckDB, Polars. E un standard de facto.

Predicate pushdown: răsplata

Acum citește dataset-ul înapoi cu un filtru pe o coloană de partiție:

read = spark.read.parquet("/tmp/orders")
filtered = read.where((F.col("year") == 2024) & (F.col("month") == 3))
filtered.show()
filtered.explain()
# == Physical Plan ==
# *(1) ColumnarToRow
# +- FileScan parquet [order_id#..., dt#..., total#..., country#..., year#..., month#...]
#       Batched: true,
#       DataFilters: [],
#       Format: Parquet,
#       Location: InMemoryFileIndex[file:/tmp/orders],
#       PartitionFilters: [isnotnull(year#...), isnotnull(month#...), (year#...= 2024), (month#...= 3)],
#       PushedFilters: [],
#       ReadSchema: ...

Linia care contează e PartitionFilters. Spark a recunoscut că year și month sunt coloane de partiție, a aplicat filtrul la nivelul listării de directoare și a citit doar directorul potrivit. Celelalte directoare nu sunt deschise, nu sunt listate dincolo de prefix, nu sunt atinse. Asta e partition pruning și e tot rostul.

Încearcă un filtru care nu e pe partiție:

read.where(F.col("country") == "IT").explain()
# PartitionFilters: []
# PushedFilters: [IsNotNull(country), EqualTo(country,IT)]

country e o coloană obișnuită din interiorul fișierelor Parquet, nu un director. Filtrul e tot împins jos la cititorul Parquet (PushedFilters); Parquet însuși poate sări peste row groups ale căror statistici zic „acest row group nu are rânduri IT”, dar fiecare fișier e cel puțin deschis. Comparativ cu sărirea peste fișier complet, asta e un câștig mult mai mic.

Ierarhia:

  1. PartitionFilters: sari peste fișiere la nivel de director. Cel mai bun.
  2. PushedFilters: deschide fișierul, dar sari peste row groups în interior. Bun.
  3. Fără filter pushdown: citește tot fișierul, filtrează în Spark. Cel mai lent.

Partitioning îți aduce primul nivel pentru coloanele după care partiționezi. Alege acele coloane cu cap.

Capcana cardinalității

Cea mai comună greșeală pe care lumea o face cu partitionBy e să partiționeze după o coloană cu cardinalitate mare.

Imaginează-ți tabela de orders de mai sus, dar decizi să partiționezi după order_id:

# Don't do this!
orders.write.mode("overwrite").partitionBy("order_id").parquet("/tmp/orders-bad")

Cinci comenzi, cinci directoare. Cu un milion de comenzi ai avea un milion de directoare. Majoritatea conțin un singur fișier cu câteva rânduri. Dezavantajele:

  • Listarea e lentă. Când Spark deschide dataset-ul, trebuie să enumere fiecare director ca să-și construiască indexul de fișiere. Un milion de directoare înseamnă un milion de apeluri LIST pe S3 (sau un milion de readdir pe disc local). Listarea singură poate dura mai mult decât citirea datelor.
  • Fiecare fișier e minuscul. Parquet are overhead fix per fișier: magic bytes, footer, metadata pe coloană. Un fișier Parquet de 2 KB e 80% overhead. Pierzi toate beneficiile coloanei.
  • Join-urile și broadcast-urile devin mai rele. Spark estimează dimensiunea tabelei din numărul de fișiere și dimensiunea per fișier. Estimări proaste duc la planuri de join proaste.
  • Cloud storage taxează per cerere. A lista și a deschide un milion de fișiere micuțe pe S3 nu e doar lent, e scump.

O coloană bună de partitionare are fiecare valoare de partiție conținând sute de MB până la câțiva GB de date. Pentru un dataset tipic e-commerce:

ColoanăCardinalitateColoană bună de partiție?
year~5–20Da
year, month~60–240Da
year, month, day~1000–7000Poate, depinde de volum
country~200Da dacă traficul e echilibrat; riscant dacă e concentrat
user_idmilioaneNu
order_idmilioaneNu
transaction_idmilioane–miliardeCategoric nu

Avertismentul „depinde de volum” e real. partitionBy("year", "month", "day") e excelent dacă ai sute de MB pe zi. E un dezastru dacă ai zece comenzi pe zi: ajungi cu 1.000 de zile × 10 rânduri × fișier minuscul. Fă matematica înainte să te angajezi.

O regulă empirică utilă: țintește între 100 MB și 1 GB per fișier de partiție. Sub atât plătești overhead; peste atât pierzi paralelism. Combină asta cu regula bullet-point „dacă n-ai scrie niciodată o clauză WHERE pe coloana asta, nu partiționa după ea” și vei evita majoritatea greșelilor.

Combinarea partitionBy cu pruning la nivel de coloană

partitionBy și pruning-ul intern de coloane al lui Parquet sunt complementare, nu alternative. Partiționezi după coloanele care apar în WHERE în majoritatea interogărilor, iar Parquet se ocupă de rest.

O tabelă de fapte tipică pentru analytics într-o companie reală:

(fact_orders.write
 .mode("overwrite")
 .partitionBy("year", "month")        # date-based, low cardinality, frequent filter
 .parquet("/data/warehouse/fact_orders"))

O interogare tipică:

(spark.read.parquet("/data/warehouse/fact_orders")
 .where((F.col("year") == 2024) & (F.col("month") == 3) & (F.col("country") == "IT"))
 .agg(F.sum("total")))

Ce se întâmplă:

  1. Partition pruning elimină fiecare director cu excepția year=2024/month=3/. De la poate 200 de partiții la 1.
  2. Proiecția de coloană a Parquet citește doar coloanele total și country din acele fișiere (nu toate cele 30 de coloane ale tabelei de fapte).
  3. Predicate pushdown al Parquet folosește statistici min/max pe row groups ca să sară peste row groups în regiunile country != IT din fișier.
  4. Spark evaluează filtrul rezidual și agregă.

Pruning-ul ăla stratificat e ce face ca o tabelă de fapte de 200 GB să răspundă la „venitul Italiei în martie 2024” în 200 ms.

Moduri de overwrite și scrieri partiționate

Un detaliu mic dar important. Cu partitionBy, sensul modurilor de write e subtil:

df.write.mode("overwrite").partitionBy("year").parquet("/data/orders")

Implicit, asta face praf întreg directorul /data/orders și scrie doar partițiile prezente în df. Dacă df conține doar rânduri year=2024, tocmai ai șters year=2023 și mai vechi. Lumea învață lecția asta la modul dur exact o dată.

Soluția e modul de overwrite dinamic al partițiilor:

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
df.write.mode("overwrite").partitionBy("year").parquet("/data/orders")

Acum Spark suprascrie doar partițiile care apar în df. year=2024 e înlocuit; year=2023 e lăsat în pace. Dacă faci scrieri incrementale după dată, majoritatea pipeline-urilor o fac, setează configul ăsta în jobul tău și nu te mai gândi niciodată la el.

Când să nu partiționezi

Uneori răspunsul e fără partitionBy deloc. Trei cazuri:

  1. Dataset-uri mici. Sub câțiva GB, partition pruning nu-ți economisește nimic semnificativ. Un singur fișier Parquet sau câteva sunt în regulă.
  2. Dataset-uri pe care le citești mereu integral. Agregările zilnice care citesc oricum fiecare rând: partitioning adaugă overhead de metadata fără să economisească citiri.
  3. Chei cu cardinalitate mare pentru join-uri. Partiționarea după user_id e greșită; dar vrei totuși să organizezi datele după user_id ca join-urile să nu facă shuffle. Unealta potrivită pentru asta e bucketing, care e lecția următoare.

Ultimul bullet e introducerea. Partitioning e pentru coloane cu cardinalitate mică ce apar în clauze WHERE. Bucketing e pentru coloane cu cardinalitate mare ce apar în clauze JOIN ON. Unelte diferite, utilizări complementare, adesea suprapuse pe aceeași tabelă.

Rulează asta pe propria mașină

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = (SparkSession.builder
         .appName("PartitionedWrites")
         .master("local[*]")
         .getOrCreate())

orders = spark.range(0, 100_000).select(
    F.col("id").alias("order_id"),
    F.date_add(F.lit("2023-01-01"), (F.col("id") % 730).cast("int")).alias("dt"),
    (F.rand() * 100).alias("total"),
)
orders = (orders
          .withColumn("year",  F.year("dt"))
          .withColumn("month", F.month("dt")))

# Good: low-cardinality partition columns
(orders.write
 .mode("overwrite")
 .partitionBy("year", "month")
 .parquet("/tmp/orders-good"))

# Read back and confirm partition pruning
read = spark.read.parquet("/tmp/orders-good")
read.where((F.col("year") == 2024) & (F.col("month") == 6)).explain()
# Look for PartitionFilters: [...year = 2024, month = 6...]

# Look at the directory tree
import os
for path, dirs, files in os.walk("/tmp/orders-good"):
    for f in files:
        if f.endswith(".parquet"):
            print(os.path.join(path, f))

# Bad: high-cardinality partition column (run on a small slice only!)
small = orders.limit(1000)
(small.write
 .mode("overwrite")
 .partitionBy("order_id")
 .parquet("/tmp/orders-bad"))

# Count the directories
print(sum(1 for _ in os.walk("/tmp/orders-bad")))   # ~1000

Uită-te la linia PartitionFilters pentru cazul bun. Apoi uită-te la numărul de fișiere pentru cazul rău, sunt o mie de directoare cu un singur fișier minuscul fiecare. Imaginează-ți să faci asta cu un milion de rânduri.

Lecția următoare e despre partitioning sub capotă: cum decide Spark numărul de partiții și cum să le dimensionezi. După aceea, lecția 36 acoperă bucketing, răspunsul la „vreau partitioning, dar cheia mea are cardinalitate mare”.


Referințe: Documentația Apache Spark SQL data sources (https://spark.apache.org/docs/latest/sql-data-sources.html) și articole de blog Databricks despre best practices pentru partitioning. Consultat 2026-05-01.

Caută