Prefect Logo
AI

Prefect on the lakehouse: write-audit-publish pattern with transactions and Git-for-data

September 27, 2024
Chris White
CTO
Jacopo Tagliabue
Founder, Bauplan
Share

Garbage in, garbage out (a.k.a. good data practices start at the source!)

As the interest for AI applications - and the data pipelines that power them - keeps growing, it is risky to ignore two hard-fought lessons from the previous data / ML / MLOps cycle: first, the quality of your final AI feature depends first and foremost on the data you feed to it; second, the surface of data-first applications is, in a sense, unbounded - as you run pipelines every day on ever changing inputs, the situation is starkly different from traditional software development, where all possibilities are often entirely captured by a well-defined, often repetitive set of options.

Since data teams sit often downstream from the data generating process, with little control over it, the risk of “data cascades” causing compounding negative consequences is very real. Consider the following common ingestion scenario:

  • as input: an S3 bucket receiving parquet files from a data generating process at a given interval - e.g., events generated by user interactions with a consumer application;
  • as output: a high-quality dataset that can be queried by downstream processes, such as an AI training job.

Fig. 1: a typical ingestion flow, going from new files in a raw cloud bucket to some artifacts usable by downstream systems. What do we need to bridge the gap?

What is depicted above as one “box”  is required to solve two sets of related, but in the end “nested” tasks: the outer loop, so to speak, in which we need to reliably schedule, retry, manage the ingestion as we do with any other production task; the inner loop, in which we need to be able to perform fine-grained manipulation of cloud data assets - creation, quality checks, append etc.

Certainly, one would be excused for thinking that setting this up requires at least a dozen tools from the current data market: instead, we show how Prefect + Bauplan will do the job in ~150 lines of no-nonsense Python. Clone the repo and tag along!

Design principles

We start by expressing what an ideal system would look like and then walk backwards to actual choices. Importantly, in what follows everything we design can be run in a internet-free environment if we choose to do so, with no data ever leaving your AWS account: we assume this “without loss of generality”, as mathematicians would say, as it’s now clear that the lakehouse architecture can provide all the nice things warehouse folks had for a while without compromising on security or cost.

In the outer loop, we want:

  • Flexible triggering: new files may arrive every 5 minutes, but I may want to batch process historical data as well - the same code should work incrementally, event-based, and as a stateless, one-off job;
  • Automated clean-up: if things go wrong, we should be able to revert back automatically to the state of the system before a pipeline run starts.

The implementation sketch looks therefore like this: we leverage the new Prefect transactions to wrap our three main data tasks, here identified by write, audit and publish - if something goes wrong, a rollback hook will provide the clean-up we desire, otherwise a “commit” will finalize the ingestion / append into our final data asset; as for triggering flexibility, Prefect provides out of the box batch and event-based workflows, so we are covered!

We now have to unpack the lakehouse foundation, i.e. the inner loop in charge of data manipulations. What should we aspire to? At the very least, we want:

  • Production isolation: instead of naively moving new files from input to output, we sandbox the changes before making them visible to downstream consumers, in order to avoid dataset pollution in production (this is indeed the meaning of “write-audit-publish”!);
  • Replayability: downstream systems should be able to access the dataset with point-in-time queries, including versions with a schema that is different from the current one (e.g. a renamed column).

When we “peek inside the lakehouse box”, unsurprisingly, Prefect tasks map nicely into lakehouse operations - adding rows to the training dataset, checking the data integrity / quality of the new version of the dataset.

Perhaps more surprising is that we indeed see a “recursive transaction” down at the data level: Bauplan datasets are stored as Iceberg tables, so modifications can happen on a zero-copy clone of the production tables - simply put, Bauplan allows us to create a new branch of our data lake and perform ingestion and data quality checks completely sandboxed from production. Only when “audit” succeeds, we “merge” back into the production branch, and all the downstream changes (i.e. the new rows appended) become visible to downstream consumers; if “audit” fails, we raise the exception to the outer loop, which will trigger the clean-up via rollback.

Notably, Bauplan is a fully programmable lakehouse as all these functionalities are available through a simple import bauplan and accessing standard Python methods: even if you never saw a lakehouse or Bauplan before, you can just read through the Python code and follow along - no knowledge of Iceberg, no JVM driver, no SQL is needed: all data-related optimizations are left to the platform (including, but not limited to, creating a data branch without any actual file being copied!). As a huge bonus, Bauplan tables enjoy not just branching, but point-in-time querying, as every operation on the lakehouse is tracked like a Git commit users can revert to: we refer to our SIGMOD24 paper for a deeper dive on immutable lakehouse jobs and instant debuggability.

Running the code

Once you have prerequisites out of the way (TL;DR: a virtualenv with Prefect version >= 3.0 and a Bauplan API key), you can run the full WAP script like any other Prefect flow: cd into src and run python wap_flow.py: if you want to quickly get a feeling for the developer experience, this is a live demo running the script from our laptop: https://www.loom.com/share/0387703f204e4b3680b1cb14302a04da?sid=536f3a9f-c590-4548-a3c2-b5861b8c17c0 .

In particular, the core logic is entirely contained in this simple sequence of functions, wrapped in a transaction: Prefect will try and execute the three WAP tasks one after the other, and then run either the on_commit hook (in case of success), or the on_rollback hook (in case of failure):

1with transaction():
2
3source_to_iceberg_table(
4
5bauplan_client,
6
7table_name,
8
9source_s3_pattern,
10
11bauplan_ingestion_branch
12
13)
14
15run_quality_checks(
16
17bauplan_client,
18
19bauplan_ingestion_branch,
20
21table_name=table_name
22
23)
24
25merge_branch(
26
27bauplan_client,
28
29bauplan_ingestion_branch
30
31)

The rest of the script is simply wrapping Bauplan SDK calls to the lakehouse (ingestion, merging, deleting etc.) in Prefect tasks, so it should be pretty straightforward. As promised, Prefect and Bauplan combined solve in 150 lines of pure Python one of the most important data engineering patterns for a data lakehouse!

See you, space cowboys

While we show above that nested transaction concepts are sufficient for WAP at scale in a lakehouse, we have not discussed if they are necessary: we do believe they are. In fact, if we only had inner, data-level transactions, users would be required to roll out their custom recovery workflow, which can quickly become cumbersome and complex; on the other hand, if we only had outer, task-level transactions, we would not be able to surgically manipulate our data assets and easily map computational tasks with (versioned) changes in the underlying lakehouse.

Of course, we barely scratched the surface of lakehouse and orchestration: for example, while black-box scheduling entire notebooks have been the only solution for many data science teams (due to the complexity of integration, containerization, cluster management etc.), the Bauplan programmable lakehouse in which “everything is just a Python function”, and Prefect flows in which - you guessed it -  “everything is just a Python function”, may provide a much more natural and cost effective way to run data workloads in production.

If you want to try out Prefect, come join us on Github.

If you want to try Bauplan, ask to join the private trial here.