PySpark, from the ground up Lesson 50 / 60

Structured Streaming basics: readStream, writeStream, triggers

The streaming entry points, the trigger semantics, and the checkpoint that everything depends on.

Last lesson was conceptual: streams are infinite tables, batch and streaming are a continuum, micro-batch is the model, DStreams are dead. This lesson is the mechanics. By the end you’ll have written a complete streaming job and know what every line does.

There are exactly four things you need to understand:

  1. readStream — how data gets in.
  2. writeStream — how data gets out.
  3. Triggers — when each micro-batch fires.
  4. Checkpoints — how Spark remembers what it’s already processed.

That’s it. The rest is variations.

readStream: the input side

The streaming entry point is spark.readStream. Same shape as spark.read, but it returns a streaming DataFrame instead of a finite one:

events = (spark.readStream
            .format("csv")
            .schema("user_id STRING, action STRING, ts TIMESTAMP")
            .option("header", "true")
            .load("/data/incoming/"))

The DataFrame events looks normal — same printSchema, same column references — but events.isStreaming returns True, and you can’t .show() it or .count() it. Streaming DataFrames don’t have a “current snapshot” you can inspect; they only make sense when wired to a sink.

Spark ships with several built-in streaming sources:

  • file — poll a directory for new files. The format can be csv, json, parquet, orc, or text. Spark scans the directory each trigger; any file that wasn’t there last time gets read. This is the easiest source to test with because you control it from a shell — cp file.json /data/incoming/ and watch the job pick it up.
  • kafka — subscribe to one or more Kafka topics. The dominant production source. Lesson 51 is dedicated to it.
  • socket — read newline-delimited text from a TCP socket. Strictly for toy examples and demos; not fault tolerant, no offsets, never use in production. But it’s handy for tutorials because you can nc -lk 9999 in a terminal and type events by hand.
  • rate and rate-micro-batch — synthetic sources that generate (timestamp, value) rows at a configurable rate. Useful for benchmarking, smoke tests, and learning. They’re documented but rarely used in real jobs.

Third parties add more: Delta Lake adds delta, Iceberg adds iceberg, AWS Kinesis has its own connector, etc. The pattern is always the same — format("...") plus options.

A few file source options worth knowing:

events = (spark.readStream
            .format("json")
            .schema(my_schema)
            .option("maxFilesPerTrigger", 10)
            .option("latestFirst", "false")
            .option("cleanSource", "archive")
            .option("sourceArchiveDir", "/data/archived/")
            .load("/data/incoming/"))

maxFilesPerTrigger caps how many new files get processed in one micro-batch — important if you’re catching up on a backlog and don’t want a single huge micro-batch to OOM. latestFirst reverses the file order if you want the newest first. cleanSource tells Spark what to do with files it’s processed (archive, delete, or off) — useful for keeping the source directory tidy without writing your own cleanup script.

The schema matters. You must provide a schema for streaming file sources by default. Spark won’t infer it because that would require reading data on every trigger, and Spark wants the schema to be stable across the lifetime of the query. If you really want inference, set spark.sql.streaming.schemaInference to true, but it’s a footgun — once the query starts, the schema is locked.

Transformations: the same API you already know

Once you have a streaming DataFrame, you transform it with the same operators as a batch DataFrame:

from pyspark.sql.functions import col, upper

high_value = (events
                .filter(col("action") == "purchase")
                .withColumn("user_id_upper", upper("user_id"))
                .select("user_id_upper", "ts", "amount"))

filter, select, withColumn, groupBy, agg, join, union — they all work, with one consistent rule: operations that need to look at the entire dataset don’t work directly on streams. You can’t .sort() a stream globally because there’s no “end” to sort to. You can’t .limit(100) because the meaning of “the first 100” depends on order. You can’t compute a percentile because it requires the whole dataset.

What you can do:

  • Stateless transformations — anything per-row (select, filter, withColumn, cast, explode, UDFs that don’t reference other rows). These are easy.
  • Stateful aggregationsgroupBy + count, sum, avg, etc. Spark maintains state across micro-batches. You can keep running counts of unbounded streams indefinitely (with watermarks to bound memory — lesson 52).
  • Windowed aggregationsgroupBy(window("ts", "5 minutes")) for tumbling windows, window("ts", "5 minutes", "1 minute") for sliding. Each window is its own group; results emit when the window closes.
  • Stream-static joins — joining a stream against a regular batch DataFrame (a dimension table, a lookup, a config). Always supported.
  • Stream-stream joins — joining two streams with watermarks and time bounds. Supported but with caveats; lesson 55.

Anything you’d compute over an infinite list, basically, has a streaming-shaped version. Anything that requires “all the data right now” doesn’t translate.

writeStream: the output side

The output side mirrors the input. You configure a sink and start the query:

query = (high_value.writeStream
            .format("parquet")
            .outputMode("append")
            .option("path", "/data/output/")
            .option("checkpointLocation", "/data/checkpoints/high_value/")
            .trigger(processingTime="30 seconds")
            .start())

Three things are happening here. Let me unpack them in order.

Output modes

outputMode tells Spark what to write each micro-batch. Three options:

  • append — only emit rows that are new since the last micro-batch and won’t change again. Default for stateless queries. Required for sinks that don’t support updates (Parquet, Avro, plain files). For aggregations, append mode requires a watermark so Spark knows when an aggregation is “done” and can be emitted (lesson 52 again — it shows up everywhere).
  • complete — emit the entire current result table on every micro-batch. Only valid for aggregations (anything else has an unbounded result). Useful for dashboards or console debugging where you want to see the running total.
  • update — emit only the rows that changed in this micro-batch. Useful for sinks that support upserts: Delta Lake, Cassandra, JDBC with merge logic. More efficient than complete, more flexible than append.

Lesson 53 is the deep dive on output modes — when each is legal, when each is what you want, and how watermarks interact. For now: use append for append-only sinks, complete for small aggregations to a console or memory sink, update for upsert-capable sinks.

Triggers

The trigger controls when each micro-batch runs. Four flavors:

.trigger(processingTime="10 seconds")   # every 10 seconds
.trigger(availableNow=True)             # process all available, then stop
.trigger(continuous="100 ms")           # experimental low-latency mode
# .trigger() omitted                    # default: as fast as possible

The default trigger (no .trigger() call, or omitted) tells Spark to start the next micro-batch as soon as the previous one finishes. If your processing takes 2 seconds per batch, you get a batch every 2 seconds. If it takes 200 ms, you get five per second. This is the right default for most production jobs — Spark just goes as fast as it can.

processingTime pins the interval. If you set "10 seconds", Spark runs a micro-batch every 10 seconds regardless of processing time. If a batch takes 4 seconds, Spark waits 6 seconds before the next one. If a batch takes 14 seconds (longer than the trigger), the next one starts immediately and you get a warning in the logs about falling behind. Use this when you want predictable, low-frequency batching — say, hourly summaries written to a table.

availableNow=True is the “batch-style streaming” trigger. Spark reads all data currently available in the source, processes it (possibly across multiple internal micro-batches if there’s a lot), writes it out, and stops. The streaming query terminates cleanly. This is enormously useful for jobs you want to run on a schedule (Airflow, cron) but written with the streaming API — say, an hourly job that reads from Kafka, processes whatever’s there, and exits. It replaces the older Trigger.Once mode (now deprecated in favor of availableNow, which handles backlogs better).

continuous is the experimental millisecond-latency mode I mentioned in lesson 49. Restricted operations, niche use case, mostly ignore it.

Sinks

format and the associated options pick the destination:

  • parquet, orc, json, csv — file sinks. Append mode only. Specify path and checkpointLocation. Each micro-batch writes new files into the path.
  • kafka — write to a Kafka topic. Useful for stream → stream pipelines. Specify the topic and bootstrap servers.
  • console — print to stdout. Debugging only. Has options for numRows and truncate like .show().
  • memory — write to an in-memory table accessible by name with spark.sql("SELECT * FROM my_query"). Debugging and notebook exploration. Don’t use in production; the table grows unbounded in driver memory.
  • foreach and foreachBatch — escape hatches for custom sinks. foreachBatch gives you the micro-batch as a regular DataFrame and lets you do whatever (write to JDBC, call an HTTP API, run a custom merge into Delta, etc.). The everyday tool when no built-in sink fits.

Third-party sinks: delta, iceberg, JDBC, Kinesis, etc. Same pattern.

The checkpoint

Now the part that’s mandatory and that beginners forget every single time.

.option("checkpointLocation", "/data/checkpoints/my_job/")

The checkpoint location is a directory (local, HDFS, S3, anywhere durable) where Spark writes:

  1. Consumed offsets — for each source, what’s the last position we’ve processed? For Kafka, the offset per partition. For files, which files have been ingested. For sockets, well, you can’t checkpoint sockets, which is one reason they’re not for production.
  2. Aggregation state — for stateful queries, snapshots of the running counts/sums/windows so they survive a restart.
  3. Commit logs — markers that say “micro-batch N completed successfully and was written to the sink.”
  4. Metadata — query ID, schema, source/sink configuration. Used to detect restart-time mismatches.

Without a checkpoint, every restart of your streaming query starts over from scratch. For a file source, that means re-reading every file in the directory. For Kafka, that means re-reading from the configured startingOffsets. For an aggregation, that means recomputing all running totals from zero. None of these are usually what you want.

With a checkpoint, restart picks up where the last successful micro-batch ended. Same offsets, same aggregation state. The query continues seamlessly.

A few rules:

  • One checkpoint location per query. Two queries sharing a checkpoint will corrupt each other.
  • The checkpoint is tied to the query plan. If you change the operations significantly (add a stateful aggregation, change a join), Spark will refuse to resume from the old checkpoint and you’ll need to start fresh. Cosmetic changes (different output path, different numRows for console) are usually fine.
  • Don’t put it on local disk in a clustered deployment. If executor 0 dies and a new one starts on a different node, the checkpoint files are gone. Use HDFS, S3, ABFS, GCS, anything durable.
  • console and memory sinks technically work without a checkpoint for debugging, but Spark will warn. In production, always set it.

Forgetting checkpointLocation is the single most common Structured Streaming mistake I’ve seen. The query starts, runs, looks healthy. Then somebody restarts it for a totally unrelated reason and the data shows up in the output a second time. Because there was no checkpoint, the source got re-read.

A complete worked example

Putting it all together — read JSON files from a directory, filter purchases over 100, write Parquet:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("PurchaseFilter").getOrCreate()

schema = "user_id STRING, action STRING, amount DOUBLE, ts TIMESTAMP"

events = (spark.readStream
            .format("json")
            .schema(schema)
            .option("maxFilesPerTrigger", 5)
            .load("/data/incoming/"))

high_value = (events
                .filter(col("action") == "purchase")
                .filter(col("amount") > 100)
                .select("user_id", "amount", "ts"))

query = (high_value.writeStream
            .format("parquet")
            .outputMode("append")
            .option("path", "/data/high_value_purchases/")
            .option("checkpointLocation", "/data/checkpoints/high_value/")
            .trigger(processingTime="30 seconds")
            .start())

query.awaitTermination()

Run it. Drop a JSON file into /data/incoming/. Wait up to 30 seconds. Look in /data/high_value_purchases/ for new Parquet. Look in /data/checkpoints/high_value/ and you’ll see directories named offsets, commits, sources, metadata — the durable record of what’s been processed.

Kill the process with Ctrl-C. Drop another JSON file. Restart the same script. The query resumes, picks up only the new file (because the old one’s offset is recorded), and writes the new Parquet. That’s the checkpoint earning its keep.

Inspecting a running query

writeStream.start() returns a StreamingQuery handle. A few useful methods:

query.id                # Stable query ID across restarts (from checkpoint)
query.runId             # ID for this specific run; changes on restart
query.status            # Current state — "PROCESSING", "WAITING", etc.
query.lastProgress      # Detailed metrics for the last micro-batch
query.recentProgress    # The last 100 micro-batches
query.exception()       # If the query failed, the exception
query.stop()            # Graceful shutdown
query.awaitTermination(timeout=60)   # Block, optionally with timeout

lastProgress is the one I look at constantly during development. It’s a dict with input rate, processing rate, batch duration, source-specific lag (Kafka offset lag, files behind), and per-operator state size. We’ll spend more time on monitoring in lesson 57; for now, just know query.lastProgress is your best friend when something looks wrong.

What to take from this

Three things, in priority order:

  1. The streaming API is readStream + transformations + writeStream. The transformations in the middle are the same DataFrame API you’ve spent forty-eight lessons on. Don’t overthink it.
  2. Pick a trigger that matches your latency budget. Default (continuous fast) for low-latency, processingTime for predictable scheduling, availableNow for batch-on-streaming jobs.
  3. Always set checkpointLocation. Always. Even in dev. Even for console. The cost is one line; the upside is that everything else works correctly.

Next lesson, the source you’ll actually use in production: Kafka. Offsets, deserialization, the tradeoffs between at-least-once and exactly-once.


References: Apache Spark Structured Streaming Programming Guide (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html), particularly the sections on input sources, output sinks, and triggers. Retrieved 2026-05-01.

Search