PySpark, dalle fondamenta Lezione 52 / 60

Watermark ed event time: la parte che quasi tutti i principianti sbagliano

Perché l'event time conta più del processing time, cosa fa davvero un watermark, e l'esempio guidato con timestamp concreti.

Se c’è una parte di Structured Streaming che fa inciampare quasi tutti la prima volta, è questa: ci sono due orologi, e non sono d’accordo tra loro. Finché non interiorizzi questa cosa, le tue aggregazioni produrranno numeri che a prima vista sembrano giusti e che invece sono sottilmente, silenziosamente sbagliati.

Questa lezione parla di quei due orologi, e del meccanismo che Spark ti dà per riconciliarli: il watermark.

I due tempi

Ogni evento che scorre nella tua pipeline ha almeno due timestamp associati.

L’event time è quando la cosa è effettivamente accaduta. L’utente ha cliccato “compra” alle 10:03:47. Il sensore IoT ha registrato una lettura di temperatura alle 10:03:47. Quel momento è un fatto immutabile sul mondo.

Il processing time è quando Spark vede la riga. Il click ha attraversato una CDN, è rimasto in una partizione Kafka per qualche secondo, è stato preso da un trigger di micro-batch, e Spark potrebbe processarlo alle 10:03:51, oppure alle 10:04:30, o, se Kafka era in ritardo, alle 10:18:00.

In un mondo perfettamente sincrono, questi due valori sarebbero uguali. Nei sistemi reali non lo sono mai. I client mobile bufferizzano gli eventi quando sono offline e li scaricano un’ora dopo. Le interruzioni di rete causano retry. Un processo producer riparte e rigioca il suo commit log. Un job batch ingerisce oggi il file CSV di ieri. Tutte queste situazioni producono eventi con event time vecchi che arrivano con processing time recenti.

Ora immagina di voler calcolare “page view per finestra di 10 minuti”. L’approccio ingenuo è:

from pyspark.sql.functions import window, count

views_by_processing_time = (events
    .groupBy(window("processing_ts", "10 minutes"), "page")
    .agg(count("*").alias("views")))

È facile, sembra giusto, ed è sbagliato. Non stai misurando “le view nella finestra 10:00-10:10 del mondo”. Stai misurando “le righe che Spark ha ingerito tra le 10:00 e le 10:10”. Sono insiemi diversi, e il divario tra loro è da dove arrivano i bug nei tuoi report.

Quello che vuoi quasi sempre è il windowing per event time:

views_by_event_time = (events
    .groupBy(window("event_time", "10 minutes"), "page")
    .agg(count("*").alias("views")))

window("event_time", "10 minutes") dice a Spark: assegna ogni riga a una finestra di 10 minuti basata sulla sua colonna event_time. Una riga con event_time = 10:03:47 finisce nella finestra [10:00, 10:10) indipendentemente da quando Spark la processa. Una riga con event_time = 09:58:12 che capita di arrivare al processing time 10:05 finisce nella finestra [09:50, 10:00), non in quella corrente.

Questo è il modello. Ora il problema.

Il problema dello stato non limitato

Se Spark accetta eventi in ritardo per sempre, lo stato delle finestre passate non può mai essere rilasciato. Ogni finestra di 10 minuti ha il suo running count seduto in memoria (o in RocksDB, o dovunque viva il tuo state store), in attesa della possibilità che un altro ritardatario delle 09:50 possa ancora presentarsi. Dopo una settimana di esecuzione hai 1.008 finestre di dieci minuti, ognuna con stato per ogni page key che sia mai apparsa. Dopo un mese ne hai 4.320. La memoria cresce senza limiti, lo state store rallenta, e alla fine il job streaming muore.

Ti serve un modo per dire a Spark: “dopo questo punto, smetti di aspettare. Chiudi la finestra, emetti il risultato finale, butta via lo stato”.

Questo è il watermark.

Cos’è davvero un watermark

Un watermark è una garanzia da te a Spark che non arriveranno più eventi con event_time precedente a una soglia X. Una volta che Spark osserva quella garanzia per una particolare finestra (cioè una volta che il watermark ha superato la fine della finestra), considera la finestra chiusa, finalizza l’aggregazione, e scarta lo stato.

Spark calcola il watermark in continuazione. La policy di base è:

watermark = max(event_time osservato finora) − lateness_budget

Tu configuri il lateness budget. Quella è la manopola. Un budget di 5 minuti dice: “Spark, se hai visto un evento alle 10:14, presumi che nulla di precedente alle 10:09 si presenterà ancora”.

L’API:

from pyspark.sql.functions import window, count

aggregated = (events
    .withWatermark("event_time", "5 minutes")
    .groupBy(window("event_time", "10 minutes"), "page")
    .agg(count("*").alias("views")))

withWatermark("event_time", "5 minutes") fa due cose insieme:

  1. Dice a Spark quale colonna porta l’event time (cosi sa cosa tracciare).
  2. Imposta il lateness budget: gli eventi che arrivano con piu di 5 minuti di ritardo rispetto al watermark corrente sono considerati troppo in ritardo e scartati silenziosamente.

Quella seconda parte è critica e facile da non vedere. Un watermark scambia correttezza con limitatezza. Stai dicendo esplicitamente “preferisco scartare lo 0,1% dei ritardatari piuttosto che lasciar crescere il mio stato per sempre”. La dimensione di quello 0,1% è una tua scelta; tara il lateness budget in base a quanto tardi arrivano davvero i tuoi eventi reali.

Esempio guidato: tracciare eventi concreti

Facciamo passare degli eventi reali attraverso questo meccanismo e guardiamo cosa succede. Setup:

  • Finestra: 10 minuti, tumbling. Quindi le finestre sono [10:00, 10:10), [10:10, 10:20), eccetera.
  • Lateness del watermark: 5 minuti.
  • Gli eventi arrivano in questo ordine di processing time (con il loro event_time tra parentesi):
#processing_tsevent_timefinestra
110:00:3010:00[10:00,10:10)
210:09:1010:09[10:00,10:10)
310:11:0510:11[10:10,10:20)
410:11:4010:08 LATE[10:00,10:10)
510:14:2010:14[10:10,10:20)
610:18:0510:06 TOO LATEscartato

Andiamo passo per passo:

Evento 1 (event_time 10:00). Primo evento visto. Max event time finora = 10:00. Watermark = 10:00 − 5min = 09:55. La finestra [10:00,10:10) ha running count = 1.

Evento 2 (event_time 10:09). Stessa finestra. Max event time = 10:09. Watermark = 10:09 − 5min = 10:04. Il count della finestra adesso è 2. Il watermark è a 10:04, ancora dentro la finestra [10:00,10:10), quindi la finestra resta aperta.

Evento 3 (event_time 10:11). Atterra nella finestra successiva. Max event time = 10:11, watermark = 10:06. Ancora dentro [10:00,10:10), quindi quella finestra resta aperta. Count della nuova finestra [10:10,10:20) = 1.

Evento 4 (event_time 10:08). In ritardo! Il suo event_time è 10:08, ma il processing time è 10:11:40, ben oltre la fine nominale della finestra. È dentro il budget del watermark? Il watermark corrente è 10:06, e 10:08 > 10:06, quindi sì, viene accettato. Il count della finestra [10:00,10:10) sale a 3. Questo è il watermark che si guadagna lo stipendio: gli eventi in ritardo vengono inclusi finché non sono troppo in ritardo.

Evento 5 (event_time 10:14). Max event time = 10:14, watermark = 10:09. Ora 10:09 ha raggiunto la fine di [10:00,10:10). Spark chiude quella finestra, emette il count finale di 3, e scarta lo stato relativo. Il nuovo watermark significa anche che gli eventi futuri precedenti alle 10:09 verranno rifiutati.

Evento 6 (event_time 10:06). Il watermark è 10:09. 10:06 < 10:09, quindi questo evento è scartato silenziosamente. È troppo in ritardo. La finestra non c’è più. Spark non la riaprirà.

Un paio di cose da portarsi a casa da questo trace:

  • Il watermark si muove solo in avanti. Anche se l’evento 6 avesse avuto event time 10:00:00, non avrebbe tirato indietro il watermark.
  • Gli eventi in ritardo dentro il budget aggiornano la finestra ancora aperta: non ricevi un evento separato di “late update”; l’aggregazione corrente li assorbe e basta.
  • La finestra si chiude quando il watermark passa la sua fine. L’output (in append mode) per [10:00,10:10) viene emesso solo dopo che l’evento 5 è stato processato, anche se l’orologio da parete è ben oltre le 10:10.
  • Gli eventi fuori budget vengono scartati. Non c’è errore, non c’è warning nei tuoi dati, solo un contatore nelle metriche.

Quest’ultimo punto è il motivo per cui il monitoring è importante.

Monitoraggio del watermark

Nel tab Streaming dell’UI di Spark, ogni micro-batch mostra due valori che dovresti imparare a leggere:

  • Event time max: il piu grande event time osservato in questo batch.
  • Watermark: il valore corrente del watermark.

Plottali nel tempo. Dovrebbero salire insieme, con un gap approssimativamente costante (il tuo lateness budget). Se il watermark smette di muoversi, il tuo stream è bloccato (nessun nuovo max event time, di solito una sorgente in stallo). Se il gap esplode, il tuo input è improvvisamente molto in ritardo.

Tieni d’occhio anche la metrica numRowsDroppedByWatermark. Un valore non zero significa che stai scartando eventi in ritardo; un valore che cresce costantemente significa che il tuo lateness budget è troppo piccolo per il tuo pattern di arrivo reale.

Scegliere il lateness budget

Non c’è una formula. Misuri i tuoi dati. Calcola, su un campione rappresentativo:

from pyspark.sql.functions import col, unix_timestamp

lateness = (events.select(
    (unix_timestamp("processing_ts") - unix_timestamp("event_time")).alias("lag_seconds")
))
lateness.summary("min", "50%", "95%", "99%", "max").show()

Il 99esimo percentile è di solito un budget sensato: accetti l’1% di perdita in cambio di stato limitato. Se letteralmente non puoi tollerare di scartare eventi, allora non vuoi proprio l’aggregazione streaming; vuoi una fact table append-only scritta su un lake, con un job batch separato che ricalcola gli ultimi giorni ogni notte.

Una query funzionante end-to-end

from pyspark.sql import SparkSession
from pyspark.sql.functions import window, count, col, from_json
from pyspark.sql.types import StructType, StringType, TimestampType

spark = SparkSession.builder.appName("EventTimeDemo").getOrCreate()

schema = StructType().add("page", StringType()).add("event_time", TimestampType())

raw = (spark.readStream
       .format("kafka")
       .option("kafka.bootstrap.servers", "localhost:9092")
       .option("subscribe", "pageviews")
       .load())

events = (raw
    .select(from_json(col("value").cast("string"), schema).alias("e"))
    .select("e.*"))

windowed = (events
    .withWatermark("event_time", "5 minutes")
    .groupBy(window("event_time", "10 minutes"), "page")
    .agg(count("*").alias("views")))

query = (windowed.writeStream
         .outputMode("append")
         .format("console")
         .option("checkpointLocation", "/tmp/ck/views")
         .start())

outputMode("append") qui è la coppia naturale dei watermark: ogni finestra viene emessa esattamente una volta, dopo che il watermark ne supera la fine. Passeremo le prossime due lezioni a esplorare gli output mode con piu cura.

Quello di cui non abbiamo ancora parlato è cosa succede dietro il watermark: dove Spark conserva i running count, come sopravvive a un restart del job, e quali altri pattern stateful esistono oltre la semplice aggregazione. Questa è la lezione 53: lo state store, la sessionizzazione e mapGroupsWithState.


Riferimenti: Apache Spark Structured Streaming Programming Guide, sezioni su event time e watermarking (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking). Recuperato il 2026-05-01.

Cerca