Prefect Product

Prefect 3.0: Now Generally Available

August 27, 2024
Chris White
CTO
Share

Update: This post has been updated as of September 3rd, 2024.

We’re excited to announce the official release of Prefect 3.0, happening today September 3rd.

This release is the product of six months of careful engineering and design, refined by months of community feedback from over a dozen release candidates.

The result? Prefect 3.0 is 10x faster than its predecessor, fully event-driven, and ships with a robust transactions API. This release is backwards compatible with Prefect Cloud, and requires few changes - if any - to upgrade. Don’t take our word for it - nearly 10% of Prefect developers have already chosen to adopt Prefect 3.0 before its official release.

In this article, we’ll review what’s new in Prefect 3.0, what’s changed and why, and where we’re headed. We invite you to explore the new improvements and give us your feedback on GitHub.

Key Features

Events & Automations

Thousands of Prefect Cloud users have already built event-driven workflows in production, and we’re excited to bring this functionality to Prefect Open-Source. Prefect 3.0 supports event-driven workflows and corresponding automations, so your data can tell your workflows when it’s ready to be processed. You can configure workflows to listen to events from anywhere - inside or outside Prefect - to start workflows, instead of asking your data every 5 minutes “are we there yet?”.

For example, say you have a critical workflow that runs on a schedule. This workflow rarely fails, but when it does, it requires some manual intervention before the next run. Ideally, the workflow would pause after a failure to give an engineer time to fix any issues. We can set up this automation via Prefect’s UI or Python SDK. Here’s an example of how to create this automation via the SDK:

1from prefect.automations import Automation
2from prefect.events.actions import PauseDeployment
3from prefect.events.schemas.automations import EventTrigger
4
5my_automation = Automation(
6    name="Pause critical workflow",
7    trigger=EventTrigger(
8        expect={"prefect.flow-run.Failed"},
9        match={"prefect.resource.id": ["prefect.flow-run.*"]},
10        match_related={
11            "prefect.resource.role": "flow",
12            "prefect.resource.name": [
13                "critical_workflow",
14            ],
15        },
16    ),
17    actions=[PauseDeployment(source="inferred")],
18)
19
20my_automation.create()

This automation will pause the deployment for the critical flow, which will give the supporting team time to troubleshoot and resolve the issue!

You can also use Prefect’s events and automations system to gather and react to events outside of Prefect: let’s say that you have a web API that gathers data and you want to run a workflow if you see a certain number of events in a given timeframe. To gather those events, you can use Prefect’s emit_event utility to send those events to your Prefect server:

1from fastapi import FastAPI
2from prefect import emit_event
3
4@app.post("/gimme_data/")
5async def get_that_data(data):
6    ... # Do what you need to do
7    emit_event(event=f"{data.name}.important.received", 
8               resource={"prefect.resource.id": data.id})

You’ll be able to observe these events in real time via the Prefect UI and create automations that react to those events.

Transactions & Caching

Perhaps the most novel component of the 3.0 is release is Prefect’s introduction of transactional semantics. Abstractly, this gives users stronger guarantees and control over their task execution and pipeline idempotency. Every task in Prefect 3.0 is executed within a transaction that governs when and where the task’s result record is persisted. This record then represents the source of truth for that task’s completion. If the task runs again under an identical context, it will not re-execute but instead load its previous result. Additionally these task transactions can be grouped and nested, allowing for granular control over which tasks execute (and retry) together. If another task or operation within the same transaction fails, all transactions are rolled back and the corresponding results are not persisted.

1import os
2
3from prefect import task, flow
4from prefect.transactions import transaction, get_transaction
5
6
7@task
8def write_file(fpath: str, contents: str):
9    "Writes to a file."
10    with open(fpath, "w") as f:
11        f.write(contents)
12        
13    # persist filename for hooks to access    
14    get_transaction().set("fpath", "side-effect.txt")
15
16
17@write_file.on_rollback
18def del_file(transaction):
19    "Deletes file."
20    fpath = transaction.get("fpath")
21    os.unlink(fpath)
22
23
24@task
25def quality_test(fpath: str):
26    "Checks contents of file."
27    with open(fpath, "r") as f:
28        data = f.readlines()
29
30    if len(data) < 2:
31        raise ValueError("Not enough data!")
32
33
34@flow
35def pipeline(contents: str, fpath: str = "side-effect.txt"):
36    with transaction():
37        write_file(fpath, contents)
38        quality_test(fpath)

For users who don’t care about transactions per se, this feature still has concrete benefits. In particular, Prefect 3.0’s task caching layer is built entirely on top of transactions. This all provides:

  • an interface for on_commit and on_rollback hooks for individual tasks that allow users to undo task side effects and react to failures from other tasks
  • improved caching semantics in the form of composable cache policies
  • a far more transparent implementation of task caching that is based entirely on the presence or absence of a cached result: this makes Prefect’s caching implementation analogous to tools like Snakemake and Luigi

For more information check out the relevant 3.0 documentation.

Performance

Many users of Prefect go far beyond ETL pipelines and data warehousing - from machine learning flows, custom internal tooling pipelines, web scraping flows, and even backing user-facing applications. Accordingly, Prefect must support a diverse assortment of workflow topologies: from single task workflows, to complex workflows of tens of thousands of dynamically generated tasks.

The transactional layer sketched above allows Prefect 3.0 to avoid many of the round-trip API calls necessary in earlier versions of Prefect and instead rely on the information content of task result records to determine whether a task or group of tasks should run.

Another significant change is that all code now runs on the main thread by default. This change improves performance and leads to more intuitive behavior, especially when dealing with shared resources or non-thread-safe operations (for example, when passing a client or database connection to a task).

To demonstrate this improvement, we benchmarked the same flow against Prefect Cloud’s API across all three major Prefect SDK versions. Each flow iterates over a list of 250 integers in sequence and executes a basic task that increments each integer by one.

ControlFlow

ControlFlow is Prefect’s agentic AI orchestration framework that is uniquely enabled by Prefect 3.0’s new capabilities. At its core, ControlFlow allows developers to define high-level objectives as a sequence of tasks, then delegates the execution details to AI agents. These agents, powered by large language models, determine in real-time which functions to call, what data to process, and how to structure the workflow. This approach enables highly adaptive workflows capable of tackling complex, open-ended tasks, but it also introduces significant orchestration challenges. Chief among these is the need to track and manage the agents' activities: their messages, token usage, tool calls, delegations to other agents, and any nested workflows they might create.


Prefect 3.0's dynamic orchestration engine rises to meet these challenges, capturing and orchestrating task executions as they unfold, regardless of their origin, timing, or nesting level. This allows ControlFlow to seamlessly integrate various elements—traditional code, LLM-generated function calls, and direct LLM interactions—into a single, dynamically discovered workflow graph. As AI agents adapt to changing conditions and make complex decisions, Prefect's orchestration features maintain comprehensive visibility and control over the entire process. Developers can monitor the flow of execution, manage retries and error handling for dynamically generated task sequences, and control concurrency in these evolving pipelines. By providing this level of insight and control, Prefect 3.0 enables ControlFlow to push the boundaries of AI orchestration, facilitating the development of intelligent workflows that are not just powerful and adaptive, but also observable and manageable in production environments.

Upgrade Considerations

As much as we’d love for everyone to upgrade overnight, it’s important to be intentional with major version upgrades, so we understand that this will take some time. To help you scope the upgrade path, we’ve collected a few items that we expect most people will need to consider when upgrading. We have worked with a number of teams in running proof of concept upgrades, and, assuming the use of workers already, the time commitment for code changes is typically measured in hours. On the back of any code changes, we recommend running a few test cases before fully switching over. If in the process you uncover anything unexpected or confusing, feel free to ask us a question by opening a discussion or filing a bug report!

Agents and Workers

Prefect 3.0 officially puts the nail in the coffin on Prefect Agents. If you were using agents previously, upgrading will require you to migrate your work to the worker paradigm. Workers provide greater control and governance over infrastructure. For information on how to upgrade, please review the 3.0 documentation.

Asynchronous code

Prefect 3.0 simplifies our support for asynchronous and synchronous code. In Prefect 2.0, we maintained thousands of lines of complex logic to interleave our async engine with your sync workflows. This not only proved difficult to maintain but also created inscrutable edge cases, so Prefect 3.0 now exposes separate synchronous and asynchronous engines. This simplification does mean that certain patterns are no longer supported. Specifically, the ability to execute an asynchronous task within a synchronous caller is not valid in 3.0. These changes were critical in unlocking the performance enhancements referenced above, and have also allowed Prefect to ensure that all user code runs on the main thread by default.

If you find a situation in which a Prefect awaitable is incorrectly run synchronously, or if you find an interface that you thought was synchronous now expects you to await it, Prefect provides a special _sync keyword to enforce the behavior you expect; setting _sync=True will guarantee the function is always executed synchronously and conversely, setting _sync=False will guarantee the function returns an awaitable. A common interface for which this applies is Block.load(**kwargs):

1my_block = Block.load(**kwargs, _sync=True)
2my_block = await Block.load(**kwargs, _sync=False)

We plan to lean into the explicit approach more and will begin issuing warnings in certain scenarios with suggestions for how to harden your code. For more information on our forward looking plan, see this GitHub issue.

Other notable changes

There are a few other notable changes worth keeping in mind as you upgrade:

  • Autonomous tasks: Prefect 3.0 has removed the restriction that tasks always be orchestrated and executed within the context of a flow, enabling users to rely on Prefect as a background task scheduler. For examples of use and setup, check out this repository.
  • Prefect 3.0 requires pydantic>=2.0: this change is most important for users who rely on advanced models for their workflow parameters as those models will need to be updated
  • Prefect future resolution: in service of performance, Prefect futures no longer auto-resolve, which means that anytime you use a task runner and the task.submit interface, you will need to explicitly wait for that futures resolution using either the future.wait method, the top-level wait utility, or by returning those futures from your flow function. Please note that Prefect will still auto-resolve dependencies between futures, so you need only wait terminal futures
  • Prefect server compatibility: while Prefect Cloud is forward and backwards compatible with both 2.x and 3.x, self-hosted OSS Prefect servers only support clients on the same major version. This means that you’ll need to upgrade both your server and client simultaneously in order to benefit from 3.0 enhancements

You can find more information that is continually updated in the 3.0 documentation.

What’s next

The release of Prefect 3.0 marks the beginning of a recommitment to our open source user base. We are committing to more public OSS roadmaps (currently represented by GitHub Milestones), more public discussions of feature design, and a holistic re-investment in making the open source product the best open source orchestrator around.

In the coming months, you can expect our focus to gravitate to a few general areas:

  • Scale: as seen above, Prefect is already a natural choice for scalable and performant orchestration; we want to invest further in this aspect of Prefect and make sure that not only is Prefect robust to performance edge cases, but also that achieving scale is both easy to setup and easy to maintain.
  • Cloud provider setup: we know that self-hosting Prefect is not always easy, and our documentation has been lacking on this front. In addition to improving our documentation on self-hosting, we plan to add various utilities for provisioning infrastructure on popular third-party Cloud providers (e.g., AWS, GCP) that makes setup a breeze.
  • Third-party integrations: we know how important it is to many of our users to have opinionated off-the-shelf integrations with popular data tooling. Given the extensible nature of 3.0, we plan to make sure our integrations have a more seamless developer experience so that you don’t need to maintain a digital twin of all your tooling within Prefect. In addition, we plan to bring improved idempotency through transactions to our integration collections.

You can expect more communication around our plans both in GitHub as well as in follow-up blog posts.

Conclusion

I've never been a fan of writing conclusions. They always feel a bit redundant, like I'm just repeating myself, but with more flourish. So, in the spirit of Prefect 3.0's embrace of AI-driven workflows, I've decided to delegate this task to an LLM and you are now reading what it came up with.

Prefect 3.0 represents a significant step forward in our journey to make orchestration more resilient, efficient, and adaptable. Whether you're excited about the new transactional semantics interface, eager to play with event-driven workflows, or just looking forward to that sweet 10x performance boost, there's something here for everyone.

We're thrilled to share this release with our open-source community, and we can't wait to see what you'll build with it. Don't forget to check out our comprehensive documentation for all the nitty-gritty details. In any case, whether you're a human reading this or another AI analyzing text, thanks for your attention!