PySpark, from the ground up Lesson 43 / 60

Parquet: why it's the default for a reason

Columnar storage explained, compression codecs, predicate pushdown, and the row-group structure that makes selective reads fast.

We’re now in Module 8, which is the part of the course where we stop talking about how Spark thinks and start talking about how Spark eats. Every job starts with reading something and ends with writing something. The format you pick for that something has more impact on performance than almost any tuning knob in the engine.

This module covers the formats you’ll actually meet in the wild — Parquet, ORC, Avro, Delta, JSON, CSV, JDBC sources, cloud object stores. We’ll start with the one you’ll use 80% of the time and the one Spark itself defaults to: Parquet.

If you only remember one thing from this module, make it this: for analytical workloads, Parquet is almost always the right answer. The rest of this lesson is about why, and what the file is doing under the hood when you call spark.read.parquet(...).

What columnar means, concretely

Imagine a table with five columns and a million rows. There are two ways to store it on disk.

Row-oriented stores all the values for row 1, then all the values for row 2, then row 3 — the way CSV does it, the way most transactional databases do it on disk. If you want the third column from row 783, the storage layer has to scan past the first two columns of every preceding row, or use an index to jump there.

Columnar stores all the values for column 1 contiguously, then all the values for column 2 contiguously, and so on. If you want column 3, you read one contiguous chunk and skip the other four entirely.

Now think about an analytical query: SELECT AVG(amount) FROM orders WHERE country = 'IT'. You touch two columns out of maybe forty. With a row-oriented format, you read the whole file to extract two columns’ worth of data — most of the I/O is wasted. With a columnar format, you read those two columns and ignore the other thirty-eight.

That’s the entire reason columnar formats exist, and the reason Parquet has eaten the analytical world. Selective reads only fetch the columns you ask for. A naive SELECT col1, col2 FROM t against a 50-column Parquet table is roughly 25x cheaper than the same query against the same data in CSV.

There’s a second benefit that compounds the first: values within one column tend to look like each other. A country column is mostly two-letter strings. A timestamp column is monotonically increasing 8-byte integers. A status column has maybe four distinct values across millions of rows. When similar values sit next to each other, compression algorithms eat them alive. Compression ratios of 5-10x on real data are routine.

The on-disk structure

A Parquet file is not just “all of column 1, then all of column 2”. It’s organized hierarchically:

File
├── Row Group 0           (target ~128 MB of source data)
│   ├── Column Chunk: id           (all id values for rows in this group)
│   │   ├── Page 0   (~1 MB compressed)
│   │   ├── Page 1
│   │   └── ...
│   ├── Column Chunk: country
│   ├── Column Chunk: amount
│   └── ...
├── Row Group 1
├── Row Group 2
└── Footer
    ├── Schema
    ├── Row group metadata (min, max, null count per column chunk)
    └── Key/value metadata

Three things deserve attention:

  1. Row groups. A Parquet file is split into row groups, each holding a contiguous range of rows. Default size is somewhere between 128 MB and 1 GB depending on writer, and the practical sweet spot is 100-500 MB per row group. Row groups are the unit of parallelism — one Spark task typically reads one row group.
  2. Column chunks. Within a row group, each column gets its own chunk. This is where the columnar magic happens at the file level: you can seek directly to the column chunk you want and read only its bytes.
  3. The footer. At the very end of the file, Parquet stores a metadata block: schema, row group offsets, and — critically — statistics for every column chunk: minimum value, maximum value, null count. Spark reads the footer first, looks at those stats, and uses them to decide which row groups it can skip without reading at all.

That last bit is the foundation of predicate pushdown.

Predicate pushdown

When you write df.filter(F.col("amount") > 1000) against a Parquet source, Spark doesn’t have to read the file and then filter. It pushes the predicate down to the scan layer. At read time, for each row group, it checks the footer stats: if max(amount) <= 1000, the entire row group can be skipped. No I/O for that range, no decompression, no rows materialized.

This is invisible from your code. You write a normal .filter(). Spark and Parquet collaborate to skip data you didn’t need. You can confirm it’s happening by reading .explain():

df = spark.read.parquet("s3://lake/orders/")
df.filter(F.col("amount") > 1000).select("order_id", "amount").explain()
# == Physical Plan ==
# *(1) Project [order_id#3, amount#5]
# +- *(1) Filter (isnotnull(amount#5) AND (amount#5 > 1000))
#    +- *(1) ColumnarToRow
#       +- FileScan parquet [order_id#3,amount#5]
#          PushedFilters: [IsNotNull(amount), GreaterThan(amount,1000)],
#          ReadSchema: struct<order_id:bigint,amount:double>

The PushedFilters list is the proof. Filters that landed in that list are evaluated at the file/row-group level. Filters that don’t appear there are evaluated in Spark after the scan — still correct, just less efficient.

What pushes down well: equality, comparison, IsNull/IsNotNull, In with a small set. What doesn’t: LIKE patterns with leading wildcards, function calls like upper(country) = 'IT' (rewrite as country = 'IT' if the data is already uppercase), arithmetic on the column being filtered (amount + tax > 1000 won’t push; amount > 1000 - tax might). Watch the plan; if your filter isn’t in PushedFilters, refactor it until it is.

ReadSchema is the column-projection sibling. Spark only reads order_id and amount because those are the only columns the query references — even though the table has another twenty.

Compression codecs

Parquet compresses each column chunk independently. The codec is configurable. The four you’ll meet:

  • Snappy — Spark’s default. Fast compress, fast decompress, decent ratio (~2-3x). The right choice for hot pipelines where CPU during read matters more than disk space.
  • Gzip — Slower but smaller (~3-4x). Older, well-supported everywhere. Good for cold archival when read frequency is low.
  • Zstd (zstandard) — The modern winner. Faster than gzip, smaller than gzip, often comparable to snappy on read speed. If you’re choosing today, this is probably the answer. Spark supports it natively from 3.2.
  • Lz4 — Very fast, modest ratio. Niche; rarely the best choice for either hot or cold paths.

You set the codec per write or globally:

# Per write
df.write.option("compression", "zstd").parquet("s3://lake/orders/")

# Or globally for the session
spark.conf.set("spark.sql.parquet.compression.codec", "zstd")
df.write.parquet("s3://lake/orders/")

A practical rule: if your data is read often (dashboards, recurring jobs), use snappy or zstd — read CPU dominates. If your data is read once a quarter for compliance, use gzip or zstd at a higher level — disk cost dominates. Don’t pick gzip for hot paths just because it’s smaller; you’ll feel it on every query.

The vectorized reader

We saw Tungsten in lesson 42. Parquet is one of the places where Tungsten visibly pays off. Spark’s Parquet reader is vectorized: instead of materializing one Row object at a time, it decodes column chunks into batches of values stored in flat off-heap arrays, and the operators downstream (filters, projections, aggregations) work on the batches directly. The ColumnarToRow node in the plan above is the boundary where Spark finally converts batches back to rows for non-vectorized operators.

You don’t configure this; it’s on by default for primitive types. The relevant setting if you ever need to disable it for debugging is spark.sql.parquet.enableVectorizedReader, defaulting to true. Leave it alone in production.

Reading and writing in practice

Reading is the simple case:

# Whole table
df = spark.read.parquet("s3://lake/orders/")

# Column projection — only reads the two columns from each row group
df = spark.read.parquet("s3://lake/orders/").select("order_id", "amount")

# With filter — predicate pushdown plus row-group skipping
df = (spark.read.parquet("s3://lake/orders/")
        .filter(F.col("country") == "IT")
        .filter(F.col("amount") > 1000))

Writing has more knobs:

(df.write
   .mode("overwrite")
   .option("compression", "zstd")
   .parquet("s3://lake/orders/"))

Best practices for the write side:

  • Partition by a low-cardinality column that matches your query patterns (lesson 34). partitionBy("year", "month") is the canonical example. Partitioning is not the same as row groups; it’s the directory layout above the file. A partitioned table can still have predicate pushdown within a file via row-group stats.
  • Target 100-500 MB per file. Smaller and you pay metadata overhead and listing cost. Bigger and you lose parallelism on read because one task handles one (large) row group. The coalesce / repartition step before write controls this.
  • Sort within partitions if you have a predictable filter column. Sorted data means tighter min/max ranges per row group, which means more aggressive row-group skipping. df.sortWithinPartitions("event_time").write... is cheap insurance.
  • Don’t write tiny files. A common antipattern is writing 1,000 partition directories with 200 files each, all 2 MB. Spark spends more time listing files than reading them. Coalesce or compact.

Schema-on-read, but with types

CSV is schema-on-read in the lazy sense: the file has no idea what types its columns are, and you (or Spark’s inference) guess at read time. Parquet is schema-on-read in the better sense: the file declares its schema in the footer, types are enforced at write, and reads always produce the same types regardless of who’s reading. You get the flexibility of not having to define a table up front and the safety of typed columns.

The type system is also genuinely rich. Parquet has nested structs, repeated fields (arrays), maps, decimals with precision/scale, timestamps with logical metadata distinguishing instant-in-time from local-datetime. You can store a struct of arrays of structs and Spark will round-trip it cleanly. This matters when your data is event-shaped — JSON-like nested objects compress and project beautifully in Parquet.

When Parquet isn’t the right answer

Parquet is the default, but it’s not universal. Skip it when:

  • You’re streaming events one at a time. Parquet’s columnar layout means writes have to buffer a row group before flushing. For low-latency append-only ingest (Kafka → durable storage), row-oriented Avro is a better fit. We’ll meet it next lesson.
  • You need atomic updates and deletes. Parquet files are immutable. UPDATE against a Parquet table means rewriting files. Delta Lake (also next lesson) sits on top of Parquet to add transactional semantics.
  • The data is already small and structured for human reading. A 50-row config file doesn’t need columnar storage. Use JSON or YAML.

Outside those cases, default to Parquet. Almost every team I’ve worked with that tried to be clever (“we’ll use CSV for now, it’s simpler”) wasted weeks of compute six months later when they migrated.

Try this

Write the same DataFrame in three formats and compare:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os

spark = (SparkSession.builder
         .appName("ParquetDemo")
         .master("local[*]")
         .getOrCreate())

df = spark.range(0, 1_000_000).select(
    F.col("id").alias("order_id"),
    (F.col("id") % 100).alias("country_id"),
    (F.rand() * 1000).alias("amount"),
    F.current_timestamp().alias("created_at"),
)

df.write.mode("overwrite").csv("/tmp/demo/orders_csv")
df.write.mode("overwrite").parquet("/tmp/demo/orders_parquet")
df.write.mode("overwrite").option("compression", "zstd").parquet("/tmp/demo/orders_zstd")

def size_mb(path):
    total = 0
    for root, _, files in os.walk(path):
        for f in files:
            total += os.path.getsize(os.path.join(root, f))
    return total / (1024 * 1024)

print(f"CSV:           {size_mb('/tmp/demo/orders_csv'):.1f} MB")
print(f"Parquet snappy: {size_mb('/tmp/demo/orders_parquet'):.1f} MB")
print(f"Parquet zstd:  {size_mb('/tmp/demo/orders_zstd'):.1f} MB")

# Predicate pushdown — check the plan
(spark.read.parquet("/tmp/demo/orders_parquet")
   .filter(F.col("amount") > 500)
   .select("order_id", "amount")
   .explain())

You’ll see CSV around 30-40 MB, Parquet snappy around 6-8 MB, Parquet zstd a touch smaller. The plan for the filtered read will show PushedFilters: [..., GreaterThan(amount, 500)] and a ReadSchema with only the two projected columns. That’s the file format earning its keep.

Next lesson we look at the alternatives — ORC, Avro, and the Delta/Iceberg/Hudi family that sits on top of Parquet to make it transactional.


References: Apache Parquet documentation (https://parquet.apache.org/docs/) and Apache Spark SQL data sources guide (https://spark.apache.org/docs/latest/sql-data-sources-parquet.html). Retrieved 2026-05-01.

Search