
Beyond Loops: How Prefect's Task Mapping Scales to Thousands of Parallel Tasks


Data pipelines frequently need to process thousands—if not tens of thousands—of independent items in parallel. Whether you're downloading data from multiple APIs, processing events from a message queue, or training models on different datasets, the ability to efficiently parallelize and monitor these operations at scale becomes a critical engineering challenge.
Traditional workflow orchestrators were designed around static, predefined DAGs with a central scheduler managing all tasks. As data volumes grow and processing requirements become more dynamic, this centralized approach creates increasingly significant bottlenecks and operational blind spots. When processing thousands of similar operations, engineers are frequently confronted with a troubling scenario: a small percentage of tasks fail (perhaps due to API rate limits or data anomalies), but identifying precisely which ones requires tedious log parsing and detective work. Even worse, recovering often means rerunning the entire workflow rather than just the failed components.
In these traditional architectures, the decision about what qualifies as an "orchestrated task" versus inline code is often dictated by system limitations rather than engineering best practices. Operations that ideally should be tracked individually are instead buried within larger monolithic functions simply because the orchestrator cannot efficiently handle thousands of discrete tasks. This architectural constraint forces suboptimal design choices, sacrificing observability and resilience for performance.
Task mapping in Prefect addresses these challenges through a fundamentally different architectural approach: a decoupled execution model, support for dynamic task discovery, and pluggable distributed task runners. This architecture enables data engineers and scientists to build workflows that truly scale with their data. Let's explore how this works in practice, examining the underlying architecture and common usage patterns that make Prefect's task mapping an elegant solution for data parallelism at scale.
The Challenge of Scale
Let's consider a common scenario: you need to process data for thousands of customers, with each customer's data requiring the same sequence of operations. In a traditional orchestrator like Airflow, you might end up with two problematic approaches:
- The monolithic task approach: Write a single task that loops through all customers, processing each one sequentially.
1@task
2def process_all_customers(customer_ids):
3 results = []
4 for customer_id in customer_ids:
5 # Process each customer
6 result = process_customer_logic(customer_id)
7 results.append(result)
8 return results
- The DAG explosion approach: Define thousands of individual tasks from a static list, one for each customer, creating a massive DAG.
1for customer_id in customer_ids:
2 process_customer_task = ProcessCustomerOperator(
3 task_id=f'process_customer_{customer_id}',
4 customer_id=customer_id,
5 dag=dag
6 )
Both approaches fall short. The monolithic approach lacks parallelism and observability—if one customer fails, the entire task fails. Moreover, individual customer processing issues cannot be independently retried or monitored. The DAG explosion approach creates a maintenance burden and puts immense pressure on the central scheduler. In addition, the list of customer_ids needs to be static for the scheduler to properly track each task.
Prefect's Task Mapping Approach
Prefect approaches this problem with its task mapping functionality, which allows you to dynamically spawn task instances at runtime based on a sequence of inputs. Here's how you'd implement our customer processing example with Prefect:
1from prefect import flow, task
2
3@task
4def get_customer_ids():
5 # Fetch customer IDs from a database or API
6 return ["customer1", "customer2", "customer3", ..., "customer1000"]
7
8@task
9def process_customer(customer_id):
10 # Process a single customer
11 return f"Processed {customer_id}"
12
13@flow
14def customer_processing_flow():
15 customer_ids = get_customer_ids()
16 # Map the process_customer task across all customer IDs
17 results = process_customer.map(customer_ids)
18 return results
With this simple pattern, Prefect dynamically creates individual task runs at runtime which can number in the thousands. Each task processes a single customer and has its own retry lifecycle, caching logic and observability. But what makes this interesting is not just the syntax—it's the architecture beneath.
The Architecture That Makes It Possible
Prefect's ability to scale to tens of thousands of mapped tasks stems from its unique architectural approach:
Decoupled Execution Model
In Prefect, once a flow run is created, it operates independently from the central scheduler. The central scheduler's only job is to determine when a scheduled flow should start; it doesn't need to track every task within the flow. In the case of manually executed work the central scheduler can be ignored entirely.
This decoupling means that each flow run can manage its own task graph, determining which tasks to execute and when. This is fundamentally different from centralized architectures like Airflow, where the scheduler must track every single task across all DAGs.
Dynamic Task Discovery
In a traditional static DAG approach all tasks must be defined at DAG-parse time, and this structure cannot vary in any way. This constraint necessitates placeholder tasks and complex branching operators to achieve conditional execution which means that the DAG structure cannot adapt based on the data it processes - any changes to what is being processed require a full redeployment of the workflow definition.
In Prefect, because the flow manages its own execution, tasks can be discovered and created dynamically at runtime. There's no need to pre-register the entire task graph. This enables truly dynamic workflows where the number and structure of tasks can be determined based on runtime data without sacrificing reliability or observability, and the resulting code is much easier on the eyes.
1@flow
2def dynamic_etl_flow():
3 # Determine which datasets to process at runtime
4 datasets = discover_new_datasets()
5
6 # Process each dataset
7 results = []
8 for dataset in datasets:
9 # Dynamic conditional logic
10 if dataset.size > 1000:
11 result = process_large_dataset.map(dataset.chunks)
12 else:
13 result = process_small_dataset(dataset)
14 results.append(result)
15
16 return results
Pluggable Task Runners
Prefect abstracts task execution through the concept of a TaskRunner, which decouples the definition of what to execute from how to execute it. This pluggable architecture allows a single workflow definition to run across different execution environments without code changes:
1from prefect.task_runners import ConcurrentTaskRunner, DaskTaskRunner, RayTaskRunner
2
3# For local development with thread-based concurrency
4@flow(name="dev-flow", task_runner=ConcurrentTaskRunner())
5def customer_flow():
6 customer_ids = get_customer_ids()
7 results = process_customer.map(customer_ids)
8 return results
9
10# create a new flow for production with distributed execution
11prod_flow = customer_flow.with_options(name="prod-flow", task_runner=DaskTaskRunner(cluster_kwargs={"n_workers": 5}))
This task runner architecture allows Prefect to leverage the specialized task execution capabilities of systems like Dask and Ray, which are specifically designed to handle distributed computation across large clusters. The extensible nature of this interface also means that task runners can be written for virtually any execution environment, allowing Prefect to integrate with custom infrastructure or other distributed systems.
By using a DaskTaskRunner, Prefect can distribute these tasks across a Dask cluster, unlocking multi-machine parallelism. Prefect can either connect to your existing Dask cluster or dynamically provision one for you. This means your mapped tasks can scale far beyond the limits of a single machine, easily handling workflows with 20,000+ tasks distributed across dozens of workers.
A Few Examples
Let's explore a few practical examples of how task mapping can be applied to real data problems.
Example: Parallel API Requests
Imagine you need to scrape product data from thousands of e-commerce product pages:
1from prefect import flow, task
2
3@task
4def get_product_urls():
5 # Return a list of product URLs to scrape
6 # This could come from a sitemap, database, or prior crawling step
7 return [
8 "<https://example-store.com/products/1234>",
9 "<https://example-store.com/products/5678>",
10 # ... thousands more product URLs
11 ]
12
13@task(retries=3, retry_delay_seconds=30, retry_jitter_factor=0.2)
14def scrape_product_page(url):
15 """Scrape a single product page with retry logic for intermittent failures"""
16 ...
17 return product
18
19@task
20def analyze_product(product_data):
21 """Calculate metrics and transform scraped product data"""
22 # Analyze product data for business metrics
23 product_data["price_tier"] = (
24 "premium" if product_data["price"] > 100 else "standard"
25 )
26 product_data["relevance_score"] = product_data["rating"] * (
27 1.5 if product_data["in_stock"] else 0.5
28 )
29 return product_data
30
31@flow(name="E-commerce Product Scraper")
32def scrape_product_catalog():
33 """Orchestrates the scraping of the entire product catalog"""
34 # Get list of product URLs
35 product_urls = get_product_urls()
36
37 # Scrape each product page in parallel
38 # Each scraping task runs independently with its own retry logic
39 product_data = scrape_product_page.map(product_urls)
40
41 # Process and analyze each product's data
42 analyzed_products = analyze_product.map(product_data)
43
44 return analyzed_products
Here, scrape_product_page.map(product_urls) spawns a task for each product page, allowing you to make API requests in parallel with the appropriate task runner instance. Even without parallel execution, each task has independent retry logic so that if one API call fails, only that specific task retries.
Example: ETL for Multiple Data Sources
Processing data from multiple sources often follows similar patterns:
1@task
2def list_data_sources():
3 return ["postgres", "mysql", "mongodb", "cassandra", "elasticsearch"]
4
5@task
6def extract_from_source(source):
7 if source == "postgres":
8 return extract_from_postgres()
9 elif source == "mysql":
10 return extract_from_mysql()
11 # etc.
12
13@task
14def transform_data(data, source, config):
15 # Transform based on source type
16 return {"source": source, "transformed_data": data}
17
18@flow
19def multi_source_etl():
20 sources = list_data_sources()
21 raw_data = extract_from_source.map(sources)
22
23 # Pass both the data and source name to transform
24 transformed_data = transform_data.map(
25 raw_data,
26 sources,
27 config=unmapped({}), # Pass some common config to each mapped task
28 )
29
30 return transformed_data
The unmapped() parameter signals to Prefect to pass the same argument to each mapped task instance. While the values of raw_data and sources vary for each mapped task call, they all receive the same config.
Example: Nested Mapping for Complex Workflows
Task mapping becomes even more powerful when combined with itself for nested operations:
1@task
2def get_customer_segments():
3 return ["segment_a", "segment_b", "segment_c"]
4
5@task
6def get_customers_in_segment(segment):
7 # Get customer IDs for a specific segment
8 return [f"{segment}_customer_{i}" for i in range(1000)]
9
10@task
11def process_customer(customer_id):
12 # Process each customer
13 return f"Processed {customer_id}"
14
15@flow
16def segmented_customer_processing():
17 segments = get_customer_segments()
18 # Map over segments
19 segment_customers = get_customers_in_segment.map(segments)
20
21 # For each segment, map over its customers
22 # This creates a nested mapping structure
23 all_results = []
24 for customers in segment_customers:
25 segment_results = process_customer.map(customers)
26 all_results.append(segment_results)
27
28 return all_results
This creates a two-level mapping where it first maps over customer segments to get a list of customers for each one. Then, for each segment, it maps again to process those customers in parallel.
This pattern is useful when one layer of work depends on the results of another, like fetching users from each organization or files from multiple folders. Instead of flattening everything into one giant list or writing complex orchestration logic, Prefect lets you express that relationship directly in code. You get clear structure, clean parallelism, and the ability to scale each layer independently.
Performance Considerations
When implementing task mapping at scale, consider these practical performance aspects:
Task Runner Selection
The default ConcurrentTaskRunner uses threads within a single Python process. This is suitable for IO-bound tasks (like API calls) but doesn't provide true CPU parallelism due to Python's Global Interpreter Lock (GIL).
For CPU-intensive workloads or when you need to scale beyond a single machine, use a distributed task runner:
1# For large-scale distributed execution
2from prefect.task_runners import DaskTaskRunner
3
4# Connect to an existing cluster
5@flow(task_runner=DaskTaskRunner(address="dask-scheduler:8786"))
6def connect_to_existing_cluster_flow():
7 # This flow will distribute tasks across your existing Dask cluster
8 # ...
9
10# Or dynamically provision a cluster
11@flow(task_runner=DaskTaskRunner(
12 cluster_class="dask_cloudprovider.aws.FargateCluster",
13 adapt_kwargs={"maximum": 10}
14))
15def large_scale_flow():
16 # This flow will distribute tasks across 50 Dask workers
17 # Easily handling 50,000+ mapped tasks
18 # ...
Even without deep knowledge of Dask, you can use a local Dask cluster to get process-based parallelism that bypasses Python's GIL:
1# Local process-based parallelism (no thread limitations)
2@flow(task_runner=DaskTaskRunner(n_workers=4))
3def local_parallel_flow():
4 # Uses 4 local processes instead of threads
5 # ...
Task Granularity
There's a tradeoff between task size and overhead. Tasks that complete in under a second can create more orchestration overhead than value, especially when you're running thousands of them. Each task still has to be tracked, cached and logged, even if it finishes almost instantly. That overhead can add up quickly and can slow down your system without improving throughput. On the flip side, if tasks are too large or slow, you limit the amount of observability and parallelism available. Long-running tasks can hog resources, delay feedback, and make failures more painful to recover from due to lack of more granular checkpointing.
A good rule of thumb is that each task should take at least a few seconds to execute. For shorter operations, consider batching:
1@task
2def process_customer_batch(customer_ids_batch):
3 results = []
4 for customer_id in customer_ids_batch:
5 result = process_single_customer(customer_id)
6 results.append(result)
7 return results
8
9@flow
10def batched_processing_flow():
11 customer_ids = get_all_customer_ids()
12 # Split into batches of 100
13 batches = [customer_ids[i:i+100] for i in range(0, len(customer_ids), 100)]
14 results = process_customer_batch.map(batches)
15 return results
Memory Management
While Prefect does support in-memory data exchange between tasks, being mindful of memory usage when mapping over large datasets is still important, especially when tasks return large results.
One scaling strategy for mapped tasks is to minimize memory pressure by offloading large data to external storage systems when possible. Rather than keeping large datasets in memory as they move through your workflow, use external storage as a buffer between tasks:
1@task
2def get_large_dataset_chunks():
3 # Return references to data chunks, not the actual data
4 return ["s3://bucket/chunk1", "s3://bucket/chunk2", ...]
5
6@task
7def process_chunk(chunk_location):
8 # Load chunk, process it, and store results externally
9 data = load_from_s3(chunk_location)
10 result = process(data)
11 result_location = store_in_s3(result)
12 # Return a reference, not the full result
13 return result_location
14
15@flow
16def large_data_processing():
17 chunk_locations = get_large_dataset_chunks()
18 result_locations = process_chunk.map(chunk_locations)
19 return result_locations
Remember that Prefect's task runners like Dask and Ray also have their own memory management considerations, so familiarize yourself with their documentation when working at scale.
The Orchestration-Performance Tradeoff
Prefect tasks are inherently heavier than native Dask or Ray tasks because they include additional metadata, state tracking, and orchestration capabilities. This means that while Prefect can scale to tens of thousands of tasks, it will not achieve the same raw performance as using Dask or Ray directly for millions of lightweight computations.
This tradeoff is often worthwhile: what you sacrifice in raw performance, you gain in observability, reliability, and operational control. For data engineering workflows where each task represents a meaningful unit of work (API calls, data transformations, model training), the overhead is often worth it compared to the benefits.
API Scalability vs. Scheduler Scalability
For high-scale operations, the bottleneck shifts from Prefect's scheduler (which is no longer involved in task execution) to its API, which handles state updates. This is actually an advantage: unlike schedulers, APIs are often stateless and can be horizontally scaled behind load balancers as needed. This architectural choice gives Prefect much better scalability characteristics for large workloads.
Knowledge Requirements for Different Scales
The knowledge requirements for task mapping vary depending on your scale:
- Small to Medium Scale (hundreds to a few thousand tasks):
- Using the default ConcurrentTaskRunner with thread-based parallelism requires no additional infrastructure knowledge
- Using a local Dask cluster provides process-based parallelism (bypassing Python's GIL) with minimal setup
- Large Scale (tens of thousands of tasks):
- Requires familiarity with Dask or Ray cluster configuration
- Understanding of resource allocation, worker scaling, and fault tolerance becomes important
- Knowledge of infrastructure management (Kubernetes, cloud resources) may be necessary
Even without deep expertise in distributed systems, data engineers can still benefit substantially from smaller-scale task mapping operations. As you scale up, the learning curve becomes steeper, but the payoffs in workflow efficiency are substantial.
Comparison with Airflow
You might be wondering how this compares with the dynamic task mapping feature that was introduced in Airflow 2.3 with its expand() operator. There are fundamental technical differences that affect scalability worth calling attention to:
- Architecture: Airflow maintains a centralized scheduler that needs to track every task in the system. When mapping creates thousands of tasks, this central registry becomes a bottleneck as the scheduler must manage each task individually.
- Execution Model: In addition to tracking, each task instance must be scheduled for execution individually by the central scheduler. This introduces latency between task executions and places hard limits on throughput. This also requires the central scheduler to have network access to all execution environments which is not true in Prefect’s worker model.
- Distributed Execution: While Airflow supports executors like Celery and Kubernetes, the scheduler still acts as a central coordinator. Also, submitting large jobs containing many individually important operations to distributed systems loses the granular observability and retryabilty that are at the core of orchestration. Prefect's task runners can connect directly to high-performance distributed systems like Dask and Ray, inheriting their specialized task scheduling optimizations while enhancing them with centralized observability.
- Static DAG Model: Airflow's core design philosophy centers around static, pre-defined DAGs that must be fully specified during the DAG parsing phase. This introduces challenges for parametrizing workflows as well as embracing true dynamism based on runtime data. Prefect, by contrast, embraces a dynamic execution model where the workflow structure itself can emerge during runtime, allowing for truly adaptive data pipelines that respond to the actual data they encounter rather than following a rigid structure defined in advance.
The technical differences might seem subtle, but they result in entirely different scaling characteristics. Data teams using Prefect for massive parallel workloads have reported processing large volumes of records daily across tens of thousands of dynamically generated tasks, something that would require significant custom engineering in traditional orchestrators.
Putting It All Together
Task mapping in Prefect is more than just a convenient syntax—it's the outcome of architectural decisions that enable scaling to levels that wouldn't be possible with centralized orchestration systems.
The combination of:
- Decoupled flow execution from the central scheduler
- Dynamic task discovery at runtime
- Pluggable task runners that integrate with distributed systems
Creates a setup where data engineers and data scientists can build workflows that truly scale with their data. A single task definition mapped over 20,000 inputs becomes 20,000 individually tracked, retryable, and observable task runs—all without overwhelming the orchestration layer.
Further Reading
For more information check out Prefect’s documentation, or join our Slack community to get personalized support.
Related Content








