Una UDF è una User Defined Function: una normale funzione Python che chiedi a Spark di applicare a una colonna di un DataFrame. La prima volta che ne usi una sembra la cosa più naturale del mondo. Hai una colonna, hai una funzione Python che sa cosa fare con i valori contenuti, avvolgi una nell’altra e Spark la esegue su tutto il cluster. Fatto.
Il problema è che “Spark la esegue su tutto il cluster” nasconde un dettaglio molto costoso. Ogni valore che la tua UDF vede deve uscire dalla JVM dove Spark vive, essere serializzato dentro il processo Python, essere processato, essere serializzato di nuovo, e rientrare nella JVM. Su un job da un milione di righe nessuno se ne accorge. Su un job da un miliardo di righe è la differenza tra una pausa caffè e uno scalo aeroportuale.
Questa lezione parla del costo di quel viaggio di andata e ritorno, dei due meccanismi PySpark che lo mitigano, e del piccolo insieme di casi in cui davvero non puoi evitare una UDF e devi scriverne semplicemente la più pulita possibile.
La tassa di serializzazione
Quando scrivi questo:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
@F.udf(returnType=StringType())
def shout(s):
if s is None:
return None
return s.upper() + "!"
df = spark.createDataFrame([("hello",), ("world",)], ["word"])
df.withColumn("loud", shout("word")).show()
ecco cosa succede fisicamente per ogni riga al momento dell’esecuzione. Spark ha la colonna word nella JVM come un UnsafeRow codificato in formato Tungsten. Per chiamare la tua funzione Python, l’executor deve:
- Estrarre il valore dal formato binario della riga e convertirlo in un oggetto Java generico.
- Serializzare quell’oggetto usando il protocollo di pickling di PySpark.
- Inviare i byte attraverso un socket locale a un processo Python worker gestito dall’executor.
- Far sì che il Python worker decodifichi (unpickle) i byte in un oggetto Python.
- Eseguire la tua funzione.
- Fare il pickle del risultato.
- Rispedirlo attraverso il socket alla JVM.
- Deserializzare i byte in un oggetto Java.
- Ricodificare quell’oggetto in formato Tungsten cosicché il prossimo operatore possa usarlo.
Nove passaggi per valore. Per valore. Niente di questo lavoro è parallelizzabile oltre quanto Spark già fa a livello di partition, e niente di questo è visibile nel query plan: Catalyst vede un nodo BatchEvalPython e rinuncia a ragionare su cosa ci sia dentro.
Tre conseguenze. Primo, il colpo al throughput può essere enorme: ho visto la stessa operazione logica girare sessanta volte più lenta come UDF rispetto a un’espressione built-in. Secondo, l’optimizer non può spingere i filtri attraverso la tua UDF. Se hai df.filter(shout("word").startswith("HELLO")), il filtro non può essere pushato fino allo scan Parquet perché Catalyst non ha idea di cosa faccia shout. Terzo, il Python worker è un processo separato con la sua memoria; se la tua UDF tiene aggrappati dei dati, puoi mandare in OOM il lato Python senza mai toccare l’heap JVM dell’executor.
La cura inizia con il non scrivere UDF.
L’ordine di intervento
Quando qualcuno dice “mi serve una UDF”, la risposta è quasi sempre “no non ti serve, ti serve guardare meglio dentro pyspark.sql.functions”. Le espressioni built-in di Spark sono scritte in Scala, girano dentro la JVM senza viaggio di serializzazione, e partecipano all’ottimizzazione di Catalyst e al codegen di Tungsten (lezioni 41 e 42). Sono ordini di grandezza più veloci della UDF equivalente.
Qualche categoria che frega le persone:
# String manipulation: quasi sempre c'è una built-in
F.regexp_extract("col", r"^([A-Z]+)-(\d+)$", 1)
F.regexp_replace("col", r"\s+", " ")
F.split("col", ",").getItem(0)
F.concat_ws("-", "year", "month", "day")
F.lower("col")
F.translate("col", "abc", "xyz")
# JSON: c'è un parser
F.from_json("payload", schema)
F.get_json_object("payload", "$.user.email")
F.to_json("struct_col")
# Array: supporto completo per higher-order function da Spark 2.4
F.transform("arr", lambda x: x * 2)
F.filter("arr", lambda x: x > 0)
F.aggregate("arr", F.lit(0), lambda acc, x: acc + x)
F.array_distinct("arr")
F.array_intersect("a", "b")
# Date: molto piu' di quanto la gente usi
F.date_trunc("month", "ts")
F.unix_timestamp("ts", "yyyy-MM-dd HH:mm:ss")
F.date_add("dt", 7)
F.months_between("end", "start")
# Condizionali
F.when(F.col("x") > 0, "pos").when(F.col("x") < 0, "neg").otherwise("zero")
Se hai scritto una UDF che fa una qualunque di queste cose, hai lasciato performance sul tavolo. Prima di buttarti su una UDF, cerca nel modulo functions. La pagina è lunga, ma il tempo che ci spendi a leggerla una volta si ripaga la prossima volta che hai una trasformazione da scrivere.
Se dopo una vera ricerca le built-in ancora non coprono quello che ti serve, la tua seconda mossa non è una UDF normale. È una pandas_udf.
pandas_udf: la scorciatoia di Arrow
Una pandas_udf è la stessa idea di una UDF normale (la tua funzione Python applicata a una colonna) ma spedisce dati a batch usando il formato colonnare di Apache Arrow. Invece di attraversare il confine JVM-Python una riga alla volta, Spark serializza qualche migliaio di righe in un record batch Arrow (zero-copy quando possibile), passa l’intero batch a Python come pandas.Series, ti lascia processarlo in modo vettoriale, e rispedisce indietro un altro batch Arrow.
Il costo di serializzazione per riga scende di uno o due ordini di grandezza. Il lato Python esegue operazioni vettoriali NumPy o pandas invece dell’overhead per riga dell’interprete Python. E i dati restano colonnari per tutto il percorso, il che significa che si sposano bene con il layout di memoria colonnare di Tungsten.
Il sapore più comune è Series -> Series:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
@pandas_udf(DoubleType())
def standardize(s: pd.Series) -> pd.Series:
return (s - s.mean()) / s.std()
df.withColumn("z_score", standardize("amount"))
Spark chiamerà standardize ripetutamente, ogni volta con una pandas.Series che contiene un pezzo della colonna (la dimensione è regolata da spark.sql.execution.arrow.maxRecordsPerBatch, di default 10.000). Dentro la funzione hai tutto pandas: aritmetica vettoriale, ufunc di NumPy, qualunque cosa operi sulla Series nel suo insieme.
Un punto sottile: ogni batch è un pezzo della colonna, quindi funzioni come s.mean() ti danno la media del batch, non quella globale. Se ti serve una statistica globale, calcolala prima con un’aggregazione normale e passala come literal, oppure usa invece un pattern groupby().applyInPandas.
Il secondo sapore è Iterator[Series] -> Iterator[Series]. Usalo quando hai un setup costoso che vuoi ammortizzare su tutti i batch in una partition: caricare un modello, aprire una connessione al database, allocare un buffer grande:
from typing import Iterator
@pandas_udf(DoubleType())
def predict(batches: Iterator[pd.Series]) -> Iterator[pd.Series]:
# Il setup gira UNA volta per task dell'executor, non per batch
model = load_model_from_disk("/mnt/models/v3.pkl")
for batch in batches:
yield pd.Series(model.predict(batch.to_numpy()))
df.withColumn("pred", predict("features"))
Questo pattern è il modo standard di fare scoring con un modello da dentro Spark. Il modello viene caricato una volta per task, e ogni batch lo riutilizza.
Il terzo sapore è Iterator[Tuple[Series, ...]] -> Iterator[Series]: stessa idea ma per funzioni con più argomenti:
@pandas_udf(DoubleType())
def weighted(batches: Iterator[tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for prices, weights in batches:
yield prices * weights
df.withColumn("rev", weighted("price", "qty"))
Esistono anche varianti raggruppate: groupby().applyInPandas(...) per operazioni di gruppo full DataFrame-in, DataFrame-out, e mapInPandas per trasformazioni a livello di partition, ma vanno più in profondità di quanto questa lezione si spinga. Le tre qui sopra coprono la maggior parte dei casi di produzione.
Perché Arrow effettivamente entri in gioco, devi averlo abilitato e PyArrow installato:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
Di default è true nelle versioni recenti di Spark; verificalo prima di dare per scontato che sia spento.
Quando ti serve davvero una UDF normale
Una breve lista di casi in cui né le built-in né pandas_udf aiutano in modo pulito, e una UDF normale è onesta:
Una libreria Python senza API vettoriale. Devi chiamare un parser di terze parti (un formato binario industriale custom, o uno schema XML legacy di nicchia, o una riga di log specifica di un dominio) e la libreria accetta un solo record alla volta. A volte puoi avvolgerla in una pandas_udf con un loop Python all’interno, che batte comunque una UDF normale grazie al batching di Arrow, ma se la libreria fa il proprio setup per chiamata, il vantaggio si riduce.
Logica stateful riga per riga che non si vettorializza. Alcuni parser portano stato attraverso le righe (un tokenizer con un flag di modalità, un rilevatore di delimitatori sbilanciati). Non puoi esprimerlo facilmente come operazione pandas vettoriale. Una pandas_udf con il sapore iterator aiuta comunque perché puoi portare stato attraverso i batch, ma dentro ogni batch fai comunque un loop.
Chiamare un sistema esterno per riga. Chiamare una REST API lenta per ogni riga da una UDF è quasi sempre sbagliato, ma a volte è quello che hai. Usa pandas_udf con il sapore iterator e batcha le richieste. Se l’API non supporta affatto il batching, probabilmente dovresti mettere i dati in staging e chiamare l’API fuori da Spark. Se proprio non puoi, allora è una UDF.
In ognuno di questi casi, pandas_udf resta comunque un punto di partenza migliore di una udf semplice. La UDF semplice è in fondo all’ordine di intervento, usata quando hai escluso le altre.
La via di fuga della UDF Scala
A volte la soluzione più pulita è scrivere la UDF in Scala o Java, impacchettarla come JAR, registrarla da PySpark e chiamarla come una qualunque built-in. Il confine JVM-Python sparisce del tutto; la funzione gira dentro la JVM dell’executor, partecipa al codegen, e non paga alcuna tassa di serializzazione. Casi in cui ne vale la pena:
- Stai chiamando librerie Java/Scala che non hanno equivalenti Python (alcune librerie finanziarie o scientifiche).
- La UDF è un hot path in un workload che gira molte volte al giorno, e anche l’overhead di
pandas_udfè inaccettabile. - Stai scrivendo una funzione riutilizzabile per un team di piattaforma e la vuoi disponibile in modo identico da PySpark, Scala Spark e Spark SQL.
La meccanica: scrivi la funzione come UserDefinedFunction Scala, costruisci un JAR, attaccalo alla session via --jars, e registralo:
spark = (SparkSession.builder
.config("spark.jars", "/path/to/myudfs.jar")
.getOrCreate())
spark.udf.registerJavaFunction("shout", "com.example.udfs.Shout", StringType())
df.selectExpr("shout(word) AS loud").show()
È più rogna operativa di quanto la gente voglia: ora hai una build Scala, uno step di deploy, e un JAR da tenere allineato di versione con Spark. Per questo la maggior parte dei team non ci va mai. Ma per un piccolo team che gira workload di produzione pesanti, una piccola libreria di UDF Scala può essere una vittoria significativa.
Cosa ricordare
Le UDF ti permettono di scappare dal mondo SQL/DataFrame dentro Python, e per questo sono lente: ogni fuga paga una tassa di serializzazione, e Catalyst non può ottimizzare attraverso di loro. L’ordine di intervento è built-in, poi pandas_udf, poi UDF semplice, con le UDF Scala come via di fuga di ultima istanza per gli hot path. La maggior parte dei “mi serve una UDF” si trasforma in “mi serve leggere una volta la documentazione di functions”.
Prossima lezione, andiamo dritti all’optimizer che ha fatto tutto questo rewriting alle tue spalle: Catalyst.
Riferimenti: guida utente Python di Apache Spark sull’integrazione Arrow (https://spark.apache.org/docs/latest/api/python/user_guide/sql/arrow_pandas.html) e il riferimento API PySpark per pyspark.sql.functions. Consultati il 2026-05-01.