PySpark, dalle fondamenta Lezione 2 / 60

L'idea di MapReduce, e perché ha contato

Il paper di Google del 2004, il modello che ha reso trattabile il processing distribuito, e perché tutti l'hanno superato nel giro di un decennio.

Non puoi davvero capire Spark senza capire la cosa che è stato costruito per sostituire. Quindi prima di arrivare a Spark stesso nella lezione 3, dobbiamo passare una lezione su MapReduce: il modello di programmazione che ha definito un intero decennio di processing distribuito dei dati, e il sistema che, più di ogni altra cosa, ha reso possibile a un normale programmatore applicativo lavorare su mille macchine senza dover prima diventare un esperto di sistemi distribuiti.

Se hai iniziato a lavorare con i dati dopo, diciamo, il 2018, potresti aver saltato MapReduce del tutto, allo stesso modo in cui qualcuno che impara JavaScript oggi può perlopiù evitare jQuery. Va bene, ma lascia un buco. La maggior parte del vocabolario dei dati distribuiti (map, reduce, shuffle, partition, job, stage) viene direttamente da MapReduce. Spark ha ereditato il modello in blocco e poi ha ottimizzato le parti che facevano più male. Sapere cos’era MapReduce, cosa ha azzeccato e cosa ha sbagliato in modo catastrofico ti dice gran parte di quello che ti serve sapere sul perché Spark ha l’aspetto che ha.

Il paper del 2004

La storia inizia nel dicembre 2004, al Sixth USENIX Symposium on Operating Systems Design and Implementation, dove due ingegneri di Google di nome Jeffrey Dean e Sanjay Ghemawat presentarono un paper intitolato “MapReduce: Simplified Data Processing on Large Clusters”. È uno dei paper di sistemi più citati nella storia dell’informatica, e se non l’hai mai letto vale una serata: sono undici pagine, scritte in inglese piano, ed è ingannevolmente semplice.

Il contesto è importante. Google nel 2003 stava facendo girare un’intera web crawl, un indice di ricerca, un sistema di ad e una dozzina di altri servizi interni su quella che era già una flotta su scala planetaria di server commodity economici. I loro ingegneri scrivevano programmi distribuiti custom continuamente: per parsare log, per costruire indici inversi, per contare cose, per ordinare cose, per deduplicare cose. Ognuno di quei programmi doveva risolvere indipendentemente gli stessi problemi difficili: come spezzi l’input tra le macchine, come gestisci gli inevitabili guasti, come coordini i worker, come rimetti insieme la risposta. Ogni programma reinventava la stessa impalcatura, male.

L’intuizione di Dean e Ghemawat fu che quasi tutti questi programmi, nonostante avessero un aspetto superficialmente diverso, rientravano nella stessa forma di base. Prendi un input grande, trasformi ogni record in uno o più record intermedi (lo step di map), raggruppi tutti i record intermedi per qualche chiave (lo step di shuffle, che il paper a malapena nomina), e poi combini i record per ogni chiave in una risposta finale (lo step di reduce). Se potessi dare ai programmatori un’API in cui scrivessero solo la funzione di map e la funzione di reduce, e il framework gestisse tutto il resto (splitting, scheduling, shuffling, fault tolerance, retry), allora un’enorme quantità di programmi distribuiti diventerebbe d’un tratto banale da scrivere.

Questa è l’intera idea. Map. Shuffle. Reduce.

Il modello a parole

Un job MapReduce ha tre fasi logiche.

La fase di map gira in parallelo su molti worker. Ogni worker legge un pezzo dell’input (tipicamente un blocco dal filesystem distribuito) ed esegue una funzione map fornita dall’utente su ogni record. La funzione di map prende un record di input ed emette zero, una o molte coppie (chiave, valore). Tutto qui. Nessuna coordinazione con altri worker, nessuno stato condiviso, nessuna variabile globale. Ogni map task è imbarazzantemente parallelo e completamente isolato.

La fase di shuffle è la parte a cui nessuno pensa finché non la sta debuggando alle 2 di notte. Dopo che tutti gli output di map sono prodotti, il framework li raggruppa per chiave. Tutte le coppie (chiave, valore) con la stessa chiave, indipendentemente da quale mapper le abbia prodotte, vengono spedite allo stesso reducer. Questo richiede di spostare dati attraverso la rete, che è la parte costosa. Nell’implementazione originale di Google, gli output dei mapper vengono scritti su disco locale, poi tirati dai reducer attraverso la rete. Lo shuffle è dove sparisce la maggior parte del wall-clock time di un job MapReduce.

La fase di reduce gira anch’essa in parallelo su molti worker. Ogni reducer è responsabile di un sottoinsieme delle chiavi. Per ogni chiave, riceve l’intera lista di valori che qualunque mapper ha emesso per quella chiave, e esegue una funzione reduce fornita dall’utente su quella lista per produrre l’output finale. L’output viene scritto sul filesystem distribuito.

Questo è l’intero modello. Due funzioni, uno shuffle, tre fasi, tutto in parallelo. Il framework decide quanti mapper e reducer far girare, dove farli girare, cosa fare quando uno di loro muore, e come mettere insieme i file finali.

Un esempio concreto: il word count

Ogni tutorial di MapReduce dal 2004 ha usato lo stesso esempio (contare quanto spesso ogni parola appare in un grande corpus) perché cattura il modello alla perfezione senza dettagli estranei. Vediamolo nello stesso modo.

Immagina che l’input sia una directory piena di file di testo su HDFS, in totale diciamo 2 TB. Vuoi, per ogni parola che appare in qualsiasi punto del corpus, il numero di volte che appare in tutti i file.

Lo pseudocodice è genuinamente di circa quattro righe, che è tutto il punto.

map(filename, line):
    for word in line.split():
        emit(word, 1)

reduce(word, counts):
    emit(word, sum(counts))

Tutto qui. Il framework legge l’input, passa ogni riga di ogni file a un mapper, e il mapper emette (word, 1) per ogni parola che vede. Il framework fa lo shuffle di tutte queste tuple in modo che tutte le coppie (the, 1) finiscano sullo stesso reducer, tutte le coppie (spark, 1) finiscano su un altro reducer, e così via. Ogni reducer vede la chiave assegnata più un iterabile di tutti gli 1 che sono stati emessi per quella chiave, li somma e scrive il totale.

Se il tuo corpus ha 10 miliardi di parole e 1 milione di parole distinte, MapReduce farà girare felicemente questo su mille macchine, recupererà da qualche guasto di worker lungo la strada, e ti restituirà una singola tabella deduplicata di conteggi di parole. Il programmatore ha scritto otto righe di pseudocodice. Il framework ha fatto il resto.

Questa è la svolta. Non la velocità (le prime versioni di MapReduce non erano veloci in alcun senso assoluto) ma l’astrazione. Il processing distribuito è passato da “scrivi un sistema RPC custom, un partitioner custom, un layer di retry custom, e prega” a “scrivi una funzione di map e una funzione di reduce”. D’un tratto, centinaia di ingegneri di Google che non avevano motivo di scrivere codice di sistemi distribuiti stavano scrivendo codice di sistemi distribuiti, e perlopiù funzionava.

Hadoop: trasformare il paper in open source

Il paper di Google descriveva un sistema interno a Google. L’implementazione non fu rilasciata. Ma due ingegneri a Yahoo, Doug Cutting e Mike Cafarella, stavano già lavorando a un web crawler open source chiamato Nutch, e stavano sbattendo esattamente contro gli stessi problemi di scaling che Google aveva incontrato qualche anno prima. Lessero il paper. Lessero il paper correlato sul Google File System del 2003. E nel 2006 estrassero i pezzi rilevanti di Nutch in un nuovo progetto chiamato come l’elefante di peluche del figlio di Cutting: Hadoop.

Hadoop era, in sostanza, un clone open source dei due sistemi fondazionali di Google. Lo Hadoop Distributed File System (HDFS) era l’analogo di GFS: un modo per memorizzare file enormi su molte macchine con replica per la fault tolerance. Hadoop MapReduce era l’analogo di MapReduce: un framework Java per scrivere funzioni di map e reduce e farle girare su un cluster.

La combinazione (HDFS per lo storage, MapReduce per il compute) è diventata la piattaforma open source di default per i big data per circa un decennio. Cloudera, Hortonworks, MapR e una dozzina di distro più piccole hanno costruito business sul packaging e sul supporto. Verso il 2012, ogni azienda Fortune 500 che avesse una qualche pretesa di essere “data-driven” aveva un cluster Hadoop da qualche parte in uno scantinato, spesso a malapena usato, quasi sempre gestito da esattamente un consulente terrorizzato di nome Steve.

L’ecosistema Hadoop è cresciuto enorme. Hive (SQL sopra MapReduce). Pig (un linguaggio di scripting sopra MapReduce). HBase (un database NoSQL sopra HDFS). Oozie (uno scheduler di workflow per concatenare job MapReduce). YARN (un resource manager per far girare framework multipli sullo stesso cluster, introdotto in Hadoop 2.0 nel 2012). La maggior parte di questi è ancora in giro nel 2026 in qualche forma, anche se di solito in ruoli molto ridotti. Hive in particolare sopravvive come layer di metadata (il Hive Metastore) molto dopo che quasi nessuno fa girare query Hive via MapReduce.

Perché tutti l’hanno superato

Verso il 2014, le crepe in MapReduce erano impossibili da ignorare. Il modello era elegante, ma l’implementazione originale aveva tre proprietà che la rendevano sempre più dolorosa da usare davvero.

Tutto è in due stage. Map e poi reduce. Quella è l’intera API. Il lavoro coi dati nel mondo reale quasi mai assomiglia esattamente a un map e un reduce. Una pipeline tipica è: leggi l’input, filtra, parsa, fai join con una lookup table, raggruppa, aggrega, ordina, fai di nuovo join, scrivi l’output. In MapReduce, ognuna di queste operazioni diventa un proprio job, con il proprio step di map, il proprio step di reduce, e la propria scrittura su HDFS in mezzo. Una pipeline che dovrebbe essere una query in cinque step diventa una catena di sette job MapReduce separati orchestrati da Oozie, ognuno un programma Java autocontenuto con cento righe di boilerplate.

Lo stato intermedio finisce su disco tra ogni job. Questo è il killer. L’output di un job MapReduce viene sempre scritto su HDFS, in triplice copia (replication factor di default pari a 3), e poi il job successivo nella catena lo rilegge. Se la tua pipeline ha sei stage, hai scritto i tuoi dati su disco sei volte e li hai riletti cinque volte. Sui dischi rotativi del 2010, con la banda di rete del 2010, era semplicemente come funzionavano le cose, e la gente lo accettava. Per il 2014, con gli SSD e le reti a 10 gigabit, era visibilmente assurdo.

Gli algoritmi iterativi sono catastroficamente lenti. Il machine learning, gli algoritmi su grafi, e qualunque altra cosa che esegue la stessa logica sugli stessi dati molte volte (k-means clustering, PageRank, regressione logistica) fanno tutti questo. Ogni iterazione è un intero job MapReduce, il che vuol dire che ogni iterazione scrive l’intero dataset su HDFS e lo rilegge. Un job di k-means clustering che dovrebbe metterci 30 secondi ne mette 30 minuti, quasi tutti dei quali sono I/O di disco per dati che non sono nemmeno cambiati tra iterazioni. Questo era il punto di dolore specifico che ha motivato Spark.

C’erano altre lamentele (l’API Java era verbosa, le modalità di guasto erano oscure, i file di configurazione erano XML infinito, l’overhead di startup della JVM rendeva i job piccoli sproporzionatamente lenti) ma il problema più profondo era architetturale. MapReduce era un modello progettato nel 2003 attorno alle assunzioni hardware del 2003, e per il 2014 quelle assunzioni non reggevano più.

La soluzione, come probabilmente stai già intuendo, era costruire un sistema che mantenesse lo stato intermedio in memoria tra le operazioni invece di scriverlo su disco ogni singola volta. Lasciare che un programmatore concatenasse quante trasformazioni gli pareva in un singolo job logico, con il motore a decidere quanti stage fisici fossero davvero necessari. Rendere l’iterazione economica facendo cache dei working set in RAM. Smettere di fingere che ogni computazione fosse esattamente un map più un reduce.

Quel sistema era Spark, ed è il soggetto della lezione 3.

Per approfondire: il paper originale di Dean-Ghemawat su MapReduce è genuinamente uno dei paper di sistemi più leggibili mai pubblicati, e vale la lettura anche se non scriverai mai un job MapReduce in vita tua. La documentazione del progetto Apache Hadoop è ancora mantenuta, perlopiù per le parti (HDFS, YARN, il Metastore) che sono sopravvissute alla transizione. Entrambi sono background utile per il resto del corso, anche se non scriveremo codice MapReduce vero.

Cerca