Building resilient data pipelines in minutes
Simplifying data pipeline development
The hardest part about writing a blog is getting started - writing the outline and filling out the first few key points. The same can be said for writing data pipelines: you need to inspect docs, determine data structures, write tests, etc.
What if you could build a resilient, production-ready data pipeline that is scheduled and running in just a few minutes? We’ll show you how to do just that with dlt and prefect.
dlt
dlt is an open-source library that you can add to your Python scripts to load data from various and often messy data sources into well-structured, live datasets. It abstracts away the need to hunt through docs, interpret APIs, and reinvent the wheel every time. Instead of writing a custom pipeline, you can use dlt to build a framework for your pipelines for any combination of tools.
Moving Slack data into BigQuery
We use BigQuery as our data warehouse, and try to centralize as much information there as possible. Given our Slack community is over 25,000 people, it makes sense to use that information to better our community. We can identify the types of questions our users struggle with the most, and take action to improve Prefect by using Slack data.
If you Google “load slack into bigquery”, you’ll see a bunch of listings for no-code tools like Zapier that can help you move data….for a fee, of course. What if you want to do this yourself? Slack has an API, but check it out. It would take some effort to interpret even a simple response like this one for users:
1{
2 "ok": true,
3 "members": [
4 {
5 "id": "W012A3CDE",
6 "team_id": "T012AB3C4",
7 "name": "spengler",
8 "deleted": false,
9 "color": "9f69e7",
10 "real_name": "spengler",
11 "tz": "America/Los_Angeles",
12 "tz_label": "Pacific Daylight Time",
13 "tz_offset": -25200,
14 "profile": {
15 "avatar_hash": "ge3b51ca72de",
16 "status_text": "Print is dead",
17 "status_emoji": ":books:",
18 "real_name": "Egon Spengler",
19 "display_name": "spengler",
20 "real_name_normalized": "Egon Spengler",
21 "display_name_normalized": "spengler",
22 "email": "spengler@ghostbusters.example.com",
23 "image_24": "https://.../avatar/e3b51ca72dee4ef87916ae2b9240df50.jpg",
24 "image_32": "https://.../avatar/e3b51ca72dee4ef87916ae2b9240df50.jpg",
25 "image_48": "https://.../avatar/e3b51ca72dee4ef87916ae2b9240df50.jpg",
26 "image_72": "https://.../avatar/e3b51ca72dee4ef87916ae2b9240df50.jpg",
27 "image_192": "https://.../avatar/e3b51ca72dee4ef87916ae2b9240df50.jpg",
28 "image_512": "https://.../avatar/e3b51ca72dee4ef87916ae2b9240df50.jpg",
29 "team": "T012AB3C4"
30 },
31 "is_admin": true,
32 "is_owner": false,
33 "is_primary_owner": false,
34 "is_restricted": false,
35 "is_ultra_restricted": false,
36 "is_bot": false,
37 "updated": 1502138686,
38 "is_app_user": false,
39 "has_2fa": false
40}
With dlt
You can use dlt to build a Slack to BigQuery pipeline in just a few seconds with a single command. Seriously, it is that simple. In preparation, let’s make sure to install what we need:
1pip install dlt
2pip install prefect
Then just run a simple init command:
1dlt init slack bigquery
In the .dlt/secrets.toml file, enter your Slack and BigQuery credentials:
1[sources.slack]
2access_token="*****"
3
4[destinations.bigquery]
5location = "US"
6
7[destination.bigquery.credentials]
8project_id = "*****"
9private_key = "*****"
10client_email = "*****"
With a single command + adding some credentials, we now have the framework of a pipeline! Look at what has been generated, with a couple of small customizations. Note that we are redacting some of the code in the preview for brevity, to follow along completely navigate to the repo.
1"""Pipeline to load slack into bigquery."""
2
3from typing import List
4
5import dlt
6import pendulum
7from pendulum import datetime
8
9from slack import slack_source
10
11def load_channels() -> None:
12 """Execute a pipeline that will load a list of all the Slack channels in the workspace to BigQuery"""
13 ...
14
15def get_resources() -> List[str]:
16 """Fetch a list of available dlt resources so we can fetch them one at a time"""
17 ...
18
19def load_channel_history(channel: str, start_date: datetime) -> None:
20 """Execute a pipeline that will load the given Slack channel
21 incrementally beginning at the given start date."""
22 ...
23
24def get_users() -> None:
25 """Execute a pipeline that will load Slack users list."""
26 ...
27
28
29if __name__ == "__main__":
30 channels = None
31 start_date = pendulum.now().subtract(days=1).date()
32
33 load_channels()
34
35 resources = get_resources()
36 for resource in resources:
37 if channels is not None and resource not in channels:
38 continue
39
40 load_channel_history(resource, start_date=start_date)
41
42 get_users()
What if it fails?
Great, we’ve got a pipeline that moves data from Slack to BigQuery and we didn’t have to format any JSON - that alone is a win. However, there may be some issue. What if Slack rate limits you? What if BigQuery is down (😅)? What about a networking issue? What if the execution environment where this script lives isn’t working?
These questions are the difference between a pipeline, and a resilient pipeline. They’re the difference between you getting sleep at night, and you looking like a hero (or a dummy) to your stakeholders.
Scheduling with Prefect
Prefect provides workflow orchestration and observability, so that you can turn your pipelines into scheduled, repeatable, and resilient workflows. With Prefect you get scheduling, observability, and automations that can make sure your pipelines aren’t causing you stress in the middle of the night.
Make sure you’re logged in to Prefect Cloud by signing up and using the following command:
1prefect cloud login
Luckily, Prefect is also incredibly Pythonic. Turning any pipeline into an observable, scheduled Prefect flow is as simple as adding decorators to your functions and serving it up. Here’s our dlt generated pipeline, scheduled daily:
1"""Pipeline to load slack into bigquery."""
2
3from typing import List
4
5import dlt
6import pendulum
7from pendulum import datetime
8from prefect import flow, task
9
10from slack import slack_source
11
12@task
13def load_channels() -> None:
14 """Execute a pipeline that will load a list of all the Slack channels in the
15 workspace to BigQuery"""
16 ...
17
18@task
19def get_resources() -> List[str]:
20 """Fetch a list of available dlt resources so we can fetch them one at a time"""
21 ...
22
23@task
24def load_channel_history(channel: str, start_date: datetime) -> None:
25 """Execute a pipeline that will load the given Slack channel
26 incrementally beginning at the given start date."""
27 ...
28
29@task
30def get_users() -> None:
31 """Execute a pipeline that will load Slack users list."""
32 ...
33
34@flow
35def slack_pipeline(
36 channels=None, start_date=pendulum.now().subtract(days=1).date()
37) -> None:
38 load_channels()
39
40 resources = get_resources()
41 for resource in resources:
42 if channels is not None and resource not in channels:
43 continue
44
45 load_channel_history(resource, start_date=start_date)
46
47 get_users()
48
49if __name__ == "__main__":
50 slack_pipeline.serve("slack_pipeline", cron="0 0 * * *")
We’ve added @task to our individual functions. These will be treated as individual units of work by Prefect when they are executed. We decorate our primary function (slack_pipeline) with @flow, which references our task functions. We will schedule and kick off flows, which in turn will execute tasks based on the decorators within them.
Finally, adding .serve to our name=main call means that a Prefect deployment will be automatically created and scheduled to run daily at noon. We can see our deployment and scheduled runs in the Prefect UI, and we’ll know when it ran or, more importantly, it they didn’t. We can further extend our pipeline by:
- Setting up remote infrastructure with workers
- Adding automations to notify us when the pipeline has failed
- Setting up retries
Where to handle failure
There are many levels of failure, you could say, from "accidentally liking your ex's social media post from five years ago" to "trying to assemble IKEA furniture without instructions”. So which ones should we handle where, and what are some quick solutions?
With dlt, your pipelines are resilient at the API level. From schema changes to network issues or memory overflow, there is automated resiliency and recovery that is specific to working with the pesky APIs of your tools.
With Prefect, your pipelines become resilient at the function level. If your workflows never run, break and fail, or break and never end, Prefect will be your backstop - notifying you and taking the appropriate action in case of failure.
Building resilient pipelines faster with dlt + Prefect
Getting into production is hard. First you need to build your pipeline, and then you need to make it resilient. With this tutorial, we’ve shown you how to quickly build pipelines with dlt and then turn that pipeline into a resilient, repeatable workflow with Prefect.
Prefect makes complex workflows simpler, not harder. Try Prefect Cloud for free for yourself, download our open source package, join our Slack community, or talk to one of our engineers to learn more.