PySpark, de la zero Lecția 43 / 60

Parquet: de ce e default-ul si pe buna dreptate

Stocarea pe coloane explicata, codecuri de compresie, predicate pushdown si structura row-group care face citirile selective rapide.

Suntem acum în Modulul 8, partea cursului unde nu mai vorbim despre cum gândește Spark și începem să vorbim despre cum mănâncă Spark. Fiecare job pornește cu citirea a ceva și se încheie cu scrierea a ceva. Formatul pe care îl alegi pentru acel ceva are mai mult impact asupra performanței decât aproape orice buton de tuning din motor.

Modulul ăsta acoperă formatele pe care le vei întâlni efectiv în sălbăticie: Parquet, ORC, Avro, Delta, JSON, CSV, surse JDBC, depozite cloud de obiecte. Vom începe cu cel pe care îl vei folosi 80% din timp și pe care însuși Spark îl folosește ca default: Parquet.

Dacă reții un singur lucru din modulul ăsta, fie acela: pentru sarcini analitice, Parquet e aproape întotdeauna răspunsul corect. Restul lecției e despre de ce și ce face fișierul sub capotă când apelezi spark.read.parquet(...).

Ce înseamnă concret „pe coloane”

Imaginează-ți un tabel cu cinci coloane și un milion de rânduri. Există două moduri de a-l stoca pe disc.

Orientat pe rânduri stochează toate valorile pentru rândul 1, apoi toate valorile pentru rândul 2, apoi rândul 3, așa cum face CSV, așa cum fac majoritatea bazelor de date tranzacționale pe disc. Dacă vrei a treia coloană din rândul 783, stratul de stocare trebuie să scaneze peste primele două coloane din fiecare rând precedent sau să folosească un index ca să sară acolo.

Pe coloane stochează toate valorile pentru coloana 1 contiguu, apoi toate valorile pentru coloana 2 contiguu și așa mai departe. Dacă vrei coloana 3, citești o singură bucată contiguă și sari peste celelalte patru complet.

Acum gândește-te la o interogare analitică: SELECT AVG(amount) FROM orders WHERE country = 'IT'. Atingi două coloane din vreo patruzeci. Cu un format orientat pe rânduri, citești tot fișierul ca să extragi date dintr-o valoare de două coloane: cea mai mare parte din I/O e irosită. Cu un format pe coloane, citești cele două coloane și le ignori pe celelalte treizeci și opt.

Ăsta e motivul integral pentru care există formatele pe coloane și motivul pentru care Parquet a înghițit lumea analitică. Citirile selective aduc doar coloanele pe care le ceri. Un SELECT col1, col2 FROM t naiv pe un tabel Parquet de 50 de coloane e cam de 25x mai ieftin decât aceeași interogare pe aceleași date în CSV.

Există un al doilea beneficiu care îl amplifică pe primul: valorile dintr-o coloană tind să semene între ele. O coloană country e în mare parte șiruri de două litere. O coloană timestamp e numere întregi pe 8 octeți care cresc monoton. O coloană status are poate patru valori distincte pe milioane de rânduri. Când valorile similare stau una lângă alta, algoritmii de compresie le mănâncă de vii. Rapoarte de compresie de 5-10x pe date reale sunt obișnuite.

Structura pe disc

Un fișier Parquet nu e doar „toată coloana 1, apoi toată coloana 2”. E organizat ierarhic:

File
├── Row Group 0           (target ~128 MB of source data)
│   ├── Column Chunk: id           (all id values for rows in this group)
│   │   ├── Page 0   (~1 MB compressed)
│   │   ├── Page 1
│   │   └── ...
│   ├── Column Chunk: country
│   ├── Column Chunk: amount
│   └── ...
├── Row Group 1
├── Row Group 2
└── Footer
    ├── Schema
    ├── Row group metadata (min, max, null count per column chunk)
    └── Key/value metadata

Trei lucruri merită atenție:

  1. Row groups. Un fișier Parquet e împărțit în row groups, fiecare conținând un interval contiguu de rânduri. Dimensiunea implicită e undeva între 128 MB și 1 GB, în funcție de writer, iar punctul dulce practic e 100-500 MB per row group. Row groups sunt unitatea de paralelism: o sarcină Spark citește de obicei un row group.
  2. Column chunks. În cadrul unui row group, fiecare coloană primește propriul chunk. Aici se întâmplă magia coloanelor la nivel de fișier: poți căuta direct la column chunk-ul pe care îl vrei și să citești doar octeții lui.
  3. Footer-ul. La capătul fișierului, Parquet stochează un bloc de metadate: schema, offset-urile row group-urilor și, esențial, statistici pentru fiecare column chunk: valoarea minimă, valoarea maximă, numărul de null-uri. Spark citește mai întâi footer-ul, se uită la acele statistici și le folosește ca să decidă ce row groups poate sări fără a le citi deloc.

Bucățica aceea finală e fundamentul pentru predicate pushdown.

Predicate pushdown

Când scrii df.filter(F.col("amount") > 1000) pe o sursă Parquet, Spark nu trebuie să citească fișierul și apoi să filtreze. Împinge predicatul jos la stratul de scan. La citire, pentru fiecare row group, verifică statisticile din footer: dacă max(amount) <= 1000, întregul row group poate fi sărit. Niciun I/O pentru intervalul ăla, nicio decompresie, niciun rând materializat.

Asta e invizibil din codul tău. Tu scrii un .filter() normal. Spark și Parquet colaborează ca să sară peste date pe care nu le-ai cerut. Poți confirma că se întâmplă citind .explain():

df = spark.read.parquet("s3://lake/orders/")
df.filter(F.col("amount") > 1000).select("order_id", "amount").explain()
# == Physical Plan ==
# *(1) Project [order_id#3, amount#5]
# +- *(1) Filter (isnotnull(amount#5) AND (amount#5 > 1000))
#    +- *(1) ColumnarToRow
#       +- FileScan parquet [order_id#3,amount#5]
#          PushedFilters: [IsNotNull(amount), GreaterThan(amount,1000)],
#          ReadSchema: struct<order_id:bigint,amount:double>

Lista PushedFilters e dovada. Filtrele care au ajuns în lista aceea sunt evaluate la nivel de fișier/row-group. Filtrele care nu apar acolo sunt evaluate în Spark după scan: tot corect, doar mai puțin eficient.

Ce se împinge bine: egalitate, comparație, IsNull/IsNotNull, In cu o mulțime mică. Ce nu: tipare LIKE cu wildcard la început, apeluri de funcții precum upper(country) = 'IT' (rescrie ca country = 'IT' dacă datele sunt deja cu majuscule), aritmetică pe coloana filtrată (amount + tax > 1000 nu se va împinge; amount > 1000 - tax s-ar putea). Urmărește planul; dacă filtrul tău nu e în PushedFilters, refactorizează-l până e.

ReadSchema e fratele proiecției pe coloane. Spark citește doar order_id și amount pentru că acelea sunt singurele coloane pe care interogarea le referă, chiar dacă tabelul mai are alte douăzeci.

Codecuri de compresie

Parquet comprimă fiecare column chunk independent. Codecul e configurabil. Cele patru pe care le vei întâlni:

  • Snappy: default-ul Spark. Compresie rapidă, decompresie rapidă, raport decent (~2-3x). Alegerea corectă pentru pipeline-uri fierbinți, unde CPU-ul în timpul citirii contează mai mult decât spațiul pe disc.
  • Gzip: mai lent, dar mai mic (~3-4x). Mai vechi, bine suportat peste tot. Bun pentru arhivare la rece când frecvența citirii e mică.
  • Zstd (zstandard): câștigătorul modern. Mai rapid decât gzip, mai mic decât gzip, deseori comparabil cu snappy la viteza de citire. Dacă alegi azi, ăsta e probabil răspunsul. Spark îl suportă nativ din 3.2.
  • Lz4: foarte rapid, raport modest. De nișă; rareori cea mai bună alegere pentru calea fierbinte sau cea rece.

Setezi codecul per scriere sau global:

# Per write
df.write.option("compression", "zstd").parquet("s3://lake/orders/")

# Or globally for the session
spark.conf.set("spark.sql.parquet.compression.codec", "zstd")
df.write.parquet("s3://lake/orders/")

O regulă practică: dacă datele tale sunt citite des (dashboard-uri, joburi recurente), folosește snappy sau zstd, CPU-ul de citire domină. Dacă datele tale sunt citite o dată pe trimestru pentru conformitate, folosește gzip sau zstd la un nivel mai înalt, costul discului domină. Nu alege gzip pentru căi fierbinți doar pentru că e mai mic; o vei simți la fiecare interogare.

Cititorul vectorizat

Am văzut Tungsten în lecția 42. Parquet e unul dintre locurile unde Tungsten își răsplătește vizibil munca. Cititorul de Parquet din Spark e vectorizat: în loc să materializeze un obiect Row la un moment dat, decodifică column chunks în loturi de valori stocate în array-uri off-heap plate, iar operatorii din aval (filtre, proiecții, agregări) lucrează direct pe loturi. Nodul ColumnarToRow din planul de mai sus e granița unde Spark convertește în final loturile înapoi în rânduri pentru operatorii nevectorizați.

Nu configurezi asta; e activ implicit pentru tipuri primitive. Setarea relevantă, dacă ai nevoie vreodată să-l dezactivezi pentru depanare, e spark.sql.parquet.enableVectorizedReader, cu valoare implicită true. Las-o în pace în producție.

Citire și scriere în practică

Citirea e cazul simplu:

# Whole table
df = spark.read.parquet("s3://lake/orders/")

# Column projection — only reads the two columns from each row group
df = spark.read.parquet("s3://lake/orders/").select("order_id", "amount")

# With filter — predicate pushdown plus row-group skipping
df = (spark.read.parquet("s3://lake/orders/")
        .filter(F.col("country") == "IT")
        .filter(F.col("amount") > 1000))

Scrierea are mai multe butoane:

(df.write
   .mode("overwrite")
   .option("compression", "zstd")
   .parquet("s3://lake/orders/"))

Bune practici pentru partea de scriere:

  • Partiționează după o coloană cu cardinalitate mică care se potrivește cu tiparele tale de interogare (lecția 34). partitionBy("year", "month") e exemplul canonic. Partiționarea nu e același lucru cu row groups; e layout-ul de directoare deasupra fișierului. Un tabel partiționat poate avea în continuare predicate pushdown în interiorul unui fișier prin statisticile row group-urilor.
  • Țintește 100-500 MB per fișier. Mai mic și plătești overhead de metadate și costul listării. Mai mare și pierzi paralelism la citire pentru că o sarcină gestionează un singur row group (mare). Pasul coalesce / repartition înainte de scriere controlează asta.
  • Sortează în interiorul partițiilor dacă ai o coloană de filtrare predictibilă. Datele sortate înseamnă intervale min/max mai strânse per row group, ceea ce înseamnă sărire mai agresivă peste row groups. df.sortWithinPartitions("event_time").write... e o asigurare ieftină.
  • Nu scrie fișiere mici. Un antipattern obișnuit e să scrii 1.000 de directoare de partiție cu 200 de fișiere fiecare, toate de 2 MB. Spark petrece mai mult timp listând fișierele decât citindu-le. Coalesce sau compactează.

Schema-on-read, dar cu tipuri

CSV e schema-on-read în sensul leneș: fișierul nu are idee ce tipuri au coloanele lui, iar tu (sau inferența Spark) ghicești la citire. Parquet e schema-on-read în sensul mai bun: fișierul își declară schema în footer, tipurile sunt impuse la scriere, iar citirile produc întotdeauna aceleași tipuri indiferent cine citește. Primești flexibilitatea de a nu trebui să definești un tabel în avans și siguranța coloanelor tipate.

Sistemul de tipuri e și genuin bogat. Parquet are struct-uri imbricate, câmpuri repetate (array-uri), map-uri, decimale cu precizie/scară, timestamp-uri cu metadate logice care disting instant-in-time de local-datetime. Poți stoca un struct de array-uri de struct-uri și Spark îl va face round-trip curat. Asta contează când datele tale au formă de eveniment: obiecte imbricate de tip JSON se comprimă și se proiectează frumos în Parquet.

Când Parquet nu e răspunsul corect

Parquet e default-ul, dar nu e universal. Sări peste el când:

  • Streamuiești evenimente unul câte unul. Layout-ul pe coloane al Parquet înseamnă că scrierile trebuie să tamponeze un row group înainte de flush. Pentru ingestie append-only cu latență mică (Kafka spre stocare durabilă), Avro orientat pe rânduri e o potrivire mai bună. Îl vom întâlni în lecția următoare.
  • Ai nevoie de update-uri și delete-uri atomice. Fișierele Parquet sunt imutabile. UPDATE pe un tabel Parquet înseamnă rescrierea fișierelor. Delta Lake (tot în lecția următoare) stă deasupra Parquet ca să adauge semantici tranzacționale.
  • Datele sunt deja mici și structurate pentru citire umană. Un fișier de configurare cu 50 de rânduri nu are nevoie de stocare pe coloane. Folosește JSON sau YAML.

În afara acelor cazuri, default la Parquet. Aproape fiecare echipă cu care am lucrat care a încercat să fie deșteaptă („vom folosi CSV deocamdată, e mai simplu”) a irosit săptămâni de calcul șase luni mai târziu, când a migrat.

Încearcă asta

Scrie același DataFrame în trei formate și compară:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os

spark = (SparkSession.builder
         .appName("ParquetDemo")
         .master("local[*]")
         .getOrCreate())

df = spark.range(0, 1_000_000).select(
    F.col("id").alias("order_id"),
    (F.col("id") % 100).alias("country_id"),
    (F.rand() * 1000).alias("amount"),
    F.current_timestamp().alias("created_at"),
)

df.write.mode("overwrite").csv("/tmp/demo/orders_csv")
df.write.mode("overwrite").parquet("/tmp/demo/orders_parquet")
df.write.mode("overwrite").option("compression", "zstd").parquet("/tmp/demo/orders_zstd")

def size_mb(path):
    total = 0
    for root, _, files in os.walk(path):
        for f in files:
            total += os.path.getsize(os.path.join(root, f))
    return total / (1024 * 1024)

print(f"CSV:           {size_mb('/tmp/demo/orders_csv'):.1f} MB")
print(f"Parquet snappy: {size_mb('/tmp/demo/orders_parquet'):.1f} MB")
print(f"Parquet zstd:  {size_mb('/tmp/demo/orders_zstd'):.1f} MB")

# Predicate pushdown — check the plan
(spark.read.parquet("/tmp/demo/orders_parquet")
   .filter(F.col("amount") > 500)
   .select("order_id", "amount")
   .explain())

Vei vedea CSV în jur de 30-40 MB, Parquet snappy în jur de 6-8 MB, Parquet zstd un strop mai mic. Planul pentru citirea filtrată va arăta PushedFilters: [..., GreaterThan(amount, 500)] și un ReadSchema cu doar cele două coloane proiectate. Ăla e formatul de fișier care își câștigă pâinea.

Lecția următoare ne uităm la alternative: ORC, Avro și familia Delta/Iceberg/Hudi care stă deasupra Parquet ca să-l facă tranzacțional.


Referințe: documentația Apache Parquet (https://parquet.apache.org/docs/) și ghidul Apache Spark SQL data sources (https://spark.apache.org/docs/latest/sql-data-sources-parquet.html). Consultat 2026-05-01.

Caută