PySpark, de la zero Lecția 40 / 60

UDF-uri: cand ai nevoie de ele, de ce ar trebui sa le eviti

Taxa de serializare Python a UDF-urilor obisnuite, de ce te salveaza pandas_udf si cazurile rare in care Scala e singurul raspuns.

Un UDF e un User Defined Function: o funcție Python obișnuită pe care îi ceri lui Spark să o aplice unei coloane dintr-un DataFrame. Prima dată când întinzi mâna după unul, ți se pare cel mai natural lucru din lume. Ai o coloană, ai o funcție Python care știe ce să facă cu valorile din ea, le împachetezi una într-alta și Spark o rulează în tot clusterul. Gata.

Problema e că „Spark o rulează în tot clusterul” ascunde un detaliu foarte costisitor. Fiecare valoare pe care o vede UDF-ul tău trebuie să iasă din JVM-ul în care trăiește Spark, să fie serializată în procesul Python, să fie procesată, să fie serializată înapoi și să reintre în JVM. Pe un job de un milion de rânduri, nu observă nimeni. Pe un job de un miliard de rânduri, e diferența dintre o pauză de cafea și o escală.

Lecția asta e despre costul acelei călătorii dus-întors, despre cele două mecanisme PySpark care îl atenuează și despre setul mic de cazuri în care chiar nu poți evita un UDF și pur și simplu trebuie să-l scrii cât se poate de curat.

Taxa de serializare

Când scrii asta:

from pyspark.sql import functions as F
from pyspark.sql.types import StringType

@F.udf(returnType=StringType())
def shout(s):
    if s is None:
        return None
    return s.upper() + "!"

df = spark.createDataFrame([("hello",), ("world",)], ["word"])
df.withColumn("loud", shout("word")).show()

iată ce se întâmplă fizic pentru fiecare rând la momentul execuției. Spark are coloana word în JVM ca un UnsafeRow codificat Tungsten. Ca să apeleze funcția ta Python, executorul trebuie să:

  1. Scoată valoarea din formatul de rând binar și să o convertească într-un obiect Java generic.
  2. Serializeze acel obiect folosind protocolul de pickling al PySpark.
  3. Trimită octeții printr-un socket local către un proces Python worker pe care executorul îl gestionează.
  4. Pună workerul Python să facă unpickle octeților într-un obiect Python.
  5. Ruleze funcția ta.
  6. Facă pickle rezultatului.
  7. Îl trimită înapoi prin socket către JVM.
  8. Deserializeze octeții într-un obiect Java.
  9. Recodifice acel obiect în format Tungsten ca să-l poată folosi următorul operator.

Nouă pași per valoare. Per valoare. Nimic din această muncă nu e paralelizabil dincolo de ce face deja Spark la nivel de partiție și nimic din ea nu e vizibil în query plan: Catalyst vede un nod BatchEvalPython și renunță să mai înțeleagă ce e înăuntru.

Trei lucruri rezultă din asta. Primul, lovitura asupra throughput-ului poate fi enormă: am văzut aceeași operație logică rulând de șaizeci de ori mai încet ca UDF decât ca expresie built-in. Al doilea, optimizatorul nu poate împinge filtre prin UDF-ul tău. Dacă ai df.filter(shout("word").startswith("HELLO")), filtrul nu poate fi împins în jos până la scanarea Parquet pentru că Catalyst nu are idee ce face shout. Al treilea, workerul Python e un proces separat cu memoria lui; dacă UDF-ul tău reține date, poți face OOM pe partea de Python fără să atingi vreodată heap-ul JVM al executorului.

Vindecarea începe cu a nu scrie UDF-uri.

Ordinea reparațiilor

Când cineva spune „am nevoie de un UDF”, răspunsul e aproape întotdeauna „nu, n-ai, ai nevoie să te uiți mai atent la pyspark.sql.functions.” Expresiile built-in din Spark sunt scrise în Scala, rulează în JVM fără călătorie dus-întors de serializare și participă la optimizarea Catalyst și codegen-ul Tungsten (lecțiile 41 și 42). Sunt cu ordine de mărime mai rapide decât UDF-ul echivalent.

Câteva categorii care îi prind pe oameni:

# Manipulare de string-uri, aproape mereu un built-in
F.regexp_extract("col", r"^([A-Z]+)-(\d+)$", 1)
F.regexp_replace("col", r"\s+", " ")
F.split("col", ",").getItem(0)
F.concat_ws("-", "year", "month", "day")
F.lower("col")
F.translate("col", "abc", "xyz")

# JSON, exista un parser
F.from_json("payload", schema)
F.get_json_object("payload", "$.user.email")
F.to_json("struct_col")

# Array-uri, suport complet de higher-order functions de la Spark 2.4
F.transform("arr", lambda x: x * 2)
F.filter("arr", lambda x: x > 0)
F.aggregate("arr", F.lit(0), lambda acc, x: acc + x)
F.array_distinct("arr")
F.array_intersect("a", "b")

# Date, mult mai mult decat folosesc oamenii
F.date_trunc("month", "ts")
F.unix_timestamp("ts", "yyyy-MM-dd HH:mm:ss")
F.date_add("dt", 7)
F.months_between("end", "start")

# Conditionale
F.when(F.col("x") > 0, "pos").when(F.col("x") < 0, "neg").otherwise("zero")

Dacă ai scris un UDF care face vreuna dintre acele operații, ai lăsat performanță pe masă. Înainte să întinzi mâna după un UDF, caută în modulul de funcții. Pagina e lungă, dar timpul pe care îl petreci citind-o o singură dată se va plăti singur data viitoare când ai de scris o transformare.

Dacă după o căutare reală built-in-urile tot nu acoperă ce-ți trebuie, a doua mutare nu e un UDF obișnuit. E un pandas_udf.

pandas_udf: scurtătura prin Arrow

Un pandas_udf e aceeași idee ca un UDF obișnuit, funcția ta Python aplicată unei coloane, dar trimite datele în loturi folosind formatul columnar Apache Arrow. În loc să treacă granița JVM-Python câte un rând odată, Spark serializează câteva mii de rânduri într-un Arrow record batch (zero-copy unde se poate), predă întregul lot în Python ca un pandas.Series, te lasă să-l procesezi vectorizat și trimite înapoi alt lot Arrow.

Costul de serializare per rând scade cu unu sau două ordine de mărime. Partea Python rulează operații vectorizate NumPy sau pandas în loc de overhead per rând al interpretorului Python. Iar datele rămân columnare pe tot parcursul, ceea ce înseamnă că se înțeleg bine cu layout-ul de memorie columnar al Tungsten.

Cea mai comună variantă e Series → Series:

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType

@pandas_udf(DoubleType())
def standardize(s: pd.Series) -> pd.Series:
    return (s - s.mean()) / s.std()

df.withColumn("z_score", standardize("amount"))

Spark va apela standardize în mod repetat, de fiecare dată cu un pandas.Series care conține o bucată din coloană (mărimea e guvernată de spark.sql.execution.arrow.maxRecordsPerBatch, implicit 10.000). În interiorul funcției ai tot ce-ți oferă pandas: aritmetică vectorizată, ufunc-uri NumPy, orice operează pe Series ca întreg.

Un punct subtil: fiecare lot e o bucată din coloană, deci funcții ca s.mean() îți dau media lotului, nu pe cea globală. Dacă ai nevoie de o statistică globală, calculeaz-o întâi cu o agregare obișnuită și pasează-o ca literal, sau folosește un pattern groupby().applyInPandas în schimb.

A doua variantă e Iterator[Series] → Iterator[Series]. Folosește-o când ai muncă de setup costisitoare pe care vrei să o amortizezi pe toate loturile dintr-o partiție: încărcarea unui model, deschiderea unei conexiuni la bază de date, alocarea unui buffer mare:

from typing import Iterator

@pandas_udf(DoubleType())
def predict(batches: Iterator[pd.Series]) -> Iterator[pd.Series]:
    # Setup-ul ruleaza O DATA per task de executor, nu per lot
    model = load_model_from_disk("/mnt/models/v3.pkl")
    for batch in batches:
        yield pd.Series(model.predict(batch.to_numpy()))

df.withColumn("pred", predict("features"))

Pattern-ul ăsta e modul standard de a face scoring cu un model din interiorul Spark. Modelul se încarcă o dată per task, iar fiecare lot îl reutilizează.

A treia variantă e Iterator[Tuple[Series, ...]] → Iterator[Series], aceeași idee dar pentru funcții cu multiple argumente:

@pandas_udf(DoubleType())
def weighted(batches: Iterator[tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for prices, weights in batches:
        yield prices * weights

df.withColumn("rev", weighted("price", "qty"))

Mai există și variante grupate, groupby().applyInPandas(...) pentru operații grupate cu DataFrame-in, DataFrame-out, și mapInPandas pentru transformări la nivel de partiție, dar acelea sunt mai adânci decât merge lecția asta. Cele trei de mai sus acoperă majoritatea cazurilor de producție.

Ca să se activeze efectiv Arrow, trebuie să fie activat și PyArrow instalat:

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

Implicit e true pe versiuni recente de Spark; verifică înainte să presupui că e dezactivat.

Când chiar ai nevoie de un UDF obișnuit

O listă scurtă de cazuri în care nici built-in-urile, nici pandas_udf nu ajută clar și un UDF obișnuit e onest:

O bibliotecă Python fără API vectorizat. Trebuie să apelezi un parser terț, să zicem un format binar industrial personalizat, sau o schemă XML legacy de nișă, sau o linie de log specifică unui domeniu, iar biblioteca acceptă doar o înregistrare odată. Uneori îl poți împacheta într-un pandas_udf cu un loop Python înăuntru, ceea ce tot bate un UDF obișnuit datorită batching-ului Arrow, dar dacă biblioteca își face propriul setup per apel, câștigul scade.

Logică stateful rând-cu-rând care nu se vectorizează. Unele parsere poartă stare între rânduri (un tokenizer cu un mode flag, un detector de delimitatori dezechilibrați). Nu poți exprima asta ușor ca operație vectorizată pandas. Un pandas_udf cu varianta iterator tot ajută, pentru că poți purta starea între loturi, dar în interiorul fiecărui lot tot iterezi.

Apelarea unui sistem extern pe rând. Apelarea unui REST API lent pe rând dintr-un UDF e aproape întotdeauna greșită, dar uneori e ce ai. Folosește pandas_udf cu varianta iterator și grupează cererile. Dacă API-ul nu suportă deloc batching, probabil ar trebui să stagezi datele și să apelezi API-ul în afara Spark. Dacă chiar nu poți, atunci e un UDF.

În fiecare dintre aceste cazuri, pandas_udf rămâne punctul de start mai bun decât un udf simplu. UDF-ul simplu e fundul ordinii de reparații, folosit doar când le-ai exclus pe celelalte.

Portița de evadare prin Scala UDF

Uneori cea mai curată soluție e să scrii UDF-ul în Scala sau Java, să-l împachetezi ca JAR, să-l înregistrezi din PySpark și să-l apelezi ca pe orice built-in. Granița JVM-Python dispare complet; funcția rulează în interiorul JVM-ului executorului, participă la codegen și nu plătește nicio taxă de serializare. Cazuri în care merită:

  • Apelezi biblioteci Java/Scala care nu au echivalente Python (unele biblioteci financiare sau științifice).
  • UDF-ul e o cale fierbinte într-un workload care rulează de multe ori pe zi și până și overhead-ul pandas_udf e inacceptabil.
  • Scrii o funcție reutilizabilă pentru o echipă de platformă și vrei să fie disponibilă identic din PySpark, Scala Spark și Spark SQL.

Mecanica: scrie funcția ca un UserDefinedFunction în Scala, construiește un JAR, atașează-l la sesiune via --jars și înregistrează-l:

spark = (SparkSession.builder
         .config("spark.jars", "/path/to/myudfs.jar")
         .getOrCreate())
spark.udf.registerJavaFunction("shout", "com.example.udfs.Shout", StringType())
df.selectExpr("shout(word) AS loud").show()

E mai multă bătaie de cap operațională decât vor oamenii: acum ai un build Scala, un pas de deploy și un JAR de aliniat ca versiune cu Spark. De aceea majoritatea echipelor nu merg acolo. Dar pentru o echipă mică ce rulează workload-uri grele de producție, o bibliotecă mică de UDF-uri Scala poate fi un câștig semnificativ.

Ce să reții

UDF-urile te lasă să evadezi din lumea SQL/DataFrame în Python și exact de aceea sunt lente: fiecare evadare plătește o taxă de serializare, iar Catalyst nu poate optimiza prin ele. Ordinea reparațiilor e built-in-urile, apoi pandas_udf, apoi UDF simplu, cu Scala UDFs ca portiță de evadare de ultimă instanță pentru căile fierbinți. Cele mai multe „am nevoie de un UDF” se transformă în „am nevoie să citesc o dată documentația de funcții”.

Lecția următoare mergem direct la optimizatorul care a făcut toată rescrierea asta în spatele tău: Catalyst.


Surse: ghidul Apache Spark Python user guide despre integrarea cu Arrow (https://spark.apache.org/docs/latest/api/python/user_guide/sql/arrow_pandas.html) și PySpark API reference pentru pyspark.sql.functions. Consultat 2026-05-01.

Caută