PySpark, from the ground up Lesson 46 / 60

Writing to JDBC: parallelism, batches, idempotency

How to write Spark output back to a relational database without crushing it, breaking transactions, or losing data on retry.

Last lesson we pulled data from Postgres into Spark. Now we go the other way: a DataFrame full of computed results, and a relational database waiting to receive them. The dashboard team wants the aggregates in their reporting Postgres. The application team wants enrichments back in MySQL so the API can serve them. Whatever the reason, you need df.write.format("jdbc").

The good news: the API mirrors the read API. The bad news: the failure modes are worse. A bad read wastes time. A bad write corrupts data, and corrupting data is the kind of mistake that lingers in your performance review for a long time.

This lesson is the safe-and-fast playbook for writing to JDBC sources.

The basic call

(df.write
   .format("jdbc")
   .option("url", "jdbc:postgresql://db.example.com:5432/reports")
   .option("dbtable", "public.daily_summary")
   .option("user", "spark_writer")
   .option("password", os.environ["DB_PASSWORD"])
   .option("driver", "org.postgresql.Driver")
   .mode("append")
   .save())

Same URL, same dbtable, same driver, same authentication story we covered in lesson 45. The new piece is .mode(...), the save mode, and a small constellation of write-specific options that change everything about how this runs.

Parallelism is implicit (and that’s the trap)

A read with no partitionColumn runs as one task. A write runs with however many partitions the DataFrame has. If df.rdd.getNumPartitions() returns 1000, Spark opens 1000 connections to your database, each running INSERTs in parallel.

That sounds great until you remember that your Postgres has max_connections = 100 and the application also wants to use it. Your write doesn’t fail cleanly — it fails halfway, holding open 100 connections, with the other 900 partitions stuck waiting in a pool, while the application starts throwing connection errors.

So the first knob, every time, before you even think about save modes:

df_to_write = df.coalesce(16)   # or .repartition(16) if you need a shuffle

A reasonable default for a write target is 16 to 32 partitions, capped by what the database can tolerate. The DBA will tell you the connection budget; do not exceed it. Spark doesn’t do the polite thing here. It will happily try 1000 simultaneous connections and watch the world burn.

Note: coalesce is cheap (no shuffle) but only reduces partitions. If your DataFrame has too few partitions and they’re skewed, you may want repartition(N, key) to redistribute the load, accepting the shuffle cost.

batchsize: how many rows per round-trip

By default, the JDBC driver groups inserts into batches of 1000 rows and sends them as a single executeBatch call. The option:

.option("batchsize", 5000)

Bigger batches mean fewer network round-trips and fewer transaction commits per row, which on a write-heavy job is the difference between minutes and hours. The tradeoffs:

  • Memory: each batch buffers batchsize rows on the executor before flushing. 5000 rows of skinny data is nothing; 5000 rows of wide JSON columns can OOM a task.
  • Lock duration: a single batch typically runs in one transaction. Bigger batch = longer transaction = longer locks held on the destination table.
  • Failure granularity: if a batch fails on row 4500, the whole batch rolls back. With batchsize=10000, that’s 10000 rows you have to redo. With batchsize=100, only 100.

A useful number for typical write workloads: 5000-10000. Tune up if writes are slow and the network is the bottleneck. Tune down if you see OOMs or batches timing out.

isolationLevel

Default is READ_COMMITTED for most drivers. Each batch runs in its own transaction. If a batch fails, that batch rolls back; previously committed batches stay committed.

Other levels — READ_UNCOMMITTED, REPEATABLE_READ, SERIALIZABLE, NONE — are mostly relevant when the destination table is being read concurrently and you care about what those readers see. For background batch loads against a table no one else is reading, the default is fine. Setting isolationLevel=NONE (autocommit per row) sounds faster but is almost always slower and breaks any sense of recovery boundaries.

Save modes: pick carefully

Four modes, and the choice has real consequences:

append — INSERTs new rows into the existing table. The table must exist with a compatible schema, or Spark creates it on first write using inferred types. Additive, which is the safest mode for production. The risk is duplicates if you re-run.

overwrite — and here’s where people get hurt. By default, overwrite does DROP TABLE followed by CREATE TABLE and inserts. The original table is gone, including any indexes, constraints, foreign keys, grants, and triggers. The recreated table has only the columns and types Spark inferred — no constraints, no indexes, no nothing. This will silently break the application three days later when someone tries to query by a missing index.

The fix:

.option("truncate", "true")
.mode("overwrite")

With truncate=true, Spark issues TRUNCATE TABLE instead of DROP, then inserts. Schema, indexes, constraints all preserved. Use this. Always. Some drivers (older Postgres) ignore truncate=true and fall back to drop-and-recreate; check the docs for your driver version. If yours does, use a staging-table pattern (below) instead.

There’s also option("cascadeTruncate", "true") for Postgres if you have foreign keys pointing at your table, which you probably shouldn’t on an analytical write target.

error (the default) — fail if the table exists. Good for “I want to be sure I’m not stomping on something.”

ignore — silently do nothing if the table exists. Useful in idempotent setup scripts; dangerous in actual writes (you might silently skip your write).

The idempotency problem

Now the real one. Imagine this sequence:

  1. Spark starts a write with 16 partitions.
  2. Partitions 1-12 finish successfully — 12 batches committed to the database.
  3. Partition 13 fails — network blip, OOM, executor lost, whatever.
  4. Spark retries partition 13 on a different executor.
  5. Partition 13’s retry succeeds.

Looks fine, right? Except partition 13 might have committed half its batches before failing. The retry then commits all its batches. You have duplicate rows for whatever the original task partially wrote.

This is not theoretical. Spark guarantees at-least-once execution per partition on retry. With append mode and no other safeguards, that means at-least-once at the row level too — which means duplicates on failure.

There are two production-grade patterns to fix this.

Pattern 1: staging table with atomic swap

Write to a temporary table that doesn’t matter, then in a single transaction copy or swap into the real table:

# Step 1: write to staging — duplicates are fine here
(df.coalesce(16).write
   .format("jdbc")
   .option("url", url)
   .option("dbtable", "public.daily_summary_staging")
   .option("user", user).option("password", pw)
   .option("driver", driver)
   .option("batchsize", 5000)
   .mode("overwrite")
   .option("truncate", "true")
   .save())

# Step 2: atomic swap, executed on the database directly
import psycopg2
with psycopg2.connect(host="...", dbname="reports", user="...", password="...") as conn:
    with conn.cursor() as cur:
        cur.execute("""
            BEGIN;
            DELETE FROM public.daily_summary
             WHERE report_date = %s;
            INSERT INTO public.daily_summary
            SELECT DISTINCT * FROM public.daily_summary_staging;
            COMMIT;
        """, (report_date,))

The Spark job is allowed to be sloppy because no one downstream reads the staging table. The atomic swap into the real table is a single transaction at the database level, so there’s no half-state visible to readers. If the swap fails, the staging table still has the data and you can retry. If the Spark job fails, you re-run it; the staging table gets overwritten. Idempotent at every step.

This is the most reliable pattern. It costs you 2x the storage briefly and one extra hop, and it’s worth it.

Pattern 2: idempotent upserts via primary key

If your destination table has a primary key or unique constraint and you can rely on it, use the database’s native upsert:

-- Postgres
INSERT INTO daily_summary (report_date, country, total)
VALUES (...)
ON CONFLICT (report_date, country) DO UPDATE
  SET total = EXCLUDED.total;

-- MySQL
INSERT INTO daily_summary (report_date, country, total)
VALUES (...)
ON DUPLICATE KEY UPDATE total = VALUES(total);

Spark’s JDBC writer doesn’t support upserts natively. You have two options:

Option A: write to a staging table (as above), then run the INSERT ... ON CONFLICT on the database side as a single SQL statement covering all rows.

Option B: drop into df.foreachPartition() and write your own JDBC upserts. The shape:

def upsert_partition(rows):
    import psycopg2
    conn = psycopg2.connect(...)
    cur = conn.cursor()
    sql = """
        INSERT INTO daily_summary (report_date, country, total)
        VALUES (%s, %s, %s)
        ON CONFLICT (report_date, country) DO UPDATE
          SET total = EXCLUDED.total
    """
    batch = []
    for row in rows:
        batch.append((row.report_date, row.country, row.total))
        if len(batch) >= 5000:
            cur.executemany(sql, batch)
            conn.commit()
            batch.clear()
    if batch:
        cur.executemany(sql, batch)
        conn.commit()
    cur.close()
    conn.close()

df.coalesce(16).foreachPartition(upsert_partition)

Each row is now idempotent at the database level — a retry of the same row produces the same result. You’re doing more work per row than Spark’s vectorized executeBatch, so this is slower than a straight append, but you get true at-most-once semantics. A pattern worth keeping in your back pocket.

Numbers, because they help

Concrete benchmarks from a real workload — 100 million skinny rows (timestamp, key, three numerics) into Postgres on the same VPC:

ConfigurationWall-clock
Default append, 1000 partitions, batchsize 1000~30 minutes
append, coalesce(16), batchsize 5000~3 minutes
Staging table + atomic swap, 16 partitions, batchsize 10000~3.5 minutes
foreachPartition upserts, 16 partitions, batchsize 5000~7 minutes

The first row is what you get if you forget about partitioning. The second is the parallel-and-batched sweet spot. The third costs almost nothing extra and gives you idempotency. The fourth is slower but gives you per-row idempotent upserts. Pick the one that matches your reliability needs.

A few more knobs

createTableOptions — extra clauses appended to CREATE TABLE when Spark creates the target. Useful for things like WITH (autovacuum_enabled=false) on a load-only Postgres table.

createTableColumnTypes — overrides Spark’s inferred column types. Use this when you want VARCHAR(255) instead of TEXT, or NUMERIC(18,2) instead of DOUBLE PRECISION.

queryTimeout — kills any single query that runs too long. Set this in production. Default is unlimited, which is exactly what you don’t want when a deadlock has frozen one of your batches.

sessionInitStatement — SQL run on each connection before any write. Useful for SET statement_timeout = '5min', SET synchronous_commit = off (Postgres performance hack on a non-critical destination), or SET search_path = ....

The DON’T list, write edition

Don’t write a 1000-partition DataFrame straight to a transactional database. Coalesce or repartition to a sane number first. Always.

Don’t use mode("overwrite") without option("truncate", "true") unless you genuinely want the table dropped. And even then, make sure no application is depending on the indexes or grants you’re about to vaporize.

Don’t assume append is idempotent. It isn’t, on retry. Either commit to the staging-table pattern, the upsert pattern, or accept that occasional duplicates exist and dedupe downstream.

Don’t skip the destination index strategy. Bulk inserts against a heavily indexed table are slow because every row updates every index. If you control the schema and writes are bursty, a common pattern is: drop indexes, bulk insert, recreate indexes. But this is dangerous on a live table and is much easier with the staging-and-swap pattern, where you can build the indexes on staging before the swap.

Don’t write to your prod transactional DB from Spark for anything time-sensitive. Same warning as last lesson, in reverse: Spark write jobs hold connections, do bulk inserts, run transactions. The application will notice. Land the data in a warehouse or analytics DB instead, and let the application pull it from there.

Try this

from pyspark.sql import SparkSession
import os

spark = (SparkSession.builder
         .appName("JdbcWriteDemo")
         .master("local[*]")
         .config("spark.jars.packages", "org.postgresql:postgresql:42.7.0")
         .getOrCreate())

# Build a synthetic DataFrame with 5M rows, 200 partitions
df = (spark.range(5_000_000)
        .withColumnRenamed("id", "user_id")
        .selectExpr(
            "user_id",
            "user_id % 100 as country_id",
            "rand() * 1000 as amount",
        )
        .repartition(200))

url = "jdbc:postgresql://localhost:5432/demo"
props = {
    "user": "spark_writer",
    "password": os.environ["DB_PASSWORD"],
    "driver": "org.postgresql.Driver",
    "batchsize": "5000",
}

# Don't do this — too many connections
# df.write.jdbc(url=url, table="public.results_bad",
#               mode="overwrite", properties=props)

# Do this — coalesce first, truncate not drop
(df.coalesce(16).write
   .option("truncate", "true")
   .jdbc(url=url, table="public.results",
         mode="overwrite", properties=props))

Watch pg_stat_activity while it runs. With the bad version (commented), you’d see 200 connections fight for the connection-pool slots and the job stutter. With the good version, you see 16 steady connections, each chewing through batches until done.

Next lesson, cloud object storage: S3, GCS, Azure Blob — the new “default filesystem” for Spark in 2026. The consistency story is finally simple, but the rename problem still bites, and that’s why direct-write committers exist.


References: Apache Spark JDBC data source documentation (https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html). Retrieved 2026-05-01.

Search