Arhitectura datelor și a sistemelor, de la zero Lecția 51 / 80

CI pentru data pipelines: testare fără să arzi un cluster

Unit testing pe transformări, integration tests pe sample data, loop-ul de dezvoltare local-first. De ce CI pentru date e diferit de CI pentru servicii web.

Integrarea continuă pentru un serviciu web e un drum bătătorit. La fiecare push, suita de teste rulează contra codului nou, suita termină în câteva minute, iar lumina verde îți spune că schimbarea e sigură de făcut merge. Forma piramidei de teste e în mare parte stabilită: multe unit tests, mai puține integration tests, un strat subțire de teste end-to-end, iar totul încape confortabil într-un CI runner cu câțiva gigabytes de RAM.

CI pentru data pipelines nu încape confortabil nicăieri. Lucrul pe care vrei să-l testezi rulează peste terabytes. Cluster-ul care îl rulează costă bani reali pe minut. Bug-ul pe care încerci să-l prinzi apare adesea doar când input-ul are distribuția dezordonată a datelor reale de producție, pe care nu o poți copia într-un CI runner. Rulezi pipeline-ul real la fiecare pull request și factura cloud devine propria problemă structurală; nu o rulezi deloc și livrezi bug-uri în producție.

Compromisul pe care s-a oprit industria e stratificat, aceeași idee ca piramida de teste pentru software, dar cu grosimi de straturi diferite și tooling diferit. Lecția aceasta parcurge straturile, uneltele care se potrivesc fiecăruia și pattern-urile care fac CI de pipeline atât rapid, cât și de încredere.

De ce CI pentru date e diferit

Trei proprietăți ale muncii cu date trag CI în direcții pe care CI pentru servicii web nu trebuie să le gestioneze.

Pipelines sunt cu stare prin definiție. Output-ul unei rulări e input-ul următoarei. Un unit test pe o singură transformare e informativ, dar nu e suficient; vrei să știi și că transformarea se compune corect cu tot ce e în jurul ei. Testele de servicii web rareori trebuie să se gândească la continuitatea stării, pentru că fiecare request e gândit să fie independent.

Raza de explozie a unui bug e mai mare și mai tăcută. Un bug într-un serviciu produce erori 5xx pe care monitoring-ul le prinde în minute. Un bug într-un pipeline scrie rânduri greșite într-un tabel pe care joburi downstream și dashboards îl folosesc cu încredere. Până când observă cineva că numărul de utilizatori e greșit cu un factor de doi, datele greșite sunt deja într-o duzină de tabele derivate. Argumentul pentru a prinde bug-uri de pipeline înainte de merge e mai puternic, nu mai slab, decât pentru serviciile web.

Datele de test reale sunt grele. Nu poți expedia date de producție în CI fără să dai peste probleme de confidențialitate și reglementare. Nu poți genera date sintetice care să se potrivească distribuției lucrului real fără efort. Default-ul „testează contra unui sample mic conservat” e corect, dar trebuie să fii cinstit că sample-ul ratează categorii întregi de bug-uri pe care doar distribuția reală le expune.

Aceste trei proprietăți modelează fiecare alegere de mai jos.

Piramida CI pentru pipelines

Piramida de teste pentru servicii web are trei straturi: unit tests la bază, integration tests în mijloc, end-to-end tests în vârf. Aceeași formă funcționează pentru pipelines, cu conținutul straturilor redefinit.

Unit tests sunt stratul de jos. Rulează pe date mici în memorie, în milisecunde, în limbajul în care e scris pipeline-ul. Transformările pure-function sunt unit-testable: dai un fixture, asertezi pe output. O funcție care primește un DataFrame de rânduri de cursă și returnează un DataFrame de rânduri de cursă curățate poate fi testată cu un input de cinci rânduri. Cod PySpark, dbt macros, Flink user-defined functions, transformări Pandas: toate au nuclee unit-testable dacă codul e structurat în jurul funcțiilor pure în loc de un singur script de job de 800 de linii.

Disciplina care face unit tests posibile e separarea logicii de business de I/O. Un job care citește din S3, transformă, scrie într-un warehouse și orchestrează retries e greu de unit-testat. Același job, refactorat astfel încât transformarea să fie o funcție care primește un DataFrame și returnează un DataFrame iar I/O-ul să fie într-un wrapper subțire, e ușor de unit-testat. Majoritatea plângerilor de tipul „codul de date e greu de testat” se reduc la „acest job are transformările împletite cu I/O-ul”.

Integration tests sunt stratul de mijloc. Rulează pipeline-ul end-to-end pe un dataset sample, câteva mii de rânduri, în CI. Ideea e să prinzi bug-uri pe care unit tests le ratează: nepotriviri de schemă între etape, coloane scăpate accidental, chei de join greșite, contracte rupte între transformări. Sample-ul e suficient de mic ca să încapă într-un CI runner, suficient de mare ca să exerseze fiecare cod path.

Sample fixtures trăiesc în repo. Input-uri canonice în fișiere CSV sau Parquet, commit-uite, tratate ca parte din suita de teste. Când pipeline-ul își schimbă forma, fixtures sunt actualizate în același pull request ca și codul care are nevoie de ele. Reviewerii le văd pe ambele odată.

End-to-end tests sunt stratul de sus. Rulează pe un cluster de staging contra unor date realiste-dar-anonimizate. Sunt lente și scumpe, deci rulează nocturn sau săptămânal, nu la fiecare pull request. Prind lucrurile pe care sample-ul mic nu le poate prinde: bug-uri de data-skew, regresii de performanță pe volumul real, interacțiuni cu orchestratorul, comportament de retry sub eșec parțial.

Forma de piramidă contează. Cele mai multe echipe care fac CI pentru date prost o fac inversând piramida: multe rulări end-to-end scumpe, puține unit tests, niciun strat de integration între ele. Factura de cloud crește și loop-ul de feedback încetinește, iar bug-urile pe care echipa le are de fapt sunt bug-uri unit-test-shaped pe care testele lente end-to-end le prind doar din întâmplare.

dbt: teste declarative ca primă oprire

Pentru pipelines în formă de SQL, convențiile de proiect dbt sunt primul loc unde să te uiți. dbt vine cu un set de teste declarative pe care le atașezi coloanelor sau modelelor în YAML.

Cele patru teste built-in acoperă majoritatea bug-urilor pe care le vei întâlni:

  • not_null: această coloană n-ar trebui să aibă vreodată valori null.
  • unique: această coloană n-ar trebui să aibă duplicate.
  • accepted_values: această coloană ar trebui să conțină doar valori dintr-un set cunoscut.
  • relationships: această cheie străină ar trebui să existe în tabelul referențiat.

Custom tests sunt query-uri SQL arbitrare care ar trebui să returneze zero rânduri când datele sunt sănătoase. „Niciun rând n-ar trebui să aibă o sumă negativă.” „Niciun client n-ar trebui să aibă mai multe abonamente deschise decât e permis.” „Totalurile de ieri ar trebui să fie în limita a cinci procente față de media pe șapte zile.” Fiecare e un query al cărui rezultat nenul e un test eșuat.

În CI, dbt tests rulează contra unui warehouse de dev mai mic: un dataset BigQuery separat, o schemă Snowflake separată, o bază de date Postgres separată. Pull request-ul declanșează un job care rulează dbt build contra acestei ținte izolate, rulează dbt test ca să verifice aserțiunile și raportează înapoi. Dacă ceva eșuează, pull request-ul devine roșu.

Great Expectations e un văr mai flexibil pentru setup-uri non-dbt. Modulul 8 acoperă data quality în mai mult detaliu; pentru moment, tratează dbt tests și Great Expectations ca stratul declarativ, iar pytest ca stratul imperativ pentru logica în formă de cod care nu se potrivește curat într-o aserțiune SQL.

pytest pentru partea de Python

Pentru pipelines bazate pe Python (PySpark, Pandas, biblioteci custom de transformare), pytest e standardul. Pattern-urile sunt aceleași ca pentru orice codebase Python, cu două extra-uri pe care merită să le menționăm.

Datele de fixture trăiesc în repo. Un director tests/fixtures/ cu fișiere CSV sau Parquet mici. O fixture pytest le încarcă într-un DataFrame la începutul fiecărui test. Fixtures sunt parte din code review: dacă schimbi schema input-ului, schimbi fixture-ul, iar reviewerii le văd pe amândouă.

Pentru PySpark, un SparkSession configurat pentru local mode în partea de sus a modulului de test îți dă un Spark real în CI. E mai lent decât Pandas pur, dar exerseză cod path-ul real. Pattern-ul pe care îl adoptă majoritatea echipelor e: testează logica de transformare cu fixtures echivalente pentru Pandas pentru viteză, apoi ai o suită mai mică de integrare care confirmă că aceeași logică funcționează pe Spark cu o sesiune mică.

Pattern-ul fresh-warehouse

Stratul de integrare are adesea nevoie de mai mult decât doar o schemă de dev. Versiunea curată e: fiecare pull request pornește un warehouse temporar, complet izolat, rulează pipeline-ul contra lui, validează output-ul și îl dă jos.

Pentru BigQuery, asta înseamnă un dataset proaspăt numit după pull request, șters la merge sau la close. Pentru Snowflake, o schemă proaspătă clonată dintr-o schemă mică de referință. Pentru Postgres, o bază de date proaspătă într-o instanță Postgres de CI.

Beneficiul e izolarea: două pull requests nu pot interfera unul cu altul, iar mediul de test e curat de fiecare dată. Costul e orchestrarea: setup-ul și teardown-ul trebuie să fie de încredere, iar curățarea trebuie să gestioneze pull requests care sunt abandonate fără merge.

Pattern-ul își câștigă pâinea în echipele unde mai multe schimbări de pipeline sunt simultan în zbor. Pentru o echipă mică cu unul sau doi ingineri, un singur warehouse de dev cu convenții despre cine îl folosește merge fără probleme.

flowchart TB
    PR[Pull request opened] --> SETUP[Spin up dev schema]
    SETUP --> BUILD[dbt build on sample data]
    BUILD --> TEST[dbt test plus pytest]
    TEST --> REPORT[Report status to PR]
    REPORT --> TEARDOWN[Teardown dev schema]
    TEARDOWN --> APPROVE[Approve and merge]

Când „testează în prod” e răspunsul corect

Există un curent de sfat în comunitatea de data engineering că „nu poți testa cu adevărat un pipeline până nu rulează pe date reale”. E prea puternic ca afirmație, dar arată ceva adevărat. Datele reale au bug-urile pe care chiar vrei să le prinzi: distribuții asimetrice, rânduri sosite târziu, valori malformate de la surse upstream, coada lungă de cazuri marginale pe care datele sintetice nu le reproduc niciodată.

Disciplina care face asta sigur nu e „sari peste CI”. E „deploy către un strat paralel, validează, apoi cut-over”. Lecția 36 a introdus arhitectura medallion; stratul silver e un loc natural pentru a ateriza output-ul unei versiuni noi fără ca consumatorii s-o vadă. O versiune nouă de pipeline scrie într-un tabel lateral o săptămână; echipa compară output-ul tabelului lateral cu output-ul de producție; odată ce diferențele sunt mici și explicabile, pointer-ul de consumator se schimbă.

Acesta e dark-launch pattern-ul din lecția 52, aplicat la nivelul de testare în loc de nivelul de deployment. CI îți dă încredere că codul e structural corect. Dark launch-ul îți dă încredere că e corect pe distribuția reală. Sunt straturi, nu alternative.

Ce înseamnă asta în practică

Setup-ul realist de CI pentru o echipă de date începe mic și crește.

Ziua unu e o suită pytest pentru funcțiile de transformare și un pas dbt test pentru modelele SQL. Asta singură prinde majoritatea bug-urilor și rulează în câteva minute.

Ziua treizeci adaugă stratul de integrare: un input canonic mic, rulat end-to-end prin pipeline în CI, output validat. Fixtures trăiesc în repo, pull request-ul vede aceeași formă de date pe care o vede pipeline-ul de producție.

Ziua nouăzeci adaugă pattern-ul fresh-warehouse dacă mai multe schimbări sunt simultan în zbor sau rămâne cu un singur warehouse de dev dacă nu. Rularea nocturnă end-to-end începe să existe ca pipeline separat în orchestrator, contra unor date realiste anonimizate.

Ce nu face echipa, niciodată, e să ruleze pipeline-ul de producție complet pe datele de producție complete la fiecare pull request. Acela e drumul de pe care factura de cloud nu se mai recuperează.

CI e upstream de CD. Lecția 52 a acoperit pattern-urile de deployment odată ce schimbarea e făcută merge. Cele două lucrează împreună: CI prinde bug-urile, CD limitează raza de explozie a celor care scapă.

Citări

  • Documentația dbt, „Tests” (https://docs.getdbt.com/docs/build/tests, consultat 2026-05-01).
  • Documentația Great Expectations (https://docs.greatexpectations.io/, consultat 2026-05-01).
  • Documentația pytest (https://docs.pytest.org/, consultat 2026-05-01).
  • „Continuous integration for data” pe blog-ul dbt (https://www.getdbt.com/blog/, consultat 2026-05-01).
Caută