PySpark, dalle fondamenta Lezione 50 / 60

Structured Streaming: le basi di readStream, writeStream, trigger

Gli entry point per lo streaming, la semantica dei trigger, e il checkpoint da cui dipende tutto.

La lezione scorsa era concettuale: gli stream sono tabelle infinite, batch e streaming sono un continuum, micro-batch è il modello, i DStreams sono morti. Questa lezione è la meccanica. Alla fine avrai scritto un job streaming completo e saprai cosa fa ogni riga.

Ci sono esattamente quattro cose che devi capire:

  1. readStream — come entrano i dati.
  2. writeStream — come escono i dati.
  3. Trigger — quando parte ogni micro-batch.
  4. Checkpoint — come Spark si ricorda cosa ha già processato.

Tutto qui. Il resto sono variazioni.

readStream: il lato input

L’entry point dello streaming è spark.readStream. Stessa forma di spark.read, ma ritorna un DataFrame streaming invece di uno finito:

events = (spark.readStream
            .format("csv")
            .schema("user_id STRING, action STRING, ts TIMESTAMP")
            .option("header", "true")
            .load("/data/incoming/"))

Il DataFrame events sembra normale: stesso printSchema, stessi riferimenti a colonne, ma events.isStreaming ritorna True, e non puoi fargli .show().count(). I DataFrame streaming non hanno uno “snapshot corrente” che puoi ispezionare; hanno senso solo quando collegati a un sink.

Spark viene con diverse source streaming integrate:

  • file — fa polling di una directory per nuovi file. Il formato può essere csv, json, parquet, orc o text. Spark scansiona la directory a ogni trigger; ogni file che non c’era l’ultima volta viene letto. Questa è la source più facile da testare perché la controlli da shell: cp file.json /data/incoming/ e guardi il job prenderlo.
  • kafka — si sottoscrive a uno o più topic Kafka. La source dominante in produzione. La lezione 51 le è dedicata.
  • socket — legge testo delimitato da newline da un socket TCP. Strettamente per esempi giocattolo e demo; non è fault tolerant, niente offset, mai usare in produzione. Ma è comoda per i tutorial perché puoi nc -lk 9999 in un terminale e digitare eventi a mano.
  • rate e rate-micro-batch — source sintetiche che generano righe (timestamp, value) a un rate configurabile. Utili per benchmark, smoke test e per imparare. Sono documentate ma raramente usate in job veri.

Le terze parti aggiungono altro: Delta Lake aggiunge delta, Iceberg aggiunge iceberg, AWS Kinesis ha il suo connettore, ecc. Il pattern è sempre lo stesso: format("...") più opzioni.

Qualche opzione della file source che vale la pena conoscere:

events = (spark.readStream
            .format("json")
            .schema(my_schema)
            .option("maxFilesPerTrigger", 10)
            .option("latestFirst", "false")
            .option("cleanSource", "archive")
            .option("sourceArchiveDir", "/data/archived/")
            .load("/data/incoming/"))

maxFilesPerTrigger limita quanti nuovi file vengono processati in un micro-batch, importante se stai recuperando un backlog e non vuoi che un singolo micro-batch enorme finisca in OOM. latestFirst inverte l’ordine dei file se vuoi prima i più nuovi. cleanSource dice a Spark cosa fare con i file processati (archive, delete, o off), utile per tenere ordinata la directory source senza scrivere il proprio script di pulizia.

Lo schema conta. Devi fornire uno schema per le file source streaming di default. Spark non lo inferirà perché richiederebbe leggere dati a ogni trigger, e Spark vuole che lo schema sia stabile per tutta la vita della query. Se vuoi davvero l’inferenza, imposta spark.sql.streaming.schemaInference a true, ma è un footgun: una volta che la query parte, lo schema è bloccato.

Trasformazioni: la stessa API che già conosci

Una volta che hai un DataFrame streaming, lo trasformi con gli stessi operatori di un DataFrame batch:

from pyspark.sql.functions import col, upper

high_value = (events
                .filter(col("action") == "purchase")
                .withColumn("user_id_upper", upper("user_id"))
                .select("user_id_upper", "ts", "amount"))

filter, select, withColumn, groupBy, agg, join, union: funzionano tutti, con una regola consistente: le operazioni che hanno bisogno di guardare l’intero dataset non funzionano direttamente sugli stream. Non puoi fare .sort() di uno stream globalmente perché non c’è una “fine” rispetto a cui ordinare. Non puoi fare .limit(100) perché il significato delle “prime 100” dipende dall’ordine. Non puoi calcolare un percentile perché richiede l’intero dataset.

Cosa puoi fare:

  • Trasformazioni stateless — qualsiasi cosa per riga (select, filter, withColumn, cast, explode, UDF che non riferiscono ad altre righe). Sono facili.
  • Aggregazioni statefulgroupBy + count, sum, avg, ecc. Spark mantiene state attraverso i micro-batch. Puoi tenere conteggi in corso di stream unbounded indefinitamente (con watermark per limitare la memoria, lezione 52).
  • Aggregazioni windowedgroupBy(window("ts", "5 minutes")) per tumbling window, window("ts", "5 minutes", "1 minute") per sliding. Ogni window è il proprio gruppo; i risultati emettono quando la window si chiude.
  • Stream-static join — fare il join di uno stream contro un DataFrame batch normale (una tabella di dimensione, un lookup, una config). Sempre supportato.
  • Stream-stream join — fare il join di due stream con watermark e bound temporali. Supportato ma con caveat; lezione 55.

Qualsiasi cosa calcoleresti su una lista infinita, in pratica, ha una versione a forma di streaming. Qualsiasi cosa che richieda “tutti i dati adesso” non si traduce.

writeStream: il lato output

Il lato output rispecchia l’input. Configuri un sink e fai partire la query:

query = (high_value.writeStream
            .format("parquet")
            .outputMode("append")
            .option("path", "/data/output/")
            .option("checkpointLocation", "/data/checkpoints/high_value/")
            .trigger(processingTime="30 seconds")
            .start())

Stanno succedendo tre cose qui. Le srotolo in ordine.

Output mode

outputMode dice a Spark cosa scrivere a ogni micro-batch. Tre opzioni:

  • append — emette solo righe nuove dall’ultimo micro-batch e che non cambieranno più. Default per query stateless. Richiesto per sink che non supportano update (Parquet, Avro, file semplici). Per le aggregazioni, append mode richiede un watermark così Spark sa quando un’aggregazione è “fatta” e può essere emessa (di nuovo lezione 52, salta fuori ovunque).
  • complete — emette l’intera result table corrente a ogni micro-batch. Valido solo per aggregazioni (qualsiasi altra cosa avrebbe un risultato unbounded). Utile per dashboard o debugging su console dove vuoi vedere il running total.
  • update — emette solo le righe cambiate in questo micro-batch. Utile per sink che supportano upsert: Delta Lake, Cassandra, JDBC con merge logic. Più efficiente di complete, più flessibile di append.

La lezione 53 è il deep dive sugli output mode: quando ognuno è legale, quando ognuno è quello che vuoi, e come interagiscono i watermark. Per ora: usa append per sink append-only, complete per piccole aggregazioni verso un sink console o memory, update per sink upsert-capable.

Trigger

Il trigger controlla quando ogni micro-batch gira. Quattro varianti:

.trigger(processingTime="10 seconds")   # ogni 10 secondi
.trigger(availableNow=True)             # processa tutto il disponibile, poi ferma
.trigger(continuous="100 ms")           # modalità sperimentale a bassa latenza
# .trigger() omesso                     # default: il più veloce possibile

Il trigger default (nessuna chiamata .trigger(), o omessa) dice a Spark di iniziare il prossimo micro-batch appena finisce il precedente. Se il tuo processing impiega 2 secondi a batch, ottieni un batch ogni 2 secondi. Se impiega 200 ms, ne ottieni cinque al secondo. Questo è il default giusto per la maggior parte dei job di produzione: Spark va più veloce che può.

processingTime fissa l’intervallo. Se imposti "10 seconds", Spark esegue un micro-batch ogni 10 secondi a prescindere dal processing time. Se un batch impiega 4 secondi, Spark aspetta 6 secondi prima del successivo. Se un batch impiega 14 secondi (più del trigger), il successivo parte subito e ottieni un warning nei log che stai rimanendo indietro. Usa questo quando vuoi batching prevedibile, a bassa frequenza, ad esempio summary orari scritti su una tabella.

availableNow=True è il trigger “batch-style streaming”. Spark legge tutti i dati attualmente disponibili nella source, li processa (eventualmente attraverso più micro-batch interni se ce n’è tanti), li scrive, e si ferma. La query streaming termina puliti. È enormemente utile per job che vuoi far girare a uno schedule (Airflow, cron) ma scritti con l’API streaming, ad esempio un job orario che legge da Kafka, processa quello che c’è, ed esce. Sostituisce la vecchia modalità Trigger.Once (ora deprecata a favore di availableNow, che gestisce meglio i backlog).

continuous è la modalità sperimentale a latenza millisecondo che ho menzionato nella lezione 49. Operazioni ristrette, caso d’uso di nicchia, in pratica ignorala.

Sink

format e le opzioni associate scelgono la destinazione:

  • parquet, orc, json, csv — file sink. Solo append mode. Specifica path e checkpointLocation. Ogni micro-batch scrive nuovi file nel path.
  • kafka — scrive su un topic Kafka. Utile per pipeline stream a stream. Specifica il topic e i bootstrap server.
  • console — stampa su stdout. Solo per debug. Ha opzioni per numRows e truncate come .show().
  • memory — scrive su una tabella in-memory accessibile per nome con spark.sql("SELECT * FROM my_query"). Debug ed esplorazione in notebook. Non usare in produzione; la tabella cresce unbounded nella memoria del driver.
  • foreach e foreachBatch — vie di fuga per sink custom. foreachBatch ti dà il micro-batch come DataFrame normale e ti lascia fare quello che vuoi (scrivere su JDBC, chiamare un’API HTTP, eseguire un merge custom su Delta, ecc.). Lo strumento di tutti i giorni quando nessun sink integrato si adatta.

Sink di terze parti: delta, iceberg, JDBC, Kinesis, ecc. Stesso pattern.

Il checkpoint

Adesso la parte che è obbligatoria e che i principianti dimenticano ogni singola volta.

.option("checkpointLocation", "/data/checkpoints/my_job/")

La checkpoint location è una directory (locale, HDFS, S3, dovunque sia durabile) dove Spark scrive:

  1. Offset consumati — per ogni source, qual è l’ultima posizione che abbiamo processato? Per Kafka, l’offset per partition. Per i file, quali file sono stati ingeriti. Per i socket, beh, non puoi fare il checkpoint dei socket, che è uno dei motivi per cui non sono per la produzione.
  2. State delle aggregazioni — per query stateful, snapshot dei conteggi/somme/window in corso così sopravvivono a un restart.
  3. Commit log — marker che dicono “il micro-batch N è stato completato con successo ed è stato scritto al sink”.
  4. Metadati — query ID, schema, configurazione di source/sink. Usato per rilevare mismatch al restart.

Senza checkpoint, ogni restart della tua query streaming riparte da zero. Per una file source, significa rileggere ogni file nella directory. Per Kafka, significa rileggere dagli startingOffsets configurati. Per un’aggregazione, significa ricalcolare tutti i totali in corso da zero. Niente di tutto questo è di solito quello che vuoi.

Con un checkpoint, il restart riprende da dove l’ultimo micro-batch riuscito è finito. Stessi offset, stesso state di aggregazione. La query continua senza interruzioni.

Qualche regola:

  • Una checkpoint location per query. Due query che condividono un checkpoint si corromperanno a vicenda.
  • Il checkpoint è legato al query plan. Se cambi le operazioni in modo significativo (aggiungi un’aggregazione stateful, cambi un join), Spark rifiuterà di riprendere dal vecchio checkpoint e dovrai partire da zero. Cambi cosmetici (path di output diverso, numRows diverso per console) di solito vanno bene.
  • Non metterlo su disco locale in un deployment a cluster. Se l’executor 0 muore e ne parte uno nuovo su un altro nodo, i file di checkpoint sono andati. Usa HDFS, S3, ABFS, GCS, qualsiasi cosa durabile.
  • I sink console e memory tecnicamente funzionano senza checkpoint per debug, ma Spark farà un warning. In produzione, impostalo sempre.

Dimenticare checkpointLocation è il singolo errore più comune di Structured Streaming che ho visto. La query parte, gira, sembra in salute. Poi qualcuno la riavvia per un motivo totalmente non correlato e i dati appaiono nell’output una seconda volta. Perché non c’era checkpoint, la source è stata riletta.

Un esempio completo, lavorato

Mettiamo tutto insieme: leggere file JSON da una directory, filtrare gli acquisti sopra 100, scrivere Parquet:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("PurchaseFilter").getOrCreate()

schema = "user_id STRING, action STRING, amount DOUBLE, ts TIMESTAMP"

events = (spark.readStream
            .format("json")
            .schema(schema)
            .option("maxFilesPerTrigger", 5)
            .load("/data/incoming/"))

high_value = (events
                .filter(col("action") == "purchase")
                .filter(col("amount") > 100)
                .select("user_id", "amount", "ts"))

query = (high_value.writeStream
            .format("parquet")
            .outputMode("append")
            .option("path", "/data/high_value_purchases/")
            .option("checkpointLocation", "/data/checkpoints/high_value/")
            .trigger(processingTime="30 seconds")
            .start())

query.awaitTermination()

Lancialo. Butta un file JSON in /data/incoming/. Aspetta fino a 30 secondi. Guarda in /data/high_value_purchases/ per nuovi Parquet. Guarda in /data/checkpoints/high_value/ e vedrai directory chiamate offsets, commits, sources, metadata: il record durabile di cosa è stato processato.

Ammazza il processo con Ctrl-C. Butta un altro file JSON. Riavvia lo stesso script. La query riprende, prende solo il nuovo file (perché l’offset del vecchio è registrato), e scrive il nuovo Parquet. È il checkpoint che si guadagna lo stipendio.

Ispezionare una query in esecuzione

writeStream.start() ritorna un handle StreamingQuery. Qualche metodo utile:

query.id                # Query ID stabile attraverso i restart (dal checkpoint)
query.runId             # ID per questo run specifico; cambia al restart
query.status            # Stato corrente — "PROCESSING", "WAITING", ecc.
query.lastProgress      # Metriche dettagliate per l'ultimo micro-batch
query.recentProgress    # Gli ultimi 100 micro-batch
query.exception()       # Se la query è fallita, l'eccezione
query.stop()            # Shutdown graceful
query.awaitTermination(timeout=60)   # Blocca, opzionalmente con timeout

lastProgress è quello che guardo costantemente durante lo sviluppo. È un dict con input rate, processing rate, durata del batch, lag specifico della source (lag di offset Kafka, file indietro), e dimensione dello state per operatore. Passeremo più tempo sul monitoring nella lezione 57; per ora, sappi che query.lastProgress è il tuo migliore amico quando qualcosa sembra andato storto.

Cosa portarsi via da questo

Tre cose, in ordine di priorità:

  1. L’API streaming è readStream + trasformazioni + writeStream. Le trasformazioni in mezzo sono la stessa API DataFrame su cui hai passato quarantotto lezioni. Non pensarci troppo.
  2. Scegli un trigger che corrisponda al tuo budget di latenza. Default (continuous fast) per bassa latenza, processingTime per scheduling prevedibile, availableNow per job batch-on-streaming.
  3. Imposta sempre checkpointLocation. Sempre. Anche in dev. Anche per console. Il costo è una riga; il vantaggio è che tutto il resto funziona correttamente.

Prossima lezione, la source che userai davvero in produzione: Kafka. Offset, deserializzazione, i tradeoff tra at-least-once ed exactly-once.


Riferimenti: Apache Spark Structured Streaming Programming Guide (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html), in particolare le sezioni su input source, output sink e trigger. Consultati il 2026-05-01.

Cerca