PySpark, from the ground up Lesson 54 / 60

Output modes and idempotent sinks: foreachBatch and the upsert pattern

Append vs update vs complete, the sinks Spark ships, and the foreachBatch escape hatch for everything else.

So your stateful streaming query is computing things. Where do those things go? And how do you make sure they get there exactly once, even when Spark retries a batch after a crash?

This lesson is about the output side: the three output modes, the sinks Spark ships, and the foreachBatch pattern that’s the answer to “but I want to write to X” for any X that isn’t on the built-in list.

The three output modes, recapped

Every streaming query has an outputMode — what gets written each batch.

append — only new rows since the last batch. The most common mode. Works for:

  • Non-aggregating queries (filters, projections, stream-stream joins). Each input row maps to zero or more output rows that are emitted once.
  • Aggregations bounded by a watermark. The window’s row is emitted when the watermark closes it, and never again.

What it can’t do: emit aggregations that haven’t been finalized yet. If your business wants to see “running total updates as they happen,” append won’t give you that — the running total only appears when the window closes, possibly hours after the events arrived.

update — rows that changed since the last batch. Works for aggregations and gives you running visibility — every batch emits the windows that got new data, with their current totals. The downside: the same key can be emitted many times with different values, so the sink has to be upsert-capable. Writing to an append-only sink (raw Parquet, Kafka without a key strategy) loses the semantics: you’d see five rows for the same window with five different counts.

complete — the entire result table after each batch. Only valid for aggregations. Only practical for small results. Spark literally re-emits the full set of group keys each batch. If you have 10 distinct keys and a dashboard that overwrites a single file, complete mode is fine. If you have 10 million keys, you’re going to have a bad time.

The mode you pick is constrained by what your query does and what your sink can do. You don’t always get a free choice.

The built-in sinks

Out of the box, Spark ships:

File sinkformat("parquet"), format("orc"), format("json"), format("csv"). Writes append-only files into a directory, partitioned if you specify partitionBy. Append mode only. Good for landing zones, lakes, raw archives.

(query.writeStream
    .format("parquet")
    .option("path", "s3://lake/views/")
    .option("checkpointLocation", "s3://ck/views/")
    .partitionBy("dt")
    .outputMode("append")
    .start())

Kafka sinkformat("kafka"). Writes each output row as a Kafka record. Expects key, value, topic, optional headers columns. At-least-once delivery; you implement exactly-once via idempotent consumers downstream.

(query.selectExpr("CAST(user_id AS STRING) AS key",
                  "to_json(struct(*)) AS value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("topic", "page-aggs")
    .option("checkpointLocation", "...")
    .start())

Console sinkformat("console"). Prints to driver stdout. Append, update, or complete. Strictly for development — the driver will OOM if you try to console-print a real workload.

.format("console").outputMode("update").option("truncate", False)

Memory sinkformat("memory"). Writes into a named in-driver table you can query with regular Spark SQL. Useful for tests and notebooks.

.format("memory").queryName("session_results").outputMode("append")
# in another cell:
spark.sql("SELECT * FROM session_results").show()

foreachBatch — the escape hatch. We get our own section.

That’s the official list. Notice what’s not on it: Postgres, MySQL, Snowflake, BigQuery, Mongo, Elasticsearch, Cassandra, Redis. Almost every real destination beyond a file or Kafka is a foreachBatch job.

The foreachBatch pattern

foreachBatch hands you the micro-batch as a regular DataFrame and a batch ID, and lets you do whatever you want with both:

def write_batch(batch_df, batch_id):
    # batch_df is a regular DataFrame
    # batch_id is a monotonically increasing long
    batch_df.write.format("jdbc").options(...).mode("append").save()

(query.writeStream
    .foreachBatch(write_batch)
    .option("checkpointLocation", "...")
    .start())

Inside the function, you have the full batch API. Multi-target writes, complex transformations, calls to external systems, conditional logic — all of it. The streaming engine takes care of triggering, ordering, and replaying batches.

Two things to internalize:

  1. batch_df is a regular DataFrame. It’s not a stream. You can call .write on it, or .collect() if it’s small, or .cache() if you need it twice. It has a defined size; this batch is over when your function returns.
  2. Batches can replay. If your function fails or the driver crashes mid-batch, Spark will call write_batch again with the same batch_id and the same contents. Your code has to handle that.

That second point is where idempotence comes in.

The idempotent-sink rule

The contract is: the same batch_id plus the same batch_df should always produce the same end state in the sink, no matter how many times your function runs.

If your sink supports upsert (insert or update by primary key), idempotence is mostly free — re-running with the same data overwrites itself. If your sink only supports append, you need to deduplicate by the batch_id, usually with a target-side primary key like (batch_id, source_offset).

Two concrete examples follow.

Pattern 1: foreachBatch into a Delta merge (upsert)

Delta Lake supports MERGE — atomic upsert by predicate. This is the gold standard for exactly-once aggregation output:

from delta.tables import DeltaTable

def upsert_to_delta(batch_df, batch_id):
    target = DeltaTable.forPath(spark, "s3://lake/page_aggs/")
    (target.alias("t")
        .merge(
            batch_df.alias("s"),
            "t.window_start = s.window_start AND t.page = s.page",
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute())

(windowed.writeStream
    .foreachBatch(upsert_to_delta)
    .outputMode("update")
    .option("checkpointLocation", "s3://ck/page_aggs/")
    .start())

If batch 17 fails partway through, Spark replays it. The MERGE runs again with the same batch_df. Rows that were already merged get matched and updated to the same values; new rows get inserted. End state is identical either way.

Pair this with outputMode("update") so each batch only carries the windows that changed. Combined, you get a Delta table that always reflects the latest aggregate per window, updated continuously, with exactly-once semantics from a strictly at-least-once stream.

Pattern 2: foreachBatch into a Postgres upsert

Postgres doesn’t have streaming-native integration, but it does have INSERT ... ON CONFLICT, which is enough:

def upsert_to_postgres(batch_df, batch_id):
    rows = batch_df.collect()  # safe only if batch is small
    if not rows:
        return

    sql = """
        INSERT INTO page_aggs (window_start, page, views, batch_id)
        VALUES (%s, %s, %s, %s)
        ON CONFLICT (window_start, page) DO UPDATE
        SET views = EXCLUDED.views,
            batch_id = EXCLUDED.batch_id
        WHERE page_aggs.batch_id < EXCLUDED.batch_id
    """

    import psycopg2
    conn = psycopg2.connect(...)
    try:
        with conn, conn.cursor() as cur:
            cur.executemany(sql, [
                (r.window_start, r.page, r.views, batch_id)
                for r in rows
            ])
    finally:
        conn.close()

A few details that matter:

  • ON CONFLICT (window_start, page) DO UPDATE is the upsert clause. Requires a unique index or PK on the conflict columns.
  • The WHERE page_aggs.batch_id < EXCLUDED.batch_id guard makes the upsert monotonic. If a batch replays out of order or you have late stragglers from an older batch, you don’t overwrite a newer state with older data.
  • .collect() materializes the batch on the driver. Only safe if batches are small (a few thousand rows). For larger batches, use batch_df.foreachPartition and connect once per partition, not once per batch.
  • The connection should be opened inside the function, not closed over from the driver. Connections aren’t picklable, and even if they were, you’d be reusing a single connection across batches in unpredictable ways.

For really large batches, a more robust pattern stages into a temporary table per batch and merges:

def upsert_to_postgres_staged(batch_df, batch_id):
    staging = f"page_aggs_stage_{batch_id}"
    (batch_df.write
        .format("jdbc")
        .option("dbtable", staging)
        .mode("overwrite")
        .save())
    # Then run a single MERGE/UPSERT statement from staging into target,
    # transactionally, then drop the staging table.

Spark writes the staging table in parallel; one transaction does the merge atomically; cleanup at the end. Idempotent because writing the staging table with mode("overwrite") is itself idempotent on batch_id, and the merge is too.

Delivery semantics, recapped

Putting the streaming model together end-to-end:

  • At-least-once is automatic. Spark’s checkpointing guarantees that every input event is processed, and every output batch is delivered, at least once. Crashes, restarts, and replays preserve this.
  • Exactly-once requires sink cooperation. The sink has to be idempotent under replay of the same batch_id. Append-only sinks need a dedup key; upsert-capable sinks need PK semantics.
  • The checkpoint is the source of truth. It records which Kafka offsets were consumed, what state was held, and which batches committed. Lose it and you lose your guarantee.

A common mistake is to ship a streaming pipeline that writes plain Parquet and call it “exactly-once” because nothing crashed during testing. Then a real outage happens, the job restarts, the last batch replays, and you have duplicate rows in the lake. Idempotence isn’t free; you have to build for it.

Picking the mode + sink for a use case

A few common combinations and what they imply:

  • Streaming filter/transform → file sink. outputMode("append"). Files are append-only; no idempotence problem because each batch’s files are committed atomically with the offset.
  • Windowed aggregation → console for dev, Delta merge for prod. outputMode("update") with foreachBatch. Running totals visible immediately; correct under replay.
  • Sessionization (flatMapGroupsWithState) → Kafka. outputMode("append"). Sessions emit when they close; downstream consumers handle idempotence with the session_id key.
  • Real-time dashboard backed by Postgres. outputMode("update") + foreachBatch upsert. The dashboard reads the current snapshot; the upsert keeps it consistent.

If you find yourself wanting complete mode in production, step back. It’s almost always wrong at scale. The right move is usually update mode with an upsert sink, plus a periodic snapshot job if downstream really needs a full picture.

Where this leaves us

You now have, end-to-end, the streaming model: sources (lesson 50), micro-batch execution (lesson 51), event time and watermarks (lesson 52), state and stateful operators (lesson 53), and now output modes plus idempotent sinks. That’s enough to design and ship a streaming pipeline.

What you don’t have yet is the war-stories vocabulary for when it goes wrong — back-pressured queries, exploding state, watermark stalls, slow micro-batches, cluster-level resource contention. That’s Module 10: production debugging. We start with reading a Streaming UI tab like a doctor reads an EKG.


References: Apache Spark Structured Streaming Programming Guide, sections on output modes, output sinks, and foreachBatch (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes); Delta Lake MERGE documentation. Retrieved 2026-05-01.

Search