PySpark, from the ground up Lesson 19 / 60

Lazy evaluation: why nothing happens until you ask

Why your transformation chain doesn't actually compute when you call it, what 'lazy' really means in Spark, and the Pandas mental-model adjustment every newcomer has to make.

The first time I sat down with a colleague who was migrating a Pandas script to PySpark, this is exactly what happened. He typed out a chain — read, filter, select, group, aggregate — pressed Shift+Enter, and the cell came back in about 12 milliseconds. He looked at me. “Did it work?” Then he typed df.show() and waited 90 seconds while a job he thought had already run got around to actually running.

That’s lazy evaluation in one paragraph. Welcome to Module 4.

The thing that confuses everyone coming from Pandas

In Pandas, every line of code does work. df = df[df.country == "IT"] reads a chunk of memory, scans it, and produces a new DataFrame in RAM, right now. The next line operates on the result. Pandas is eager: each call computes immediately.

PySpark is lazy. Transformations don’t compute anything. They build a plan.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col

spark = (SparkSession.builder
         .appName("LazyDemo")
         .master("local[*]")
         .getOrCreate())

orders = spark.createDataFrame(
    [
        (1, "IT", 59.0,  "2026-03-05"),
        (2, "IT", 29.0,  "2026-03-18"),
        (3, "NL", 149.0, "2026-02-15"),
        (4, "IT", 89.5,  "2026-03-22"),
        (5, "DE", 14.0,  "2026-03-10"),
        (6, "IT", 42.42, "2026-03-26"),
    ],
    "OrderId INT, Country STRING, Total DOUBLE, OrderDate STRING",
)

# Build a chain of transformations
result = (orders
    .where(col("Country") == "IT")
    .withColumn("Total", col("Total") * 1.22)   # add VAT
    .groupBy("Country")
    .agg(F.sum("Total").alias("TotalWithVat"),
         F.count("*").alias("OrderCount")))

print(result)
# DataFrame[Country: string, TotalWithVat: double, OrderCount: bigint]

That print(result) returned in microseconds. Nothing was filtered. Nothing was multiplied. Nothing was grouped. Spark didn’t even open the data — and orders here is in memory, so there’s nothing to open. All result is, at this point, is a description of work to do.

If you’ve never seen this, it feels broken. It isn’t. It’s the whole point.

Why lazy: the optimizer needs the whole plan

The reason is simple and worth taking seriously: an optimizer that sees one operation at a time can only make local decisions. An optimizer that sees the entire pipeline before any of it runs can rearrange things.

Concrete examples of what Spark does once it can see the whole plan:

  • Filter pushdown. You wrote read.parquet(...).select("a","b","c").where(col("country")=="IT"). Spark notices the filter and pushes it down into the Parquet read so only IT-country row groups are even loaded from disk. If your chain was eager, the read would have already happened by the time the filter showed up.
  • Column pruning. You read 80 columns from Parquet but only use 4 of them downstream. Spark loads only those 4. Same trick — the optimizer needs to see the downstream usage before deciding what to read.
  • Predicate combination. You wrote .where(col("a") > 10).where(col("b") < 100) as two calls. Spark fuses them into a single filter pass.
  • Join strategy choice. Spark can pick between broadcast joins, shuffle hash joins, and sort-merge joins. The right choice depends on table sizes and join keys, which it can only know once the whole plan is assembled.
  • Operation fusion. Multiple withColumn and select calls get compiled into one pass over the data instead of N passes.

You get all of this for free, but only because nothing executes until an action forces it to. Eager evaluation would have to commit to a strategy on every line; lazy evaluation gets to wait for the full picture and pick the best one.

This is the same idea as a SQL query planner. You write a SELECT with a WHERE, and the database decides whether to use an index, scan a heap, or do something clever with statistics. PySpark transformations are the dataframe equivalent — you describe the what, the engine picks the how.

Transformations describe; actions execute

The mental model to internalize:

  • A transformation returns a new DataFrame and adds a node to the logical plan. No computation. Examples: select, where, withColumn, join, groupBy.agg, orderBy, distinct, union.
  • An action returns a result (a Python value, a write to disk, a side effect) and triggers execution of everything in the plan it depends on. Examples: show, count, collect, take, first, toPandas, write.parquet, write.csv, foreach.

Lesson 20 has the full catalog. For now the test is: “does this method return a DataFrame?” If yes, almost certainly a transformation. If it returns anything else — a number, a list of Row objects, None after writing to disk — it’s an action.

# Transformations: instant. The chain is just being described.
print("Building plan...")
plan = (orders
    .where(col("Country") == "IT")
    .withColumn("Total", col("Total") * 1.22)
    .groupBy("Country")
    .agg(F.sum("Total").alias("TotalWithVat")))
print("Plan built. Type:", type(plan).__name__)
# DataFrame

# Action: this is when work happens
print("Calling show()...")
plan.show()
# Now the executors actually compute. The first .show() on a non-trivial
# pipeline is the moment your laptop's fan spins up.

If you put a time.time() around the transformation chain, you’ll see numbers in microseconds. Around the show(), you’ll see whatever the actual computation costs — milliseconds for tiny in-memory data, minutes or hours for big files.

The “I called .count() and waited three minutes” surprise

This is the canonical newcomer experience. You’ve built a 12-step chain that reads a 200 GB Parquet table, joins it to two reference tables, filters, aggregates, and you want to peek at the result. You type result.count(). You wait. And wait. And the kettle boils, and the meeting starts, and you’re still waiting.

What happened is exactly what’s supposed to happen: count() is the first action in the chain. Up until that moment, none of the 12 steps had run. They were all sitting in the logical plan. count() is what made the engine actually go open the 200 GB table, do the joins, and tally the rows.

This isn’t a bug. The bug is your expectation that count() is cheap. In Pandas it is — the data is already in RAM, counting rows is a length lookup. In Spark, count() re-runs the whole pipeline that produced the DataFrame.

The notebook anti-pattern

Now, the worse version of this. You’re prototyping. You build the chain step by step, peeking at each stage:

# Build the chain piece by piece, with .show() at each step (DON'T)
df1 = orders.where(col("Country") == "IT")
df1.show()                          # action #1: runs the read + filter

df2 = df1.withColumn("Total", col("Total") * 1.22)
df2.show()                          # action #2: re-runs read + filter + withColumn

df3 = df2.groupBy("Country").agg(F.sum("Total").alias("TotalWithVat"))
df3.show()                          # action #3: re-runs everything from scratch

print(df3.count())                  # action #4: re-runs everything AGAIN
print(df3.count())                  # action #5: re-runs everything ONE MORE TIME

Five actions. Five times the entire pipeline ran. Spark does not cache results between actions by default — every action starts from the original source and re-walks the whole plan.

On 6-row in-memory data this doesn’t matter. On a real pipeline reading from S3 or HDFS, you’ve just charged your team for 5x the compute and 5x the network IO to look at the same numbers from five different angles.

The fix is .cache() (or .persist()), which we’ll cover in lesson 23. The short version: .cache() is a marker that says “after the next action, keep the result of this DataFrame in memory so subsequent actions can reuse it.” Apply it to expensive intermediates that you’re going to peek at multiple times during development.

The shorter version: while you’re learning, don’t sprinkle .show() and .count() calls between every line. Build the whole chain. Run one action at the end. If you want intermediate visibility, add a .cache() before the visibility calls.

See it for yourself

Three concrete experiments worth running once.

Experiment 1: a chain with no action prints instantly.

import time

t0 = time.time()
big_chain = (orders
    .where(col("Country") == "IT")
    .withColumn("Total", col("Total") * 1.22)
    .withColumn("VatRate", F.lit(0.22))
    .withColumn("PreVat", col("Total") / (1 + col("VatRate")))
    .groupBy("Country")
    .agg(F.sum("Total").alias("TotalWithVat"),
         F.sum("PreVat").alias("PreVatTotal"),
         F.count("*").alias("OrderCount"))
    .orderBy(F.desc("TotalWithVat")))
print(f"Building chain took {(time.time() - t0)*1000:.2f} ms")
# Building chain took 4.13 ms

Eight transformations. Sub-10ms. Nothing computed.

Experiment 2: the action triggers the whole chain.

t0 = time.time()
big_chain.show()
print(f"show() took {(time.time() - t0)*1000:.2f} ms")
# show() took 850 ms (or whatever — depends on your machine)

That 850 ms is the read, the filter, the four withColumn ops, the group, the aggregate, the order, and the network shuffle for the groupBy. All packed into one job because no action came before it.

Experiment 3: .explain() shows you the plan without running it.

big_chain.explain()
# == Physical Plan ==
# AdaptiveSparkPlan isFinalPlan=false
# +- Sort [TotalWithVat#... DESC NULLS LAST], true, 0
#    +- Exchange rangepartitioning(TotalWithVat#... DESC NULLS LAST, 200), ...
#       +- HashAggregate(keys=[Country#...], functions=[sum(Total#...), ...])
#          +- Exchange hashpartitioning(Country#..., 200), ENSURE_REQUIREMENTS, ...
#             +- HashAggregate(keys=[Country#...], functions=[partial_sum(...), ...])
#                +- Project [Country#..., (Total#... * 1.22) AS Total#..., ...]
#                   +- Filter (isnotnull(Country#...) AND (Country#... = IT))
#                      +- Scan ExistingRDD[OrderId#..., Country#..., Total#..., ...]

Read it bottom-up. Scan the source, filter to IT rows, project the new columns, partial aggregate per partition, shuffle (Exchange), final aggregate, shuffle again to sort, sort. That’s the entire physical plan, and it never ran. explain() is the fastest way to develop intuition for what Spark is going to do — and you’ll see it again in the next lesson when we cover the DAG.

What this changes about how you write code

Three habits that come with internalizing lazy evaluation:

  1. Build long chains; resist the urge to peek. Each peek is a full re-run. Save the inspection for the end, or cache once and inspect cheaply after that.
  2. Trust that filters get pushed down. You don’t need to micro-optimize the order of your where clauses. The optimizer will move them as close to the read as it can.
  3. .explain() early when something is slow. If a job is taking longer than you expect, the physical plan tells you why far faster than guessing. Look for unexpected Exchange nodes (shuffles), missing filter pushdown, or full table scans where you expected partition pruning.

This also means you have a tiny new debugging trap: code can look like it’s working when it isn’t, because errors don’t surface until the action. A typo in a column name inside a withColumn won’t error on the line where you wrote it — it errors when you finally call .show(), possibly hundreds of lines later. Read the stack trace carefully when this happens; the AnalysisException will name the missing column and the actual line is the one that referenced it.

Next lesson: the full catalog of transformations and actions, including the awkward middle category (cache, persist) that look like actions but are technically transformations. After that, we’ll dig into the DAG that Spark builds from your plan, and finally — the lesson the rest of the course pivots on — the difference between narrow transformations (cheap) and wide ones (expensive). That distinction is the single most important Spark concept and it’s three lessons away.

Search