If you only remember one thing from this whole course, make it the contents of this lesson.
Every transformation in Spark falls into one of two categories. Narrow transformations run at hardware speed across the cluster with no coordination. Wide transformations require all the executors to stop, exchange data over the network, and resume — an operation called a shuffle that costs orders of magnitude more than everything around it.
The line between narrow and wide is the single most important boundary in Spark. Every performance question about Spark — why is this slow, how do I make it faster, why did my cluster fall over, why is one executor doing all the work — comes back to this distinction. The rest of the course is largely about avoiding, minimizing, or optimizing wide transformations.
A quick recap of partitions
Before the definition makes sense, the partition model. A Spark DataFrame is split into many partitions — typically 100 to a few thousand — distributed across the cluster’s executors. Each partition lives on one machine and is processed by one task. As long as each task’s work is independent of the others, the cluster is happy and your job runs at the speed of the slowest task. Things stop being happy when a task on machine A needs to know something that’s on machine B. That’s when data has to move.
The definition
A narrow transformation is one where each output partition depends on exactly one input partition. Each task can run independently on its own machine, looking only at its own slice of data, and produce its slice of the output. No coordination. No network.
A wide transformation is one where each output partition depends on data from multiple input partitions. The cluster has to redistribute rows so that the rows that need to be in the same output partition end up on the same machine. This redistribution is the shuffle: every executor writes its rows to a local disk file partitioned by the destination, then every other executor reads back the rows destined for it. All-to-all data movement.
Concrete:
- A
filter(col("Country") == "IT")looks at one row at a time. Each task reads its partition, keeps the IT rows, drops the rest, writes its slice of the output. The other partitions don’t matter. Narrow. - A
groupBy("Country").agg(F.sum("Total"))needs every IT row to end up in the same place to be summed together. Some IT rows are on machine A, some on machine B, some on machine C. They have to meet. Shuffle. Wide.
The catalog
Narrow transformations (no shuffle, cheap):
select,selectExpr(when there’s no DISTINCT or aggregation in the expression)where/filterwithColumn,withColumnRenamed,dropunion,unionByName— append rows; no row needs to movena.fill,na.drop,na.replacesample(without replacement, in most modes)map,mapPartitions— the RDD-style row-by-row operations, included for completeness
Wide transformations (shuffle required, expensive):
groupBy(*cols).agg(...)— group by key requires same-key rows to colocatedistinct(),dropDuplicates(*cols)— same: deduplication needs same-value rows togetherjoin(other, on, how)— both sides must have matching keys colocated, except for broadcast joins (lesson 25)orderBy/sort— global sort requires range-partitioning across the clusterrepartition(n),repartition(n, col),repartitionByRange(...)— these literally exist to shuffleintersect,exceptAll,subtract— set operations across the whole DataFrame- Window functions with a
partitionBy(...)clause — need rows in the same window together
The honorable exception:
coalesce(n)reduces the number of partitions by combining adjacent ones. It does not shuffle; it’s narrow. The trade-off is that you can only go down (you can’t go from 4 partitions to 200 withcoalesce), and the output partitions can become unbalanced. Usecoalesceto drop from 200 to 10 before writing a small file. Userepartitionwhen you need balance or need to go up.
The picture
ASCII diagram. Four input partitions, top, processed by four tasks. Two flows: narrow (filter), and wide (groupBy).
Narrow transformation: filter(col("Country") == "IT")
Input: [P0] [P1] [P2] [P3]
| | | |
v v v v
task0 task1 task2 task3 (parallel, independent)
| | | |
v v v v
Output: [P0'] [P1'] [P2'] [P3']
Each task reads exactly one partition, writes one partition. No data
crosses machines. Wall-clock time = time of slowest task.
Wide transformation: groupBy("Country").agg(...)
Input: [P0] [P1] [P2] [P3]
| | | |
v v v v
task0 task1 task2 task3 (partial aggregate)
| | | |
+----+----+----+----+----+----+
| | |
v v v === SHUFFLE ===
IT NL DE (network + disk)
| | |
v v v
task_IT task_NL task_DE (final aggregate)
| | |
v v v
Output: [Pa] [Pb] [Pc]
Look at that middle section. In the narrow case, four tasks each do their thing in parallel and we’re done. In the wide case, four tasks do partial work, then every task has to send its rows to the right destination — IT rows to one place, NL rows to another, DE rows to a third — over the network. Then a second round of tasks does the final aggregation.
This is the shuffle. It’s the slow expensive thing in distributed computing. And it’s the thing every performance discussion in Spark eventually comes back to.
Why shuffles are expensive
Three costs, all real. Disk write: each executor, before sending its rows out, writes them to local disk partitioned by destination, so the receiving side can pull on demand and failed tasks can be retried without re-running upstream work. Network transfer: the rows move across the cluster network — in a 200-partition shuffle on a 10-node cluster, every node sends to every other node simultaneously, and bandwidth becomes the bottleneck. Disk read: the receiving executor reads the shuffle files into memory to do its work.
A pipeline of pure narrow transformations runs at the speed of CPU plus storage. One shuffle adds a round-trip of disk-network-disk for the whole dataset. Five shuffles does that five times sequentially, because each shuffle is a barrier — the next stage can’t start until the previous one’s shuffle has finished. In practical numbers: a narrow-only stage on a 10 GB DataFrame might run in 30 seconds; add a groupBy and the same pipeline often takes 2-3 minutes.
How to spot a shuffle in .explain()
Spark exposes the shuffle directly in the physical plan. Look for the Exchange operator:
df = spark.createDataFrame(
[(i, "IT" if i % 3 == 0 else "NL", float(i)) for i in range(1000)],
"OrderId INT, Country STRING, Total DOUBLE",
)
# Narrow only — no Exchange
(df.where(col("Country") == "IT")
.withColumn("WithVat", col("Total") * 1.22)).explain()
# *(1) Project [..., (Total * 1.22) AS WithVat]
# +- *(1) Filter (Country = IT)
# +- *(1) Scan ExistingRDD[...]
# Add a groupBy — Exchange appears
(df.where(col("Country") == "IT")
.groupBy("Country")
.agg(F.sum("Total").alias("TotalIT"))).explain()
# *(2) HashAggregate(keys=[Country], functions=[sum(Total)])
# +- Exchange hashpartitioning(Country, 200), ENSURE_REQUIREMENTS, ...
# +- *(1) HashAggregate(keys=[Country], functions=[partial_sum(Total)])
# +- *(1) Filter (Country = IT)
# +- *(1) Scan ExistingRDD[...]
There’s the Exchange hashpartitioning(Country, 200). The *(1) and *(2) are stage IDs — Spark splits the work at the shuffle boundary. Stage 1 reads, filters, and partial-aggregates per partition. Stage 2 takes the shuffled output, finalizes the aggregation, and returns the result.
The pattern to internalize: every Exchange is one shuffle, one stage boundary, one chunk of network/disk cost.
Stages and the DAG
Quick definition: a stage is a chunk of work that can run as a pipeline of narrow transformations with no shuffle in the middle. Each Exchange is the boundary between two stages. A pipeline with two shuffles has three stages; a pipeline with no shuffles is one stage. Each stage has to finish before the next begins, because its input is the shuffle output of the previous one. Rule of thumb: count the wide transformations and you’ve roughly counted the stages. The full DAG story comes next lesson.
The performance rules that fall out
Once you internalize narrow vs wide, every Spark performance recommendation makes sense:
Filter and project before wide steps. A where and a select are narrow and free; doing them early shrinks the data that goes into the next shuffle. The optimizer often pushes them down for you, but writing them in the right place makes intent obvious.
Avoid sorting unless you actually need sorted output. orderBy is global sort across the cluster — one of the most expensive things you can do. If you only need the top 10, df.orderBy("col").limit(10) lets Spark do a TopK optimization, but it’s still cheaper to avoid the sort entirely when you can.
Use broadcast joins for small-on-large. If one side of a join fits in driver memory (under a few hundred MB), Spark can broadcast it to every executor and avoid the shuffle on the big side entirely. Lesson 25 covers this; it can turn a 10-minute join into a 30-second one.
Reuse intermediate results. If you’ve paid the cost of a shuffle once, cache the result before doing several aggregations on top of it. Lesson 23 covers caching strategy.
Don’t repartition unless you have a reason. repartition() is itself a wide transformation. Calling it for cosmetic reasons is paying for a shuffle to no benefit. Good reasons: severe skew, pre-partitioning for many aggregations on a key, or controlling output file count before a write.
Use coalesce to reduce partitions before writing. If your DataFrame has 200 partitions and you’re writing a small result, coalesce(8) gives 8 output files with no shuffle.
A warning about skew: the narrow-vs-wide model assumes shuffle output partitions are roughly balanced. In real life they often aren’t. If 80% of your orders are from Italy and you groupBy("Country"), the IT reduce task gets 80% of the data, runs 50x slower than the rest, and dominates wall-clock time. Categorical columns with one dominant value are the usual suspects. Diagnosis and fixes come later in the course.
Putting it together
A realistic pipeline. Mark each line as narrow (N), wide (W), or action (A):
df = (spark.read.parquet("./orders") # ___
.where(col("OrderDate") >= "2026-01-01") # ___
.withColumn("Total", # ___
col("Total").cast("double"))
.join(customers, "CustomerId", "left") # ___
.groupBy("Country", "CustomerId") # ___
.agg(F.sum("Total").alias("LTV"))
.orderBy(F.desc("LTV")) # ___
.limit(100) # ___
.write.parquet("./out/top100")) # ___
Read, filter (N), withColumn (N), join (W), groupBy.agg (W), orderBy (W), limit (N), write (A). Three wide transformations, four stages, one action. The cost is dominated by the three shuffles — the join, the group, the sort — not by the narrow steps.
If this pipeline were slow, the fix isn’t in the narrow lines. It’s in: could customers be broadcast? Could we group on a smaller cardinality first? The levers are always the same — make shuffles smaller, fewer, or avoidable.
Run this on your own machine
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col
spark = (SparkSession.builder.appName("NarrowWide").master("local[*]").getOrCreate())
df = spark.createDataFrame(
[(i, "IT" if i % 3 == 0 else "NL", float(i)) for i in range(1000)],
"OrderId INT, Country STRING, Total DOUBLE",
)
# 1. Pure narrow chain — no Exchange in the plan
narrow = (df
.where(col("Country") == "IT")
.withColumn("WithVat", col("Total") * 1.22)
.withColumn("Region", F.lit("EU")))
narrow.explain()
# 2. Single wide step — one Exchange, two stages
single_wide = narrow.groupBy("Country").agg(F.sum("WithVat").alias("Total"))
single_wide.explain()
# 3. Multiple wide steps — multiple Exchanges, more stages
multi_wide = (narrow
.join(df.select("OrderId", "Total").alias("d2"), "OrderId")
.groupBy("Country").agg(F.sum("WithVat").alias("Total"))
.orderBy(F.desc("Total")))
multi_wide.explain()
# 4. coalesce vs repartition: count Exchange operators
coalesced = narrow.coalesce(2)
coalesced.explain() # no Exchange — just partition combining
repartitioned = narrow.repartition(8)
repartitioned.explain() # Exchange present — full shuffle
# 5. Run the wide pipeline and check the Spark UI for stage count
multi_wide.write.mode("overwrite").parquet("./tmp/wide_demo")
# Open http://localhost:4040 — you should see one job with three stages
# (one per Exchange plus the final write).
Stare at those plans until the Exchange operator becomes the thing your eyes go to first. That’s the muscle to build. From here on, “is this pipeline going to be fast?” is mostly the same question as “how many Exchanges does its plan have, and how big is the data going through each one?”
Next lesson: the DAG. How Spark turns a logical plan into stages, what the lineage graph buys you (fault tolerance for free), and how to read the Spark UI’s DAG visualization for any job. Lesson 23 is caching — the playbook for when, what, and how to cache so your iterative work doesn’t pay the shuffle tax twice.