PySpark, from the ground up Lesson 34 / 60

Partitioned writes: directory layout, predicate pushdown, and when to do it

Hive-style partition columns on disk, how Spark uses them at read time to skip files, and the cardinality trap to avoid.

So far in this module we’ve talked about partitions as an in-memory thing — chunks of rows that Spark hands to tasks. Today is about the other partitioning, the one that lives on disk and survives between jobs: partitioned writes.

Partitioned writes are the single biggest “free” speedup in a data lake. A correctly partitioned dataset turns “read 200 GB and filter to 2 GB” into “read 2 GB” — Spark literally skips the rest at the file-system level, before any data is read into memory. Done well, queries that used to take five minutes finish in seconds.

Done badly, you get tens of thousands of tiny files in tens of thousands of directories and the listing overhead alone makes your job slower than no partitioning at all. The line between the two outcomes is one decision: which column to partition by.

What partitionBy does on disk

When you write a DataFrame with partitionBy, Spark organizes the output into a directory tree where each partition column becomes a directory level:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = (SparkSession.builder
         .appName("PartitionedWrites")
         .master("local[*]")
         .getOrCreate())

orders = spark.createDataFrame(
    [
        (1, "2024-03-15", 59.0,  "IT"),
        (2, "2024-03-15", 29.0,  "IT"),
        (3, "2024-04-02", 149.0, "NL"),
        (4, "2024-04-02", 89.5,  "NL"),
        (5, "2025-01-08", 12.0,  "DE"),
    ],
    "order_id INT, dt STRING, total DOUBLE, country STRING",
).withColumn("dt", F.to_date("dt")) \
 .withColumn("year",  F.year("dt")) \
 .withColumn("month", F.month("dt"))

(orders.write
 .mode("overwrite")
 .partitionBy("year", "month")
 .parquet("/tmp/orders"))

On disk you now have:

/tmp/orders/
├── _SUCCESS
├── year=2024/
│   ├── month=3/
│   │   └── part-00000-...-c000.snappy.parquet
│   └── month=4/
│       └── part-00000-...-c000.snappy.parquet
└── year=2025/
    └── month=1/
        └── part-00000-...-c000.snappy.parquet

Two things to notice. First, the partition columns (year, month) are in the directory names, not in the Parquet files themselves. Spark reconstructs them from the path at read time — saves storage and they’re effectively free to read. Second, the directory naming is column=value, called Hive-style partitioning. Almost every query engine in the data world reads this layout: Spark, Hive, Presto/Trino, Athena, BigQuery via external tables, DuckDB, Polars. It’s a de facto standard.

Predicate pushdown: the payoff

Now read the dataset back with a filter on a partition column:

read = spark.read.parquet("/tmp/orders")
filtered = read.where((F.col("year") == 2024) & (F.col("month") == 3))
filtered.show()
filtered.explain()
# == Physical Plan ==
# *(1) ColumnarToRow
# +- FileScan parquet [order_id#..., dt#..., total#..., country#..., year#..., month#...]
#       Batched: true,
#       DataFilters: [],
#       Format: Parquet,
#       Location: InMemoryFileIndex[file:/tmp/orders],
#       PartitionFilters: [isnotnull(year#...), isnotnull(month#...), (year#...= 2024), (month#...= 3)],
#       PushedFilters: [],
#       ReadSchema: ...

The line that matters is PartitionFilters. Spark recognized that year and month are partition columns, applied the filter at the directory-listing level, and read only the matching directory. The other directories are not opened, not listed past the prefix, not touched. This is partition pruning and it’s the whole point.

Try a non-partition filter:

read.where(F.col("country") == "IT").explain()
# PartitionFilters: []
# PushedFilters: [IsNotNull(country), EqualTo(country,IT)]

country is a regular column inside the Parquet files, not a directory. The filter still gets pushed down to the Parquet reader (PushedFilters) — Parquet itself can skip row groups whose statistics say “this row group has no IT rows” — but every file is at least opened. Compared to skipping the file entirely, this is a much smaller win.

The hierarchy:

  1. PartitionFilters — skip files at the directory level. Best.
  2. PushedFilters — open the file, but skip row groups inside. Good.
  3. No filter pushdown — read the whole file, filter in Spark. Slowest.

Partitioning gets you the first one for the columns you partition on. Choose those columns wisely.

The cardinality trap

The most common mistake people make with partitionBy is partitioning by a high-cardinality column.

Imagine the orders table from above, but you decide to partition by order_id:

# Don't do this!
orders.write.mode("overwrite").partitionBy("order_id").parquet("/tmp/orders-bad")

Five orders, five directories. With a million orders you’d have a million directories. Most of them contain a single file with a few rows. The downsides:

  • Listing is slow. When Spark opens the dataset, it has to enumerate every directory to build its file index. A million directories means a million LIST calls on S3 (or a million readdirs on local disk). Listing alone can take longer than reading the data.
  • Each file is tiny. Parquet has fixed per-file overhead — magic bytes, footer, column metadata. A 2 KB Parquet file is 80% overhead. You lose all the columnar benefits.
  • Joins and broadcasts get worse. Spark estimates table size from file count and per-file size. Bad estimates lead to bad join plans.
  • Cloud storage charges per request. Listing and opening a million tiny files on S3 isn’t just slow, it’s expensive.

A good partition column has each partition value holding hundreds of MB to a few GB of data. For a typical e-commerce dataset:

ColumnCardinalityGood partition column?
year~5–20Yes
year, month~60–240Yes
year, month, day~1000–7000Maybe — depends on volume
country~200Yes if traffic is balanced; risky if concentrated
user_idmillionsNo
order_idmillionsNo
transaction_idmillions–billionsAbsolutely not

The “depends on volume” caveat is real. partitionBy("year", "month", "day") is great if you have hundreds of MB per day. It’s a disaster if you have ten orders per day — you end up with 1,000 days × 10 rows × tiny file. Run the math before you commit.

A useful rule of thumb: target between 100 MB and 1 GB per partition file. Below that you’re paying overhead; above that you’re losing parallelism. Combine that with the bullet-point rule “if you’d never write a WHERE clause on this column, don’t partition by it” and you’ll avoid most mistakes.

Combining partitionBy with column-level pruning

partitionBy and Parquet’s internal column pruning are complementary, not alternatives. You partition by the columns that show up in WHERE for most queries, and Parquet handles the rest.

A typical analytics fact table at a real company:

(fact_orders.write
 .mode("overwrite")
 .partitionBy("year", "month")        # date-based, low cardinality, frequent filter
 .parquet("/data/warehouse/fact_orders"))

A typical query:

(spark.read.parquet("/data/warehouse/fact_orders")
 .where((F.col("year") == 2024) & (F.col("month") == 3) & (F.col("country") == "IT"))
 .agg(F.sum("total")))

What happens:

  1. Partition pruning drops every directory except year=2024/month=3/. From maybe 200 partitions down to 1.
  2. Parquet’s column projection reads only the total and country columns from those files (not all 30 columns of the fact table).
  3. Parquet’s predicate pushdown uses row-group min/max statistics to skip row groups in country != IT regions inside the file.
  4. Spark evaluates the residual filter and aggregates.

That layered pruning is what makes a 200 GB warehouse fact table answer “March 2024 Italy revenue” in 200 ms.

Overwrite modes and partitioned writes

A small but important detail. With partitionBy, the meaning of write modes is subtle:

df.write.mode("overwrite").partitionBy("year").parquet("/data/orders")

By default this nukes the entire /data/orders directory and writes only the partitions present in df. If df only contains year=2024 rows, you’ve just deleted year=2023 and earlier. People learn this lesson the hard way exactly once.

The fix is dynamic partition overwrite mode:

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
df.write.mode("overwrite").partitionBy("year").parquet("/data/orders")

Now Spark only overwrites the partitions that appear in df. year=2024 gets replaced; year=2023 is left alone. If you do incremental writes by date — most pipelines do — set this config in your job and never think about it again.

When not to partition

Sometimes the answer is no partitionBy at all. Three cases:

  1. Small datasets. Below a few GB, partition pruning saves you nothing meaningful. A single Parquet file or a few of them is fine.
  2. Datasets you always read in full. Daily aggregations that read every row anyway — partitioning adds metadata overhead without saving any reads.
  3. High-cardinality keys for joins. Partitioning by user_id is wrong; but you do want to organize the data by user_id so joins don’t shuffle. The right tool for that is bucketing, which is the next lesson.

That last bullet is the lead-in. Partitioning is for low-cardinality columns that show up in WHERE clauses. Bucketing is for high-cardinality columns that show up in JOIN ON clauses. Different tools, complementary uses, often layered on the same table.

Run this on your own machine

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = (SparkSession.builder
         .appName("PartitionedWrites")
         .master("local[*]")
         .getOrCreate())

orders = spark.range(0, 100_000).select(
    F.col("id").alias("order_id"),
    F.date_add(F.lit("2023-01-01"), (F.col("id") % 730).cast("int")).alias("dt"),
    (F.rand() * 100).alias("total"),
)
orders = (orders
          .withColumn("year",  F.year("dt"))
          .withColumn("month", F.month("dt")))

# Good: low-cardinality partition columns
(orders.write
 .mode("overwrite")
 .partitionBy("year", "month")
 .parquet("/tmp/orders-good"))

# Read back and confirm partition pruning
read = spark.read.parquet("/tmp/orders-good")
read.where((F.col("year") == 2024) & (F.col("month") == 6)).explain()
# Look for PartitionFilters: [...year = 2024, month = 6...]

# Look at the directory tree
import os
for path, dirs, files in os.walk("/tmp/orders-good"):
    for f in files:
        if f.endswith(".parquet"):
            print(os.path.join(path, f))

# Bad: high-cardinality partition column (run on a small slice only!)
small = orders.limit(1000)
(small.write
 .mode("overwrite")
 .partitionBy("order_id")
 .parquet("/tmp/orders-bad"))

# Count the directories
print(sum(1 for _ in os.walk("/tmp/orders-bad")))   # ~1000

Look at the PartitionFilters line for the good case. Then look at the file count for the bad case — that’s a thousand directories with one tiny file each. Imagine doing that with a million rows.

Next lesson is partitioning under the hood — how Spark decides on partition counts and how to size them. After that, lesson 36 covers bucketing, the answer to “I want partitioning, but my key is high-cardinality.”


References: Apache Spark SQL data sources documentation (https://spark.apache.org/docs/latest/sql-data-sources.html) and Databricks blog posts on partitioning best practices. Retrieved 2026-05-01.

Search