Nella scorsa lezione abbiamo tirato dati da Postgres dentro Spark. Adesso facciamo il percorso opposto: un DataFrame pieno di risultati calcolati, e un database relazionale che aspetta di riceverli. Il team della dashboard vuole gli aggregati nel loro Postgres di reporting. Il team applicativo vuole gli arricchimenti dentro MySQL così che l’API possa servirli. Qualunque sia la ragione, ti serve df.write.format("jdbc").
La buona notizia: l’API rispecchia quella di lettura. La cattiva notizia: i modi di fallire sono peggiori. Una lettura sbagliata fa perdere tempo. Una scrittura sbagliata corrompe i dati, e corrompere i dati è il tipo di errore che ti resta appiccicato addosso in performance review per parecchio tempo.
Questa lezione è il manuale per scrivere su sorgenti JDBC in modo veloce e sicuro.
La chiamata di base
(df.write
.format("jdbc")
.option("url", "jdbc:postgresql://db.example.com:5432/reports")
.option("dbtable", "public.daily_summary")
.option("user", "spark_writer")
.option("password", os.environ["DB_PASSWORD"])
.option("driver", "org.postgresql.Driver")
.mode("append")
.save())
Stesso URL, stesso dbtable, stesso driver, stessa storia di autenticazione che abbiamo coperto nella lezione 45. Il pezzo nuovo è .mode(...), il save mode, e una piccola costellazione di opzioni specifiche per la scrittura che cambiano tutto su come gira.
Il parallelismo è implicito (e questa è la trappola)
Una lettura senza partitionColumn gira come un unico task. Una scrittura gira con tante partizioni quante ne ha il DataFrame. Se df.rdd.getNumPartitions() restituisce 1000, Spark apre 1000 connessioni al tuo database, ognuna che fa INSERT in parallelo.
Sembra fantastico finché non ti ricordi che il tuo Postgres ha max_connections = 100 e anche l’applicazione lo vuole usare. La tua scrittura non fallisce in modo pulito: fallisce a metà, tenendo aperte 100 connessioni, con le altre 900 partizioni bloccate in attesa nella pool, mentre l’applicazione inizia a sputare errori di connessione.
Quindi la prima manopola, ogni volta, prima ancora di pensare ai save mode:
df_to_write = df.coalesce(16) # o .repartition(16) se ti serve uno shuffle
Un default ragionevole per un target di scrittura sono da 16 a 32 partizioni, con un cap pari a quello che il database può tollerare. Il DBA ti dirà il budget di connessioni; non superarlo. Spark non fa il gentile qui. Proverà allegramente 1000 connessioni simultanee e guarderà il mondo bruciare.
Nota: coalesce è poco costoso (niente shuffle) ma riduce solo le partizioni. Se il tuo DataFrame ne ha troppo poche e sono skewed, potresti voler fare repartition(N, key) per ridistribuire il carico, accettando il costo dello shuffle.
batchsize: quante righe per round-trip
Di default, il driver JDBC raggruppa gli insert in batch di 1000 righe e li manda come una singola chiamata executeBatch. L’opzione:
.option("batchsize", 5000)
Batch più grandi significano meno round-trip di rete e meno commit di transazione per riga, che in un job write-heavy è la differenza tra minuti e ore. I trade-off:
- Memoria: ogni batch bufferizza
batchsizerighe sull’executor prima di scaricarle. 5000 righe di dati magri non sono nulla; 5000 righe di colonne JSON larghe possono mandare in OOM un task. - Durata dei lock: un singolo batch tipicamente gira in una transazione. Batch più grande = transazione più lunga = lock più lunghi tenuti sulla tabella di destinazione.
- Granularità del fallimento: se un batch fallisce sulla riga 4500, l’intero batch fa rollback. Con
batchsize=10000, sono 10000 righe da rifare. Conbatchsize=100, solo 100.
Un numero utile per workload di scrittura tipici: 5000-10000. Alza se le scritture sono lente e la rete è il collo di bottiglia. Abbassa se vedi OOM o batch che vanno in timeout.
isolationLevel
Il default è READ_COMMITTED per la maggior parte dei driver. Ogni batch gira nella sua transazione. Se un batch fallisce, quel batch fa rollback; i batch precedentemente committati restano committati.
Altri livelli, READ_UNCOMMITTED, REPEATABLE_READ, SERIALIZABLE, NONE, sono perlopiù rilevanti quando la tabella di destinazione viene letta in concorrenza e ti importa di cosa vedono quei lettori. Per caricamenti batch in background contro una tabella che nessun altro sta leggendo, il default va bene. Mettere isolationLevel=NONE (autocommit per riga) suona più veloce ma è quasi sempre più lento e rompe ogni senso di confine di recovery.
Save mode: scegli con attenzione
Quattro mode, e la scelta ha conseguenze concrete:
append: INSERT di nuove righe nella tabella esistente. La tabella deve esistere con uno schema compatibile, oppure Spark la crea alla prima scrittura usando i tipi inferiti. Additivo, che è il mode più sicuro per la produzione. Il rischio sono i duplicati se rilanci il job.
overwrite: ed è qui che la gente si fa male. Di default, overwrite fa DROP TABLE seguito da CREATE TABLE e insert. La tabella originale non c’è più, inclusi tutti gli indici, vincoli, foreign key, grant e trigger. La tabella ricreata ha solo le colonne e i tipi che Spark ha inferito: zero vincoli, zero indici, zero di tutto. Questo romperà silenziosamente l’applicazione tre giorni dopo, quando qualcuno proverà una query su un indice mancante.
Il fix:
.option("truncate", "true")
.mode("overwrite")
Con truncate=true, Spark emette TRUNCATE TABLE invece di DROP, poi inserisce. Schema, indici, vincoli tutti preservati. Usalo. Sempre. Alcuni driver (Postgres vecchi) ignorano truncate=true e ricadono nel drop-and-recreate; controlla la documentazione della tua versione del driver. Se il tuo lo fa, usa il pattern della staging table (sotto).
C’è anche option("cascadeTruncate", "true") per Postgres se hai foreign key che puntano alla tua tabella, cosa che probabilmente non dovresti avere su un target di scrittura analitico.
error (il default): fallisce se la tabella esiste. Buono per “voglio essere sicuro di non pestare niente”.
ignore: silenziosamente non fa nulla se la tabella esiste. Utile in script di setup idempotenti; pericoloso in scritture vere (potresti saltare silenziosamente la tua scrittura).
Il problema dell’idempotenza
Adesso quello vero. Immagina questa sequenza:
- Spark inizia una scrittura con 16 partizioni.
- Le partizioni 1-12 finiscono con successo: 12 batch committati al database.
- La partizione 13 fallisce: blip di rete, OOM, executor perso, qualunque cosa.
- Spark ritenta la partizione 13 su un altro executor.
- Il retry della partizione 13 ha successo.
Sembra a posto, no? Tranne che la partizione 13 potrebbe aver committato metà dei suoi batch prima di fallire. Il retry poi committa tutti i suoi batch. Hai righe duplicate per qualunque cosa il task originale avesse parzialmente scritto.
Non è teorico. Spark garantisce esecuzione at-least-once per partizione al retry. Con append mode e nessun’altra protezione, vuol dire at-least-once anche a livello di riga: cioè duplicati al fallimento.
Ci sono due pattern production-grade per risolverlo.
Pattern 1: staging table con swap atomico
Scrivi su una tabella temporanea che non conta, poi in una singola transazione copi o fai swap nella tabella vera:
# Step 1: scrivi su staging - i duplicati qui vanno bene
(df.coalesce(16).write
.format("jdbc")
.option("url", url)
.option("dbtable", "public.daily_summary_staging")
.option("user", user).option("password", pw)
.option("driver", driver)
.option("batchsize", 5000)
.mode("overwrite")
.option("truncate", "true")
.save())
# Step 2: swap atomico, eseguito direttamente sul database
import psycopg2
with psycopg2.connect(host="...", dbname="reports", user="...", password="...") as conn:
with conn.cursor() as cur:
cur.execute("""
BEGIN;
DELETE FROM public.daily_summary
WHERE report_date = %s;
INSERT INTO public.daily_summary
SELECT DISTINCT * FROM public.daily_summary_staging;
COMMIT;
""", (report_date,))
Al job Spark è permesso essere sciatto perché nessuno a valle legge la staging table. Lo swap atomico nella tabella vera è una singola transazione a livello di database, quindi non c’è nessuno stato a metà visibile ai lettori. Se lo swap fallisce, la staging table ha ancora i dati e puoi ritentare. Se il job Spark fallisce, lo rilanci; la staging table viene sovrascritta. Idempotente a ogni passo.
Questo è il pattern più affidabile. Ti costa brevemente 2x lo storage e un hop in più, e ne vale la pena.
Pattern 2: upsert idempotenti tramite primary key
Se la tua tabella di destinazione ha una primary key o un vincolo unique e puoi farci affidamento, usa l’upsert nativo del database:
-- Postgres
INSERT INTO daily_summary (report_date, country, total)
VALUES (...)
ON CONFLICT (report_date, country) DO UPDATE
SET total = EXCLUDED.total;
-- MySQL
INSERT INTO daily_summary (report_date, country, total)
VALUES (...)
ON DUPLICATE KEY UPDATE total = VALUES(total);
Il writer JDBC di Spark non supporta gli upsert nativamente. Hai due opzioni:
Opzione A: scrivi su una staging table (come sopra), poi lanci l’INSERT ... ON CONFLICT lato database come una singola istruzione SQL che copre tutte le righe.
Opzione B: cala in df.foreachPartition() e scrivi i tuoi upsert JDBC. La forma:
def upsert_partition(rows):
import psycopg2
conn = psycopg2.connect(...)
cur = conn.cursor()
sql = """
INSERT INTO daily_summary (report_date, country, total)
VALUES (%s, %s, %s)
ON CONFLICT (report_date, country) DO UPDATE
SET total = EXCLUDED.total
"""
batch = []
for row in rows:
batch.append((row.report_date, row.country, row.total))
if len(batch) >= 5000:
cur.executemany(sql, batch)
conn.commit()
batch.clear()
if batch:
cur.executemany(sql, batch)
conn.commit()
cur.close()
conn.close()
df.coalesce(16).foreachPartition(upsert_partition)
Ogni riga è adesso idempotente a livello di database: un retry della stessa riga produce lo stesso risultato. Stai facendo più lavoro per riga rispetto all’executeBatch vettorizzato di Spark, quindi è più lento di un append semplice, ma ottieni vere semantiche at-most-once. Un pattern che vale la pena tenere in tasca.
Numeri, perché aiutano
Benchmark concreti da un workload reale, 100 milioni di righe magre (timestamp, key, tre numerici) dentro Postgres sulla stessa VPC:
| Configurazione | Wall-clock |
|---|---|
append di default, 1000 partizioni, batchsize 1000 | ~30 minuti |
append, coalesce(16), batchsize 5000 | ~3 minuti |
| Staging table + swap atomico, 16 partizioni, batchsize 10000 | ~3.5 minuti |
Upsert con foreachPartition, 16 partizioni, batchsize 5000 | ~7 minuti |
La prima riga è quello che ottieni se ti dimentichi del partitioning. La seconda è lo sweet spot parallelo-e-batched. La terza non costa quasi nulla in più e ti dà idempotenza. La quarta è più lenta ma ti dà upsert idempotenti per riga. Scegli quella che corrisponde alle tue necessità di affidabilità.
Qualche manopola in più
createTableOptions: clausole extra appese al CREATE TABLE quando Spark crea il target. Utile per cose come WITH (autovacuum_enabled=false) su una tabella Postgres dedicata al solo carico.
createTableColumnTypes: sovrascrive i tipi di colonna inferiti da Spark. Usalo quando vuoi VARCHAR(255) invece di TEXT, o NUMERIC(18,2) invece di DOUBLE PRECISION.
queryTimeout: ammazza qualsiasi singola query che giri troppo a lungo. Mettilo in produzione. Il default è illimitato, che è esattamente quello che non vuoi quando un deadlock ha congelato uno dei tuoi batch.
sessionInitStatement: SQL eseguito su ogni connessione prima di qualsiasi scrittura. Utile per SET statement_timeout = '5min', SET synchronous_commit = off (hack di performance Postgres su una destinazione non critica), o SET search_path = ....
La lista dei NON fare, edizione scrittura
Non scrivere un DataFrame da 1000 partizioni dritto in un database transazionale. Coalesce o repartition a un numero sano prima. Sempre.
Non usare mode("overwrite") senza option("truncate", "true") a meno che tu non voglia genuinamente che la tabella venga droppata. E anche allora, assicurati che nessuna applicazione stia dipendendo dagli indici o dai grant che stai per vaporizzare.
Non assumere che append sia idempotente. Non lo è, al retry. O ti impegni col pattern della staging table, col pattern dell’upsert, o accetti che ogni tanto esistano duplicati e fai dedup a valle.
Non saltare la strategia degli indici sulla destinazione. Gli insert di massa contro una tabella pesantemente indicizzata sono lenti perché ogni riga aggiorna ogni indice. Se controlli lo schema e le scritture sono a raffica, un pattern comune è: droppi gli indici, fai bulk insert, ricrei gli indici. Ma è pericoloso su una tabella live ed è molto più facile col pattern staging-and-swap, dove puoi costruire gli indici sulla staging prima dello swap.
Non scrivere sul tuo DB transazionale di prod da Spark per cose time-sensitive. Stesso warning della scorsa lezione, al contrario: i job di scrittura Spark tengono connessioni, fanno bulk insert, lanciano transazioni. L’applicazione lo noterà. Atterra i dati in un warehouse o in un DB analitico, e lascia che l’applicazione li tiri da lì.
Provalo
from pyspark.sql import SparkSession
import os
spark = (SparkSession.builder
.appName("JdbcWriteDemo")
.master("local[*]")
.config("spark.jars.packages", "org.postgresql:postgresql:42.7.0")
.getOrCreate())
# Costruisci un DataFrame sintetico con 5M di righe, 200 partizioni
df = (spark.range(5_000_000)
.withColumnRenamed("id", "user_id")
.selectExpr(
"user_id",
"user_id % 100 as country_id",
"rand() * 1000 as amount",
)
.repartition(200))
url = "jdbc:postgresql://localhost:5432/demo"
props = {
"user": "spark_writer",
"password": os.environ["DB_PASSWORD"],
"driver": "org.postgresql.Driver",
"batchsize": "5000",
}
# Non farlo - troppe connessioni
# df.write.jdbc(url=url, table="public.results_bad",
# mode="overwrite", properties=props)
# Fai cosi' - prima coalesce, truncate non drop
(df.coalesce(16).write
.option("truncate", "true")
.jdbc(url=url, table="public.results",
mode="overwrite", properties=props))
Guarda pg_stat_activity mentre gira. Con la versione cattiva (commentata), vedresti 200 connessioni litigare per gli slot della pool e il job balbettare. Con la versione buona, vedi 16 connessioni stabili, ognuna che mastica batch fino alla fine.
Prossima lezione, cloud object storage: S3, GCS, Azure Blob, il nuovo “filesystem di default” per Spark nel 2026. La storia della consistency è finalmente semplice, ma il problema del rename morde ancora, ed è per questo che esistono i committer direct-write.
Riferimenti: documentazione del data source JDBC di Apache Spark (https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html). Recuperata il 2026-05-01.