PySpark, de la zero Lecția 10 / 60

Show, count, collect: actiunile pe care orice incepator le ruleaza primele

Cele trei actiuni cu care incepe orice notebook PySpark, diferenta dintre ele si de ce e periculos sa le confunzi la scara mare.

Lecția trecută am adus date în Spark. Astăzi ne uităm la ele. Cele trei comenzi pe care le vei tasta în primele zece secunde ale fiecărui notebook sunt .show(), .count() și .collect(). Par inofensive. Două chiar sunt. A treia este responsabilă pentru mai multe crash-uri out-of-memory decât oricare alt apel PySpark pe care l-am văzut.

Asta e și lecția în care strecurăm cel mai important model mental din Spark: transformările sunt leneșe, acțiunile sunt nerăbdătoare. Poți înlănțui o sută de apeluri .filter() și .select(), iar Spark nu face nimic. În momentul în care apelezi o acțiune, fie ea .show(), .count(), .collect() sau .write, Spark se trezește, planifică tot pipeline-ul și îl execută. Dacă greșești distincția, vei petrece o după-amiază întrebându-te de ce codul tău „rapid” e lent.

Pregătire

Aceeași SparkSession de pornire. Reutilizăm datele orders/customers generate în lecția 9:

from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .appName("ShowCountCollect")
         .master("local[*]")
         .config("spark.sql.shuffle.partitions", "8")
         .getOrCreate())

spark.sparkContext.setLogLevel("WARN")

orders = (spark.read
          .option("header", "true")
          .option("inferSchema", "true")
          .csv("./data/orders.csv"))

Dacă nu ai ./data/orders.csv rămas de la lecția trecută, întoarce-te la lecția 9 și rerulează snippet-ul de generare de date. Vom folosi același fișier de șase rânduri pe tot parcursul.

.show(): cel pe care îl vei folosi de o mie de ori

.show() afișează rânduri în consolă ca un tabel ASCII formatat. Atât. Fără valoare returnată, fără DataFrame înapoi, fără obiect Pandas. E un efect secundar.

orders.show()
+-------+----------+-----+-------+----------+
|OrderId|CustomerId|Total|Country| OrderDate|
+-------+----------+-----+-------+----------+
|   1001|         1| 59.0|     NL|2026-03-05|
|   1002|         1| 29.0|     NL|2026-03-18|
|   1003|         2|149.0|     IT|2026-02-15|
|   1004|         2| 89.5|     IT|2026-03-22|
|   1005|         3|199.0|     DE|2026-03-10|
|   1006|         4|42.42|     RO|2026-03-28|
+-------+----------+-----+-------+----------+

Semnătura completă este show(n=20, truncate=True, vertical=False). Trei butoane, toate utile.

n este câte rânduri să afișeze. Implicit 20. Trece orice întreg pozitiv:

orders.show(3)         # Primele 3 rânduri
orders.show(1000)      # Primele 1000, dar Spark nu va aduce mai multe rânduri decât există

truncate controlează coloanele cu string-uri lungi. Implicit True, ceea ce taie la 20 de caractere și afișează ... pentru restul. Trece False ca să vezi tot, sau trece un întreg pentru o lățime personalizată:

orders.show(truncate=False)   # String-uri complete
orders.show(truncate=50)      # Taie la 50 de caractere

Dacă ai apelat vreodată .show() pe un DataFrame plin de blob-uri JSON de 5KB și ți-ai văzut terminalul plângând, de asta truncate=True e implicit.

vertical întoarce layout-ul de la coloane-ca-coloane la coloane-ca-rânduri, ceea ce e minunat pentru tabele late:

orders.show(2, vertical=True)
-RECORD 0--------------
 OrderId    | 1001
 CustomerId | 1
 Total      | 59.0
 Country    | NL
 OrderDate  | 2026-03-05
-RECORD 1--------------
 OrderId    | 1002
 ...

Folosesc modul vertical de fiecare dată când un DataFrame are mai mult de 8-10 coloane. E diferența dintre a citi și a mijloci ochii.

Ce face .show() de fapt sub capotă: Spark planifică pipeline-ul tău, rulează doar atât cât trebuie pentru a produce n rânduri, apoi colectează acele n rânduri la driver și le afișează. E o acțiune, dar una mică. A cere 20 de rânduri înseamnă că Spark ar putea avea nevoie să citească doar o partiție de fișier înainte să aibă destul.

.count(): cel care îți rulează tot pipeline-ul

.count() returnează numărul de rânduri, ca un int Python:

n = orders.count()
print(n)   # 6

Pare inofensiv. Nu e. Ca să numere rânduri, Spark trebuie să execute tot pipeline-ul până în acest punct. Fiecare citire, fiecare filtrare, fiecare join. Nu există scurtătură; nu poți ști câte rânduri supraviețuiesc unui join fără să faci join-ul.

Așa că asta:

big = (spark.read.parquet("./data/huge_orders.parquet")
       .filter("OrderDate >= '2026-01-01'")
       .join(customers, "CustomerId")
       .filter("Country = 'IT'"))

print(big.count())

…citește întregul dataset Parquet, aplică ambele filtre, rulează join-ul și numără ce rămâne. Pe un dataset de 200GB, asta e o sarcină de mai multe minute, chiar dacă ai vrut doar un număr.

Greșeala pe care o face toată lumea (inclusiv eu, în mod repetat) e să presari apeluri .count() printr-un notebook pentru „verificare”:

df = read_orders()
print("After read:",   df.count())          # Job 1
df = df.filter(...)
print("After filter:", df.count())          # Job 2: recitește totul
df = df.join(customers)
print("After join:",   df.count())          # Job 3: recitește + refiltrează
df.write.parquet("...")                     # Job 4: recitește + refiltrează + reface join

Fiecare dintre acele apeluri .count() rulează tot pipeline-ul până în acel punct. De la zero. Spark nu cache-uiește rezultate între acțiuni decât dacă îi spui. Așa că o „verificare rapidă” îți poate cvadrupla discret timpul de rulare al jobului.

Există o soluție, .cache() / .persist(), dar e subiect pentru lecția 23. Pentru moment, disciplina e: nu da .count() la mijlocul unui pipeline decât dacă vrei să plătești pentru asta.

.collect(): cel pe care aproape niciodată nu ar trebui să-l apelezi

.collect() returnează fiecare rând al DataFrame-ului, ca o listă Python de obiecte Row, pe driver:

rows = orders.collect()

print(type(rows))      # <class 'list'>
print(len(rows))       # 6
print(rows[0])         # Row(OrderId=1001, CustomerId=1, Total=59.0, ...)
print(rows[0].Total)   # 59.0
print(rows[0]["Total"])# 59.0: merge și acces ca atribut, și ca dict

Pentru un DataFrame de șase rânduri, e în regulă. E o listă cu șase tuple. Poate un kilobyte de memorie.

Pentru un DataFrame de 100GB, e un dezastru. .collect() trage fiecare rând într-un singur JVM, driverul, și încearcă să le încapă pe toate în memoria driverului. Driverul tău are, ce, 4GB? 16GB? 64GB dacă ai noroc? Va da OOM. Zgomotos. Adesea după ce a rulat zece minute, deci ai parte și să aștepți înainte să primești crash-ul.

Regula pe care o scriu pe orice whiteboard:

Nu apela niciodată .collect() pe un DataFrame a cărui dimensiune nu o cunoști deja ca fiind mică.

Dacă vii din Pandas, asta e cea mai mare diferență comportamentală. pandas.DataFrame e mereu în memorie prin definiție. DataFrame-urile PySpark de obicei nu sunt în memorie și ar putea să nu încapă în memoria niciunei mașini. .collect() e singura operație care se preface că Spark e Pandas, iar Spark te pedepsește pentru asta.

Când .collect() e cu adevărat în regulă

  • Tabele de lookup mici (țări, coduri valutare, hărți de configurație).
  • Agregări unde ai redus deja numărul de rânduri, precum df.groupBy("Country").count().collect() pe 30 de țări.
  • Teste, unde input-urile sunt deliberat mici.

Pentru orice altceva, folosește una dintre alternativele mai sigure.

Alternativele sigure: .take(), .first(), .head()

Trei acțiuni care trag câteva rânduri la driver fără să se prefacă că le poate ține pe toate:

first_three = orders.take(3)        # listă cu 3 obiecte Row
print(first_three)

one_row = orders.first()            # un singur Row, sau None dacă e gol
print(one_row)

head_three = orders.head(3)         # listă cu 3 Row-uri (alias pentru .take())
print(head_three)

.take(n) e .collect() cu lesă. Trage n rânduri și se oprește. Spark e suficient de inteligent să citească doar atâtea partiții câte îi trebuie pentru a obține n rânduri, așa că pe un dataset de 100GB, .take(5) citește aproximativ o partiție și se întoarce în secunde.

.first() e .take(1)[0], doar că returnează None în loc să crape pe un DataFrame gol. Folosește-l când vrei cu adevărat un singur rând, de exemplu să arunci o privire la o înregistrare-exemplu pentru schemă.

.head(n) și .take(n) sunt aliasuri. Unii preferă .head() pentru că așa îi spune Pandas; alții preferă .take() pentru că așa l-au numit RDD-urile mereu. Alege unul.

Dacă te trezești că întinzi mâna spre .collect(), întreabă „chiar am nevoie de fiecare rând?”. Răspunsul onest e aproape întotdeauna nu, iar .take(100) îți va da mai mult decât suficient pentru debug.

Companionii: .printSchema(), .describe(), .summary()

Încă trei metode „uită-te la date” pe care le vei folosi constant. Niciuna nu e periculoasă.

.printSchema() afișează schema. Ieftin, instantaneu; Spark cunoaște deja schema, nu se citesc date:

orders.printSchema()
root
 |-- OrderId:    integer (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Total:      double  (nullable = true)
 |-- Country:    string  (nullable = true)
 |-- OrderDate:  date    (nullable = true)

Afișez schema după fiecare citire. Prinde imediat erorile de inferență de tip; în momentul în care vezi OrderDate: string știi că inferSchema n-a funcționat și ai nevoie de o schemă explicită.

.describe() calculează count / mean / stddev / min / max pentru fiecare coloană numerică:

orders.describe().show()
+-------+------------------+------------------+------------------+-------+
|summary|           OrderId|        CustomerId|             Total|Country|
+-------+------------------+------------------+------------------+-------+
|  count|                 6|                 6|                 6|      6|
|   mean|            1003.5|               2.5| 94.65333333333334|   null|
| stddev|1.8708286933869707|1.2247448713915890| 65.42818134849872|   null|
|    min|              1001|                 1|              29.0|     DE|
|    max|              1006|                 4|             199.0|     RO|
+-------+------------------+------------------+------------------+-------+

De reținut: returnează un DataFrame, nu un tabel afișat. Așa că tot ai nevoie de .show() ca să-l vezi. (Una dintre acele mici ciudățenii PySpark care prinde pe toată lumea o dată.)

.summary() este fratele mai puternic. Implicit adaugă cuartilele:

orders.summary().show()

Primești count, mean, stddev, min, 25%, 50%, 75%, max. Poți trece și percentile personalizate:

orders.summary("count", "min", "25%", "50%", "75%", "99%", "max").show()

Pentru un parcurs exploratoriu peste un dataset nou, .summary() e cel mai util one-liner din PySpark. E echivalentul describe(include='all', percentiles=[...]) din Pandas.

Atât .describe() cât și .summary() sunt operații full-pass; citesc tot DataFrame-ul. Pe un CSV minuscul e gratis; pe un lake de 200GB, e o sarcină reală. Rulează-le cu măsură pe date de dimensiune de producție, sau rulează-le mai întâi pe un .sample(0.01).

Acțiune vs transformare: previzualizarea eager/lazy

Vom face asta cum trebuie în lecția 19, dar iată titlul acum, ca restul cursului să aibă sens.

O transformare descrie un DataFrame nou în termenii unuia existent. select, filter, withColumn, join, groupBy, orderBy, union: toate sunt transformări. Returnează un DataFrame nou fără să facă nicio muncă. Spark doar înregistrează rețeta.

O acțiune îi cere Spark un rezultat concret: un tabel afișat, un număr, o listă de Row-uri sau o scriere pe disc. Acțiunile declanșează execuția. Spark se uită la rețetă, planifică o execuție fizică optimă și o rulează.

Cu alte cuvinte:

# Aceste patru linii nu fac absolut nimic încă.
filtered  = orders.filter("Country = 'IT'")
projected = filtered.select("OrderId", "Total")
ordered   = projected.orderBy("Total")
big_only  = ordered.filter("Total > 100")

# Această linie rulează tot ce e mai sus, plus show-ul.
big_only.show()

Spark optimizează tot lanțul înainte să execute. Ar putea împinge filtrele în scanarea fișierului („nici nu citi rândurile non-IT”), reordona operații, elimina coloane. Planul de query pe care îl primești e adesea radical diferit de ce ai scris.

De asta și un .show() la jumătatea unui notebook poate părea suspect de rapid: Spark a calculat doar atâtea rânduri cât să umple afișarea, în timp ce .count() pe același DataFrame durează minute. Au rulat cantități diferite de muncă.

Un script-tur

Adunând lecția laolaltă, iată un script care exersează fiecare acțiune pe care am acoperit-o, aproximativ în ordinea în care le-ai folosi pe un dataset proaspăt:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = (SparkSession.builder
         .appName("ActionsTour")
         .master("local[*]")
         .getOrCreate())
spark.sparkContext.setLogLevel("WARN")

orders = (spark.read
          .option("header", "true")
          .option("inferSchema", "true")
          .csv("./data/orders.csv"))

# 1. Cum arată?
orders.printSchema()
orders.show(5)

# 2. Statistici rapide
orders.summary("count", "min", "25%", "50%", "75%", "max").show()

# 3. Numărătoare ieftină de rânduri (dataset mic, e ok)
print("Total rows:", orders.count())

# 4. Aruncă o privire la datele filtrate fără să colectezi tot
italian = orders.filter(col("Country") == "IT")
italian.show()                             # safe
print("IT rows:", italian.count())         # safe pe date mici

sample_rows = italian.take(2)              # întotdeauna safe
print(sample_rows)

# 5. Collect: doar pentru că știm că e mic
country_totals = (orders.groupBy("Country")
                        .sum("Total")
                        .collect())
for row in country_totals:
    print(f"{row['Country']}: {row['sum(Total)']:.2f}")

spark.stop()

Șase rânduri de input. Șase linii de output relevant. Niciun OOM. Asta e fluxul.

Lecția următoare mergem în direcția opusă: scrierea datelor înapoi, save modes, scrieri partiționate și problema numărului de fișiere care mușcă din orice echipă mai devreme sau mai târziu.

Caută