Finora in questo modulo abbiamo parlato delle partizioni come di una cosa in memoria: blocchi di righe che Spark passa ai task. Oggi parliamo dell’altro partitioning, quello che vive su disco e sopravvive tra un job e l’altro: le scritture partizionate.
Le scritture partizionate sono lo speedup “gratuito” più grosso in un data lake. Un dataset partizionato correttamente trasforma “leggi 200 GB e filtra a 2 GB” in “leggi 2 GB”: Spark salta letteralmente il resto a livello di file system, prima che qualunque dato venga letto in memoria. Fatto bene, query che prima impiegavano cinque minuti finiscono in secondi.
Fatto male, ti ritrovi con decine di migliaia di file minuscoli in decine di migliaia di directory e il solo overhead di listing rende il job più lento di nessun partitioning. La linea tra i due esiti è una sola decisione: per quale colonna partizionare.
Cosa fa partitionBy su disco
Quando scrivi un DataFrame con partitionBy, Spark organizza l’output in un albero di directory in cui ogni colonna di partizione diventa un livello di directory:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = (SparkSession.builder
.appName("PartitionedWrites")
.master("local[*]")
.getOrCreate())
orders = spark.createDataFrame(
[
(1, "2024-03-15", 59.0, "IT"),
(2, "2024-03-15", 29.0, "IT"),
(3, "2024-04-02", 149.0, "NL"),
(4, "2024-04-02", 89.5, "NL"),
(5, "2025-01-08", 12.0, "DE"),
],
"order_id INT, dt STRING, total DOUBLE, country STRING",
).withColumn("dt", F.to_date("dt")) \
.withColumn("year", F.year("dt")) \
.withColumn("month", F.month("dt"))
(orders.write
.mode("overwrite")
.partitionBy("year", "month")
.parquet("/tmp/orders"))
Su disco adesso hai:
/tmp/orders/
├── _SUCCESS
├── year=2024/
│ ├── month=3/
│ │ └── part-00000-...-c000.snappy.parquet
│ └── month=4/
│ └── part-00000-...-c000.snappy.parquet
└── year=2025/
└── month=1/
└── part-00000-...-c000.snappy.parquet
Due cose da notare. Primo, le colonne di partizione (year, month) sono nei nomi delle directory, non nei file Parquet stessi. Spark le ricostruisce dal path al momento della lettura: risparmia spazio e di fatto sono gratis da leggere. Secondo, la convenzione di naming delle directory è colonna=valore, chiamata partitioning in stile Hive. Quasi tutti i query engine del mondo dei dati leggono questo layout: Spark, Hive, Presto/Trino, Athena, BigQuery via tabelle esterne, DuckDB, Polars. È uno standard de facto.
Predicate pushdown: il guadagno
Adesso rileggi il dataset con un filtro su una colonna di partizione:
read = spark.read.parquet("/tmp/orders")
filtered = read.where((F.col("year") == 2024) & (F.col("month") == 3))
filtered.show()
filtered.explain()
# == Physical Plan ==
# *(1) ColumnarToRow
# +- FileScan parquet [order_id#..., dt#..., total#..., country#..., year#..., month#...]
# Batched: true,
# DataFilters: [],
# Format: Parquet,
# Location: InMemoryFileIndex[file:/tmp/orders],
# PartitionFilters: [isnotnull(year#...), isnotnull(month#...), (year#...= 2024), (month#...= 3)],
# PushedFilters: [],
# ReadSchema: ...
La riga che conta è PartitionFilters. Spark ha riconosciuto che year e month sono colonne di partizione, ha applicato il filtro a livello di listing delle directory, e ha letto solo la directory corrispondente. Le altre directory non vengono aperte, non vengono listate oltre il prefisso, non vengono toccate. Questo è il partition pruning ed è tutto il punto.
Prova un filtro non-partizione:
read.where(F.col("country") == "IT").explain()
# PartitionFilters: []
# PushedFilters: [IsNotNull(country), EqualTo(country,IT)]
country è una colonna normale dentro i file Parquet, non una directory. Il filtro viene comunque pushato giù al lettore Parquet (PushedFilters): Parquet stesso può saltare row group le cui statistiche dicono “questo row group non ha righe IT”, ma ogni file viene almeno aperto. Rispetto a saltare il file del tutto, è un guadagno molto più piccolo.
La gerarchia:
PartitionFilters: saltare i file a livello di directory. Il meglio.PushedFilters: aprire il file, ma saltare i row group dentro. Buono.- Nessun pushdown del filtro: leggere tutto il file, filtrare in Spark. Il più lento.
Il partitioning ti regala il primo per le colonne su cui partizioni. Sceglile sagge.
La trappola della cardinalità
L’errore più comune che la gente fa con partitionBy è partizionare per una colonna ad alta cardinalità.
Immagina la tabella degli ordini di prima, ma decidi di partizionare per order_id:
# Don't do this!
orders.write.mode("overwrite").partitionBy("order_id").parquet("/tmp/orders-bad")
Cinque ordini, cinque directory. Con un milione di ordini avresti un milione di directory. La maggior parte contiene un singolo file con poche righe. Gli svantaggi:
- Il listing è lento. Quando Spark apre il dataset, deve enumerare ogni directory per costruire il suo file index. Un milione di directory significa un milione di chiamate
LISTsu S3 (o un milione direaddirsu disco locale). Il solo listing può impiegare più tempo della lettura dei dati. - Ogni file è minuscolo. Parquet ha un overhead fisso per file: magic byte, footer, metadati di colonna. Un file Parquet da 2 KB è 80% overhead. Perdi tutti i benefici del formato colonnare.
- Join e broadcast peggiorano. Spark stima la dimensione della tabella dal numero di file e dalla dimensione per file. Stime cattive portano a piani di join cattivi.
- Lo storage cloud fa pagare per richiesta. Listare e aprire un milione di file minuscoli su S3 non è solo lento, è costoso.
Una buona colonna di partizione ha ogni valore di partizione che contiene da centinaia di MB a qualche GB di dati. Per un tipico dataset e-commerce:
| Colonna | Cardinalità | Buona colonna di partizione? |
|---|---|---|
year | ~5-20 | Sì |
year, month | ~60-240 | Sì |
year, month, day | ~1000-7000 | Forse, dipende dal volume |
country | ~200 | Sì se il traffico è bilanciato; rischioso se concentrato |
user_id | milioni | No |
order_id | milioni | No |
transaction_id | milioni-miliardi | Assolutamente no |
L’avvertenza “dipende dal volume” è reale. partitionBy("year", "month", "day") è ottimo se hai centinaia di MB al giorno. È un disastro se hai dieci ordini al giorno: ti ritrovi con 1.000 giorni × 10 righe × file minuscolo. Fa’ i conti prima di impegnarti.
Una regola empirica utile: mira tra 100 MB e 1 GB per file di partizione. Sotto stai pagando overhead; sopra stai perdendo parallelismo. Combinala con la regola “se non scriveresti mai una WHERE su questa colonna, non partizionarci sopra” ed eviterai la maggior parte degli errori.
Combinare partitionBy con il pruning a livello di colonna
partitionBy e il pruning interno di Parquet sulle colonne sono complementari, non alternative. Partizioni per le colonne che compaiono nel WHERE per la maggior parte delle query, e Parquet gestisce il resto.
Una tipica fact table di analytics in un’azienda reale:
(fact_orders.write
.mode("overwrite")
.partitionBy("year", "month") # date-based, low cardinality, frequent filter
.parquet("/data/warehouse/fact_orders"))
Una query tipica:
(spark.read.parquet("/data/warehouse/fact_orders")
.where((F.col("year") == 2024) & (F.col("month") == 3) & (F.col("country") == "IT"))
.agg(F.sum("total")))
Cosa succede:
- Il partition pruning scarta ogni directory eccetto
year=2024/month=3/. Da forse 200 partizioni a 1. - La proiezione di colonna di Parquet legge solo le colonne
totalecountryda quei file (non tutte le 30 colonne della fact table). - Il predicate pushdown di Parquet usa le statistiche min/max dei row group per saltare row group nelle regioni
country != ITdentro il file. - Spark valuta il filtro residuo e aggrega.
Quel pruning a strati è ciò che fa rispondere a una fact table di warehouse da 200 GB la query “fatturato Italia marzo 2024” in 200 ms.
Modalità di overwrite e scritture partizionate
Un dettaglio piccolo ma importante. Con partitionBy, il significato delle modalità di scrittura è sottile:
df.write.mode("overwrite").partitionBy("year").parquet("/data/orders")
Per default questo distrugge l’intera directory /data/orders e scrive solo le partizioni presenti in df. Se df contiene solo righe year=2024, hai appena cancellato year=2023 e precedenti. La gente impara questa lezione nel modo duro esattamente una volta.
La soluzione è la modalità di overwrite dinamica delle partizioni:
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
df.write.mode("overwrite").partitionBy("year").parquet("/data/orders")
Adesso Spark sovrascrive solo le partizioni che compaiono in df. year=2024 viene rimpiazzato; year=2023 viene lasciato in pace. Se fai scritture incrementali per data, come la maggior parte delle pipeline, imposta questa config nel tuo job e non pensarci più.
Quando non partizionare
A volte la risposta è non fare nessun partitionBy. Tre casi:
- Dataset piccoli. Sotto qualche GB, il partition pruning non ti fa risparmiare niente di significativo. Un singolo file Parquet o pochi vanno bene.
- Dataset che leggi sempre per intero. Aggregazioni giornaliere che leggono ogni riga comunque: il partitioning aggiunge overhead di metadati senza far risparmiare letture.
- Chiavi ad alta cardinalità per join. Partizionare per
user_idè sbagliato; ma vuoi comunque organizzare i dati peruser_idperché i join non facciano shuffle. Lo strumento giusto per quello è il bucketing, che è la prossima lezione.
Quell’ultimo punto è il pretesto per il prossimo argomento. Il partitioning è per colonne a bassa cardinalità che compaiono nelle clausole WHERE. Il bucketing è per colonne ad alta cardinalità che compaiono nelle clausole JOIN ON. Strumenti diversi, usi complementari, spesso a strati sulla stessa tabella.
Falla girare sulla tua macchina
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = (SparkSession.builder
.appName("PartitionedWrites")
.master("local[*]")
.getOrCreate())
orders = spark.range(0, 100_000).select(
F.col("id").alias("order_id"),
F.date_add(F.lit("2023-01-01"), (F.col("id") % 730).cast("int")).alias("dt"),
(F.rand() * 100).alias("total"),
)
orders = (orders
.withColumn("year", F.year("dt"))
.withColumn("month", F.month("dt")))
# Good: low-cardinality partition columns
(orders.write
.mode("overwrite")
.partitionBy("year", "month")
.parquet("/tmp/orders-good"))
# Read back and confirm partition pruning
read = spark.read.parquet("/tmp/orders-good")
read.where((F.col("year") == 2024) & (F.col("month") == 6)).explain()
# Look for PartitionFilters: [...year = 2024, month = 6...]
# Look at the directory tree
import os
for path, dirs, files in os.walk("/tmp/orders-good"):
for f in files:
if f.endswith(".parquet"):
print(os.path.join(path, f))
# Bad: high-cardinality partition column (run on a small slice only!)
small = orders.limit(1000)
(small.write
.mode("overwrite")
.partitionBy("order_id")
.parquet("/tmp/orders-bad"))
# Count the directories
print(sum(1 for _ in os.walk("/tmp/orders-bad"))) # ~1000
Guarda la riga PartitionFilters per il caso buono. Poi guarda il numero di file per il caso cattivo: sono mille directory con un file minuscolo ciascuna. Immagina di farlo con un milione di righe.
La prossima lezione è il partitioning sotto il cofano: come Spark decide il numero di partizioni e come dimensionarle. Dopo, la lezione 36 copre il bucketing, la risposta a “voglio il partitioning, ma la mia chiave è ad alta cardinalità”.
Riferimenti: documentazione di Apache Spark SQL sulle data source (https://spark.apache.org/docs/latest/sql-data-sources.html) e post del blog di Databricks sulle best practice di partitioning. Recuperato il 2026-05-01.