Automated Data Pipelines On Dask With Coiled & Prefect#
Operationalizing Pure Python Data Engineering Workloads#
Dask is widely used among data scientists and engineers proficient in Python for interacting with big data, doing statistical analysis, and developing machine learning models. Operationalizing this work has traditionally required lengthy code rewrites, which makes moving from development and production hard. This gap slows business progress and increases risk for data science and data engineering projects in an enterprise setting. The need to remove this bottleneck has prompted the emergence of production deployment solutions that allow code written by data scientists and engineers to be directly deployed to production, unlocking the power of continuous deployment for pure Python data science and engineers.
Here, we demonstrate the use of Coiled and Prefect to automate a common component of a data engineering pipeline. The motivating example is a file ingestion pipeline that a) runs on a scheduled basis, b) leverages distributed computing capability of Dask to process highly variable and potentially large volumes of data, and c) allows for reprocessing the data when a change in business logic or incoming data mandates a change.
The Problem To Solve#

The New York City Taxi Dataset is well-known in the big data community. This dataset is now being further enriched by including For-Hire Vehicle records (Uber, Lyft, etc). In May 2022, data stewards moved from a bi-annual to monthly update cadence, and switched to Parquet format. These changes open the door to using the data to experiment with modern data engineering and data science workloads.
Experimentation with the data demonstrates that, while we can take advantage of the Parquet file format, row groups are nonuniform and at times very large, which makes working with the data cumbersome. Additionally, there are no guarantees about data quality or uniformity of datatypes. This creates unnecessary barriers and risk to using the data in a real-world business setting, where data quality is of paramount importance.
Here, we take the first step in a common data engineering pipeline from a raw data source by automating the ingestion and initial cleaning of the data for downstream consumers. This preprocessing step assures the data is properly formatted for downstream users. We do this in a way that minimizes cost by waiting until we know there are files to process before creating a Dask cluster on Coiled, and automatically tearing down the cluster when the workload is complete.
Getting Started#
The following is an architecture diagram of the solution to build.

We assume users have both Coiled and Prefect accounts, are using Prefect >=2.0, and have a private S3 bucket with credentials for landing the output our our data transformation pipeline. Clone the coiled-from-prefect repository, create the Python environment, and collect the following information:
Coiled
Coiled User and Team Account information, and a Coiled API token This allows creating Coiled clusters from Prefect.
S3
The NYC Taxi Dataset is in a public S3 bucket (labeled
Source
) above)An AWS Access Key Id and AWS Secret Access Token to write to your private S3 bucket (labeled
Sink
above). The bucket must be configured for write access
Prefect
Create the following Prefect Blocks in your Prefect Workspace for storing secrets:
AWS Credentials Block named
prefectexample0
. Add theAws Access Key Id
andAws Secret Access Key
for your S3 sink here. Dask workers need these credentials to write the output of your data transformation pipelineS3 Block named
prefect-s3-storage
. This is where user code is stored for Prefect to run in automation. In our example, this is the same S3 bucket as our target, but we specify a Bucket Path ofbucket/prefect-s3-storage
. ThisBlock
also expects anAWS Access Key ID
andAWS Secret Access Key
Three separate Secret Blocks, named:
coiled-account
,coiled-user
, andcoiled-token
. These enable authentication to your Coiled account.
In a terminal
, and from your active Python environment running python flows/coiled_flow.py
should trigger the execution of your Prefect flow with the following output:

Prefect creates a flow from the check_for_files
Python function, and generates a random name tuscan-cormorant
. The Flow is parameterized for testing purposes to only choose the first file from the list, and so returns the file s3://nyc-tlc/trip data/fhvhf_tripdata_2019-02.parquet
. Since we have a file that needs to be processed, this triggers execution of second Prefect Flow that dynamically uses Coiled to create a Dask and submits the file processing job for execution.
Insepcting the logs shows a few items that are noteworthy:
The cluster comes up quickly; the Dask dashboard is available about 90-seconds after being requested.
Package Sync makes it easy to replicate our local software environment on the remote cluster
Create a Prefect Deployment, Work Queue, and Agent#
In the terminal, run:
prefect deployment build flows/coiled_flow.py:check_for_files -n check_for_files -q test -sb s3/prefect-s3-storage
This will create check_for_files-deployment.yaml
(example below), provision a Prefect Work Queue
and push all the code needed for remote execution to the S3 bucket defined in Prefect S3 Block
.
###
### A complete description of a Prefect Deployment for flow 'Check for files'
###
name: check_for_files
description: |-
We're going to run this on a weekly schedule. Our intention is, as a default behavior,
to find files in the S3 bucket that match `fhvhv_tripdata_*.parquet` and evaluate
their modification timestamp. We only want to process files that were
modified in the last seven days.
We also have the option to either: a) `reprocess` the entire dataset, which will delete
the existing data and rewrite it, or b) `test_subset`, which will process only the first file
in the list. This is for testing only.
Parameters
----------
intent: str: one of `test_subset`, `reprocess`, or ""
version: ae6b4b09dd5bdfeb4e0de153ef60b7c6
# The work queue that will handle this deployment's runs
work_queue_name: test
tags: []
parameters:
intent: "" <--- Verify this line is populated
schedule: null
infra_overrides: {}
###
### DO NOT EDIT BELOW THIS LINE
###
flow_name: Check for files
manifest_path: null
infrastructure:
type: process
env: {}
labels: {}
name: null
command: null
stream_output: true
working_dir: null
_block_document_id: <XXX>
_block_document_name: <YYY>
_is_anonymous: true
block_type_slug: process
_block_type_slug: process
storage:
bucket_path: <S3 Bucket>/prefect-s3-storage
aws_access_key_id: '**********'
aws_secret_access_key: '**********'
_block_document_id: <ZZZ>
_block_document_name: prefect-s3-storage
_is_anonymous: false
block_type_slug: s3
_block_type_slug: s3
path: ''
entrypoint: flows/coiled_flow.py:check_for_files
parameter_openapi_schema:
title: Parameters
type: object
properties:
intent:
title: intent
type: string
required:
- intent
definitions: null
Let’s verify that parameters: {"intent": ""}
, which sets the default behavior for our Prefect Flow to incrementally process data. If necessary, update this setting, then push the changes to our Deployment with:
prefect deployment apply ./check_for_files-deployment.yaml
And then start a Prefect Agent on our local machine, which watches the Prefect Work Queue.
prefect agent start -q 'test'
Now we can see the creation of both Deployment, named Check for files/my_run
and the Work Queue and in our Prefect workspace.


Our first test of the pipeline is to create a custom run named test_subset
. This validates our pipeline writes data to the target S3 bucket.


Now, we run the pipeline against the full dataset, and confirm new data lands as expected.

There we have it. An automated, pure Python, big data engineering pipeline running Dask on Coiled and Prefect. From here, users should follow Prefect guidelines for operationalizing their Agent and scheduling execution of their deployment.