The shuffle is Spark’s most expensive operation. When you join two big DataFrames, every row from each side has to be hashed by the join key and sent over the network to the executor responsible for that hash bucket. Tens of gigabytes flying between machines, written and read from local disk in between. A join that takes ten minutes might be spending eight of those on shuffle.
A broadcast join skips that whole circus. If one side of the join is small enough — really small, fits-in-memory small — Spark sends the entire small DataFrame to every executor in the cluster. Each executor already has its slice of the big DataFrame. Now every executor can join its slice against the small side completely locally. No shuffle. No network for the big side. The big side never moves.
This is the single biggest performance optimization in Spark joins, and it happens automatically — most of the time. This lesson is about when it happens automatically, how to force it when Spark misses it, and when not to force it because you’ll OOM the driver.
The picture
The standard join (sort-merge or shuffle hash) looks like this:
big_df (300 GB) small_df (50 MB)
| |
[shuffle by key] [shuffle by key]
| |
\________ ___________/
\/
[executors do local joins]
Two shuffles. 300 GB of network traffic plus 50 MB of network traffic, mostly the former.
A broadcast join looks like this:
small_df (50 MB)
|
[collect to driver]
|
[send to every executor]
|
big_df (300 GB) ---------------+
| |
+---- local join on each executor's slice
The big side never shuffles. The small side travels exactly once per executor. If you have a 50 MB lookup table and a 300 GB fact table, broadcast is somewhere between 10x and 100x faster.
When Spark picks broadcast automatically
Spark has a config called spark.sql.autoBroadcastJoinThreshold. Default: 10485760 (10 MB). Any DataFrame whose estimated size is below this threshold is eligible to be broadcast.
spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
# '10485760'
10 MB is conservative. Most production clusters bump this to 100 MB, 200 MB, sometimes 1 GB, depending on driver and executor memory:
# 200 MB threshold — typical for a cluster with 16 GB executors
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 200 * 1024 * 1024)
The trick is the word estimated. Spark estimates the size of a DataFrame from its statistics if they exist (you have to run ANALYZE TABLE for those), or from the file size if it’s reading a file directly, or from the input size if it’s the result of an upstream operation. It does not run the query to find out. This is the source of most “why didn’t Spark broadcast this?” surprises.
A 5 GB Parquet file that you filter down to 50 MB is, in Spark’s eyes, still a 5 GB DataFrame for join planning purposes. The filter happens at runtime; the planner makes its choice ahead of time. So the heavy filter doesn’t trigger automatic broadcast even though the actual joined data would fit easily.
The fix is the broadcast hint.
The broadcast hint
from pyspark.sql.functions import broadcast
big_facts.join(broadcast(small_dim), on="dim_id", how="inner")
broadcast(small_dim) tells Spark: “trust me, this side is small, broadcast it.” Spark obliges, no questions asked, even if its own size estimate says otherwise.
This is the single most useful PySpark function for join performance. If you’ve identified a hot join and one side is genuinely small at runtime — even if it doesn’t look small statically — drop a broadcast() around it and watch the job time fall.
A small concrete example. I’ll set up a fact table of orders and a dim table of countries, run the join two ways, and look at the plan.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import broadcast
spark = (SparkSession.builder
.appName("BroadcastJoins")
.master("local[*]")
.config("spark.sql.shuffle.partitions", "8")
.getOrCreate())
# Pretend this is huge — millions of rows
orders = spark.createDataFrame(
[(i, f"IT" if i % 3 == 0 else "NL" if i % 3 == 1 else "DE", i * 1.5)
for i in range(10_000)],
"order_id INT, country STRING, total DOUBLE",
)
# Tiny lookup table — three rows
countries = spark.createDataFrame(
[("IT", "Italy", 22.0),
("NL", "Netherlands", 21.0),
("DE", "Germany", 19.0)],
"country STRING, country_name STRING, vat_rate DOUBLE",
)
# Without explicit hint — at this size Spark picks broadcast on its own
orders.join(countries, on="country").explain()
The plan output (trimmed to the relevant bits) will read something like:
== Physical Plan ==
*(2) BroadcastHashJoin [country#5], [country#15], Inner, BuildRight
:- *(2) Filter isnotnull(country#5)
: +- *(2) Scan ExistingRDD[order_id#4, country#5, total#6]
+- BroadcastExchange HashedRelationBroadcastMode([country#15])
+- *(1) Filter isnotnull(country#15)
+- *(1) Scan ExistingRDD[country#15, country_name#16, vat_rate#17]
The two words to look for: BroadcastHashJoin and BroadcastExchange. If you see those, the small side is being broadcast. If you see SortMergeJoin instead, the small side is being shuffled.
Now disable the auto-threshold and re-run:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
orders.join(countries, on="country").explain()
# == Physical Plan ==
# *(5) SortMergeJoin [country#5], [country#15], Inner
# :- *(2) Sort [country#5 ASC NULLS FIRST], false, 0
# : +- Exchange hashpartitioning(country#5, 8) ...
# ...
Same join, different plan. Both shuffle because broadcast is off. Tens of seconds vs sub-second on real data.
Force the broadcast back even with the threshold off:
orders.join(broadcast(countries), on="country").explain()
# Back to BroadcastHashJoin.
The hint wins over the threshold. The hint is the override.
The opposite hint: forcing a sort-merge
There are cases where Spark wants to broadcast and you’d rather it didn’t — usually because Spark’s size estimate is too low and broadcasting will OOM. The hint is merge:
df1.join(df2.hint("merge"), on="key").explain()
# Forces SortMergeJoin
Or shuffle_hash if you specifically want the shuffle hash join strategy:
df1.join(df2.hint("shuffle_hash"), on="key")
You’ll reach for these less often than broadcast, but when Spark gets the strategy wrong, knowing all three hints exist matters.
How to disable broadcast globally
Sometimes you’re debugging a flaky job and want to take broadcast off the table:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
-1 means “never auto-broadcast.” Explicit broadcast() hints still work. This is useful for testing whether the automatic broadcast is causing issues, but you should not leave it set to -1 in production — you’d be giving up the single best join optimization Spark has.
The failure mode: driver OOM
The thing that makes broadcast joins fast — sending the whole small side to every executor — is also the thing that makes them dangerous. The mechanism is:
- Spark calls
.collect()on the small DataFrame, pulling all of its rows back to the driver. - The driver materializes the whole thing in memory.
- The driver ships it out to every executor over the network.
If the “small” side is actually 4 GB and your driver has 2 GB of memory, step 2 OOMs. The driver dies. The job dies. You see a stack trace ending in something like OutOfMemoryError: Java heap space or Total size of serialized results of N tasks (X GB) is bigger than spark.driver.maxResultSize.
The fix is either:
- Don’t broadcast that DataFrame. Drop the hint, let Spark sort-merge.
- Increase driver memory (
spark.driver.memory) andspark.driver.maxResultSize. - Filter or aggregate the small side down before broadcasting.
Rule of thumb: never broadcast something larger than ~1 GB even on a beefy driver. The 100-200 MB range is the comfortable zone. Anything bigger and you’re flirting with OOM and also losing the broadcast advantage — at that size the network cost of replicating to N executors starts to rival the cost of just shuffling.
Two practical patterns
Pattern 1: filter, then broadcast. If you know the small side will be tiny after a filter, materialize it and hint:
active_users = (users
.where(F.col("status") == "active")
.select("user_id", "name", "tier"))
events.join(broadcast(active_users), on="user_id", how="left")
If active_users is 5% of users and the original was 200 MB, you’re broadcasting 10 MB. Easy win.
Pattern 2: broadcast a dim, sort-merge a fact. The “star schema” pattern. Big fact table, small dim tables. Broadcast every dim:
(facts
.join(broadcast(dim_country), on="country_id")
.join(broadcast(dim_product), on="product_id")
.join(broadcast(dim_date), on="date_id"))
Each dim avoids a shuffle. Three joins, zero shuffles on the fact side. This is the bread-and-butter optimization in any analytics workload.
What broadcast can’t fix
Broadcast joins solve the “one side is small” problem. They don’t solve the “one key is huge” problem. If your big DataFrame has 100 million rows for country = 'US' and 1 million rows for everything else combined, no amount of broadcasting helps — the fact side still gets processed locally on each executor, and one executor still ends up with all the US rows. That’s data skew. That’s lesson 28.
Reading .explain() like an inspector
Once you start hunting performance issues, .explain() becomes a daily tool. The default output is text only; .explain(True) gives you the parsed, analyzed, optimized, and physical plans all at once. For broadcast questions, the physical plan is what matters. The keywords to grep for:
BroadcastHashJoin— fast path, broadcast worked.BroadcastNestedLoopJoin— broadcast worked but the join condition isn’t an equality, so Spark is doing a per-row check. Slow if the broadcast side is non-trivial.SortMergeJoin— both sides shuffled and sorted. The default for big-vs-big joins.ShuffleHashJoin— both sides shuffled, the smaller side hashed in memory. Spark picks this less often by default; you can force it withdf.hint("shuffle_hash").BroadcastExchange— the small side being collected and broadcast. Always paired withBroadcastHashJoin(orBroadcastNestedLoopJoin).
If you ran your job, expected a broadcast, and .explain() shows SortMergeJoin, the most likely culprit is the size estimate — the planner thinks the small side is bigger than the threshold. Either bump the threshold, fix the statistics, or add the explicit broadcast() hint.
A small AQE note
Spark 3.x’s Adaptive Query Execution can convert a SortMergeJoin into a BroadcastHashJoin at runtime if the actual size of one side turns out to be small enough after the initial stages run. This is a partial answer to the “size estimate was wrong” problem — Spark gets a second look once the data has materialized. AQE is on by default in Spark 3.2+, but the broadcast threshold for AQE conversion is a separate setting (spark.sql.adaptive.autoBroadcastJoinThreshold). Lesson 59 covers AQE in full; for now, just know that “Spark didn’t broadcast at plan time” doesn’t necessarily mean “Spark won’t broadcast at all.”
Catalyst, Spark’s query optimizer, makes most of these decisions for you when it has good statistics. Lesson 41 covers Catalyst end-to-end and explains how to read its plans. For now, broadcast hint when you know the small side is small, mind the threshold, and don’t broadcast a 4 GB DataFrame off a 2 GB driver.
References: Apache Spark SQL documentation on join hints and autoBroadcastJoinThreshold; Databricks engineering blog posts on broadcast joins and join-strategy selection. Retrieved 2026-05-01.