Arhitectura datelor și a sistemelor, de la zero Lecția 38 / 80

Batch idempotent: cum faci job-urile sigur de re-rulat

Overwrite vs append vs upsert. Tiparul MERGE. De ce 'job-ul ăsta a rulat de două ori' ar trebui să fie un non-eveniment.

Lecția 16, în modulul 2, a acoperit idempotența pentru mesagerie: cum să proiectezi un consumer astfel încât procesarea aceluiași mesaj de două ori să fie inofensivă. Lecția asta e pendantul ei pentru batch. Aceeași idee, aplicată job-urilor programate care citesc bucăți mari de input și scriu bucăți mari de output: rularea aceluiași job de două ori ar trebui să producă aceeași stare finală.

Dacă iei un singur sfat din tot modulul ăsta, ia-l pe ăsta. Batch-ul idempotent e proprietatea care transformă restul platformei de date dintr-un morman de scripturi de unică folosință în ceva ce poți opera calm. Eșecurile încetează să mai fie incidente. Backfill-urile încetează să mai fie expediții. Fraza „job-ul ăsta a rulat de două ori” n-ar trebui să facă pe nimeni să tresară. Aproape fiecare durere operațională într-o echipă de date pe care am văzut-o, urmărită până la rădăcină, se reduce la un job pe undeva care nu e sigur de re-rulat.

De ce contează

Job-urile batch eșuează. Eșuează din cauza unor sughițuri de rețea, probleme tranzitorii la furnizorul de cloud, un nod worker care face OOM, un dataset upstream care întârzie, un bug, sau un deploy. Orchestratorul (Airflow, Dagster, oricare) reîncearcă. Dacă prima încercare a ajuns la jumătate și a scris niște output înainte de crash, iar reîncercarea rulează totul din nou, cum arată output-ul?

Într-un job care nu e idempotent, ai acum două jumătăți ale primei încercări plus toată a doua încercare, toate amestecate. Poți avea rânduri duplicate. Poți avea rânduri dintr-o rulare incompletă care arată reale dar sunt bazate pe input incomplet. Probabil nu știi fără inspecție care rânduri sunt care.

Același tipar apare în alte trei locuri care nu sunt determinate de eșec:

  • Backfill-uri. Realizezi că rularea de săptămâna trecută a avut un bug. Vrei s-o re-rulezi pentru datele afectate. Dacă job-ul e idempotent, re-rularea pentru orice interval de date produce output corect. Dacă nu, trebuie întâi să ștergi ce e acolo, apoi să re-rulezi, sperând că ștergi ce trebuie.
  • Programări care se suprapun. Un job zilnic rulează la 02:00. Rularea de la 02:00 pentru ziua de 18 e întârziată și se termină la 02:50. Rularea de la 02:00 pentru ziua de 19 începe acum. Dacă citesc input-uri care se suprapun, ar fi bine să nu producă output incorect care se suprapune.
  • Replay-uri. Ceva downstream s-a corupt și trebuie să-l reconstruiești de la upstream. Re-rularea fiecărui job din graful de dependențe pentru fereastra afectată ar fi bine să producă același răspuns ca rulările originale.

Toate patru (retry la eșec, backfill, suprapunere, replay) sunt rutină în orice echipă de date. Toate patru funcționează curat dacă job-urile tale sunt idempotente și devin exerciții forensice dacă nu sunt.

Vestea bună: idempotența pentru batch nu e grea. E un set mic de tipare, aplicate consistent. Restul lecției sunt tiparele alea.

Cele trei moduri de scriere

Scrierea fiecărui job batch cade într-unul din trei moduri, iar modul determină povestea de idempotență.

Append. Job-ul scrie rânduri noi în tabelul existent. Re-rularea adaugă rândurile din nou. Implicit, asta nu e idempotent: o re-rulare produce duplicate. Append e ok când e împerecheat cu deduplicare downstream (consumer-ul următor colapsează duplicatele după cheie) sau când aplicația e ok cu un tabel în formă de log care are voie să conțină duplicate. Ca mod principal de scriere pentru tabelele de analiză, e cea mai proastă alegere.

Overwrite. Job-ul înlocuiește o partiție sau întreg tabelul atomic cu output-ul rulării. Re-rularea cu același input produce același output, iar înlocuirea atomică înseamnă că nu vezi niciodată o jumătate de stare. E calea cea mai simplă spre idempotență. Unitatea de overwrite e de obicei o partiție, nu tot tabelul: scrii event_date=2026-01-21 și înlocuiești ce era acolo.

Upsert / MERGE. Job-ul inserează rânduri noi și actualizează cele existente după o cheie. Re-rularea cu același input e idempotentă pentru că a doua rulare vede rândurile pe care le-a scris prima și se face merge cu ele, lăsând aceeași stare finală. E modul cel mai flexibil și cel mai scump.

Regula de bază: preferă overwrite pentru date în formă de partiție, preferă MERGE pentru date în formă de entitate, evită append-ul curat decât dacă ai un motiv specific.

flowchart TB
    subgraph A[Append + dedup]
      A1[Job writes new rows] --> A2[Downstream dedup]
      A2 --> A3[Idempotent end state]
    end
    subgraph O[Partition overwrite]
      O1[Job computes partition] --> O2[Atomic replace partition]
      O2 --> O3[Idempotent end state]
    end
    subgraph M[MERGE / upsert]
      M1[Job stages records] --> M2[MERGE INTO target ON key]
      M2 --> M3[Idempotent end state]
    end

Tiparul partition-overwrite

Ăsta e tiparul de tracțiune al batch-ului modern. Majoritatea tabelelor de analiză sunt partiționate după dată, iar majoritatea job-urilor batch rulează o dată pe zi pentru a produce output-ul unei zile. Forma e:

INSERT OVERWRITE TABLE silver.orders PARTITION (event_date = '2026-01-21')
SELECT ...
FROM bronze.orders
WHERE event_date = '2026-01-21'

Instrucțiunea aia e idempotentă. Rul-o o dată, de două ori, de zece ori pentru aceeași dată: partiția ajunge cu același conținut de fiecare dată. Formatul de tabel (Delta, Iceberg sau Hudi) face înlocuirea de partiție atomică, deci cititorii nu văd niciodată o partiție pe jumătate înlocuită.

Backfill-ul e trivial. Pentru a re-rula pentru săptămâna 14-21 ianuarie, parcurgi datele și rulezi job-ul pentru fiecare. Output-ul fiecărei rulări e mărginit la propria partiție, deci backfill-ul nu poate afecta accidental datele altor zile.

Disciplina pe care o cere tiparul ăsta e ca job-ul să fie o funcție de partiție: aceeași partiție de input produce aceeași partiție de output, fără dependențe de când rulează job-ul sau ce alte job-uri au rulat. Dacă job-ul silver pentru ziua 21 depinde de ce era în tabelul silver pentru ziua 20, proprietatea de funcție-de-partiție se rupe, iar re-rularea devine periculoasă.

De-asta job-urile batch aliniate la partiție din stack-urile moderne sunt aproape întotdeauna scrise ca citește partiția X, transformă, suprascrie partiția X. Simplitatea contractului e tot rostul.

Tiparul MERGE

Pentru tabele care nu sunt în formă de partiție, trucul partition-overwrite nu funcționează. Un tabel silver.customers are un rând per client, nu partiții de evenimente datate. Actualizarea înregistrării unui client înseamnă schimbarea unui rând pe loc, nu înlocuirea unei partiții.

Tiparul MERGE, suportat de Delta, Iceberg și Hudi (lecția 37), e răspunsul idempotent:

MERGE INTO silver.customers AS t
USING staging.customers_today AS s
ON t.customer_id = s.customer_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

MERGE e atomic la nivelul formatului de tabel: ori se aplică tot merge-ul, ori nimic din el. E idempotent pentru că a doua rulare vede rândurile pe care le-a scris prima, le potrivește și le actualizează la aceleași valori. Starea finală după rularea numărul doi e identică cu starea finală după rularea numărul unu.

Două note practice. Întâi, clauza ON trebuie să folosească o cheie reală. Dacă cheia nu e unică în sursă, MERGE va eșua sau va produce gunoi în funcție de engine; deduplichează sursa întâi. În al doilea rând, dacă faci merge doar pe înregistrările schimbate (nu pe snapshot-ul complet), output-ul e corect doar dacă feed-ul de înregistrări schimbate e el însuși complet. A pierde un update pe partea sursei înseamnă că ținta rămâne învechită până următoarea oară când înregistrarea apare în feed.

Tiparul watermark / high-water-mark

Atât partition-overwrite cât și MERGE au nevoie ca job-ul să știe „ce input procesez de data asta”. Pentru input partiționat după timp, răspunsul e „partiția pentru care rulez”. Pentru o sursă care crește continuu și unde vrei procesare incrementală, răspunsul e un watermark: timestamp-ul (sau numărul de secvență) al celei mai recente înregistrări procesate la ultima rulare.

Tiparul:

  1. Job-ul citește watermark-ul din state store-ul lui (deseori un tabel de metadate în lakehouse-ul însuși).
  2. Selectează înregistrările de input cu event_time > watermark.
  3. Le procesează și scrie output-ul (idempotent, prin overwrite sau MERGE).
  4. Avansează watermark-ul la event_time maxim pe care tocmai l-a procesat.

Combinat cu MERGE, devine rețeta canonică de procesare incrementală-și-idempotentă. Fiecare rulare reia de unde s-a oprit ultima. Eșecurile și re-rulările nu fac dublă procesare: watermark-ul e avansat doar după ce scrierea de output reușește, deci o rulare eșuată lasă watermark-ul neschimbat și încercarea următoare procesează același input din nou, idempotent.

Atenție la datele care sosesc târziu. Dacă înregistrările pot ajunge cu event_time mai vechi decât watermark-ul curent, un filtru strict „mai mare decât watermark” le va rata. Soluția e fie să întârzii watermark-ul în spatele ceasului real cu o fereastră de toleranță (procesează înregistrări mai vechi decât now() - 1h), fie să permiți job-ului să se uite înapoi cu un număr fix de partiții la fiecare rulare.

Tiparul append-cu-dedup

Uneori append-ul e impus. Cea mai ieftină scriere într-un tabel rezident pe object store, mai ales într-un handoff streaming-spre-batch, e un append. Dacă nu poți evita, tiparul de dedup păstrează idempotența cu prețul unei mici munci suplimentare downstream:

  1. Adaugă înregistrările (append), incluzând cheia naturală a sursei și un timestamp de ingestie.
  2. Consumer-ul downstream (sau un pas batch ulterior) grupează după cheie și păstrează rândul cu cel mai recent timestamp de ingestie.

Starea finală e idempotentă: re-rularea pasului de append adaugă duplicate, dar pasul de dedup le colapsează. Costul e că dedup-ul e muncă suplimentară la fiecare citire, sau infrastructură batch suplimentară pentru a materializa o vedere deduplicată.

Ăsta e tiparul pe care-l recomand cel mai puțin. Funcționează, și sunt situații în care e singura opțiune realistă, dar e operațional mai greu decât overwrite sau MERGE. Dacă ai de ales, alege unul dintre celelalte.

Capcane

Câteva capcane care fac job-urile care arată idempotente să nu fie pe tăcute idempotente.

Side-effect-uri care nu sunt idempotente. Un job batch care trimite un email la fiecare rulare, taxează un card de credit sau face POST la un webhook nu e idempotent: re-rularea retrimite, retaxează, repostează. Mută side-effect-ul în afara traseului batch sau învelește-l într-o verificare de idempotency-key (lecția 16) astfel încât sistemul receptor să absoarbă duplicatele.

Dependențe de aleator sau de ceasul real. Un job care folosește now() pentru a eticheta înregistrări sau random() pentru a sparge egalitățile va produce output diferit la fiecare rulare. Soluția e să fixezi timestamp-ul la timpul logic de rulare al programării (orchestratorul de obicei expune asta ca parametru, ex: execution_date sau logical_date) și să sămânci orice aleatorism dintr-o sursă deterministă. Disciplina e: fiecare valoare pe care job-ul o scrie ar trebui să fie o funcție de input, nu de când a rulat job-ul.

Resurse externe care se schimbă. Un job care lovește un API extern cu un query gen „dă-mi cele mai recente cursuri de schimb” nu e idempotent: re-rularea mâine returnează cursuri diferite. Dacă nu poți evita asta, fă un snapshot al datelor externe în bronze întâi, cu o etichetă de dată-logică, și pune restul job-ului să citească din snapshot. Bronze devine granița dintre lumea exterioară non-idempotentă și pipeline-ul batch idempotent.

Transformări dependente de ordine. O transformare care folosește un row-number sau rank fără un tie-breaker determinist poate produce output diferit pentru același input la rulări diferite (execuția paralelă a engine-ului poate ordona rândurile la egalitate diferit de fiecare dată). Adaugă o coloană explicită de tie-breaker. Dacă două comenzi au același timestamp, ordonează și după order_id, ca rank-ul să fie complet determinat.

Sărirea avansării watermark-ului la eșec. Dacă watermark-ul e avansat înainte ca scrierea de output să facă commit, un crash între cele două lasă watermark-ul mutat dincolo de înregistrări care n-au fost scrise niciodată. Ăsta e bug-ul canonic „date pierdute la retry”. Întotdeauna avansează watermark-ul în aceeași tranzacție cu output-ul, sau după ce output-ul e durabil.

Disciplina

Starea finală a tot ce e mai sus: fiecare job batch din platforma ta ar trebui să fie sigur de re-rulat pentru orice interval de input, de oricine, oricând, fără coordonare. Posesorul job-ului ar trebui să poată spune „da, doar rul-l pentru săptămâna trecută” fără să se gândească. Orchestratorul ar trebui să poată reîncerca fără a declanșa o alertă. Un bug, găsit în producție după trei luni, ar trebui să fie reparabil printr-o re-rulare.

Proprietatea aia se construiește un job pe rând, prin tiparele de mai sus și prin norme de echipă care spun „dacă nu e idempotent, nu e gata”. E fundația pe care lecția următoare (backfill-uri și replay-uri) și mare parte din restul modulului se sprijină. Fără ea, fiecare operație ulterioară e un exercițiu atent de a nu strica producția. Cu ea, platforma rămâne calmă.

Caută