PySpark, dalle fondamenta Lezione 34 / 60

Scritture partizionate: layout di directory, predicate pushdown, e quando farle

Colonne di partizione in stile Hive su disco, come Spark le usa in lettura per saltare file, e la trappola della cardinalità da evitare.

Finora in questo modulo abbiamo parlato delle partizioni come di una cosa in memoria: blocchi di righe che Spark passa ai task. Oggi parliamo dell’altro partitioning, quello che vive su disco e sopravvive tra un job e l’altro: le scritture partizionate.

Le scritture partizionate sono lo speedup “gratuito” più grosso in un data lake. Un dataset partizionato correttamente trasforma “leggi 200 GB e filtra a 2 GB” in “leggi 2 GB”: Spark salta letteralmente il resto a livello di file system, prima che qualunque dato venga letto in memoria. Fatto bene, query che prima impiegavano cinque minuti finiscono in secondi.

Fatto male, ti ritrovi con decine di migliaia di file minuscoli in decine di migliaia di directory e il solo overhead di listing rende il job più lento di nessun partitioning. La linea tra i due esiti è una sola decisione: per quale colonna partizionare.

Cosa fa partitionBy su disco

Quando scrivi un DataFrame con partitionBy, Spark organizza l’output in un albero di directory in cui ogni colonna di partizione diventa un livello di directory:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = (SparkSession.builder
         .appName("PartitionedWrites")
         .master("local[*]")
         .getOrCreate())

orders = spark.createDataFrame(
    [
        (1, "2024-03-15", 59.0,  "IT"),
        (2, "2024-03-15", 29.0,  "IT"),
        (3, "2024-04-02", 149.0, "NL"),
        (4, "2024-04-02", 89.5,  "NL"),
        (5, "2025-01-08", 12.0,  "DE"),
    ],
    "order_id INT, dt STRING, total DOUBLE, country STRING",
).withColumn("dt", F.to_date("dt")) \
 .withColumn("year",  F.year("dt")) \
 .withColumn("month", F.month("dt"))

(orders.write
 .mode("overwrite")
 .partitionBy("year", "month")
 .parquet("/tmp/orders"))

Su disco adesso hai:

/tmp/orders/
├── _SUCCESS
├── year=2024/
│   ├── month=3/
│   │   └── part-00000-...-c000.snappy.parquet
│   └── month=4/
│       └── part-00000-...-c000.snappy.parquet
└── year=2025/
    └── month=1/
        └── part-00000-...-c000.snappy.parquet

Due cose da notare. Primo, le colonne di partizione (year, month) sono nei nomi delle directory, non nei file Parquet stessi. Spark le ricostruisce dal path al momento della lettura: risparmia spazio e di fatto sono gratis da leggere. Secondo, la convenzione di naming delle directory è colonna=valore, chiamata partitioning in stile Hive. Quasi tutti i query engine del mondo dei dati leggono questo layout: Spark, Hive, Presto/Trino, Athena, BigQuery via tabelle esterne, DuckDB, Polars. È uno standard de facto.

Predicate pushdown: il guadagno

Adesso rileggi il dataset con un filtro su una colonna di partizione:

read = spark.read.parquet("/tmp/orders")
filtered = read.where((F.col("year") == 2024) & (F.col("month") == 3))
filtered.show()
filtered.explain()
# == Physical Plan ==
# *(1) ColumnarToRow
# +- FileScan parquet [order_id#..., dt#..., total#..., country#..., year#..., month#...]
#       Batched: true,
#       DataFilters: [],
#       Format: Parquet,
#       Location: InMemoryFileIndex[file:/tmp/orders],
#       PartitionFilters: [isnotnull(year#...), isnotnull(month#...), (year#...= 2024), (month#...= 3)],
#       PushedFilters: [],
#       ReadSchema: ...

La riga che conta è PartitionFilters. Spark ha riconosciuto che year e month sono colonne di partizione, ha applicato il filtro a livello di listing delle directory, e ha letto solo la directory corrispondente. Le altre directory non vengono aperte, non vengono listate oltre il prefisso, non vengono toccate. Questo è il partition pruning ed è tutto il punto.

Prova un filtro non-partizione:

read.where(F.col("country") == "IT").explain()
# PartitionFilters: []
# PushedFilters: [IsNotNull(country), EqualTo(country,IT)]

country è una colonna normale dentro i file Parquet, non una directory. Il filtro viene comunque pushato giù al lettore Parquet (PushedFilters): Parquet stesso può saltare row group le cui statistiche dicono “questo row group non ha righe IT”, ma ogni file viene almeno aperto. Rispetto a saltare il file del tutto, è un guadagno molto più piccolo.

La gerarchia:

  1. PartitionFilters: saltare i file a livello di directory. Il meglio.
  2. PushedFilters: aprire il file, ma saltare i row group dentro. Buono.
  3. Nessun pushdown del filtro: leggere tutto il file, filtrare in Spark. Il più lento.

Il partitioning ti regala il primo per le colonne su cui partizioni. Sceglile sagge.

La trappola della cardinalità

L’errore più comune che la gente fa con partitionBy è partizionare per una colonna ad alta cardinalità.

Immagina la tabella degli ordini di prima, ma decidi di partizionare per order_id:

# Don't do this!
orders.write.mode("overwrite").partitionBy("order_id").parquet("/tmp/orders-bad")

Cinque ordini, cinque directory. Con un milione di ordini avresti un milione di directory. La maggior parte contiene un singolo file con poche righe. Gli svantaggi:

  • Il listing è lento. Quando Spark apre il dataset, deve enumerare ogni directory per costruire il suo file index. Un milione di directory significa un milione di chiamate LIST su S3 (o un milione di readdir su disco locale). Il solo listing può impiegare più tempo della lettura dei dati.
  • Ogni file è minuscolo. Parquet ha un overhead fisso per file: magic byte, footer, metadati di colonna. Un file Parquet da 2 KB è 80% overhead. Perdi tutti i benefici del formato colonnare.
  • Join e broadcast peggiorano. Spark stima la dimensione della tabella dal numero di file e dalla dimensione per file. Stime cattive portano a piani di join cattivi.
  • Lo storage cloud fa pagare per richiesta. Listare e aprire un milione di file minuscoli su S3 non è solo lento, è costoso.

Una buona colonna di partizione ha ogni valore di partizione che contiene da centinaia di MB a qualche GB di dati. Per un tipico dataset e-commerce:

ColonnaCardinalitàBuona colonna di partizione?
year~5-20
year, month~60-240
year, month, day~1000-7000Forse, dipende dal volume
country~200Sì se il traffico è bilanciato; rischioso se concentrato
user_idmilioniNo
order_idmilioniNo
transaction_idmilioni-miliardiAssolutamente no

L’avvertenza “dipende dal volume” è reale. partitionBy("year", "month", "day") è ottimo se hai centinaia di MB al giorno. È un disastro se hai dieci ordini al giorno: ti ritrovi con 1.000 giorni × 10 righe × file minuscolo. Fa’ i conti prima di impegnarti.

Una regola empirica utile: mira tra 100 MB e 1 GB per file di partizione. Sotto stai pagando overhead; sopra stai perdendo parallelismo. Combinala con la regola “se non scriveresti mai una WHERE su questa colonna, non partizionarci sopra” ed eviterai la maggior parte degli errori.

Combinare partitionBy con il pruning a livello di colonna

partitionBy e il pruning interno di Parquet sulle colonne sono complementari, non alternative. Partizioni per le colonne che compaiono nel WHERE per la maggior parte delle query, e Parquet gestisce il resto.

Una tipica fact table di analytics in un’azienda reale:

(fact_orders.write
 .mode("overwrite")
 .partitionBy("year", "month")        # date-based, low cardinality, frequent filter
 .parquet("/data/warehouse/fact_orders"))

Una query tipica:

(spark.read.parquet("/data/warehouse/fact_orders")
 .where((F.col("year") == 2024) & (F.col("month") == 3) & (F.col("country") == "IT"))
 .agg(F.sum("total")))

Cosa succede:

  1. Il partition pruning scarta ogni directory eccetto year=2024/month=3/. Da forse 200 partizioni a 1.
  2. La proiezione di colonna di Parquet legge solo le colonne total e country da quei file (non tutte le 30 colonne della fact table).
  3. Il predicate pushdown di Parquet usa le statistiche min/max dei row group per saltare row group nelle regioni country != IT dentro il file.
  4. Spark valuta il filtro residuo e aggrega.

Quel pruning a strati è ciò che fa rispondere a una fact table di warehouse da 200 GB la query “fatturato Italia marzo 2024” in 200 ms.

Modalità di overwrite e scritture partizionate

Un dettaglio piccolo ma importante. Con partitionBy, il significato delle modalità di scrittura è sottile:

df.write.mode("overwrite").partitionBy("year").parquet("/data/orders")

Per default questo distrugge l’intera directory /data/orders e scrive solo le partizioni presenti in df. Se df contiene solo righe year=2024, hai appena cancellato year=2023 e precedenti. La gente impara questa lezione nel modo duro esattamente una volta.

La soluzione è la modalità di overwrite dinamica delle partizioni:

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
df.write.mode("overwrite").partitionBy("year").parquet("/data/orders")

Adesso Spark sovrascrive solo le partizioni che compaiono in df. year=2024 viene rimpiazzato; year=2023 viene lasciato in pace. Se fai scritture incrementali per data, come la maggior parte delle pipeline, imposta questa config nel tuo job e non pensarci più.

Quando non partizionare

A volte la risposta è non fare nessun partitionBy. Tre casi:

  1. Dataset piccoli. Sotto qualche GB, il partition pruning non ti fa risparmiare niente di significativo. Un singolo file Parquet o pochi vanno bene.
  2. Dataset che leggi sempre per intero. Aggregazioni giornaliere che leggono ogni riga comunque: il partitioning aggiunge overhead di metadati senza far risparmiare letture.
  3. Chiavi ad alta cardinalità per join. Partizionare per user_id è sbagliato; ma vuoi comunque organizzare i dati per user_id perché i join non facciano shuffle. Lo strumento giusto per quello è il bucketing, che è la prossima lezione.

Quell’ultimo punto è il pretesto per il prossimo argomento. Il partitioning è per colonne a bassa cardinalità che compaiono nelle clausole WHERE. Il bucketing è per colonne ad alta cardinalità che compaiono nelle clausole JOIN ON. Strumenti diversi, usi complementari, spesso a strati sulla stessa tabella.

Falla girare sulla tua macchina

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = (SparkSession.builder
         .appName("PartitionedWrites")
         .master("local[*]")
         .getOrCreate())

orders = spark.range(0, 100_000).select(
    F.col("id").alias("order_id"),
    F.date_add(F.lit("2023-01-01"), (F.col("id") % 730).cast("int")).alias("dt"),
    (F.rand() * 100).alias("total"),
)
orders = (orders
          .withColumn("year",  F.year("dt"))
          .withColumn("month", F.month("dt")))

# Good: low-cardinality partition columns
(orders.write
 .mode("overwrite")
 .partitionBy("year", "month")
 .parquet("/tmp/orders-good"))

# Read back and confirm partition pruning
read = spark.read.parquet("/tmp/orders-good")
read.where((F.col("year") == 2024) & (F.col("month") == 6)).explain()
# Look for PartitionFilters: [...year = 2024, month = 6...]

# Look at the directory tree
import os
for path, dirs, files in os.walk("/tmp/orders-good"):
    for f in files:
        if f.endswith(".parquet"):
            print(os.path.join(path, f))

# Bad: high-cardinality partition column (run on a small slice only!)
small = orders.limit(1000)
(small.write
 .mode("overwrite")
 .partitionBy("order_id")
 .parquet("/tmp/orders-bad"))

# Count the directories
print(sum(1 for _ in os.walk("/tmp/orders-bad")))   # ~1000

Guarda la riga PartitionFilters per il caso buono. Poi guarda il numero di file per il caso cattivo: sono mille directory con un file minuscolo ciascuna. Immagina di farlo con un milione di righe.

La prossima lezione è il partitioning sotto il cofano: come Spark decide il numero di partizioni e come dimensionarle. Dopo, la lezione 36 copre il bucketing, la risposta a “voglio il partitioning, ma la mia chiave è ad alta cardinalità”.


Riferimenti: documentazione di Apache Spark SQL sulle data source (https://spark.apache.org/docs/latest/sql-data-sources.html) e post del blog di Databricks sulle best practice di partitioning. Recuperato il 2026-05-01.

Cerca