PySpark, de la zero Lecția 13 / 60

Schema: explicita vs dedusa

Cand sa lasi Spark sa deduca, cand sa o declari tu si de ce in productie codul aproape mereu o declara.

Un DataFrame e două lucruri prinse împreună: un sac distribuit de rânduri și o schema, adică numele coloanelor și tipurile lor, care spun cum arată acele rânduri. Datele fără schema sunt doar bytes opaci. Schema fără date e un contract. Spark are nevoie de amândouă, de fiecare dată.

Lecția de azi e despre de unde vine schema asta. Există exact două răspunsuri, Spark o ghicește pentru tine sau o scrii tu pe hârtie, iar alegerea dintre ele e una din acele decizii mici care fac diferența între un notebook care rulează în 30 de secunde și un job de producție care rulează în 30 de minute.

Ce e într-o schema

df.schema e un StructType. Un StructType e doar o listă de obiecte StructField, fiecare cu trei lucruri: un nume, un tip de date și un flag nullable. Atât. Nu e nicio magie, niciun strat de metadate, nicio stare ascunsă. Când Spark planifică o interogare, parcurge acest obiect ca să afle ce coloane există și cum să le citească.

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("schemas").getOrCreate()

df = spark.read.csv("orders.csv", header=True, inferSchema=True)
df.printSchema()
# root
#  |-- order_id: integer (nullable = true)
#  |-- customer_id: integer (nullable = true)
#  |-- amount: double (nullable = true)
#  |-- ts: timestamp (nullable = true)

printSchema() îți dă arborele lizibil pentru oameni. df.schema returnează obiectul StructType real, util când vrei să refolosești o schema sau să compari programatic două DataFrame-uri. df.dtypes returnează o listă de tupluri (nume, tip_string), forma cea mai ușor de copiat într-un script.

df.dtypes
# [('order_id', 'int'),
#  ('customer_id', 'int'),
#  ('amount', 'double'),
#  ('ts', 'timestamp')]

Trei moduri de a vedea aceeași informație. Folosește-l pe cel care se potrivește momentului.

Un detaliu mic, dar util: schema e pozițională în unele operații și după nume în altele. Când faci union, Spark potrivește coloanele după poziție în mod implicit, deci două DataFrame-uri cu aceleași nume de coloane în ordini diferite vor produce, în liniște, o uniune-gunoi. Soluția e unionByName. Vom ajunge la asta în lecția despre joins, dar e un memento că schema nu e o etichetă pasivă, ci modul în care Spark identifică coloanele la fiecare pas al planului.

Schema dedusă: ieftină când e mică, brutală când e mare

Spark poate afla schema unui fișier CSV sau JSON citindu-l. Pentru CSV, ceri asta cu inferSchema=True:

df = spark.read \
    .option("header", True) \
    .option("inferSchema", True) \
    .csv("orders.csv")

Ce face de fapt: Spark citește fișierul întreg o dată, doar ca să se uite la valori, decide ce tip ar trebui să aibă fiecare coloană (int? double? string? timestamp?), aruncă acel rezultat și apoi citește fișierul din nou ca să încarce efectiv datele. Două pase complete peste input.

Pentru un CSV exploratoriu de 10MB într-un notebook, nu observi. Totul se termină într-o secundă. Pentru un dump de 500GB într-o landing zone pe S3, tocmai ți-ai dublat costul de citire, timpul de citire și factura de I/O, ca să afli informații pe care aproape sigur le știai deja.

JSON e și mai rău. Nu există o opțiune inferSchema=False pentru JSON la fel; Spark deduce mereu, fiindcă JSON nu are header. Cu JSON imbricat sau profund variabil, pasul de inferență poate scana miliarde de înregistrări ca să unească schemele între rânduri.

Cealaltă problemă cu inferența, cea care mușcă în producție: tipurile deduse sunt presupuneri și se pot schimba între rulări. Să zicem că coloana ta customer_id e în mod normal numerică. Într-o zi, un singur rând ajunge cu valoarea "unknown", vreo santinelă null a cuiva care s-a strecurat prin sistemul din amonte. Spark vede un string într-o coloană pe care ar fi numit-o int și lărgește în liniște întreaga coloană la string. Codul tău din aval, care făcea df.customer_id + 1, acum explodează. Sau, mai rău, nu explodează și produce prostii.

Schema drift e un mod real de eșec. Inferența te face vulnerabil la el.

Mai e și costul pur al greșelii cu tipurile. Inferența va decide bucuroasă că o coloană e int când de fapt ar trebui să fie bigint, fiindcă rândurile-eșantion pe care le-a văzut încăpeau toate în 32 de biți. Apoi, peste o lună, ajunge un rând cu un id peste două miliarde și cast-ul dă overflow. Sau decide că o coloană de date e string, fiindcă un rând avea "N/A". Inferența e optimistă într-un fel pe care sistemele de producție nu-l răsplătesc.

Schema explicită: alegerea implicită din producție

Soluția e să declari tu schema și să o transmiți. Atunci Spark nu mai ghicește, ci parsează fiecare valoare în tipul pe care l-ai specificat, iar rândurile care nu se potrivesc devin null (cu mode="PERMISSIVE", valoarea implicită) sau eșuează la citire (cu mode="FAILFAST").

Forma explicită folosește StructType și StructField:

from pyspark.sql.types import (
    StructType, StructField,
    IntegerType, DoubleType, StringType, TimestampType,
)

orders_schema = StructType([
    StructField("order_id",    IntegerType(),   nullable=False),
    StructField("customer_id", IntegerType(),   nullable=False),
    StructField("amount",      DoubleType(),    nullable=True),
    StructField("ts",          TimestampType(), nullable=True),
])

df = spark.read \
    .option("header", True) \
    .schema(orders_schema) \
    .csv("orders.csv")

O singură pasă de citire. Tipuri previzibile. Dacă un rând are "banana" în coloana amount, doar acea valoare devine null; restul rândului tot se parsează. Dacă preferi ca jobul să eșueze zgomotos atunci când se întâmplă asta, adaugă .option("mode", "FAILFAST").

Forma scurtă folosește un string DDL. E aceeași informație, mai puțină ceremonie:

orders_schema = "order_id INT, customer_id INT, amount DOUBLE, ts TIMESTAMP"

df = spark.read \
    .option("header", True) \
    .schema(orders_schema) \
    .csv("orders.csv")

Eu prefer string-ul DDL pentru scheme plate. E două linii în loc de șapte, se citește ca un CREATE TABLE și e prietenoasă cu diff-urile: când cineva adaugă o coloană, diff-ul e o linie, nu cinci.

Pentru date profund imbricate (array-uri de struct-uri, map-uri de struct-uri etc.), forma explicită e mai clară. Folosește-o pe cea care se potrivește.

Flag-ul nullable e mai mult documentație

Vei observa că fiecare StructField ia un argument nullable. Ai putea presupune că setarea lui pe False îl face pe Spark să refuze rândurile în care acea coloană e null.

Nu o face. Sau, mai bine zis, o face uneori și nu o face alteori, iar comportamentul nu e cel pe care l-ai spera.

nullable=False e un hint pentru optimizatorul Catalyst. Îi spune Spark-ului „poți presupune că această coloană nu e niciodată null când planifici interogări împotriva ei”, ceea ce permite optimizatorului să sară peste căile de cod pentru tratarea null-urilor. Dacă efectiv bagi un null acolo unde ai promis că nu va fi unul, nu primești o eroare curată. Primești comportament nedefinit: uneori rândul e eliminat în liniște, alteori crapă adânc într-un generator de cod, alteori trece pur și simplu.

Concluzia: setează nullable=True (valoarea implicită) pe tot, decât dacă ești 100% sigur. Dacă vrei să impui non-null, fă-o explicit cu un filtru sau un check după citire:

bad_rows = df.filter(df.order_id.isNull())
if bad_rows.count() > 0:
    raise ValueError(f"Got {bad_rows.count()} rows with null order_id")

Ăla e un check care chiar se declanșează. Flag-ul nullable=False e un contract pe care Spark nu-l va impune pentru tine.

Când inferența e ok

Nu zic să nu deduci niciodată. Sunt cazuri în care e unealta potrivită:

  • Explorare punctuală într-un notebook. Ai primit un fișier, nu știi ce e în el, vrei să te uiți. Inferența e mai rapidă decât să tastezi o schema pentru date pe care le vei arunca în zece minute.
  • Fișiere destul de mici încât dubla citire să nu conteze. Sub o sută de megabyți, pe disc local, vei petrece mai mult timp scriind schema decât rulând a doua pasă.
  • Parquet, ORC, Delta, Avro. Aceste formate includ schema chiar în fișier. Spark o citește din footer la cost esențial nul. Nu există un pas de inferență. Citirea Parquet „fără o schema” e fundamental diferită de citirea CSV fără o schema. Nicio penalizare de două pase, niciun risc de drift.

Deci regula e mai îngustă decât „declară mereu”. Sună așa:

Pentru formatele text fără schema (CSV, JSON) în producție, declară mereu. Pentru formatele binare auto-descriptive (Parquet, ORC, Delta, Avro), lasă fișierul să-ți spună.

Ăsta e unul dintre motivele pentru care lumea data engineering mută CSV/JSON spre Parquet cât poate de repede. Schema devine o proprietate a datelor, nu un lucru pe care trebuie să-l întreții într-un fișier Python separat care iese din sincron cu realitatea.

Citirea a ceea ce ți-a dat Spark

Când ți se pune în mână un DataFrame venit din altă parte, un join, un apel de funcție, un notebook început de un coleg, cele trei metode de inspecție au fiecare locul lor:

df.printSchema()        # vedere arbore, pentru oameni
df.schema               # obiect StructType, pentru cod
df.dtypes               # lista de tupluri (nume, tip), pentru copy-paste rapid

printSchema() e ce vei folosi 80% din timp. E diagnosticul. Când o operație din aval eșuează cu o eroare de tip, ăsta e primul lucru de verificat.

df.schema e ce vei folosi când scrii cod reutilizabil. Poți salva o schema pe disc ca JSON și o poți încărca înapoi:

schema_json = df.schema.json()
# {"fields":[{"name":"order_id","type":"integer","nullable":false,...}], ...}

# Mai târziu, în altă parte:
from pyspark.sql.types import StructType
import json
restored = StructType.fromJson(json.loads(schema_json))

Așa funcționează sub capotă uneltele de tip schema registry și straturile de verificare a contractelor. E bine de știut că există; nu o vei scrie de mână prea des.

Un pattern care merită cunoscut: când schema unui job este contractul dintre două echipe, stochează schema ca un fișier .json pus în Git, alături de codul care citește datele. Citirile încarcă schema de pe disc prin StructType.fromJson(...). Schimbările de schema devin diff-uri Git care trec prin code review ca orice altă modificare. Așa transformi „schema” din cunoaștere tribală într-un lucru pe care un nou angajat îl poate găsi din prima zi.

Rulează asta pe propria mașină

Aruncă asta într-un notebook sau script. Va genera un mic CSV, apoi îl citește în trei moduri diferite, ca să poți vedea singur diferența.

from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType, StructField,
    IntegerType, DoubleType, StringType, TimestampType,
)
from pathlib import Path

spark = SparkSession.builder.appName("schemas-demo").getOrCreate()

# Make a tiny CSV with a deliberately tricky row
csv_path = Path("/tmp/orders_demo.csv")
csv_path.write_text(
    "order_id,customer_id,amount,ts\n"
    "1,42,59.00,2026-01-03 10:32:00\n"
    "2,42,29.00,2026-01-04 14:22:00\n"
    "3,17,banana,2026-01-05 09:15:00\n"   # bad amount
    "4,8,149.00,2026-01-06 11:40:00\n"
)

# 1. No schema, no inference: everything is a string
df1 = spark.read.option("header", True).csv(str(csv_path))
print("=== Default (everything string) ===")
df1.printSchema()
df1.show()

# 2. Inferred: Spark guesses, and guesses wrong because of 'banana'
df2 = spark.read.option("header", True).option("inferSchema", True).csv(str(csv_path))
print("=== Inferred ===")
df2.printSchema()
df2.show()
# Notice: amount becomes string, because of one bad row.

# 3. Explicit DDL string: amount is DOUBLE, the bad row becomes null
schema_ddl = "order_id INT, customer_id INT, amount DOUBLE, ts TIMESTAMP"
df3 = spark.read.option("header", True).schema(schema_ddl).csv(str(csv_path))
print("=== Explicit (DDL) ===")
df3.printSchema()
df3.show()

# 4. Explicit StructType: same result, more verbose
schema_obj = StructType([
    StructField("order_id",    IntegerType()),
    StructField("customer_id", IntegerType()),
    StructField("amount",      DoubleType()),
    StructField("ts",          TimestampType()),
])
df4 = spark.read.option("header", True).schema(schema_obj).csv(str(csv_path))
print("=== Explicit (StructType) ===")
df4.printSchema()
df4.show()

Rulează. Uită-te ce tip a primit amount în fiecare caz. Versiunea dedusă care se lărgește la string din cauza unui singur rând stricat e întreg argumentul de producție pentru declararea schemelor, în opt caractere vizibile.

Lecția următoare: select și filter, cele două operații pe care le vei face de mii de ori, inclusiv cele patru moduri diferite de a te referi la o coloană, dintre care doar trei sunt sigure.


Referință: Apache Spark Python API (https://spark.apache.org/docs/latest/api/python/), consultat 2026-05-01.

Caută