Automated Data Pipelines On Dask With Coiled & Prefect#

Operationalizing Pure Python Data Engineering Workloads#

Dask is widely used among data scientists and engineers proficient in Python for interacting with big data, doing statistical analysis, and developing machine learning models. Operationalizing this work has traditionally required lengthy code rewrites, which makes moving from development and production hard. This gap slows business progress and increases risk for data science and data engineering projects in an enterprise setting. The need to remove this bottleneck has prompted the emergence of production deployment solutions that allow code written by data scientists and engineers to be directly deployed to production, unlocking the power of continuous deployment for pure Python data science and engineers.

Here, we demonstrate the use of Coiled and Prefect to automate a common component of a data engineering pipeline. The motivating example is a file ingestion pipeline that a) runs on a scheduled basis, b) leverages distributed computing capability of Dask to process highly variable and potentially large volumes of data, and c) allows for reprocessing the data when a change in business logic or incoming data mandates a change.

The Problem To Solve#

Alt text

The New York City Taxi Dataset is well-known in the big data community. This dataset is now being further enriched by including For-Hire Vehicle records (Uber, Lyft, etc). In May 2022, data stewards moved from a bi-annual to monthly update cadence, and switched to Parquet format. These changes open the door to using the data to experiment with modern data engineering and data science workloads.

Experimentation with the data demonstrates that, while we can take advantage of the Parquet file format, row groups are nonuniform and at times very large, which makes working with the data cumbersome. Additionally, there are no guarantees about data quality or uniformity of datatypes. This creates unnecessary barriers and risk to using the data in a real-world business setting, where data quality is of paramount importance.

Here, we take the first step in a common data engineering pipeline from a raw data source by automating the ingestion and initial cleaning of the data for downstream consumers. This preprocessing step assures the data is properly formatted for downstream users. We do this in a way that minimizes cost by waiting until we know there are files to process before creating a Dask cluster on Coiled, and automatically tearing down the cluster when the workload is complete.

Getting Started#

The following is an architecture diagram of the solution to build. Alt text

We assume users have both Coiled and Prefect accounts, are using Prefect >=2.0, and have a private S3 bucket with credentials for landing the output our our data transformation pipeline. Clone the coiled-from-prefect repository, create the Python environment, and collect the following information:

Coiled

S3

  • The NYC Taxi Dataset is in a public S3 bucket (labeled Source) above)

  • An AWS Access Key Id and AWS Secret Access Token to write to your private S3 bucket (labeled Sink above). The bucket must be configured for write access

Prefect

  • Create the following Prefect Blocks in your Prefect Workspace for storing secrets:

    1. AWS Credentials Block named prefectexample0. Add the Aws Access Key Id and Aws Secret Access Key for your S3 sink here. Dask workers need these credentials to write the output of your data transformation pipeline

    2. S3 Block named prefect-s3-storage. This is where user code is stored for Prefect to run in automation. In our example, this is the same S3 bucket as our target, but we specify a Bucket Path of bucket/prefect-s3-storage. This Block also expects an AWS Access Key ID and AWS Secret Access Key

    3. Three separate Secret Blocks, named: coiled-account, coiled-user, and coiled-token. These enable authentication to your Coiled account.

In a terminal, and from your active Python environment running python flows/coiled_flow.py should trigger the execution of your Prefect flow with the following output:

Alt text

Prefect creates a flow from the check_for_files Python function, and generates a random name tuscan-cormorant. The Flow is parameterized for testing purposes to only choose the first file from the list, and so returns the file s3://nyc-tlc/trip data/fhvhf_tripdata_2019-02.parquet. Since we have a file that needs to be processed, this triggers execution of second Prefect Flow that dynamically uses Coiled to create a Dask and submits the file processing job for execution.

Insepcting the logs shows a few items that are noteworthy:

  • The cluster comes up quickly; the Dask dashboard is available about 90-seconds after being requested.

  • Package Sync makes it easy to replicate our local software environment on the remote cluster

Create a Prefect Deployment, Work Queue, and Agent#

In the terminal, run:

prefect deployment build flows/coiled_flow.py:check_for_files -n check_for_files  -q test -sb s3/prefect-s3-storage

This will create check_for_files-deployment.yaml (example below), provision a Prefect Work Queue and push all the code needed for remote execution to the S3 bucket defined in Prefect S3 Block.

###
### A complete description of a Prefect Deployment for flow 'Check for files'
###
name: check_for_files
description: |-
  We're going to run this on a weekly schedule.  Our intention is, as a default behavior,
  to find files in the S3 bucket that match `fhvhv_tripdata_*.parquet` and evaluate
  their modification timestamp.  We only want to process files that were
  modified in the last seven days.

  We also have the option to either: a) `reprocess` the entire dataset, which will delete
  the existing data and rewrite it, or b) `test_subset`, which will process only the first file
  in the list.  This is for testing only.

  Parameters
  ----------

  intent: str: one of `test_subset`, `reprocess`, or ""
version: ae6b4b09dd5bdfeb4e0de153ef60b7c6
# The work queue that will handle this deployment's runs
work_queue_name: test
tags: []
parameters:
  intent: ""  <--- Verify this line is populated
schedule: null
infra_overrides: {}

###
### DO NOT EDIT BELOW THIS LINE
###
flow_name: Check for files
manifest_path: null
infrastructure:
  type: process
  env: {}
  labels: {}
  name: null
  command: null
  stream_output: true
  working_dir: null
  _block_document_id: <XXX>
  _block_document_name: <YYY>
  _is_anonymous: true
  block_type_slug: process
  _block_type_slug: process
storage:
  bucket_path: <S3 Bucket>/prefect-s3-storage
  aws_access_key_id: '**********'
  aws_secret_access_key: '**********'
  _block_document_id: <ZZZ>
  _block_document_name: prefect-s3-storage
  _is_anonymous: false
  block_type_slug: s3
  _block_type_slug: s3
path: ''
entrypoint: flows/coiled_flow.py:check_for_files
parameter_openapi_schema:
  title: Parameters
  type: object
  properties:
    intent:
      title: intent
      type: string
  required:
  - intent
  definitions: null

Let’s verify that parameters: {"intent": ""}, which sets the default behavior for our Prefect Flow to incrementally process data. If necessary, update this setting, then push the changes to our Deployment with:

prefect deployment apply ./check_for_files-deployment.yaml

And then start a Prefect Agent on our local machine, which watches the Prefect Work Queue.

prefect agent start -q 'test'

Now we can see the creation of both Deployment, named Check for files/my_run and the Work Queue and in our Prefect workspace.

Alt text Alt text

Our first test of the pipeline is to create a custom run named test_subset. This validates our pipeline writes data to the target S3 bucket.

Alt text

Alt text

Now, we run the pipeline against the full dataset, and confirm new data lands as expected. Alt text

There we have it. An automated, pure Python, big data engineering pipeline running Dask on Coiled and Prefect. From here, users should follow Prefect guidelines for operationalizing their Agent and scheduling execution of their deployment.