PySpark, de la zero Lecția 44 / 60

ORC, Avro, Delta: alternativele si cand castiga fiecare

Trei familii de formate care nu sunt Parquet, cand este fiecare alegerea corecta si de ce Delta a tot preluat in liniste controlul.

Lecția trecută am argumentat că Parquet e default-ul corect pentru muncă analitică. Majoritatea echipelor se opresc cu cititul acolo și nu mai investighează restul. E în mare parte ok, dar ocazional vei întâlni pipeline-uri unde Parquet nu e răspunsul și ar trebui să știi de ce.

Trei familii de formate domină spațiul „nu Parquet”:

  • ORC: fratele mai mare al lui Parquet, născut în ecosistemul Hive.
  • Avro: un format orientat pe rânduri, optimizat pentru streaming și evoluția schemei.
  • Delta Lake / Iceberg / Hudi: formate de tabel tranzacționale care stau deasupra Parquet și adaugă semanticile de bază de date care îi lipsesc.

Fiecare câștigă într-o situație specifică. Lecția asta parcurge toate trei, ca să recunoști când ești în acea situație și să acționezi în consecință.

ORC: nativ Hive

ORC vine de la Optimized Row Columnar și e structural foarte similar cu Parquet. Fișierele se împart în stripes (row group-urile lui Parquet), stripes se împart în column streams (column chunks), fiecare stream are propria compresie, iar un footer stochează statistici la nivel de stripe. Dacă ai mijit ochii la layout-ul unui fișier ORC lângă un fișier Parquet, te-ai chinui să le deosebești.

# Reading is identical to Parquet
df = spark.read.orc("s3://lake/orders_orc/")

# Writing too
(df.write
   .mode("overwrite")
   .option("compression", "zstd")
   .orc("s3://lake/orders_orc/"))

Motivul pentru care ambele formate există se reduce la istorie. ORC a fost construit la Hortonworks pentru Hive; Parquet a fost construit la Twitter și Cloudera, inițial pentru Impala. Câțiva ani au avut puncte forte genuin diferite: predicate pushdown-ul ORC și indexurile încorporate (un concept „min/max + bloom filter” pe care îl avea înaintea Parquet) îi dădeau un avantaj pe anumite sarcini Hive, în timp ce Parquet avea suport mai bun cross-language și o poveste mai puternică pentru tipuri imbricate.

Până în 2026, acele diferențe s-au îngustat. Cititorul vectorizat al lui Parquet e excelent. Parquet a primit bloom filters. ORC a primit o integrare Spark mai bună. Cele două formate dau performanță aproximativ echivalentă pe majoritatea benchmark-urilor, cu cazuri marginale care înclină într-o direcție sau alta în funcție de forma datelor și de codec.

Ce nu s-a îngustat e gravitația ecosistemului. Parquet a câștigat ecosistemul analitic mai larg: Pandas, DuckDB, Polars, BigQuery External Tables, Athena, suportul pentru fișiere externe al Snowflake, dbt. Aproape orice citește Parquet first-class. ORC e citit peste tot, dar tratat ca un cetățean de clasa a doua în uneltele care nu sunt în sfera Hive.

Recomandarea practică: default la Parquet, doar dacă nu ești într-un Hive shop unde ORC e standardul existent. Dacă lucrezi într-un mediu Cloudera sau Hortonworks cu ani de tabele ORC, păstrează ORC: costul conversiei nu merită urmărit pentru 5% performanță. Dacă pornești de la zero, alege Parquet și nu te mai uita înapoi.

Avro: când orientat pe rânduri e forma corectă

Avro e cel ciudat din rând. E orientat pe rânduri, nu pe coloane. Înregistrările sunt stocate una după alta, câmpurile fiecărei înregistrări împachetate contiguu. Citirea coloanei 3 înseamnă citirea fiecărei înregistrări complet. Proiecția pe coloane e falsă: citești octeții și îi arunci pe cei pe care nu îi vrei.

Sună a pas înapoi, iar pentru analitică chiar este. Atunci de ce există Avro?

Două motive: scrieri append cu latență mică și evoluția schemei.

Când ingerezi evenimente Kafka unul câte unul și ai nevoie să le stochezi durabil pe măsură ce sosesc, nu poți tampona 128 MB în memorie așteptând ca un row group să se umple. Avro îți permite să scrii o singură înregistrare și s-o flushuiești. Formatul de fișier e proiectat pentru append în streaming. De asta pipeline-urile de tip „topic Kafka arhivat în S3” aterizează aproape întotdeauna în Avro, apoi sunt reîmpachetate în Parquet în aval de un job batch.

Povestea evoluției schemei e și mai importantă. Avro stochează schema cu datele: fiecare fișier Avro își declară schema scriitorului în header. Cititorii compară schema scriitorului cu propria schemă a cititorului și rezolvă diferențele automat, urmând reguli de compatibilitate bine definite. Poți:

  • Adăuga un câmp cu o valoare implicită. Cititorii vechi îl ignoră; cititorii noi văd valoarea implicită pentru înregistrări vechi.
  • Elimina un câmp care are o valoare implicită. Cititorii vechi văd valoarea implicită; cititorii noi încetează să-l citească.
  • Redenumi un câmp prin alias-uri. Codul vechi continuă să meargă sub vechiul nume.

Acest tip de compatibilitate forward-and-backward e esențial pentru streaming de evenimente, unde producătorii și consumatorii fac deploy independent și nu poți coordona schimbările de schemă între echipe. Tiparul e împerecheat cu un Schema Registry (cel de la Confluent e implementarea canonică) care stochează schemele central și atribuie fiecăreia un ID. Înregistrările Avro pe fir referă ID-ul schemei, nu schema completă, păstrându-le mici.

Citirea și scrierea Avro cu Spark are nevoie de pachetul spark-avro, care e împachetat în distribuțiile Spark standard, dar s-ar putea să trebuiască să-l declari explicit:

spark = (SparkSession.builder
         .appName("AvroDemo")
         .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.5.0")
         .getOrCreate())

# Read
events = spark.read.format("avro").load("s3://kafka-archive/events/")

# Write
(df.write
   .format("avro")
   .mode("append")
   .save("s3://kafka-archive/events/"))

Când să apelezi la Avro:

  • Consumi topicuri Kafka și scrii evenimentele brute în stocare pe termen lung.
  • Producătorii și consumatorii fac deploy pe planificări diferite și au nevoie de evoluție de schemă care să nu rupă niciuna dintre părți.
  • Înregistrările tale sunt de obicei citite în întregime, nu proiectate pe coloane.

Când să nu apelezi la Avro:

  • Interogări analitice care ating câteva coloane din multe. Lipsa pruning-ului pe coloane va doare.
  • Oriunde ai apela la Parquet din motive de performanță. Beneficiul streaming-ului nu se aplică.

O arhitectură de producție obișnuită: evenimentele aterizează în Avro pe stocare brută de obiecte; un job batch rulează în fiecare oră și reîmpachetează în Parquet partiționat pentru analitică. Avro la margine, Parquet la depozit.

Delta Lake: Parquet plus un transaction log

Acum formatul care a tot mâncat lumea în liniște.

Delta Lake nu e un format de fișier nou: e un strat care stă deasupra Parquet. Fișierele de date sunt în continuare Parquet. Ce adaugă Delta e un transaction log: un director numit _delta_log/ lângă datele tale, plin cu fișiere JSON care înregistrează fiecare commit (adaugă fișierul ăsta, scoate fișierul celălalt, schimbă metadata asta) în ordine. Fiecare scriere produce o nouă intrare în log. Fiecare citire consultă mai întâi log-ul ca să afle ce fișiere fac parte din versiunea curentă a tabelului, apoi citește doar pe acelea.

Acea structură deblochează patru lucruri pe care Parquet singur nu le poate da:

1. Scrieri atomice. O scriere Parquet în S3 listează fișiere pe jumătate scrise în timpul scrierii: cititorii pot vedea stare parțială. Delta scrie fișiere noi, apoi face commit atomic la o intrare în log care le face vizibile. Fie întreaga scriere e vizibilă, fie deloc.

2. UPDATE / DELETE / MERGE. Fișierele Parquet sunt imutabile, deci schimbarea rândurilor înseamnă rescrierea fișierelor. Delta automatizează asta: un UPDATE rescrie fișierele afectate și înregistrează în log că cele vechi sunt scoase. Cititorii văd automat starea nouă.

from delta.tables import DeltaTable

orders = DeltaTable.forPath(spark, "s3://lake/orders_delta/")

# UPDATE rows in place
orders.update(
    condition="status = 'pending' AND age_days > 7",
    set={"status": "'expired'"},
)

# DELETE rows
orders.delete(condition="amount = 0")

3. MERGE INTO (upserts). Funcționalitatea-ucigaș. Combină inserturile și update-urile într-o singură operațiune, ceea ce e exact ce are nevoie fiecare pipeline CDC (change-data-capture):

updates = spark.read.parquet("s3://staging/orders_changes/")
target = DeltaTable.forPath(spark, "s3://lake/orders_delta/")

(target.alias("t")
   .merge(updates.alias("u"), "t.order_id = u.order_id")
   .whenMatchedUpdateAll()
   .whenNotMatchedInsertAll()
   .execute())

Înainte de Delta, tiparul ăsta cerea o rescriere completă a tabelului sau un job complex cu windowing. Acum sunt trei rânduri.

4. Time travel. Pentru că log-ul înregistrează fiecare versiune a tabelului, poți citi orice versiune trecută după număr de versiune sau timestamp:

# As of 2 days ago
old = spark.read.format("delta").option("timestampAsOf", "2026-04-21").load(path)

# As of version 47
old = spark.read.format("delta").option("versionAsOf", 47).load(path)

Time travel e neprețuit pentru depanare („cum arăta tabelul înainte de jobul prost de ieri?”) și pentru reproductibilitate („modelul a fost antrenat pe acest snapshot exact”). Nu e gratis: fișierele vechi sunt păstrate până sunt vacuumate, mâncând stocare, dar valoarea operațională a depanării e greu de supraestimat.

Citirea și scrierea sunt directe odată ce ai adăugat pachetul:

spark = (SparkSession.builder
         .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.1.0")
         .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
         .config("spark.sql.catalog.spark_catalog",
                 "org.apache.spark.sql.delta.catalog.DeltaCatalog")
         .getOrCreate())

df.write.format("delta").mode("overwrite").save("s3://lake/orders_delta/")
df = spark.read.format("delta").load("s3://lake/orders_delta/")

Delta a început ca o chestie doar Databricks, a fost open-sourced în 2019, iar până în 2026 e un proiect Linux Foundation cu suport first-class pentru Spark, Trino, Flink și principalele depozite cloud. Nu mai e o poveste de vendor lock-in.

Iceberg și Hudi: aceeași idee, alte pariuri

Delta nu e singurul format de tabel tranzacțional. Apache Iceberg (inițial de la Netflix) și Apache Hudi (de la Uber) atacă aceeași problemă cu alegeri de design diferite.

  • Iceberg are un strat de metadate mai sofisticat (manifest files care descriu manifest lists care descriu snapshot-uri) care scalează mai bine la tabele cu adevărat enorme și suportă o evoluție mai curată a schemei/partițiilor. AWS, Snowflake, BigQuery au pariat puternic pe interoperabilitatea cu Iceberg.
  • Hudi se concentrează pe upserts streaming la latență mică, cu un compromis copy-on-write vs merge-on-read expus utilizatorilor.
  • Delta are cele mai largi unelte în jurul lui (în special optimizările Databricks precum Z-ORDER și liquid clustering) și cel mai simplu model mental.

Pentru proiecte noi în 2026, alegerea e în mare parte tribală. Echipele Databricks folosesc Delta. Echipele AWS-centrice tind spre Iceberg. Echipele heavy-streaming aleg uneori Hudi. Vestea bună: toate trei rezolvă aceleași probleme de bază, toate trei stau deasupra Parquet și toate trei converg spre seturi similare de funcționalități. Alege-l pe cel pe care platforma ta îl suportă cel mai bine și nu te chinui.

Configurația terenului în 2026

Punând totul cap la cap, iată cum aș recomanda să te gândești la alegerea formatului:

Caz de utilizareFormat
Data lake analitic static, în mare parte appendParquet
Hive shop cu tabele existenteORC (nu migra)
Arhivă streaming KafkaAvro, reîmpachetat în Parquet în aval
Lakehouse mutabil (UPDATE/DELETE/MERGE)Delta (sau Iceberg, sau Hudi)
Depanare cu time travel necesarăDelta (sau Iceberg)
Fișier mic ad-hoc pentru citire umanăJSON / CSV

Tendința macro e clară: data lake-urile Parquet pure migrează încet spre Delta sau Iceberg, pentru că odată ce ai experimentat scrieri atomice și MERGE INTO, nu te mai întorci. Migrarea nu e trivială (reorganizarea layout-urilor de partiție, reantrenarea echipei, asumarea costului operațional al unui transaction log), dar se plătește singură prima dată când un job de la 3 dimineața moare la jumătatea drumului și tabelul e încă consistent.

Încearcă asta

Scrie același DataFrame ca Parquet, ORC, Avro și Delta, apoi explorează fiecare:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = (SparkSession.builder
         .appName("FormatsDemo")
         .master("local[*]")
         .config("spark.jars.packages",
                 "org.apache.spark:spark-avro_2.12:3.5.0,"
                 "io.delta:delta-spark_2.12:3.1.0")
         .config("spark.sql.extensions",
                 "io.delta.sql.DeltaSparkSessionExtension")
         .config("spark.sql.catalog.spark_catalog",
                 "org.apache.spark.sql.delta.catalog.DeltaCatalog")
         .getOrCreate())

df = spark.range(0, 100_000).select(
    F.col("id").alias("order_id"),
    (F.col("id") % 50).alias("user_id"),
    (F.rand() * 1000).alias("amount"),
    F.lit("pending").alias("status"),
)

df.write.mode("overwrite").parquet("/tmp/demo/orders_parquet")
df.write.mode("overwrite").orc("/tmp/demo/orders_orc")
df.write.mode("overwrite").format("avro").save("/tmp/demo/orders_avro")
df.write.mode("overwrite").format("delta").save("/tmp/demo/orders_delta")

# MERGE INTO with Delta
from delta.tables import DeltaTable

updates = spark.range(0, 100).select(
    F.col("id").alias("order_id"),
    F.lit(0).alias("user_id"),
    F.lit(99.99).alias("amount"),
    F.lit("paid").alias("status"),
)

target = DeltaTable.forPath(spark, "/tmp/demo/orders_delta")
(target.alias("t")
   .merge(updates.alias("u"), "t.order_id = u.order_id")
   .whenMatchedUpdateAll()
   .whenNotMatchedInsertAll()
   .execute())

# How many rows have status = 'paid' now?
print(spark.read.format("delta").load("/tmp/demo/orders_delta")
      .filter("status = 'paid'").count())   # 100

# Time travel — read the version before the merge
v0 = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/demo/orders_delta")
print(v0.filter("status = 'paid'").count())  # 0

Deschide /tmp/demo/orders_delta/_delta_log/ după ce rulezi asta. Vei vedea două fișiere .json: 00000000000000000000.json pentru scrierea inițială, 00000000000000000001.json pentru merge. Ăsta e transaction log-ul în jurul căruia e construit Delta. Deschide unul și citește-l: e doar JSON care listează fișierele adăugate și scoase.

Lecția următoare lăsăm formatele de fișier și începem cu surse JDBC: extragerea de date din Postgres, MySQL și SQL Server, inclusiv trucul partitionColumn care te împiedică să faci accidental DDoS pe baza ta de date de producție.

Câteva referințe înainte: lecția 47 acoperă atenționările despre cloud storage (costurile de listare S3, consistența eventuală și flagul _SUCCESS) care mușcă atunci când aceste formate trăiesc pe stocare de obiecte în loc de HDFS. Lecția 48 intră mai adânc în evoluția schemei: subiectul Avro primește tratament mai amplu acolo, alături de modul în care Parquet și Delta gestionează adăugările și eliminările de coloane.


Referințe: documentația Apache ORC (https://orc.apache.org/docs/), specificația Apache Avro (https://avro.apache.org/docs/current/specification/) și documentația Delta Lake (https://docs.delta.io/latest/index.html). Consultat 2026-05-01.

Caută