PySpark, from the ground up Lesson 22 / 60

The DAG: how Spark organizes your job into stages

Visualizing your job as a directed acyclic graph, reading the Spark UI's stages tab, and the relationship between stages and shuffles.

We’ve spent the last several lessons building DataFrames, transforming them, and writing them out. Each time you chained a .select() or a .filter(), Spark didn’t actually do anything — it just remembered. And every time you finally called .count() or .write.parquet(), something kicked into motion. That something is the DAG: a directed acyclic graph of operators that Spark builds in memory and then executes in chunks.

Today’s lesson is about reading that graph. Once you can look at the Spark UI’s Stages tab and immediately spot which operation is the slow one, debugging Spark stops being mystical. It becomes the same thing as reading a SQL Server execution plan: you find the fat arrow, you fix the fat arrow.

What “DAG” actually means

DAG = directed acyclic graph. Three words doing real work:

  • Graph — nodes (operators) connected by edges (data flow).
  • Directed — edges have a direction; data flows one way.
  • Acyclic — no loops; a node never feeds back into itself.

Every PySpark job is a DAG. When you write:

df = spark.read.csv("orders.csv", header=True)
filtered = df.filter(col("country") == "IT")
agg = filtered.groupBy("status").sum("total")
agg.write.parquet("out/")

Spark doesn’t build four separate things. It builds one graph: Read CSV → Filter → GroupBy → Sum → Write Parquet. Lazily. Nothing has executed yet. The driver just knows the shape of the work.

The action — .write.parquet() — is what triggers execution. At that moment Spark runs the graph through the Catalyst optimizer, applies rules (predicate pushdown, column pruning, constant folding), produces a physical plan, and then chops it into stages.

What a stage is

A stage is a contiguous chunk of the DAG that Spark can run end-to-end without moving data between executors. As long as every operation is narrow — meaning each output partition depends on exactly one input partition — the work stays local. Filter, select, withColumn, cast, simple map: all narrow. Spark can run them all in one pass per partition, no network, no disk shuffling between operators.

A wide transformation breaks the chain. Wide means an output partition depends on many input partitions: groupBy, join, distinct, orderBy, repartition. To resolve “all rows with the same key end up in the same place,” Spark has to reshuffle data across executors. That reshuffle is the boundary between two stages.

So the rule is simple and worth memorizing:

One stage = one chunk of narrow work. Stage boundaries = shuffles.

If your DAG has three shuffles, you have four stages. If it has zero shuffles (a pure ETL pipeline that just filters and writes), you have one stage. This is the mental model that will save you hours.

Tasks: the unit of execution

Each stage is split into tasks, one per partition. A stage that operates on 200 partitions becomes 200 tasks. Spark’s scheduler hands those tasks out to executor cores; each core runs one task at a time. If you have 10 executors with 4 cores each = 40 cores total, your 200 tasks run in roughly 5 waves of 40.

That’s the whole execution model. Stages run sequentially (a stage can’t start until its parents finish, because it needs their shuffled output). Tasks within a stage run in parallel up to your cluster’s core capacity.

A worked example

Consider this job, which is more or less the canonical “first real Spark workload” everyone writes:

from pyspark.sql.functions import col, sum as _sum

orders = (
    spark.read
         .option("header", True)
         .option("inferSchema", True)
         .csv("s3://runehold/orders/2026-04/")
)

result = (
    orders
        .filter(col("country") == "IT")
        .groupBy("status")
        .agg(_sum("total").alias("total_revenue"))
)

result.write.mode("overwrite").parquet("s3://runehold/reports/it-revenue/")

How many stages does Spark build? Let’s walk it.

  1. Read CSV + filter — both narrow. Same stage. Call it Stage 0.
  2. groupBy(“status”).agg(…) — wide. Triggers a shuffle. Stage boundary.
  3. After the shuffle, Spark does the final aggregation per group, then writes Parquet. Both narrow. Stage 1.

So this job has two stages. There’s a subtlety: Spark does a partial aggregation before the shuffle (called a HashAggregate with partial mode) so that only the per-partition partial sums get sent across the network — not every row. This is one of the optimizations that makes groupBy cheaper than people fear. We’ll come back to this in lesson 27 when we talk about combiners.

In the Spark UI, that translates to:

  • Stage 0: tasks reading the input files, filtering, partial-aggregating. Output: partial sums written to local disk, partitioned by hash of status.
  • Stage 1: tasks fetching their assigned shuffle partitions, finishing the aggregation, writing Parquet.

If your input was 200 CSV files, Stage 0 has 200 tasks. Stage 1’s task count depends on spark.sql.shuffle.partitions (default 200 — yes, the same default everyone forgets to tune in tiny jobs).

Reading the Spark UI’s Stages tab

Open http://localhost:4040 (or whatever your driver UI URL is) and click Stages. You’ll see a table per job. The columns that matter:

  • Stage Id — identifier, ordered by execution.
  • Description — Spark’s best guess at what the stage does (often shows the line of your code that triggered it).
  • Submitted / Duration — when the stage started, how long it took.
  • Tasks (Succeeded/Total) — how many tasks ran. Big number = high parallelism (good, usually). Number of 1 = serial bottleneck (bad).
  • Input — bytes read from external source (S3, HDFS, local FS).
  • Output — bytes written.
  • Shuffle Read — bytes this stage pulled from previous stages over the network.
  • Shuffle Write — bytes this stage wrote to local disk for the next stage to consume.
  • GC Time — time spent in JVM garbage collection. If this is more than ~10% of duration, you have a memory problem.

Click any stage to drill in. The detail page has the task summary: min, 25th percentile, median, 75th percentile, max for duration, GC time, input size, shuffle read/write, and so on.

This is where you find skew. Compare median task time to max task time:

  • Median: 2 seconds. Max: 3 seconds. Healthy.
  • Median: 2 seconds. Max: 90 seconds. One task has 45× the work of the typical task. That’s skew. One key has way more rows than the others, and one task is dragging the whole stage. Lesson 30 covers what to do about it (salting, broadcast joins, AQE).

The DAG Visualization link at the top of the stage detail page draws the operator graph for just that stage. Useful for verifying “yes, the filter pushed down, the project is narrow, the partial aggregation happened where I expected.”

Pipelining inside a stage

Within a single stage, Spark doesn’t materialize intermediate results between operators. If your stage is Read CSV → Filter → Project → Write, each row (or vectorized batch) flows through all the operators in sequence before the next one starts. Operators are fused together, often into generated bytecode — Tungsten’s whole-stage code generation.

That’s why narrow transformations are so cheap. The cost of a stage is dominated by reading the input and writing the output. Adding a withColumn in the middle is, give or take, free. The exception is anything that breaks the pipeline (a Python UDF that Spark can’t inline, for instance). Lesson 41 covers UDF performance.

When a stage spills

If a stage’s working set doesn’t fit in executor memory, Spark spills intermediate data to local disk. The Spark UI shows this in the per-task detail under Shuffle Spill (Memory) and Shuffle Spill (Disk).

A small spill is fine — Spark is designed to handle it gracefully. A large spill is a giant red flag: the stage is running 5-20× slower than it should because instead of working in RAM it’s pumping data through disk.

Common causes:

  • Skewed key in a groupBy or join (one task gets too much).
  • spark.sql.shuffle.partitions set too low (each partition is huge).
  • Executor memory too small for the workload.

We’ll go deep on memory tuning in lesson 57. For now, when you see “Shuffle Spill (Disk): 12.3 GB” in the UI, your move is: increase shuffle partitions, increase executor memory, or look for skew.

The relationship between jobs, stages, and tasks

A quick hierarchy refresher because the Spark UI uses all three terms:

  • Job — what an action triggers. Each .count(), .write(), .collect() produces one job. (Sometimes more — the optimizer occasionally fires a small job to compute statistics first.)
  • Stage — a chunk of narrow work within a job. A job has 1+ stages, separated by shuffles.
  • Task — one stage running on one partition. A stage has N tasks where N = number of partitions.

So: job → stages → tasks. The Spark UI has tabs for all three. Most of your debugging life will be in the Stages tab, occasionally drilling into a specific task.

How AQE rewrites the DAG at runtime

One last twist worth mentioning early. Modern Spark (3.0+) has Adaptive Query Execution (spark.sql.adaptive.enabled = true, default since 3.2). When AQE is on, the DAG is not fully fixed at submission time. After each shuffle, Spark looks at the actual sizes of the shuffled data and may:

  • Coalesce many tiny shuffle partitions into fewer larger ones (avoids 200 nano-tasks on a small result).
  • Switch a sort-merge join into a broadcast join if the build side turned out to be small.
  • Split skewed partitions into sub-partitions that multiple tasks can chew on in parallel.

So the stage you see in the UI may have a slightly different shape than the one you’d predict from the code alone. The Spark UI helpfully shows AQE-rewritten plans with a marker. We’ll come back to AQE in detail in lesson 33; flag it now so you don’t get confused when the UI shows fewer tasks than spark.sql.shuffle.partitions would imply.

Run this on your own machine

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, rand

spark = (
    SparkSession.builder
        .appName("dag-demo")
        .config("spark.sql.shuffle.partitions", "16")
        .getOrCreate()
)

# Synthesize 1M rows across 200 partitions
df = (
    spark.range(0, 1_000_000, numPartitions=200)
        .withColumn("country", (col("id") % 5).cast("string"))
        .withColumn("status", (col("id") % 3).cast("string"))
        .withColumn("total", rand() * 1000)
)

# A two-stage job: filter (narrow) + groupBy (wide)
result = (
    df.filter(col("country") == "1")
      .groupBy("status")
      .agg(_sum("total").alias("revenue"))
)

result.show()

# Now go open http://localhost:4040, click Jobs, click the job that just ran,
# click into each stage, look at the task summary. Find the shuffle write
# in stage 0 and the matching shuffle read in stage 1.

input("Press Enter to exit (keeps the UI alive)... ")
spark.stop()

The input() keeps the Spark session alive so you can poke around the UI before the driver shuts down. Once it shuts down, the UI is gone (unless you’ve configured the History Server, which is a topic for lesson 60).

Two stages, one shuffle, one obvious aggregation. The simplest non-trivial DAG, and the one you should be able to read in your sleep before moving on.

Next lesson is about caching and persistence — when you tell Spark “remember this DataFrame, I’m going to use it again.” We’ll cover the seven storage levels and the patterns where caching genuinely pays off, before lesson 24 turns around and explains when caching is actually a bad idea.

Search