PySpark, de la zero Lecția 48 / 60

Schema evolution: cand coloanele se schimba sub tine

De ce formatele schema-on-read gestioneaza prost schimbarea, de ce Avro+registry o gestioneaza bine si modul Delta/Iceberg de a le obtine pe ambele.

Se adaugă o coloană în upstream. O coloană este redenumită. Un tip se schimbă din int în bigint. Mesajul Slack de marți dimineață: „am deployat un payload de eveniment nou, poți să-ți verifici pipeline-urile?”

Asta e cotidianul data engineer-ului. Schema evolution este disciplina menținerii pipeline-urilor funcționale când datele de sub ele își schimbă forma. Diferite formate o gestionează cu grade de eleganță foarte diferite, iar alegerea contează mai mult decât realizează oamenii până în ziua când coloana nouă apare lipsă din dashboard-ul de ieri.

Lecția asta este despre ce se schimbă, de ce fiecare format de stocare reacționează diferit și ce pattern-uri de producție funcționează în 2026.

Trei tipuri de schimbări

Înainte să vorbim despre formate, taxonomia. Schimbările de schemă vin în trei categorii:

Backward-compatible: schema nouă poate citi date vechi. Adăugarea unei coloane opționale cu o valoare implicită. Lărgirea unui tip (int la bigint, float la double). Cod nou citește fișiere vechi și primește null-uri sau valori implicite pentru câmpul nou. Aceasta este cea sigură.

Forward-compatible: schema veche poate citi date noi. Eliminarea unei coloane. Cod vechi citește fișiere noi și ignoră câmpul eliminat. Mai puțin comună dar contează când consumatorii se actualizează pe alte programe decât producătorii.

Breaking: nicio direcție nu funcționează fără migrare de date. Redenumirea unei coloane. Schimbarea unui tip incompatibil (string la int). Eliminarea unui câmp neopțional. Acestea au nevoie de o tăiere coordonată.

Scopul mașinăriei de schema evolution este să facă schimbările backward-compatible gratuite, schimbările forward-compatible posibile și schimbările breaking imposibile de făcut din greșeală.

Parquet: schema-on-read, evoluție la nivel de format

Parquet stochează schema în footer-ul fiecărui fișier. Când Spark citește un singur fișier Parquet, citește schema fișierului și o folosește.

Când Spark citește mai multe fișiere Parquet cu scheme diferite, are o problemă. Implicit, Spark presupune că toate fișierele dintr-o cale partajează aceeași schemă. Alege una (deseori prima listată) și o folosește pentru întregul DataFrame. Fișierele cu scheme diferite produc în cel mai bun caz rezultate greșite, în cel mai rău caz aruncă la momentul citirii.

Există două straturi de ajutor:

mergeSchema

Cititorul Parquet suportă unirea schemelor:

df = (spark.read
        .option("mergeSchema", "true")
        .parquet("s3a://bucket/events/"))

Cu mergeSchema=true, Spark citește footer-ul fiecărui fișier din calea de input, calculează uniunea tuturor schemelor și o folosește ca schema DataFrame-ului. Fișierele cărora le lipsește o coloană returnează null pentru acea coloană. Costul este un round-trip de metadate per fișier înainte ca orice dată să fie citită, pentru un director cu 10000 de fișiere, asta înseamnă o întârziere up-front non-trivială.

mergeSchema e excelent în dezvoltare și job-uri la scară mică. În producție pe un lakehouse cu un milion de fișiere, e suficient de scump încât ar trebui să fii selectiv.

Ce unește efectiv Parquet

Unirea schemelor Parquet urmează reguli specifice:

  • Adăugarea unei coloane: funcționează. Fișierele fără coloană returnează null.
  • Redenumirea unei coloane: nu funcționează. Spark vede două coloane; fișierele vechi au una, fișierele noi au cealaltă.
  • Lărgire de tip (INT32 la INT64, FLOAT la DOUBLE): funcționează în 2026; versiunile mai vechi de Spark aruncau.
  • Îngustare de tip (INT64 la INT32): eșuează pe rândurile care nu se potrivesc.
  • Schimbări de tip între categorii (string la int): eșuează direct.
  • Adăugarea unui câmp în struct imbricat: funcționează.
  • Redenumirea unui câmp în struct imbricat: eșuează la fel ca redenumirea de la nivel superior.

Concluzia: Parquet gestionează bine evoluția aditivă și deloc redenumirile. Într-un lac doar Parquet, nu redenumești coloanele. Adaugi coloana nouă, scrii dual pentru o perioadă de tranziție, apoi o elimini pe cea veche.

Avro: schema călătorește cu datele

Avro adoptă o abordare diferită. Fiecare fișier Avro stochează schema writer-ului în header. Când citești, oferi o schemă a reader-ului. Biblioteca Avro rezolvă câmp cu câmp, cu reguli explicite pentru câmpuri lipsă, promovare de tip și ordine a câmpurilor.

Un câmp prezent în schema writer-ului dar lipsă din schema reader-ului este eliminat în liniște. Un câmp prezent în schema reader-ului dar lipsă din schema writer-ului este completat cu valoarea implicită din partea reader-ului (specificată la momentul definirii schemei). Un câmp cu același nume și un tip compatibil-dar-diferit este promovat (int la long, float la double, string la bytes).

Redenumirile sunt gestionate prin alias-uri: în schema reader-ului marchezi câmpul nou cu "aliases": ["old_name"], iar Avro recunoaște că datele vechi care poartă old_name ar trebui să populeze new_name.

Asta este fundamental mai flexibil decât Parquet. Costul: codare rând cu rând (fără pruning columnar), fără predicate pushdown, în general rate de compresie mai mici.

Pentru Avro la scară ai nevoie și de un schema registry, un serviciu precum Confluent Schema Registry care stochează toate versiunile istorice de schemă pentru un topic, atribuie fiecăreia un ID numeric și face ca producătorii și consumatorii să fie de acord cu privire la ce trimit. Fiecare înregistrare Avro primește un prefix de 4 bytes cu ID-ul schemei; reader-ul aduce schema potrivită din registry. Cu un registry, producătorii pot evolua schemele liber, iar registry-ul aplică regulile de compatibilitate (BACKWARD, FORWARD, FULL, NONE) pe care le-ai configurat.

De aceea Avro domină pipeline-urile de streaming și ingestion de evenimente în 2026. Topicurile Kafka transportă milioane de evenimente cu scheme care evoluează săptămânal; Avro plus Schema Registry face asta posibil. Parquet la aceeași rată de schimbare e o suferință.

Citirea Avro în Spark:

df = (spark.read
        .format("avro")
        .load("s3a://bucket/events/"))

Sau cu schemă explicită și integrare cu Schema Registry (de obicei printr-o bibliotecă Spark-Avro Confluent sau open-source):

from pyspark.sql.avro.functions import from_avro

# Intr-un job de streaming care trage din Kafka
events = (spark.readStream.format("kafka")
          .option("kafka.bootstrap.servers", "...")
          .option("subscribe", "user_events")
          .load())

# Decodeaza schema fiecarei inregistrari dupa ID si proiecteaza pe schema reader-ului
decoded = events.select(
    from_avro("value", reader_schema_json,
              {"mode": "PERMISSIVE",
               "schemaRegistryUrl": "https://schema-registry.internal"})
    .alias("event")
)

Delta și Iceberg: schema în log-ul tabelei

Delta Lake (formatul pe care l-am acoperit în lecția 44) și Apache Iceberg adoptă o a treia abordare: schema tabelei este stocată într-un log de tranzacții alături de fișierele de date. Fiecare schimbare a schemei este o intrare în log. Fișierele de date vechi nu sunt rescrise când se schimbă schema; cititorul proiectează întotdeauna fișierul pe schema curentă.

Asta îți oferă ce e mai bun din ambele lumi:

  • Evoluție ieftină. Adăugarea unei coloane nu atinge fișierele existente. Log-ul înregistrează adăugarea și ce valoare implicită să folosească la citirea fișierelor vechi.
  • Validare puternică. O scriere care nu se potrivește schemei curente eșuează implicit. Optezi pentru evoluție cu mergeSchema sau opțiuni echivalente per-scriere.
  • Time travel. Schema este versionată. Interogarea tabelei la o versiune veche folosește schema din acea versiune.
  • Redenumiri care funcționează. Iceberg suportă redenumirea coloanelor nativ (urmărește coloanele după ID, nu după nume). Delta a adăugat suport pentru redenumire în modul column-mapping (1.2+), e o funcționalitate opt-in pentru că necesită cititori Parquet care respectă metadatele de column-mapping.

Un flux tipic de evoluție Delta:

# Scriere initiala
(orders_v1
   .write
   .format("delta")
   .mode("overwrite")
   .save("s3a://bucket/orders/"))

# O saptamana mai tarziu, upstream adauga o coloana discount_code
# Vrem sa o adaugam in tabela fara sa rescriem istoricul
(orders_v2_with_discount
   .write
   .format("delta")
   .option("mergeSchema", "true")
   .mode("append")
   .save("s3a://bucket/orders/"))

# Citirile dupa asta vad coloana noua. Fisierele vechi raporteaza null pentru ea.
(spark.read.format("delta").load("s3a://bucket/orders/")
   .select("order_id", "discount_code")
   .show())

Fără mergeSchema=true, a doua scriere eșuează cu o eroare schema-mismatch. Opt-in-ul este mecanismul de siguranță. Spui explicit „da, evoluez schema de data asta.”

Pentru schimbări breaking, atât Delta cât și Iceberg au DDL explicit:

-- Delta SQL
ALTER TABLE orders ADD COLUMN region STRING AFTER country;
ALTER TABLE orders RENAME COLUMN customer TO customer_id;
ALTER TABLE orders ALTER COLUMN amount TYPE DECIMAL(18,4);

Redenumirile funcționează pentru că Parquet-ul de bază păstrează un ID de coloană stabil, iar cititorii înțeleg maparea. Aceasta a fost istoric cel mai mare motiv pentru a alege Iceberg sau Delta în detrimentul Parquet simplu și rămâne așa în 2026.

Pattern-ul medallion: unde trăiește efectiv evoluția

În producție, cea mai curată poveste de schema evolution folosește formate diferite la etape diferite:

Kafka (Avro + registry)
       |
       v
Raw layer: Delta sau Parquet, schema oglindeste Avro-ul upstream
       |
       v
Clean layer: Delta, cu o schema curatata stabila controlata de echipa de date
       |
       v
Serving layer: Delta, denormalizat pentru pattern-uri de interogare

Producătorii controlează schemele Avro pe care le emit. Schema registry aplică compatibilitatea, producătorii nu pot rupe consumatorii fără să treacă prin review. Spark Structured Streaming citește din Kafka, deserializează prin registry și aterizează înregistrările într-o tabelă Delta „raw”. Tabela raw are o schemă permisivă care oglindește orice Avro este curent.

Stratul „clean” este unde echipa de date preia controlul. Transformările de la raw la clean proiectează explicit pe o schemă stabilă. Coloanele noi din upstream apar automat în raw; nu apar în clean până când cineva nu actualizează transformarea. Asta decuplează evoluția upstream de stabilitatea downstream.

Acest pattern gestionează schimbările backward-compatible cu zero modificări în pipeline (stratul raw le absoarbe; stratul clean ignoră câmpurile pe care nu le proiectează) și scoate la suprafață schimbările breaking în code review când cineva încearcă să actualizeze proiecția stratului clean.

Validare la granițe

În interiorul unui pipeline tipat, erorile de tip sunt prinse la momentul compilării (sau cel puțin la momentul nepotrivirii de schemă). La graniță, citirea datelor upstream, tipurile sunt orice produce sursa, iar sursa poate minți. Câteva pattern-uri care funcționează:

Cast explicit la graniță. Nu te încrede în tipurile inferate. Castează totul când intră în stratul clean:

from pyspark.sql import functions as F

cleaned = (raw
   .select(
       F.col("order_id").cast("long"),
       F.col("amount").cast("decimal(18,2)"),
       F.to_timestamp("created_at").alias("created_at"),
       F.col("country").cast("string"),
   ))

Fail-fast la null-uri neașteptate. Dacă un câmp ar trebui să fie prezent și nu este, vrei să afli acum, nu într-un raport din downstream:

null_count = cleaned.filter(F.col("order_id").isNull()).count()
if null_count > 0:
    raise RuntimeError(f"{null_count} rows missing order_id")

Într-un pipeline real asta devine un check de calitate a datelor, deseori printr-o bibliotecă precum Great Expectations sau Soda sau API-urile DataQuality proprii Spark. Principiul: validează la graniță, nu după ce trei transformări au rulat deja.

Whitelist coloanele de care îți pasă. Dacă faci .select() pe o listă stabilă de coloane din raw, adăugările din upstream nu se propagă din greșeală:

expected = ["order_id", "amount", "created_at", "country"]
cleaned = raw.select(*expected)  # Esueaza zgomotos daca lipseste vreuna

Ce mai poate merge prost

Câteva capcane:

Lărgire implicită de tip care pierde precizie. Un decimal(38,18) însumat de Spark returnează decimal(38,18) cu overflow la scară. Un float mediat returnează un float cu erori de rotunjire. Castează la un tip suficient de larg înainte de agregare.

Valori implicite Avro null care nu sunt ce crezi. În Avro, un câmp opțional este de obicei ["null", "string"] cu null ca implicit. Dacă greșești tipărirea valorii implicite în schemă (de exemplu string gol în loc de null), înregistrările vechi încarcă valoarea implicită greșită pentru totdeauna.

Delta mergeSchema cu column-mapping dezactivat. Dacă încerci să redenumești o coloană cu mergeSchema=true pe o tabelă Delta implicită (fără modul column-mapping), Delta vede asta ca elimină-coloana-veche-adaugă-una-nouă. Pierzi datele din coloana redenumită. Activează întotdeauna column-mapping înainte de redenumiri.

Redenumire Parquet prin „ALTER” pe Hive metastore. Hive metastore redenumește bucuros o coloană în metadatele lui. Fișierele Parquet de bază au în continuare numele vechi. Citirile returnează null pentru coloana redenumită. Bug-ul ăsta e mai vechi decât majoritatea data engineer-ilor și încă se livrează.

Încearcă asta

Un mic exercițiu de evoluție-în-timp pe Delta:

from pyspark.sql import SparkSession, functions as F

spark = (SparkSession.builder
    .appName("EvolutionDemo")
    .master("local[*]")
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate())

path = "/tmp/delta_orders"

# Ziua 1: comenzi cu id si amount
day1 = spark.range(10).selectExpr("id as order_id", "id * 1.5 as amount")
(day1.write.format("delta").mode("overwrite").save(path))

print("After day 1:")
spark.read.format("delta").load(path).show()

# Ziua 2: upstream a adaugat o coloana country
day2 = (spark.range(10, 20)
        .selectExpr("id as order_id",
                    "id * 1.5 as amount",
                    "'IT' as country"))
(day2.write
     .format("delta")
     .option("mergeSchema", "true")  # opt-in pentru evolutie
     .mode("append")
     .save(path))

print("After day 2:")
spark.read.format("delta").load(path).show()
# id 0..9 au country=null, id 10..19 au country='IT'

# Time travel inapoi inainte sa existe coloana country
print("Time travel to version 0:")
spark.read.format("delta").option("versionAsOf", 0).load(path).show()
# Schema nu include country

Rulează asta și urmărește schema evoluând. Istoricul este păstrat. O interogare la versiunea 0 vede schema originală. O interogare la versiunea cea mai recentă vede coloana nouă. Nicio dată nu a fost rescrisă.

Acesta este Modulul 8 încheiat. De-a lungul acestor opt lecții ai acoperit formatele de fișier (Parquet, ORC, Avro, Delta), conectorul JDBC pentru surse relaționale, cloud object storage și pattern-urile de schema evolution care țin totul la un loc. Modulul 9 reia cu streaming: Spark Structured Streaming, modelul de watermarking, semantica exactly-once și cum toate formatele de stocare de azi servesc atât ca sursă cât și ca sink pentru job-urile de streaming.


Referințe: documentația Apache Spark, documentația Delta Lake (https://docs.delta.io/), documentația Apache Iceberg (https://iceberg.apache.org/), documentația Confluent Schema Registry. Consultat 2026-05-01.

Caută