PySpark, dalle fondamenta Lezione 48 / 60

Schema evolution: quando le colonne ti cambiano sotto

Perché i formati schema-on-read gestiscono male il cambiamento, perché Avro+registry lo gestisce bene, e il modo Delta/Iceberg di avere entrambe le cose.

Una colonna viene aggiunta a monte. Una colonna viene rinominata. Un tipo passa da int a bigint. Il messaggio Slack del martedì mattina: “abbiamo deployato un nuovo payload eventi, puoi controllare le tue pipeline?”

Questa è la quotidianità del data engineer. La schema evolution è la disciplina di tenere in piedi le pipeline quando i dati sotto cambiano forma. Formati diversi la gestiscono con grazia molto diversa, e la scelta conta più di quanto la gente si renda conto, fino al giorno in cui la nuova colonna arriva mancante dalla dashboard di ieri.

Questa lezione parla di cosa cambia, perché ogni formato di storage reagisce in modo diverso, e quali pattern di produzione funzionano nel 2026.

Tre tipi di cambiamento

Prima di parlare di formati, la tassonomia. I cambi di schema arrivano in tre categorie:

Backward-compatible: il nuovo schema può leggere i dati vecchi. Aggiungere una colonna opzionale con un default. Allargare un tipo (int a bigint, float a double). Il codice nuovo legge file vecchi e ottiene null o default per il nuovo campo. Questo è il caso sicuro.

Forward-compatible: lo schema vecchio può leggere i dati nuovi. Eliminare una colonna. Il codice vecchio legge file nuovi e ignora il campo eliminato. Meno comune, ma conta quando i consumer si aggiornano con tempi diversi rispetto ai producer.

Breaking: nessuna direzione funziona senza migrazione di dati. Rinominare una colonna. Cambiare un tipo in modo incompatibile (string a int). Rimuovere un campo non opzionale. Questi richiedono un cutover coordinato.

L’obiettivo della macchina di schema evolution è rendere i cambi backward-compatible gratuiti, i cambi forward-compatible possibili, e i cambi breaking impossibili da fare per sbaglio.

Parquet: schema-on-read, evoluzione a livello di formato

Parquet memorizza il suo schema nel footer di ogni file. Quando Spark legge un singolo file Parquet, legge lo schema del file e usa quello.

Quando Spark legge più file Parquet con schema diversi, ha un problema. Per default, Spark assume che tutti i file in un path condividano lo stesso schema. Ne sceglie uno (spesso il primo elencato) e lo usa per tutto il DataFrame. I file con schema diversi nel migliore dei casi producono risultati sbagliati, nel peggiore tirano un’eccezione in lettura.

Ci sono due livelli di aiuto:

mergeSchema

Il reader Parquet supporta il merging dello schema:

df = (spark.read
        .option("mergeSchema", "true")
        .parquet("s3a://bucket/events/"))

Con mergeSchema=true, Spark legge il footer di ogni file nel path di input, calcola l’unione di tutti gli schema, e la usa come schema del DataFrame. I file a cui manca una colonna restituiscono null per quella colonna. Il costo è un round-trip di metadati per file prima che venga letto qualsiasi dato: per una directory di 10000 file, è un ritardo iniziale non banale.

mergeSchema è ottimo in sviluppo e nei job di piccola scala. In produzione su un lakehouse da un milione di file, è abbastanza costoso da spingerti a essere selettivo.

Cosa fa l’union Parquet in pratica

Lo schema merging di Parquet segue regole specifiche:

  • Aggiungere una colonna: funziona. I file senza la colonna restituiscono null.
  • Rinominare una colonna: non funziona. Spark vede due colonne; i file vecchi ne hanno una, i nuovi l’altra.
  • Allargamento di tipo (INT32 a INT64, FLOAT a DOUBLE): funziona nel 2026; le versioni più vecchie di Spark tiravano eccezione.
  • Restringimento di tipo (INT64 a INT32): fallisce sulle righe che non ci stanno.
  • Cambi di tipo tra categorie diverse (string a int): fallisce direttamente.
  • Aggiunta di campo in struct annidata: funziona.
  • Rinomina di campo in struct annidata: fallisce allo stesso modo del rename top-level.

Il punto: Parquet gestisce bene l’evoluzione additiva, e per niente i rename. In un lake solo Parquet, non rinomini colonne. Aggiungi la nuova colonna, fai dual-write per un periodo di transizione, poi elimini la vecchia.

Avro: lo schema viaggia con i dati

Avro prende un approccio diverso. Ogni file Avro memorizza lo schema del writer nell’header. Quando leggi, fornisci uno schema del reader. La libreria Avro risolve campo per campo, con regole esplicite per campi mancanti, promozione di tipi e ordine dei campi.

Un campo presente nello schema del writer ma mancante in quello del reader viene scartato in silenzio. Un campo presente nello schema del reader ma mancante in quello del writer viene riempito col valore di default lato reader (specificato al momento della definizione dello schema). Un campo con lo stesso nome e un tipo compatibile-ma-diverso viene promosso (int a long, float a double, string a bytes).

I rename si gestiscono con gli alias: nello schema del reader marchi il nuovo campo con "aliases": ["old_name"], e Avro riconosce che i dati vecchi che portano old_name devono popolare new_name.

Questo è fondamentalmente più flessibile di Parquet. Il costo: encoding riga per riga (niente pruning colonnare), niente predicate pushdown, in genere rapporti di compressione più piccoli.

Per Avro su scala servono anche un schema registry: un servizio tipo Confluent Schema Registry che memorizza tutte le versioni storiche dello schema per un topic, assegna a ognuna un ID numerico, e fa accordare producer e consumer su cosa stanno scambiando. Ogni record Avro riceve un prefisso di 4 byte con l’ID dello schema; il reader recupera lo schema corrispondente dal registry. Con un registry, i producer possono evolvere gli schema liberamente, e il registry impone le regole di compatibilità (BACKWARD, FORWARD, FULL, NONE) che hai configurato.

È per questo che Avro domina nelle pipeline di streaming e di event-ingest nel 2026. I topic Kafka portano milioni di eventi con schema che evolvono settimanalmente; Avro più Schema Registry rende la cosa lavorabile. Parquet allo stesso ritmo di churn è una sofferenza.

Leggere Avro in Spark:

df = (spark.read
        .format("avro")
        .load("s3a://bucket/events/"))

Oppure con schema esplicito e integrazione con Schema Registry (tipicamente tramite una libreria Spark-Avro Confluent o open-source):

from pyspark.sql.avro.functions import from_avro

# In un job di streaming che legge da Kafka
events = (spark.readStream.format("kafka")
          .option("kafka.bootstrap.servers", "...")
          .option("subscribe", "user_events")
          .load())

# Decodifica lo schema di ogni record per ID e proietta sullo schema del reader
decoded = events.select(
    from_avro("value", reader_schema_json,
              {"mode": "PERMISSIVE",
               "schemaRegistryUrl": "https://schema-registry.internal"})
    .alias("event")
)

Delta e Iceberg: lo schema nel log della tabella

Delta Lake (il formato che abbiamo visto nella lezione 44) e Apache Iceberg prendono un terzo approccio: lo schema della tabella è memorizzato in un transaction log insieme ai file di dati. Ogni cambio allo schema è una entry del log. I file di dati vecchi non vengono riscritti quando lo schema cambia; il reader proietta sempre il file sullo schema corrente.

Questo ti dà il meglio dei due mondi:

  • Evoluzione a basso costo. Aggiungere una colonna non tocca i file esistenti. Il log registra l’aggiunta e quale valore di default usare leggendo file vecchi.
  • Validazione forte. Una scrittura che non corrisponde allo schema corrente fallisce per default. Si entra esplicitamente nell’evoluzione con mergeSchema o opzioni equivalenti per singola scrittura.
  • Time travel. Lo schema è versionato. Interrogare la tabella a una vecchia versione usa lo schema di quella versione.
  • Rename che funzionano. Iceberg supporta i rename di colonna nativamente (traccia le colonne per ID, non per nome). Delta ha aggiunto il supporto al rename in column-mapping mode (1.2+): è una feature opt-in perché richiede reader Parquet che rispettino i metadati di column-mapping.

Un tipico flusso di evoluzione Delta:

# Scrittura iniziale
(orders_v1
   .write
   .format("delta")
   .mode("overwrite")
   .save("s3a://bucket/orders/"))

# Una settimana dopo, a monte aggiungono una colonna discount_code
# La vogliamo aggiungere alla tabella senza riscrivere la storia
(orders_v2_with_discount
   .write
   .format("delta")
   .option("mergeSchema", "true")
   .mode("append")
   .save("s3a://bucket/orders/"))

# Le letture dopo questo vedono la nuova colonna. I file vecchi riportano null per essa.
(spark.read.format("delta").load("s3a://bucket/orders/")
   .select("order_id", "discount_code")
   .show())

Senza mergeSchema=true, la seconda scrittura fallisce con un errore di schema mismatch. L’opt-in è il meccanismo di sicurezza. Dici esplicitamente “sì, sto evolvendo lo schema questa volta”.

Per i cambi breaking, sia Delta sia Iceberg hanno DDL espliciti:

-- SQL Delta
ALTER TABLE orders ADD COLUMN region STRING AFTER country;
ALTER TABLE orders RENAME COLUMN customer TO customer_id;
ALTER TABLE orders ALTER COLUMN amount TYPE DECIMAL(18,4);

I rename funzionano perché il Parquet sottostante mantiene un column ID stabile, e i reader capiscono il mapping. Storicamente questo è stato il motivo principale per scegliere Iceberg o Delta rispetto al Parquet semplice, e lo resta nel 2026.

Il pattern medallion: dove l’evoluzione vive davvero

In produzione, la storia più pulita di schema evolution usa formati diversi a stadi diversi:

Kafka (Avro + registry)
       v
Layer raw: Delta o Parquet, schema che rispecchia l'Avro a monte
       v
Layer clean: Delta, con uno schema curato stabile controllato dal data team
       v
Layer serving: Delta, denormalizzato per i pattern di query

I producer controllano gli schema Avro che emettono. Il schema registry impone la compatibilità: i producer non possono rompere i consumer senza passare da una review. Lo Spark Structured Streaming legge da Kafka, deserializza tramite il registry, e atterra i record in una tabella Delta “raw”. La tabella raw ha uno schema permissivo che rispecchia qualsiasi Avro sia corrente.

Il layer “clean” è dove il data team prende il controllo. Le trasformazioni da raw a clean proiettano esplicitamente su uno schema stabile. Le nuove colonne da monte compaiono in raw automaticamente; non compaiono in clean finché qualcuno non aggiorna la trasformazione. Questo disaccoppia l’evoluzione a monte dalla stabilità a valle.

Questo pattern gestisce i cambi backward-compatible con zero modifiche alla pipeline (il layer raw li assorbe; il layer clean ignora i campi che non proietta), e fa emergere i cambi breaking in code review quando qualcuno prova ad aggiornare la proiezione del layer clean.

Validazione ai confini

Dentro una pipeline tipata, gli errori di tipo si prendono al compile time (o almeno al schema-mismatch time). Al confine (la lettura dei dati a monte), i tipi sono quelli che produce la sorgente, e la sorgente può mentire. Un paio di pattern che funzionano:

Cast esplicito al confine. Non fidarti dei tipi inferiti. Casta tutto man mano che entra nel layer clean:

from pyspark.sql import functions as F

cleaned = (raw
   .select(
       F.col("order_id").cast("long"),
       F.col("amount").cast("decimal(18,2)"),
       F.to_timestamp("created_at").alias("created_at"),
       F.col("country").cast("string"),
   ))

Fail-fast su null inattesi. Se un campo dovrebbe essere presente e non lo è, vuoi saperlo adesso, non in un report a valle:

null_count = cleaned.filter(F.col("order_id").isNull()).count()
if null_count > 0:
    raise RuntimeError(f"{null_count} rows missing order_id")

In una pipeline reale questo diventa un data quality check, spesso tramite una libreria come Great Expectations o Soda o le DataQuality API native di Spark. Il principio: valida al confine, non dopo che tre trasformazioni sono già girate.

Whitelist delle colonne che ti interessano. Se fai .select() di una lista stabile di colonne dal raw, le aggiunte a monte non si propagano per sbaglio:

expected = ["order_id", "amount", "created_at", "country"]
cleaned = raw.select(*expected)  # Fallisce in modo rumoroso se ne manca qualcuna

Cosa può ancora andare storto

Un paio di trappole:

Allargamento di tipo implicito che perde precisione. Un decimal(38,18) sommato da Spark restituisce decimal(38,18) con overflow su scala. Una media di float restituisce un float con errori di arrotondamento. Casta su un tipo abbastanza largo prima di aggregare.

Default null di Avro che non sono quello che pensi. In Avro, un campo opzionale è tipicamente ["null", "string"] con null come default. Se sbagli a digitare il default nello schema (es. stringa vuota invece di null), i record vecchi caricano per sempre il default sbagliato.

Delta mergeSchema con column-mapping disabilitato. Se provi a rinominare una colonna con mergeSchema=true su una tabella Delta default (senza column-mapping mode), Delta lo vede come elimina-vecchia-colonna-aggiungi-nuova. Perdi i dati nella colonna rinominata. Abilita sempre il column-mapping prima dei rename.

Rename Parquet via “ALTER” su Hive metastore. Hive metastore rinomina volentieri una colonna nei suoi metadati. I file Parquet sottostanti hanno ancora il nome vecchio. Le letture restituiscono null per la colonna rinominata. Questo bug è più vecchio della maggior parte dei data engineer e si presenta ancora.

Provalo

Un piccolo esercizio di evolution-over-time su Delta:

from pyspark.sql import SparkSession, functions as F

spark = (SparkSession.builder
    .appName("EvolutionDemo")
    .master("local[*]")
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate())

path = "/tmp/delta_orders"

# Giorno 1: orders con id e amount
day1 = spark.range(10).selectExpr("id as order_id", "id * 1.5 as amount")
(day1.write.format("delta").mode("overwrite").save(path))

print("After day 1:")
spark.read.format("delta").load(path).show()

# Giorno 2: a monte e' stata aggiunta una colonna country
day2 = (spark.range(10, 20)
        .selectExpr("id as order_id",
                    "id * 1.5 as amount",
                    "'IT' as country"))
(day2.write
     .format("delta")
     .option("mergeSchema", "true")  # opt in all'evoluzione
     .mode("append")
     .save(path))

print("After day 2:")
spark.read.format("delta").load(path).show()
# id 0..9 hanno country=null, id 10..19 hanno country='IT'

# Time travel a prima che esistesse la colonna country
print("Time travel to version 0:")
spark.read.format("delta").option("versionAsOf", 0).load(path).show()
# Lo schema non include country

Lancialo e guarda lo schema evolvere. La storia è preservata. Una query a versione 0 vede lo schema originale. Una query alla versione più recente vede la nuova colonna. Nessun dato è stato riscritto.

Questo è il Modulo 8 finito. In queste otto lezioni hai coperto i formati di file (Parquet, ORC, Avro, Delta), il connettore JDBC per le sorgenti relazionali, l’object storage cloud, e i pattern di schema evolution che tengono insieme tutto. Il Modulo 9 prende in mano lo streaming: Spark Structured Streaming, il modello di watermarking, la semantica exactly-once, e come tutti i formati di storage di oggi servono sia da source sia da sink per i job di streaming.


Riferimenti: documentazione di Apache Spark, documentazione di Delta Lake (https://docs.delta.io/), documentazione di Apache Iceberg (https://iceberg.apache.org/), documentazione di Confluent Schema Registry. Recuperato il 2026-05-01.

Cerca