

There’s a growing demand for data pipelines to respond immediately to external triggers—files arriving in cloud storage, a table getting updated, or a webhook call from a third-party service. Prefect supports these scenarios by providing event-driven scheduling capabilities that let you trigger a flow the instant new data is ready.
Below, we’ll walk through a simple example of detecting a file in cloud storage, see how to configure Prefect to execute a flow on that event, and discuss why capturing and storing those events is helpful.
In many workflows, you might rely on time-based schedules (like CRON) or an ongoing “sensor” that checks for conditions. For instance:
This polling approach works, but it runs constantly and may obscure the actual cause of the run. The pipeline technically starts at a set interval, then waits until the file arrives.
Prefect includes a concept of events, which are structured notifications about something that happened at a specific time. Instead of passively waiting, you can arrange for an event to be sent to Prefect whenever a file is created in an S3 bucket or GCS bucket—or whenever a specific change occurs in a database. Once that event arrives, Prefect can trigger the relevant flow immediately.
Every event that Prefect receives or emits is stored. This means you can view them in the UI for debugging, auditing, and understanding exactly what triggered each flow run. Having a native concept of events is crucial for real event-driven scheduling—no hidden triggers, no guesswork.
Let’s imagine a scenario: You want a flow to run as soon as a client uploads a new CSV file into a cloud bucket. After the file arrives, you’ll load it into a warehouse (BigQuery, Snowflake, etc.) and optionally delete it afterward.
With Prefect, you can set up an incoming webhook or direct API call whenever the file lands in storage. For instance, you might integrate a cloud storage notification system to send a POST request to Prefect. Here’s a simplified Python snippet that demonstrates emitting an event when a file arrives (though in production you’d likely rely on the bucket’s built-in eventing system):
1
2from prefect.events import emit_event
3from prefect import flow, task
4
5@task
6def load_file_to_warehouse(file_name: str):
7    # ... Logic to load the file into your data warehouse ...
8    return f"Loaded {file_name} into warehouse"
9
10@task
11def cleanup(file_name: str):
12    # ... Logic to remove file after loading ...
13    return f"Cleaned up {file_name}"
14
15@flow
16def process_new_file(file_name: str):
17    loaded = load_file_to_warehouse(file_name)
18    cleanup(file_name)
19    # Emit a custom event noting successful processing
20    emit_event(
21        event="custom.file_processed",
22        resource={"prefect.resource.id": f"file.{file_name}"}
23    )
24
25Client uploads a file to a cloud bucket.
Because every event is stored, you can then open Prefect’s UI to see the event timestamp, which file triggered the run, and the tasks executed as a result.
Event storage makes it easy to answer questions like:
And let’s be honest: “Having a native concept of events is crucial for REAL event-driven scheduling,” right?
Having an explicit list of events means no guesswork—every external trigger is recorded. This is especially useful if you need to troubleshoot or audit your processes.
Event-driven scheduling is essential for modern data pipelines where new data can arrive at any time. Prefect integrates event handling into its core, so you can trigger flows as soon as external conditions change and keep a detailed record of those events. By letting you see exactly why and when each flow ran, Prefect’s event storage provides transparency and helps ensure that your pipelines run reliably whenever the data is ready.
Want to see Prefect’s event-driven system in action? Book a demo with our Python engineers.








