We’ve now seen both partitioning concepts: in-memory partitions controlled by repartition and coalesce, and on-disk partitioning via partitionBy. Both are great when the column you care about has low cardinality — year, month, country. Both fall over when the column is high-cardinality — user_id, account_id, transaction_id.
But high-cardinality columns are exactly the ones you join on. If your fact table has 500 million orders and you join it to a dimension table on user_id ten times a day, every one of those joins triggers a shuffle of half a billion rows. Even with healthy partitions and no skew, you’re paying tens of seconds of shuffle work per query.
There’s a third tool, and it’s the one almost nobody uses: bucketing. This lesson is about why bucketing exists, what it does, when it’s worth the operational cost, and the footgun that catches everyone the first time.
The idea
When you bucket a table, you tell Spark at write time to hash-partition each input partition into a fixed number of buckets, and to record the bucket spec — the column, the bucket count, the hash function — in the table metadata.
(orders
.write
.bucketBy(64, "user_id")
.sortBy("user_id") # optional but recommended
.saveAsTable("warehouse.orders_bucketed"))
Two things changed compared to a normal partitionBy write:
- The output files are named by bucket. Each task that writes data produces 64 files:
part-00000-...-bucket=0.snappy.parquet,bucket=1, …,bucket=63. Every row whosehash(user_id) % 64 == 17lands inbucket=17, regardless of which input partition it came from. - You used
saveAsTable, notsave. This is non-negotiable for bucketing. The bucket spec lives in the metastore (Hive metastore, AWS Glue, Databricks Unity Catalog — whatever you’ve configured). Spark looks it up at read time. Without a metastore-backed table, the layout is just a pile of Parquet files on disk and Spark has no way to know what they mean.
That second point is the main reason bucketing is underused: a lot of teams write Parquet to plain S3 paths and don’t have a metastore. We’ll come back to that.
The bucket join optimization
The whole reason bucketing exists is the join optimization. Imagine two tables, both bucketed on user_id with 64 buckets:
(orders.write
.bucketBy(64, "user_id")
.sortBy("user_id")
.saveAsTable("warehouse.orders_bucketed"))
(users.write
.bucketBy(64, "user_id")
.sortBy("user_id")
.saveAsTable("warehouse.users_bucketed"))
Now you join them:
joined = (spark.table("warehouse.orders_bucketed")
.join(spark.table("warehouse.users_bucketed"), "user_id"))
joined.explain()
# == Physical Plan ==
# *(3) Project [...]
# +- *(3) SortMergeJoin [user_id#...], [user_id#...], Inner
# :- *(1) Sort [user_id#... ASC NULLS FIRST], false, 0
# : +- *(1) Filter isnotnull(user_id#...)
# : +- *(1) ColumnarToRow
# : +- FileScan parquet warehouse.orders_bucketed[...] Bucketed: true, ...
# +- *(2) Sort [user_id#... ASC NULLS FIRST], false, 0
# +- *(2) Filter isnotnull(user_id#...)
# +- *(2) ColumnarToRow
# +- FileScan parquet warehouse.users_bucketed[...] Bucketed: true, ...
Notice what’s not in that plan: an Exchange node before the join. The shuffle has been eliminated. Each task reads bucket i from orders_bucketed and bucket i from users_bucketed — same hash function, same bucket count, so the matching keys are guaranteed to be in the same bucket. The sort-merge join runs locally per bucket. No network transfer of half a billion rows.
For comparison, the same join on un-bucketed tables would have an Exchange hashpartitioning(user_id, 200) on each side before the sort-merge. That’s the shuffle that bucketing eliminates. On a real fact-to-fact join over hundreds of millions of rows, the difference is minutes vs seconds.
When the optimization actually fires
Three conditions must be true for Spark to skip the shuffle:
- Both sides bucketed on the same columns. Bucket on
user_idfor one table and(user_id, date)for the other? No deal. - Both sides have the same bucket count. 64 vs 64 works. 64 vs 128 doesn’t (though Spark 3.x can sometimes coalesce them; don’t rely on it).
- Both sides use the same hash function. Within Spark this is automatic; if you’re sharing data with Hive or another system that hashes differently, the optimization breaks silently.
If any of those is wrong, Spark falls back to a regular shuffle as if you hadn’t bucketed at all. You don’t get an error, you just don’t get the speedup. Always check the physical plan after a bucketed join to confirm the Exchange is gone.
There’s also the sortBy recommendation. Bucketing alone hash-partitions the rows; adding .sortBy("user_id") writes them sorted within each bucket. Sort-merge joins (the default for big-to-big joins) need both sides sorted; if the input files are already sorted, Spark can skip the sort step too. Without sortBy, you save the shuffle but still pay for sorting at read time. Always sort when you bucket — same column, same effort, big win.
Why bucketing is underused
Bucketing has been in Spark since 2.0 and most teams have never used it. There are real reasons:
You need a metastore. bucketBy only works with saveAsTable. If your data lake is “Parquet files in S3, no Hive metastore, just point Spark at the path” — and a lot of them are — bucketing isn’t available without infrastructure work.
It used to be Hive-only. Until Spark 3.0, Spark could read bucketed tables that Hive wrote, but the file-system data source didn’t preserve the bucket-aware optimization on its own. The default Spark layout wasn’t optimized for bucket joins. This is fixed in 3.x but the reputation lingered for years.
Schemas change. Bucket counts can’t. This is the operational footgun. Once you’ve written a bucketed table with 64 buckets, that’s what it has. Want to change to 128? Full rewrite. Want to add a column to the bucket spec? Full rewrite. The bucket count is part of the physical layout, not metadata you can alter in place. For a 50 TB fact table, “full rewrite” is a multi-hour, multi-TB-of-S3-cost decision.
It interacts oddly with other features. Dynamic partition overwrite + bucketing has known issues. Some Delta Lake versions don’t support bucket joins (Iceberg has its own bucket transform). Streaming workloads with bucketed sinks are painful. Every “but” makes the value proposition murkier.
Teams default to broadcast joins. When one side of a join is small enough to broadcast, you don’t need bucketing — broadcast does the same job for free. Lesson 27 covers when broadcast applies. Bucketing is for the both-sides-are-big case, which is rarer than people think.
The result: bucketing sits in a niche where it’s the right answer perhaps 5% of the time, and most teams never get there.
When it is the right answer
The classic use case is a fact table joined repeatedly to one or two big dimensions on a high-cardinality key. Examples:
- Fraud detection. A 2-billion-row transactions table joined to a 200-million-row accounts table on
account_id, run every 15 minutes for streaming fraud scoring. - Recommendation systems. A user-events table joined to user-profile features on
user_id, run daily for training and hourly for serving. - Web analytics. A page-views table joined to a sessions table on
session_id, run dozens of times across different downstream queries.
The pattern: same join key, big on both sides, runs many times per day. Pay the bucketing cost once on write, save the shuffle cost on every subsequent read. Even at one save vs ten reads, you’re ahead.
If you’re joining once a week, just shuffle. If one side is small, broadcast. If your join keys differ across queries, you can’t bucket usefully (you’d need to bucket once per join column, which is rarely worth it).
A worked example
Build two tables that we’ll join repeatedly. We’ll use a small dataset for the demo:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = (SparkSession.builder
.appName("BucketingDemo")
.master("local[*]")
.config("spark.sql.warehouse.dir", "/tmp/warehouse")
.enableHiveSupport() # required for saveAsTable + bucketing
.getOrCreate())
# Fact table — 1M orders
orders = spark.range(0, 1_000_000).select(
F.col("id").alias("order_id"),
(F.col("id") % 100_000).alias("user_id"),
(F.rand() * 100).alias("total"),
)
# Dim table — 100K users
users = spark.range(0, 100_000).select(
F.col("id").alias("user_id"),
F.concat(F.lit("user_"), F.col("id")).alias("name"),
F.lit("IT").alias("country"),
)
spark.sql("CREATE DATABASE IF NOT EXISTS demo")
# Bucket both tables on user_id, 16 buckets, sorted within bucket
(orders.write
.mode("overwrite")
.bucketBy(16, "user_id")
.sortBy("user_id")
.saveAsTable("demo.orders_bucketed"))
(users.write
.mode("overwrite")
.bucketBy(16, "user_id")
.sortBy("user_id")
.saveAsTable("demo.users_bucketed"))
Now compare the join plans, bucketed vs unbucketed:
# Bucketed join — no Exchange
b = spark.table("demo.orders_bucketed").join(
spark.table("demo.users_bucketed"), "user_id")
b.explain()
# Unbucketed equivalent — has Exchange
u = orders.join(users, "user_id")
u.explain()
In the bucketed plan you’ll see Bucketed: true on each FileScan and no Exchange between the scans and the SortMergeJoin. In the unbucketed plan there’s an Exchange hashpartitioning(user_id, 200) on each side. The bucketed version is doing the same join with significantly less I/O and CPU.
Some Spark versions need an explicit config to enable the optimization:
spark.conf.set("spark.sql.sources.bucketing.enabled", "true")
spark.conf.set("spark.sql.sources.bucketing.autoBucketedScan.enabled", "true")
Both default to true on recent versions, but if you don’t see the speedup, check these first.
Layering with partitionBy
Bucketing and partitioning aren’t mutually exclusive. You can do both, and for a table that’s queried by date and joined on user, both is the right answer:
(orders.write
.mode("overwrite")
.partitionBy("year", "month")
.bucketBy(64, "user_id")
.sortBy("user_id")
.saveAsTable("warehouse.orders"))
Disk layout becomes year=2024/month=3/part-00000-bucket=17.snappy.parquet. A query that filters on year and month gets partition pruning. A query that joins on user_id gets the bucket-join optimization. Queries that do both — most of the interesting ones — get both.
The cost: you’ve now got (num_partitions × num_buckets) files per write, which can be a lot. For 24 months × 64 buckets, that’s 1,536 files even for a small dataset. Tune the bucket count down for small datasets, up for huge ones.
Run this on your own machine
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = (SparkSession.builder
.appName("BucketingDemo")
.master("local[*]")
.config("spark.sql.warehouse.dir", "/tmp/warehouse")
.enableHiveSupport()
.getOrCreate())
spark.sql("CREATE DATABASE IF NOT EXISTS demo")
orders = spark.range(0, 100_000).select(
F.col("id").alias("order_id"),
(F.col("id") % 10_000).alias("user_id"),
(F.rand() * 100).alias("total"),
)
users = spark.range(0, 10_000).select(
F.col("id").alias("user_id"),
F.concat(F.lit("user_"), F.col("id")).alias("name"),
)
(orders.write
.mode("overwrite")
.bucketBy(8, "user_id")
.sortBy("user_id")
.saveAsTable("demo.orders_b"))
(users.write
.mode("overwrite")
.bucketBy(8, "user_id")
.sortBy("user_id")
.saveAsTable("demo.users_b"))
# Bucketed join — confirm no Exchange in the plan
spark.table("demo.orders_b").join(spark.table("demo.users_b"), "user_id").explain()
# For comparison — DataFrame join with shuffle
orders.join(users, "user_id").explain()
# Listing files shows the bucket layout
import os
for path, _, files in os.walk("/tmp/warehouse/demo.db/orders_b"):
for f in files:
if f.endswith(".parquet"):
print(os.path.join(path, f))
Compare the two .explain() outputs. The bucketed one says Bucketed: true and has no Exchange. The unbucketed one has the shuffle. That’s the whole point of the chapter, in two physical plans.
That closes Module 6. You now have a complete mental model for partitioning across both axes — in-memory and on-disk — plus the join-time optimization that bucketing buys. Module 7 starts next lesson and goes one level deeper: Spark SQL and Catalyst, the optimizer that turns your DataFrame code into the physical plans we’ve been reading. Once you understand how Catalyst rewrites your code, you can predict (and influence) most of these decisions before you press run.
References: Apache Spark SQL data sources documentation (https://spark.apache.org/docs/latest/sql-data-sources.html) and Databricks engineering posts on bucketing and Delta Lake. Retrieved 2026-05-01.