Un job Spark care nu citește date este un job Spark care nu face muncă. Astăzi punem date în Spark din cele trei formate pe care le vei întâlni în 95% din job-urile reale: CSV, JSON și Parquet. Fiecare are valori implicite diferite, capcane diferite și un cost-pe-rând foarte diferit. Alegerea formatului potrivit și a opțiunilor de citire potrivite este diferența dintre un job de 10 secunde și un job de 10 minute.
Vom atinge și compromisul schema-on-read: îi permiți lui Spark să ghicească tipurile coloanelor sau i le spui? Alegerea greșită pe un dataset de 200GB îți va arde o oră din dimineață înainte să-ți dai seama.
Setup
Același SparkSession ca în lecția trecută, presupunem că spark este deja viu:
from pyspark.sql import SparkSession
spark = (SparkSession.builder
.appName("ReadingData")
.master("local[*]")
.config("spark.sql.shuffle.partitions", "8")
.getOrCreate())
spark.sparkContext.setLogLevel("WARN")
Avem nevoie de niște date sample. Salvează snippet-ul ăsta și rulează-l o dată ca să generezi trei fișiere mici:
import json
from pathlib import Path
data_dir = Path("./data")
data_dir.mkdir(exist_ok=True)
# CSV
(data_dir / "orders.csv").write_text(
"OrderId,CustomerId,Total,Country,OrderDate\n"
"1001,1,59.00,NL,2026-03-05\n"
"1002,1,29.00,NL,2026-03-18\n"
"1003,2,149.00,IT,2026-02-15\n"
"1004,2,89.50,IT,2026-03-22\n"
"1005,3,199.00,DE,2026-03-10\n"
"1006,4,42.42,RO,2026-03-28\n"
)
# JSON Lines (o inregistrare pe linie - formatul preferat de Spark)
(data_dir / "customers.jsonl").write_text(
'{"CustomerId": 1, "Name": "Anna", "Country": "NL"}\n'
'{"CustomerId": 2, "Name": "Marco", "Country": "IT"}\n'
'{"CustomerId": 3, "Name": "Dieter","Country": "DE"}\n'
'{"CustomerId": 4, "Name": "Ioana", "Country": "RO"}\n'
)
# JSON multi-linie (un singur array mare - se intinde pe mai multe linii per "inregistrare")
(data_dir / "customers_pretty.json").write_text(json.dumps([
{"CustomerId": 1, "Name": "Anna", "Country": "NL"},
{"CustomerId": 2, "Name": "Marco", "Country": "IT"},
{"CustomerId": 3, "Name": "Dieter", "Country": "DE"},
{"CustomerId": 4, "Name": "Ioana", "Country": "RO"},
], indent=2))
Acum avem ceva de citit.
Două moduri de a scrie aceeași citire
PySpark îți oferă două forme de builder. Fac exact același lucru.
# Forma scurta
df = spark.read.csv("./data/orders.csv", header=True)
# Builder
df = (spark.read
.format("csv")
.option("header", "true")
.load("./data/orders.csv"))
Folosește forma scurtă pentru citiri punctuale. Folosește builder-ul când ai multe opțiuni sau când formatul este parametrizat (de exemplu citește dintr-o configurație care zice csv astăzi și parquet mâine).
CSV: formatul care minte că e simplu
df = spark.read.csv("./data/orders.csv")
df.show()
df.printSchema()
Rulează asta și uită-te:
+----+----------+------+-------+----------+
| _c0| _c1| _c2| _c3| _c4|
+----+----------+------+-------+----------+
| Id|CustomerId| Total|Country| OrderDate|
|1001| 1| 59.00| NL|2026-03-05|
|1002| 1| 29.00| NL|2026-03-18|
...
Două probleme deja. Rândul de header a fost tratat ca date și fiecare coloană este string. Valorile implicite CSV ale lui Spark sunt pesimiste: fără să-i spui altceva, presupune:
- Fără rând de header.
- Fiecare coloană este string.
- Virgula este delimitatorul.
- Ghilimelele duble sunt caracterul de quote.
Aproape întotdeauna trebuie să suprascrii cel puțin primele două:
df = (spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("./data/orders.csv"))
df.show()
df.printSchema()
+-------+----------+-----+-------+----------+
|OrderId|CustomerId|Total|Country| OrderDate|
+-------+----------+-----+-------+----------+
| 1001| 1| 59.0| NL|2026-03-05|
...
root
|-- OrderId: integer (nullable = true)
|-- CustomerId: integer (nullable = true)
|-- Total: double (nullable = true)
|-- Country: string (nullable = true)
|-- OrderDate: date (nullable = true)
Mai bine. Dar observă ce face de fapt inferSchema=True: Spark citește întreg fișierul o dată ca să stabilească tipurile, apoi îl citește din nou ca să încarce datele. Pe un CSV de 6 rânduri asta e gratis. Pe un CSV de 200GB asta e o scanare suplimentară de 200GB pe care n-aveai nevoie să o faci. Pentru orice mai mare decât „explorare ad-hoc”, asta e greșit.
Răspunsul corect în producție este să declari schema explicit:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, DateType
orders_schema = StructType([
StructField("OrderId", IntegerType(), nullable=False),
StructField("CustomerId", IntegerType(), nullable=False),
StructField("Total", DoubleType(), nullable=False),
StructField("Country", StringType(), nullable=False),
StructField("OrderDate", DateType(), nullable=False),
])
df = (spark.read
.option("header", "true")
.schema(orders_schema)
.csv("./data/orders.csv"))
df.show()
df.printSchema()
O singură trecere prin fișier. Tipuri stricte. Erorile de tip eșuează rapid (sau devin null, în funcție de mode).
Opțiuni CSV pe care le vei folosi efectiv
Există aproximativ 30 de opțiuni CSV. Mâna care contează:
df = (spark.read
.option("header", "true") # Primul rand este nume de coloane
.option("inferSchema", "false") # Nu ghici - furnizeaza schema explicit
.option("delimiter", ",") # Sau ";", "\t", "|"...
.option("quote", '"') # Caracterul de quoting al campului
.option("escape", '"') # Caracter escape in interiorul campurilor in ghilimele
.option("nullValue", "") # Ce string inseamna NULL
.option("dateFormat", "yyyy-MM-dd") # Implicit ISO-8601
.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss")
.option("mode", "PERMISSIVE") # PERMISSIVE | DROPMALFORMED | FAILFAST
.schema(orders_schema)
.csv("./data/orders.csv"))
mode este cel despre care nu vorbește nimeni și de care toată lumea are nevoie:
- PERMISSIVE (implicit): rândurile rele se umplu cu null. Rândul malformed merge într-o coloană magică numită
_corrupt_recorddacă schema ta o include. Altfel, pierdut în tăcere. - DROPMALFORMED: rândurile rele sunt aruncate în tăcere. Nu folosi asta. Nu vei ști niciodată câte rânduri ai pierdut.
- FAILFAST: rândurile rele ridică o excepție imediat. Folosește asta în producție. Eșecurile zgomotoase sunt mai bune decât corupția silențioasă.
JSON: două formate cu aceeași extensie
Cititorul JSON al lui Spark așteaptă JSON Lines implicit: un obiect JSON pe linie, fără array înconjurător, fără virgule între înregistrări:
df = spark.read.json("./data/customers.jsonl")
df.show()
df.printSchema()
+-------+----------+------+
|Country|CustomerId| Name|
+-------+----------+------+
| NL| 1| Anna|
| IT| 2| Marco|
| DE| 3|Dieter|
| RO| 4| Ioana|
+-------+----------+------+
root
|-- Country: string (nullable = true)
|-- CustomerId: long (nullable = true)
|-- Name: string (nullable = true)
Observă că JSON îți oferă tipuri gratis. Formatul codifică integers, float-uri, boolean-uri și string-uri diferit, deci Spark nu trebuie să ghicească. (Tot scanează fișierul o dată ca să descopere schema, adică ce câmpuri există, dar tipurile sunt neambigue.)
Cealaltă formă JSON pe care o vei vedea este un singur array mare care se întinde pe mai multe linii:
[
{"CustomerId": 1, "Name": "Anna", ...},
{"CustomerId": 2, "Name": "Marco", ...},
...
]
Asta returnează majoritatea API-urilor. Cititorul JSON implicit al lui Spark îl va mutila: fiecare linie este parsată independent, iar liniile cu paranteze și virgule eșuează la parsing. Ca să citești formatul ăsta, setează multiLine=True:
df = (spark.read
.option("multiLine", "true")
.json("./data/customers_pretty.json"))
df.show()
Notă de performanță: multiLine=True înseamnă că Spark nu poate împărți fișierul între executori, parserul JSON trebuie să vadă întregul fișier ca un singur stream. Pe un JSON pretty-printed de 50GB primești un singur task și un singur nucleu CPU. Convertește la JSON Lines (jq -c '.[]' input.json > output.jsonl) în momentul în care fișierul devine mare.
JSON gestionează și câmpuri imbricate natural:
nested = spark.read.json(spark.sparkContext.parallelize([
'{"order": 1001, "customer": {"id": 1, "name": "Anna"}}',
'{"order": 1002, "customer": {"id": 2, "name": "Marco"}}',
]))
nested.printSchema()
nested.select("order", "customer.name").show()
root
|-- customer: struct (nullable = true)
| |-- id: long (nullable = true)
| |-- name: string (nullable = true)
|-- order: long (nullable = true)
+-----+-----+
|order| name|
+-----+-----+
| 1001| Anna|
| 1002|Marco|
+-----+-----+
Coloana customer vine prin ca un struct. Vom ajunge la gestionarea datelor imbricate cum trebuie în Modulul 4.
Parquet: cel pe care Spark chiar îl place
Parquet este formatul pentru care Spark este construit. Este un format binar columnar cu o schemă încorporată, compresie per coloană și statistici de row-group care îi permit lui Spark să sară peste bucăți întregi dintr-un fișier când filtrează.
Hai să scriem DataFrame-ul orders în Parquet, apoi să-l citim înapoi:
df = (spark.read
.option("header", "true")
.schema(orders_schema)
.csv("./data/orders.csv"))
df.write.mode("overwrite").parquet("./data/orders.parquet")
Asta scrie un folder, nu un singur fișier. În interiorul ./data/orders.parquet/ vei vedea ceva de genul:
_SUCCESS
part-00000-abc123.snappy.parquet
part-00001-abc123.snappy.parquet
...
Un fișier per partiție, fiecare comprimat individual (Snappy implicit). Marker-ul _SUCCESS îi spune lui Spark și uneltelor Hadoop „scrierea asta s-a terminat curat.” Dacă nu vezi _SUCCESS, scriitorul s-a prăbușit la mijlocul scrierii și ar trebui să tratezi output-ul ca fiind rău.
Citește-l înapoi:
df2 = spark.read.parquet("./data/orders.parquet")
df2.show()
df2.printSchema()
+-------+----------+-----+-------+----------+
|OrderId|CustomerId|Total|Country| OrderDate|
+-------+----------+-----+-------+----------+
| 1001| 1| 59.0| NL|2026-03-05|
...
Fără opțiuni necesare. Fără inferență de schemă. Aceleași tipuri ca sursa. Mai rapid. Mai mic. Mai strict. De asta câștigă Parquet.
De ce Parquet este rapid: column pruning
Un row-store precum CSV stochează 1001,1,59.00,NL,2026-03-05 ca o singură bucată. Ca să citești doar coloana Total, trebuie să scanezi fiecare byte al fiecărui rând.
Parquet stochează toate valorile OrderId împreună, apoi toate valorile CustomerId, apoi toate valorile Total. Ca să citească doar Total, Spark citește doar acea coloană de pe disc. Pe un tabel lat (peste 50 de coloane), asta este adesea de 10 până la 50 de ori mai rapid decât CSV.
Rulează asta ca să simți diferența:
# Citeste Parquet, proiecteaza o singura coloana
spark.read.parquet("./data/orders.parquet").select("Total").show()
# Citeste CSV, proiecteaza o coloana - Spark tot trebuie sa parseze fiecare rand
(spark.read
.option("header", "true")
.schema(orders_schema)
.csv("./data/orders.csv")
.select("Total")
.show())
Pe 6 rânduri nu poți vedea diferența. Pe 6 milioane de rânduri, citirea Parquet se termină înainte ca citirea CSV să se încălzească.
Moduri de scriere Parquet
df.write.mode("overwrite").parquet(path) # Sterge si rescrie
df.write.mode("append").parquet(path) # Adauga fisiere noi pe langa cele vechi
df.write.mode("ignore").parquet(path) # Daca path-ul exista, nu face nimic
df.write.mode("error").parquet(path) # Implicit - ridica daca path-ul exista
error este implicit și este implicitul corect: te oprește din a strica accidental date. În scripturile de producție folosesc de obicei overwrite doar cu partiționare a path-ului bazată pe dată, astfel încât fiecare zi scrie în propriul folder, iar „overwrite” afectează doar slot-ul de azi.
Partiționare la scriere
df.write.mode("overwrite").partitionBy("Country").parquet("./data/orders_part.parquet")
Asta scrie un subdirector per valoare Country:
orders_part.parquet/
Country=NL/
part-00000-...snappy.parquet
Country=IT/
part-00000-...snappy.parquet
Country=DE/
part-00000-...snappy.parquet
Country=RO/
part-00000-...snappy.parquet
Acum spark.read.parquet(...).filter("Country = 'IT'") citește doar folderul Country=IT. Săritul peste date este cea mai ieftină victorie de performanță existentă.
Dar, și asta contează, partiționează doar după coloane cu cardinalitate scăzută. Partiționarea după OrderId ar crea un fișier per comandă. Partiționarea după Country (4 până la 50 de valori) este excelentă. Partiționarea după OrderDate (un folder per zi) este pattern-ul standard pentru date time-series. Vom reveni la asta în lecția 35.
Compromisul schema-on-read
Trei poziții pe care le poți lua când citești date:
1. Schemă inferată (inferSchema=True pentru CSV; implicit pentru JSON).
- Avantaje: zero ceremonie, Spark își dă seama.
- Dezavantaje: trecere suplimentară completă peste fișier pentru CSV. Fragil: dacă fișierul de vineri are un string într-o coloană numerică, job-ul tău de luni se rupe. Fără garanții de tip în codul tău.
- Când să folosești: explorare ad-hoc. Notebook-uri cu un singur dezvoltator. Fișiere suficient de mici încât trecerea suplimentară este gratis.
2. Schemă explicită (.schema(StructType(...))).
- Avantaje: o singură trecere. Tipuri stricte. Erorile de tip prinse devreme. Auto-documentare.
- Dezavantaje: mai mult cod. Trebuie ținută în sincron cu sursa dacă se adaugă coloane.
- Când să folosești: producție. Orice rulează pe schedule. Orice unde tipurile greșite ar cauza probleme de calitate a datelor în aval.
3. Schemă în fișier (Parquet, ORC, Avro, Delta).
- Avantaje: fără schemă de menținut. Tipuri stricte. Schema evoluează cu datele.
- Dezavantaje: necesită controlul și asupra părții de scriere: nu poți face asta cu CSV-uri care sosesc de la un partener.
- Când să folosești: orice date pe care le produce propriul tău pipeline. Mută din CSV în Parquet la primul hop intern, apoi nu mai atinge niciodată CSV.
Într-un data lake tipic, pattern-ul este: CSV (cu schemă explicită) la marginea de ingestie, Parquet (sau Delta) pentru orice intern. Vom petrece lecția 13 chiar pe definiția de schemă: fiecare tip PySpark, când să folosești ce, cum să gestionezi nullability, schema merging la append.
Un script complet de citire și afișare
Punând totul cap la cap:
from pyspark.sql import SparkSession
from pyspark.sql.types import (StructType, StructField,
IntegerType, DoubleType, StringType, DateType)
spark = (SparkSession.builder
.appName("ReadingData")
.master("local[*]")
.config("spark.sql.shuffle.partitions", "8")
.getOrCreate())
spark.sparkContext.setLogLevel("WARN")
orders_schema = StructType([
StructField("OrderId", IntegerType(), False),
StructField("CustomerId", IntegerType(), False),
StructField("Total", DoubleType(), False),
StructField("Country", StringType(), False),
StructField("OrderDate", DateType(), False),
])
# CSV cu schema explicita - pattern de productie
orders = (spark.read
.option("header", "true")
.option("mode", "FAILFAST")
.schema(orders_schema)
.csv("./data/orders.csv"))
# JSON Lines - tipurile vin gratis
customers = spark.read.json("./data/customers.jsonl")
# Scrie in Parquet pentru job-uri din aval
orders.write.mode("overwrite").partitionBy("Country").parquet("./data/orders.parquet")
# Citeste inapoi - rapid, schema incorporata
parq = spark.read.parquet("./data/orders.parquet")
# Join rapid
joined = parq.join(customers, on="CustomerId", how="inner") \
.select("OrderId", "Name", "Country", "Total", "OrderDate")
joined.show()
spark.stop()
Dacă asta rulează cap la cap și tipărește un tabel join-uit cu 6 rânduri cu clienți italieni, olandezi, germani și români, ai pipeline-ul de citire funcțional. Tot restul în PySpark se construiește pe asta.
Modulul următor: lucrul efectiv cu DataFrame-uri. Selectarea coloanelor, filtrarea rândurilor, diferența dintre select și selectExpr și de ce .show() te minte despre ce face job-ul tău.