PySpark, de la zero Lecția 11 / 60

Scrierea datelor: moduri, partitii si problema numarului de fisiere

Save modes, scrieri partitionate, diferenta dintre multe fisiere mici si unul gigant si de ce Parquet e default-ul cu motiv.

Un job Spark care nu scrie nimic e un job Spark care a rulat într-un notebook și nu a ajutat pe nimeni. Astăzi facem munca noastră durabilă. Scrierea seamănă cu oglinda citirii, df.write în loc de spark.read, dar are propriul morman de capcane. Save modes pe care le vei uita că există până șterg producția. Layout-uri de partiții care transformă un job de cinci minute într-unul de cinci ore. Notoria problemă „am zece mii de fișiere de 4KB în S3 și acum citirea e mai lentă decât scrierea”.

Dacă ai citit cu atenție lecția 9, ai văzut deja oasele lui df.write.parquet(...). Ridicăm capacul cum trebuie.

Forma unei scrieri

Aceeași construcție duală ca la citire:

# Metode de conveniență
df.write.csv("./out/orders.csv")
df.write.json("./out/orders.json")
df.write.parquet("./out/orders.parquet")

# Forma builder
(df.write
   .format("parquet")
   .mode("overwrite")
   .option("compression", "snappy")
   .save("./out/orders.parquet"))

Cele două forme sunt echivalente. Eu trec implicit la builder odată ce o scriere are mai mult de două opțiuni; se citește de sus în jos și e mai ușor de comparat în code review.

Pregătire, în caz că pornești de la zero:

from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .appName("WritingData")
         .master("local[*]")
         .getOrCreate())
spark.sparkContext.setLogLevel("WARN")

orders = (spark.read
          .option("header", "true")
          .option("inferSchema", "true")
          .csv("./data/orders.csv"))

Cele patru save modes

.mode(...) decide ce se întâmplă când calea destinație există deja. Există exact patru valori și ar trebui să le memorezi:

df.write.mode("error").parquet(path)             # default
df.write.mode("errorifexists").parquet(path)     # alias pentru "error"
df.write.mode("append").parquet(path)
df.write.mode("overwrite").parquet(path)
df.write.mode("ignore").parquet(path)

error (și sinonimul lui errorifexists) e implicit. Dacă calea există deja, Spark refuză să scrie și aruncă o excepție. E default-ul corect; te oprește să suprascrii din greșeală datele de ieri. Multe scripturi one-off folosesc error pur și simplu nespecificând un mod.

append scrie fișiere noi alături de ce există deja acolo. Fișierele existente nu sunt atinse; fișiere part-*.parquet noi apar în același folder. Așa faci scrieri zilnice incrementale:

todays_orders.write.mode("append").parquet("./data/orders_lake.parquet")

Un pericol subtil: modul append nu face dedupe. Dacă faci append cu aceleași date de două ori, le ai de două ori. Nu există INSERT IGNORE, niciun upsert. Append e literalmente un append. (Delta Lake și Apache Iceberg rezolvă asta cu MERGE INTO. Parquet simplu nu.)

overwrite șterge calea existentă și scrie de la zero. Asta e distructiv și ar trebui să-l tratezi ca atare:

final.write.mode("overwrite").parquet("./data/orders_clean.parquet")

Overwrite e modul corect pentru pipeline-uri „reconstruiește acest dataset de la zero de fiecare dată”, tipic pentru tabele dimensiune și mart-uri agregate. E modul greșit pentru pipeline-uri „adaugă lotul de azi în lake”. Dacă le confunzi, fie acumulezi duplicate, fie ștergi un an de istorie.

Există o variantă cu granularitate mai fină numită dynamic partition overwrite, care suprascrie doar partițiile prezente în scrierea ta (nu toată calea). E un flag de configurare, nu un mod:

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

todays_orders.write \
    .mode("overwrite") \
    .partitionBy("OrderDate") \
    .parquet("./data/orders_lake.parquet")

Cu dynamic, doar folderul OrderDate=2026-03-28/ e înlocuit; celelalte date rămân. Fără dynamic (modul default static), tot folderul orders_lake.parquet/ e șters. Numărul de echipe care au învățat această diferență pe pielea lor e mare. Setează dynamic global în orice pipeline care suprascrie pe partiție.

ignore e cel ciudat. Dacă calea există, nu face nimic, în tăcere. Fără eroare, fără overwrite, fără append. Util în scripturi de pregătire idempotente („creează acest tabel de lookup dacă nu e deja acolo”) și aproape nimic altceva. Am livrat probabil două apeluri mode("ignore") în cinci ani.

Scrieri partiționate: layout-ul de directoare care face citirile rapide

partitionBy(col1, col2, ...) scrie o structură de directoare în stil Hive, unde fiecare valoare de partiție devine un nume de folder:

orders.write \
    .mode("overwrite") \
    .partitionBy("Country") \
    .parquet("./data/orders_by_country.parquet")

Pe disc:

orders_by_country.parquet/
  _SUCCESS
  Country=NL/
    part-00000-...snappy.parquet
  Country=IT/
    part-00000-...snappy.parquet
  Country=DE/
    part-00000-...snappy.parquet
  Country=RO/
    part-00000-...snappy.parquet

Segmentul Country= nu e decorație. E un lucru parsabil, interogabil. Când recitești acest layout cu un filtru pe Country, Spark deschide doar folderele care se potrivesc:

italian_only = (spark.read
                .parquet("./data/orders_by_country.parquet")
                .filter("Country = 'IT'"))
italian_only.explain(True)

Uită-te la planul fizic și vei vedea PartitionFilters: [isnotnull(Country#X), (Country#X = IT)]; Spark a împins filtrul chiar în listarea fișierelor. Folderele DE, NL și RO nu sunt deschise niciodată. Asta se numește partition pruning, iar pe un dataset multi-TB e diferența dintre a scana 4TB și a scana 4GB.

Poți partiționa pe mai multe coloane:

orders.write \
    .mode("overwrite") \
    .partitionBy("Country", "OrderDate") \
    .parquet("./data/orders_by_c_d.parquet")
orders_by_c_d.parquet/
  Country=IT/
    OrderDate=2026-02-15/
      part-00000-...snappy.parquet
    OrderDate=2026-03-22/
      part-00000-...snappy.parquet
  Country=NL/
    ...

Spark prună de la stânga la dreapta: un filtru pe Country sare peste majoritatea arborelui; un filtru pe Country ȘI OrderDate sare peste și mai mult.

Coloana de partiție nu este stocată în interiorul fișierelor Parquet; e codificată în cale. Când Spark citește datasetul înapoi, reconstruiește coloana din numele folderelor. Efect secundar: nu poți avea un rând în care Country lipsește, pentru că nu există niciun folder pentru asta. Valorile null de partiție primesc un nume special de folder precum Country=__HIVE_DEFAULT_PARTITION__. În cea mai mare parte e ok, ocazional surprinzător.

Capcana cardinalității

Partiționarea e un cuțit, iar întrebarea e în ce direcție taie. Regula este:

Partiționează pe coloane cu cardinalitate scăzută până la medie. Țintește partiții de cel puțin ~100MB fiecare, ideal 256MB până la 1GB.

Country are, ce, 200 de valori distincte la nivel mondial. Poate 4 în datasetul tău. E o coloană de partiție grozavă.

OrderDate e cardinalitate medie: un folder pe zi, deci 365 de foldere pe an. Tot bună pentru date de tip serii temporale și șablonul de facto pentru lake-uri de date.

OrderId ar fi catastrofal. Un folder per comandă. Zece milioane de comenzi înseamnă zece milioane de foldere, fiecare cu un fișier Parquet care conține un rând, fiecare fișier cu un header Parquet mai mare decât datele lui. Recitirea datasetului devine o operație de metadate împotriva a zece milioane de fișiere, iar factura de S3 plânge. Asta e problema fișierelor mici în forma ei pură.

Un exemplu real: un analist a partiționat odată un dataset de 50GB pe (Country, OrderDate, CustomerId). Rezultatul a fost 1.4 milioane de fișiere Parquet minuscule. Datasetul original se citea în 30 de secunde. Versiunea partiționată dura 40 de minute. L-am rescris partiționat doar pe (Country, OrderDate) și a coborât înapoi la 35 de secunde. CustomerId aparținea în date, nu în cale.

Euristică pentru alegerea coloanei de partiție:

  • Cardinalitate: sub câteva mii de valori distincte, total. Maxim zeci de mii.
  • Selectivitate la filtrare: oamenii interoghează des pe această coloană. Dacă nimeni nu filtrează după ea, partiționarea după ea e cost pur.
  • Distribuție uniformă: aproximativ aceleași date per partiție. O coloană unde 90% din rânduri au aceeași valoare îți dă o partiție gigantică și o mie minuscule, ce e mai rău din ambele lumi.

Pentru majoritatea dataseturilor analitice, răspunsul corect e partitionBy("date_column") și gata. Orice mai fin necesită un motiv puternic.

Numărul de fișiere, dimensiunea și .coalesce(1)

Chiar și în cadrul unei singure partiții, Spark scrie un fișier per task de executor care era activ când a rulat scrierea. Cu 8 partiții în DataFrame-ul tău, vei obține 8 part-fișiere în output:

orders.write.mode("overwrite").parquet("./data/multi_file.parquet")
import os
print(sorted(os.listdir("./data/multi_file.parquet")))
# ['_SUCCESS', 'part-00000-...snappy.parquet', 'part-00001-...snappy.parquet', ...]

Dacă ai nevoie de un singur fișier output, să zicem că livrezi un CSV unui partener care refuză un folder, tentația e coalesce(1):

orders.coalesce(1).write.mode("overwrite").csv("./data/single.csv")

coalesce(1) colapsează tot DataFrame-ul într-o singură partiție înainte de a scrie. Rezultatul e un fișier. Simplu, intuitiv și o capcană.

Ce se întâmplă de fapt: Spark forțează toate datele printr-un singur task de executor. Toate. Un pipeline care a rulat pe 100 de core-uri în paralel rulează deodată pe 1 core, iar acel core trebuie să țină fiecare rând în memorie cât scrie. Pe 100MB e ok. Pe 100GB dă OOM. Pe 1TB s-a întâmplat să dărâme clustere.

Cele două utilizări sigure ale lui coalesce(1):

  1. Output-ul e cu adevărat mic (sub 1GB, comod).
  2. Chiar ai nevoie de un singur fișier (un CSV pentru un consumator non-Spark, un raport zilnic mic).

Pentru orice altceva, acceptă output-ul multi-fișier. Majoritatea uneltelor (Spark, Hive, Presto, DuckDB, Pandas via glob) gestionează nativ folderele Parquet multi-fișier.

Dacă ai problema opusă, prea multe fișiere mici, soluția corectă e repartition(N) înainte de scriere, unde N e „aproximativ, câte fișiere vreau?”:

# Țintește fișiere de ~256MB. Dacă DataFrame-ul tău e cam 25GB, asta e ~100 de fișiere.
big_df.repartition(100).write.mode("overwrite").parquet("./out/big.parquet")

repartition face un shuffle complet și îți dă exact N partiții. coalesce reduce doar numărul de partiții fără shuffle, dar nu poate crește. Regulă de bază: repartition(N) ca să crești, coalesce(N) ca să micșorezi, ambele cu N ales ca să aterizezi fiecare fișier în intervalul 100MB-1GB.

Un script complet de scriere

Adunând șabloanele, genul de script care aterizează la sfârșitul unui job ETL real:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month

spark = (SparkSession.builder
         .appName("WriteShowcase")
         .master("local[*]")
         .getOrCreate())
spark.sparkContext.setLogLevel("WARN")

# Fă din dynamic partition overwrite default-ul: o gentilețe pentru viitorul tău.
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

orders = (spark.read
          .option("header", "true")
          .option("inferSchema", "true")
          .csv("./data/orders.csv"))

# Adaugă year/month pentru coloanele de partiție. Nu partiționa pe zi pe un dataset mic,
# dar pe un lake real, year/month/day e layout-ul standard.
enriched = (orders
            .withColumn("Year",  year(col("OrderDate")))
            .withColumn("Month", month(col("OrderDate"))))

# 1. Intern: Parquet, partiționat, overwrite (dynamic).
(enriched.write
         .mode("overwrite")
         .partitionBy("Year", "Month")
         .option("compression", "snappy")
         .parquet("./data/orders_lake.parquet"))

# 2. Append pentru lotul de azi: aceeași cale, fișiere noi.
todays = enriched.filter("OrderDate = '2026-03-28'")
(todays.write
       .mode("append")
       .partitionBy("Year", "Month")
       .parquet("./data/orders_lake.parquet"))

# 3. Raport zilnic: CSV mic, un singur fișier, pentru un consumator non-Spark.
report = (orders.groupBy("Country").sum("Total")
                .withColumnRenamed("sum(Total)", "total_revenue"))
(report.coalesce(1)
       .write
       .mode("overwrite")
       .option("header", "true")
       .csv("./data/daily_report.csv"))

# 4. Citește înapoi ca să dovedești că merge.
spark.read.parquet("./data/orders_lake.parquet").show()

spark.stop()

Trei scrieri, trei forme diferite: Parquet pentru stocare internă, CSV pentru consumator, append pentru felia zilnică. Orice pipeline real se termină într-o combinație din acestea.

Stocare în cloud, pe scurt

Totul de aici funcționează la fel pe S3, ADLS sau GCS; schimbă ./data/... în s3a://bucket/path/... (sau abfss://, sau gs://) și codul e identic. Singurele diferențe practice sunt:

  • Scrierile în cloud sunt mai lente decât local, mai ales pentru multe fișiere mici (fiecare fișier e un round-trip de rețea).
  • S3 în special are semantică slabă de redenumire, așa că markerele _SUCCESS și scrierile atomice au nevoie de grijă suplimentară. Spark gestionează asta intern; clienții S3 moderni sunt acum strict consistenți, deci e mai puțin un foc decât a fost odată.
  • Politicile de lifecycle, versionarea bucket-ului și IAM sunt complet în afara scopului jobului Spark; trăiesc cu un nivel mai jos.

Dacă faci muncă serioasă în cloud, upgrade-ul obișnuit e la un format de tabel tranzacțional (Delta, Iceberg, Hudi) deasupra Parquet-ului din cloud. Aceleași scrieri pe care tocmai le-ai învățat, plus ACID, plus MERGE INTO, plus time travel. Modulul 9 are propria lecție despre Delta Lake.

Vom reveni la profunzimea partiționării în Modulul 6, unde acoperim bucketing, Z-ordering și cum să proiectezi un layout de partiție pentru o încărcătură de query pe care chiar o înțelegi. Pentru moment, titlul e: alege o coloană de tip dată sau categorie cu cardinalitate scăzută, țintește partiții în intervalul 100MB-1GB și nu face niciodată coalesce(1) pe ceva ce nu încape pe laptopul tău.

Lecția următoare e ultima din acest modul: modul local versus un cluster real. Ce se schimbă, ce nu și bug-urile care apar doar când există executori reali în peisaj.

Caută