PySpark, from the ground up Lesson 56 / 60

Reading execution plans: .explain(True), parsed to physical

How to read every line of .explain() output, the operators that matter, and the optimizer steps that produce them.

The Spark UI tells you what happened after the fact. .explain() tells you what’s going to happen before you run a single task. Both matter — the UI confirms reality, .explain() lets you predict it. A senior Spark engineer reads plans the same way a DBA reads EXPLAIN ANALYZE from Postgres: with mild paranoia and an eye for the operators that don’t belong.

We touched plans back in lesson 41 when we covered Catalyst. This lesson is the production-grade version: reading every line, knowing which operators matter, and understanding the gap between what .explain() prints and what actually runs under Adaptive Query Execution.

What .explain prints

By default, .explain() shows only the physical plan — the one Spark will actually execute.

df.explain()
== Physical Plan ==
*(2) HashAggregate(keys=[country#42], functions=[sum(amount#15)])
+- Exchange hashpartitioning(country#42, 200), ENSURE_REQUIREMENTS, [plan_id=...]
   +- *(1) HashAggregate(keys=[country#42], functions=[partial_sum(amount#15)])
      +- *(1) Project [amount#15, country#42]
         +- *(1) BroadcastHashJoin [user_id#14], [user_id#41], Inner, BuildRight
            :- *(1) Filter (isnotnull(amount#15) AND (amount#15 > 40.0))
            :  +- *(1) Scan parquet orders[order_id#13,user_id#14,amount#15]
            :        PushedFilters: [IsNotNull(amount), GreaterThan(amount,40.0)]
            +- BroadcastExchange HashedRelationBroadcastMode(...)
               +- *(1) Filter isnotnull(user_id#41)
                  +- *(1) Scan parquet users[user_id#41,name#42,country#43]

.explain(True) (or .explain(extended=True)) shows all four phases: parsed, analyzed, optimized, and physical. That’s what you want when you’re debugging an unexpected plan, because seeing the optimization journey often tells you which rewrite Catalyst skipped or applied that you didn’t expect.

df.explain(True)

There’s also .explain("formatted"), which is much easier to read on long plans — it numbers the operators and prints a separate “details” block. And .explain("cost"), which adds row count and size stats from the optimizer’s cost model. We’ll come back to those.

Reading direction

Plans are printed top-down but they execute bottom-up. The leaves are the inputs (file scans, exchanges receiving shuffle data), the root is the action that triggered the plan.

When I read a plan, I read the leaves first. “What am I scanning, what columns, what filters got pushed down?” Then I trace upward: “what does Spark do to this data on its way to the action?”

The :- and +- characters draw the tree. Children of an operator are indented under it.

Operators worth memorizing

You don’t need to know every operator. You need to know these.

Scan parquet (or Scan orc, Scan json, etc.) — the file read. The line below it lists PushedFilters and PartitionFilters. The columns Spark actually reads are listed in brackets — that’s column pruning at work.

Project — column selection or expression evaluation. Should be cheap.

Filter — a row-level predicate. If you see a Filter right above a Scan parquet and the same predicate is also in PushedFilters, both are doing work — the source did its best, Spark cleans up what couldn’t be pushed. If the predicate isn’t in PushedFilters, it couldn’t be pushed (often because of a UDF or a complex expression) and Spark is filtering after reading every row.

HashAggregate — a hash-based aggregation (group by). Often appears twice in a plan around an Exchange: a partial aggregate before the shuffle, a final aggregate after. That two-step aggregation pattern is one of Spark’s most important optimizations — it’s why groupBy().count() doesn’t shuffle every input row.

Sort — a sort. Expensive if it’s not pushed into a SortMergeJoin’s required side.

Exchange hashpartitioning(...)the shuffle. Every shuffle is an Exchange. This is the most expensive operator in your plan, full stop. Count them. Each one is a network round-trip + a disk write + a disk read. Unexpected Exchanges are the number one performance bug.

BroadcastExchange — broadcasts a small dataset to every executor. Cheap if the dataset is genuinely small; the driver collects it and ships it. Usually appears under a BroadcastHashJoin.

BroadcastHashJoin — the cheap join. One side is broadcast, the other is hash-probed locally. Spark picks this when one side is below spark.sql.autoBroadcastJoinThreshold (10 MB by default).

SortMergeJoin — the safe default join. Both sides shuffled to the same partitioning, sorted, then merged. Two Exchanges below it.

ShuffleHashJoin — both sides shuffled, smaller side built into a hash table. Less common; AQE prefers it sometimes when sizes are middling.

BroadcastNestedLoopJointhe antipattern. O(n × m). Spark falls back to this when it can’t pick a hash strategy — usually because the join condition is a non-equi predicate (<, BETWEEN, a function call) and it has to compare every left row with every right row. If you see this in a plan over real-sized data, the job will not finish. Rewrite the condition to be an equi-join with a range filter on top.

WholeStageCodegen — not really an operator, more of a wrapper. The *(1), *(2) markers in front of operators tell you that those operators are fused into a single generated function at runtime. Tungsten’s bytecode generation. If you see *(1) on Filter, Project, HashAggregate — Spark wrote one big loop for all three that runs without virtual dispatch. The boundary between code-gen stages is usually a shuffle or an operator that doesn’t support codegen (some UDFs, certain Python operations). Two operators with the same *(N) are fused; different N, not.

The PushedFilters / PartitionFilters lines

Below a Scan parquet you’ll see something like:

PushedFilters: [IsNotNull(amount), GreaterThan(amount,40.0)]
PartitionFilters: [isnotnull(dt#17), (dt#17 = 2026-05-01)]

PushedFilters are pushed down to the file format reader. Parquet’s footer statistics let it skip row groups whose min/max excludes the predicate. This is column-level data skipping — your filter on amount > 40 lets Parquet skip whole chunks of file without decoding them.

PartitionFilters prune entire directories before any file is opened. If your data is partitioned by dt, and you filter on dt = '2026-05-01', that partition filter means Spark lists exactly one directory and ignores the rest. PartitionFilters are the cheapest filters in Spark — they happen at planning time.

What you want to see in a healthy plan: every selective filter you wrote shows up in PushedFilters or PartitionFilters. If a filter only shows up as a Filter operator above the scan, with nothing in PushedFilters, you’re reading the whole file and discarding rows. Common causes:

  • Type mismatch (dt = '2026-05-01' against a DATE column — needs a cast).
  • A UDF inside the filter — UDFs are opaque to the optimizer.
  • An expression the source can’t represent (regex against text, complex date math).

Fix the filter, watch it land in PushedFilters, watch the input bytes drop in the UI. Cause and effect.

A worked example

A small two-table join. We’ll explain it and walk through the phases.

from pyspark.sql import functions as F

orders = spark.read.parquet("/data/orders/")     # ~5 GB, partitioned by dt
users  = spark.read.parquet("/data/users/")      # ~30 MB

q = (orders
     .filter(F.col("dt") == "2026-05-01")
     .filter(F.col("amount") > 40)
     .join(users, "user_id")
     .groupBy("country")
     .agg(F.sum("amount").alias("total")))

q.explain(True)

.explain(True) prints four blocks. Skim them in this order:

Parsed Logical Plan — the literal AST of your code. Column references are unresolved ('amount, with the apostrophe meaning unresolved). Useful only when you want to confirm Spark parsed your query the way you wrote it. Skip in 99% of cases.

Analyzed Logical Plan — column references resolved to concrete columns with IDs (amount#15), types attached. If you have a typo in a column name, this is where it errors out. If you see column IDs you don’t recognize, those are aliases or auto-generated columns from the join.

Optimized Logical Plan — Catalyst has run all the rule-based rewrites. This is where you see what the optimizer actually did to your query. Common transformations to look for:

  • The two filters merged into one: Filter ((dt = '2026-05-01') AND (amount > 40)).
  • Columns pruned: only user_id, amount, country remain — the join uses user_id, the projection needs amount and country, the rest are gone.
  • Predicates pushed below joins: filters on orders happen before the join, not after.

If you wrote a filter and it doesn’t appear in the optimized plan, the optimizer dropped it as redundant or true. If you wrote a column you didn’t end up using, projection pruning removed it. Reading the optimized plan tells you what Catalyst kept.

Physical Plan — the executable plan. Walk it leaf-to-root:

  1. Scan parquet orders with PartitionFilters: [(dt = '2026-05-01')] and PushedFilters: [IsNotNull(amount), GreaterThan(amount, 40.0)]. Both filters landed in the source. Good.
  2. Filter above it — Spark double-checks the pushed predicate. Cheap.
  3. Scan parquet users — small, should be broadcast.
  4. BroadcastExchange — driver collects users, broadcasts.
  5. BroadcastHashJoin — orders probes the broadcast hash table. Excellent — no shuffle on the big side.
  6. Project — keep only amount, country.
  7. HashAggregate (partial) — partial sum within each task.
  8. Exchange hashpartitioning(country, 200) — shuffle by country.
  9. HashAggregate (final) — final sum per country.

One Exchange total. The big table never shuffled — the broadcast saved us. That’s the plan you want.

If instead you saw two Exchanges and a SortMergeJoin, the broadcast didn’t trigger — usually because the small side’s stats made it look bigger than spark.sql.autoBroadcastJoinThreshold. You can force it with F.broadcast(users) or raise the threshold.

.explain isn’t always the truth: AQE

Here’s the production reality: with Adaptive Query Execution on (which is the default in Spark 3.2+), the physical plan can be rewritten at runtime based on actual shuffle statistics. AQE can:

  • Coalesce small post-shuffle partitions (fewer tasks for trivial data).
  • Switch a SortMergeJoin to a BroadcastHashJoin if the post-shuffle size is small enough to broadcast.
  • Split skewed partitions into smaller subpartitions.

What .explain() prints is the pre-AQE physical plan — the plan as the static optimizer produced it. The plan that actually runs is in the SQL tab of the UI, where the operator graph reflects the AQE-rewritten plan with AQEShuffleRead boxes and any join-strategy changes.

The implication: .explain() is for understanding intent and predicting the worst case. The UI’s SQL tab is for confirming what happened. If they differ, AQE rewrote something — usually for the better.

The other modes

.explain("formatted") is what I use when the plan is more than ~30 lines. Output looks like:

== Physical Plan ==
* HashAggregate (8)
+- Exchange (7)
   +- * HashAggregate (6)
      +- * Project (5)
         +- * BroadcastHashJoin (4)
            ...

(1) Scan parquet orders
Output: [order_id#13, user_id#14, amount#15]
PushedFilters: [...]
...

(2) Filter
Input: [...]
Condition: (amount#15 > 40.0)
...

The tree is at the top with numbered nodes; the details for each numbered node are below. Much easier to read than the indented form for big plans.

.explain("cost") adds the optimizer’s row count and size estimates next to each operator. Read it with skepticism — these come from table statistics, and statistics drift; they’re useful as a first-order signal, not gospel.

What to do with this

When a query is slow, before you tune anything:

  1. Run .explain(True). Walk the optimized plan and the physical plan.
  2. Count the Exchanges. Each one is a shuffle. Are there more than you expected?
  3. Check the join algorithms. Any BroadcastNestedLoopJoin? Any SortMergeJoin where you wanted a broadcast?
  4. Check PushedFilters and PartitionFilters on each scan. Did your filters push down?
  5. Run the query. Open the SQL tab in the UI. Compare the AQE-rewritten plan to what .explain printed. Note what AQE changed.

That five-minute ritual catches more performance issues than any amount of “let me tune spark.sql.shuffle.partitions.” The plan is the truth. Tune the plan, not the configs.

Next lesson: when none of this saves you and the executor dies — memory tuning and the OOM postmortem.

Search