September 10, 202413 minute read
Building a cost-effective analytics stack with Modal, dlt, and dbt
author
Kenny Ning@kenny_ning
Growth Engineer
author
Kasper Ramström
Forward Deployed Engineer

We’ve previously talked about why you should move your ETL stack to Modal, highlighting:

  • Cost savings of thousands of dollars compared to the largest ETL vendors
  • Flexibility in your ETL logic
  • No management of infrastructure

This post expands upon that simple example and shows you how to run your entire analytics stack on Modal:

We’ve been using this system ourselves and are enjoying:

  • Moving millions of rows of data a day for less than $1 a day
  • Clean code built on open source frameworks with strong community support

etl-arch-diagram

Project structure

Here’s a snapshot of how our project is laid out:

.github/workflows/
├── ci-cd.yml # manages auto deploy
dbt
├── models/ # dbt transformation code
├── build_dbt.py # invoke dbt build via python sdk, this is what gets run by Github Actions in CI/CD
├── dbt_project.yml
├── modal_profiles.yml # specifies snowflake credentials, sensitive info uses env variables which will be passed via Modal Secrets
└── packages.yml
dlt
├── clickhouse_pipeline.py
├── postgres_pipeline.py
└── sql_database/ # dlt helpers
.pre-commit-config.yml # ruff and sqlfmt
cicd.py # script to find and deploy jobs on Modal
requirements.txt

Data loading with dlt

At Modal, one of our most important data loading use cases is copying our production read replica Postgres instance to Snowflake, our data warehouse. Some of these tables write millions of rows a day, leaving vendor solutions like Fivetran out of the question.

The first version of this system used each database’s client SDK and passed the data as a list of tuples:

import psycopg2

conn = psycopg2.connect(
  host=os.environ["PGHOST"],
  database=os.environ["PGDATABASE"],
  user=os.environ["PGUSER"],
  password=os.environ["PGPASSWORD"],
  port=os.environ["PGPORT"],
)
cur = conn.cursor()

cur.execute(
  """
  select * from task
  """
)
result = cur.fetchall()
cur.close()
conn.close()

return [tuple(row) for row in result]

However, we quickly realized we were spending a lot of time writing boilerplate ETL tasks like:

  • Automatic schema inference: Snowflake needs to know the schema of the data before it can create a table; to solve this we were manually defining columns and their types for every table we wanted to sync, which we knew wouldn’t scale well
  • Snowflake DDL: Creating a table, uploading raw data as a stage, running copy commands; there’s a lot of boilerplate Snowflake DDL required to simply copy data into a Snowflake table
  • Incremental loading: We were hand-rolling our own incremental logic to only copy data from the last X days. This was error prone and often resulted in duplicates

We had been following the momentum around dlt, an open source data loading Python library; we even noticed that some of our own Modal users were using Modal to run dlt. We gave it a spin and were impressed with how straightforward it was to deploy on Modal and how much cleaner it made our data loading pipeline code.

How to run dlt on Modal

Here’s our dlt setup copying data from our Postgres read replica into Snowflake:

  1. Run the dlt SQL database setup to initialize their sql_database_pipeline.py template:
dlt init sql_database snowflake
  1. Open the file and define the Modal Image you want to run dlt in:
import dlt
import pendulum

from sql_database import sql_database, ConnectionStringCredentials, sql_table

import modal
import os

image = (
    modal.Image.debian_slim()
    .apt_install(["libpq-dev"]) # system requirement for postgres driver
    .pip_install(
        "sqlalchemy>=1.4", # how dlt establishes connections
        "dlt[snowflake]>=0.4.11",
        "psycopg2-binary", # postgres driver
        "dlt[parquet]",
        "psutil==6.0.0", # for dlt logging
        "connectorx", # creates arrow tables from database for fast data extraction
    )
)

app = modal.App("dlt-postgres-pipeline", image=image)
  1. Wrap the provided load_table_from_database with the Modal Function decorator, Modal Secrets containing your database credentials, and a daily cron schedule
@app.function(
    secrets=[
        modal.Secret.from_name("snowflake-secret"),
        modal.Secret.from_name("postgres-read-replica-prod"),
    ],
    # run this pipeline daily
    schedule=modal.Cron("24 6 * * *"),
    timeout=3000,
)
def load_table_from_database(
    table: str,
    incremental_col: str,
    dev: bool = False,
) -> None:
  1. Write your dlt pipeline:
    # Modal Secrets are loaded as environment variables which are used here to create the SQLALchemy connection string
    pg_url = f'postgresql://{os.environ["PGUSER"]}:{os.environ["PGPASSWORD"]}@localhost:{os.environ["PGPORT"]}/{os.environ["PGDATABASE"]}'
    snowflake_url = f'snowflake://{os.environ["SNOWFLAKE_USER"]}:{os.environ["SNOWFLAKE_PASSWORD"]}@{os.environ["SNOWFLAKE_ACCOUNT"]}/{os.environ["SNOWFLAKE_DATABASE"]}'

    # Create a pipeline
    schema = "POSTGRES_DLT_DEV" if dev else "POSTGRES_DLT"
    pipeline = dlt.pipeline(
        pipeline_name="task",
        destination=dlt.destinations.snowflake(snowflake_url),
        dataset_name=schema,
        progress="log",
    )

    credentials = ConnectionStringCredentials(pg_url)

    # defines the postgres table to sync (in this case, the "task" table)
    source_1 = sql_database(credentials, backend="connectorx").with_resources("task")

    # defines which column to reference for incremental loading (i.e. only load newer rows)
    source_1.task.apply_hints(
        incremental=dlt.sources.incremental(
            "enqueued_at",
            initial_value=pendulum.datetime(2024, 7, 24, 0, 0, 0, tz="UTC"),
        )
    )

    # if there are duplicates, merge the latest values
    info = pipeline.run(source_1, write_disposition="merge")
    print(info)

The last run of this job wrote 1,375,896 rows (~20MB) in 47 seconds.

modal-dlt-runtime

Syncing over 1 million rows a day (30 million rows a month) on Fivetran would cost you $4,738 a month. Meanwhile, here’s how the math works out for Modal:

  • $0.000038 / CPU core / sec • 0.125 cores • 47 seconds = $0.0002 in compute cost
  • $0.00000667 / GiB / sec • .02 GiB • 47 seconds = $0.000006 in memory cost

This comes out to $0.006 a month; it’s effectively free to move 30 million rows a month if you use dlt with Modal.

Advanced configuration

Modal Proxy

If your database is in a private VPN, you can use Modal Proxy as a bastion server (only available to Enterprise customers). We use Modal Proxy to connect to our production read replica by attaching it to the Function definition and changing the hostname to localhost:

@app.function(
    secrets=[
        modal.Secret.from_name("snowflake-secret"),
        modal.Secret.from_name("postgres-read-replica-prod"),
    ],
    schedule=modal.Cron("24 6 * * *"),
    proxy=modal.Proxy.from_name("prod-postgres-proxy", environment_name="main"),
    timeout=3000,
)
def task_pipeline(dev: bool = False) -> None:
    pg_url = f'postgresql://{os.environ["PGUSER"]}:{os.environ["PGPASSWORD"]}@localhost:{os.environ["PGPORT"]}/{os.environ["PGDATABASE"]}'

Capturing deletes

One limitation of our simple approach above is that it does not capture updates or deletions of data. This isn’t a hard requirement yet for our use cases, but it appears that dlt does have a Postgres CDC replication feature that we are considering.

Scaling out

The example above syncs one table from our Postgres data source. In practice, we are syncing multiple tables and mapping each table copy job to a single container using Modal.starmap:

@app.function(timeout=3000, schedule=modal.Cron("29 11 * * *"))
def main(dev: bool = False):
    tables = [
        ("task", "enqueued_at", dev),
        ("worker", "launched_at", dev),
        ...
    ]
    list(load_table_from_database.starmap(tables))

This allows us to easily add more tables to our ETL batch processing system without increasing the overall runtime of our ETL system.

Transform with dbt

After our data has been loaded into Snowflake, we still need to transform it to make it analysis ready. dbt is the de facto standard for this and also works great with Modal.

Data gets transformed in this order:

  • base: one base model per table for basic sanitizing e.g. conforming namespaces, removing deleted rows, removing banned users
  • activities: activity schema-inspired data modeling format where base models are combined into customer events unique on a Modal workspace (i.e. “account” or “business”) and timestamp

Each model is materialized as a view, which is effectively a “saved query”. This means that querying a view will always return the most recent data, and we don’t have to rebuild the data more than once.

Let’s show how the example task table synced in the above section gets transformed with dbt.

  1. Create a base model ( base_prod__task ) that standardizes column names:
select
    -- ids
    id as task_id,
    account_id as workspace_id,

    -- numerics
    gpu_count,

    -- timestamps
    enqueued_at,
    started_at,
    finished_at

from {{ source('postgres_dlt', 'task') }}
  1. Create two activities: one for workspace_stream_started_task and one for workspace_stream_finished_task
# workspace_stream_started_task.sql
select
    workspace_id,
    started_at as ts,
    gpu_count

from {{ ref('base_prod__task') }}
# workspace_stream_finished_task.sql
select
    workspace_id,
    finished_at as ts,
    gpu_count

from {{ ref('base_prod__task') }}

This transformation makes it easy to combine activities into a long stream of events which can be a very convenient data structure to query. For instance, we can calculate the number of concurrent GPU tasks a workspace is running at any given time in a pretty straightforward way:

with activity_stream as (
    select *

    from workspace_stream_started_task

    union all

    select

    from workspace_stream_finished_task
)

select
    *,
    sum(
        case when activity = 'started_task'
        then gpu_count
        else -gpu_count
        end
    ) over (partition by workspace_id order by ts) as concurrent_tasks
from activity_stream

We use this query to monitor workspaces that are getting close to their GPU concurrency limit (10 for Starter, 30 for Team) and send them an email to upgrade if they are close to their limit.

Data modeling strategy

We are fans of of the events-style architecture of Activity Schema, but ultimately found its strict schema guidelines too hard to query to buy into it 100%:

  • Querying data from feature_json object is annoying and AI autocomplete won’t be able to help since it generally doesn’t know the contents of your data
  • Modal’s primarily engineering workforce wants to query data in as raw a form as possible and doesn’t like unnecessary abstractions
  • There aren’t many use cases for a single workspace_stream table of all events and it can be a burden to materialize because it’s so large

Instead, we use an Activity Schema “lite” version as a guideline for data modeling:

  • Require all activity columns to have an activity_id primary key column, a workspace_id column and a ts column, but can add arbitrary number of additional metadata columns
  • Build as much BI on base tables as possible so the lineage from raw data to metric is very clear; only build a dbt activity model if it will be used by more than 3 or more reports
  • Break out each activity table into its own view (i.e. no single huge activity table)

CI/CD

We use Github workflows for our CI/CD and have set up the following workflow to deploy and run our ETL applications on PRs and whenever we merge to main:

name: CI/CD

on:
  push:
    branches:
      - main
  pull_request:

# Cancel previous runs of the same PR but do not cancel previous runs on main
concurrency:
  group: ${{ github.workflow }}-${{ github.ref }}
  cancel-in-progress: ${{ github.ref != 'refs/heads/main' }}

jobs:
  deploy:
    name: Deploy
    runs-on: ubuntu-latest
    env:
      MODAL_ENVIRONMENT: modal-etl
      MODAL_TOKEN_ID: ${{ secrets.MODAL_TOKEN_ID }}
      MODAL_TOKEN_SECRET: ${{ secrets.MODAL_TOKEN_SECRET }}

    steps:
      - name: Checkout Repository
        uses: actions/checkout@v4

      - name: Install Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.10"

      - name: Install Packages
        run: |
          python -m pip install --upgrade pip
          pip install -r requirements.txt

      - name: CD job
        if: github.ref == 'refs/heads/main'
        run: |
          sha=${{ github.sha }}
          sha_short=$(git rev-parse --short $sha)
          python cicd.py --cd --tag $sha_short

      - name: CI job
        if: github.ref != 'refs/heads/main'
        run: |
          branch=${{ github.head_ref }}
          python cicd.py --ci --tag $branch

It’s a version of the workflow guide on our docs page with some additions. Instead of running modal deploy directly from our workflow we invoke a python script cicd.py with either the short commit sha (for deployments) or the branch name (for CI) which discovers all the modal apps in our repository and runs or deploys them as necessary based on the flags passed. This is great if you (like us) have multiple directories and subdirectories within your project and each of them contain one or more modal apps. Another alternative is to use modal deploy pkg which is great if you have apps spanning multiple files.

The script below is the cicd.py code that gets called by Github Actions.

import os
import re
import shutil
import subprocess
import sys
import pathlib
import argparse
from typing import Dict

def get_modules():
    modules_to_deploy = []
    modules_to_run_on_cd = []
    modules_to_run_on_ci = []
    current_dir = pathlib.Path.cwd()

    for subdir, dirs, files in os.walk(current_dir):
        subdir_path = pathlib.Path(subdir)
        relative_subdir = subdir_path.relative_to(current_dir)

        if re.match(r"^(\.|__)", str(relative_subdir)):
            continue

        for file in files:
            filepath = subdir_path / file

            if filepath.suffix == ".py":
                with open(filepath, "r") as f:
                    content = f.read()

                    if "# deploy: true" in content:
                        modules_to_deploy.append({"fp": filepath})
                    if "# cd-run: true" in content:
                        modules_to_run_on_cd.append({"fp": filepath})
                    if "# ci-run: true" in content:
                        modules_to_run_on_ci.append({"fp": filepath})

    return {
        "cd": {
            "deploy": modules_to_deploy,
            "run": modules_to_run_on_cd,
        },
        "ci": {
            "run": modules_to_run_on_ci,
        },
    }


def run_command(cmd, extra_env: Dict[str, str] = {}):
    print(f"  Running command: {cmd}")
    env = {
        "MODAL_ENVIRONMENT": os.environ.get("MODAL_ENVIRONMENT"),
        "MODAL_TOKEN_ID": os.environ.get("MODAL_TOKEN_ID"),
        "MODAL_TOKEN_SECRET": os.environ.get("MODAL_TOKEN_SECRET"),
        **extra_env,
    }
    print(f"  Environment: {env}")
    subprocess.run(
        cmd,
        env={k: v for k, v in env.items() if v},
        check=True,
        stdout=sys.stdout,
        stderr=sys.stderr,
    )


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--ci", action="store_true")
    parser.add_argument("--cd", action="store_true")
    parser.add_argument("--tag")
    args = parser.parse_args()


    if args.ci and args.cd:
        raise ValueError("Please provide either --ci or --cd flag, not both")

    modules = get_modules()
    bin_path = shutil.which("modal")

    extra_env = {"tag": args.tag} if args.tag else {}

    if args.cd:
        print(f"Deploying {len(modules['cd']['deploy'])} apps")

        tag = ["--tag", args.tag] if args.tag else []

        for module in modules['cd']['deploy']:
            print(f"\n- Deploying {str(module['fp'])}")
            run_command([bin_path, "deploy", module["fp"]] + tag, extra_env)

        print(f"Running {len(modules['cd']['run'])} apps")
        for module in modules['cd']['run']:
            print(f"\n- Running {str(module['fp'])}")
            run_command([bin_path, "run", module["fp"]], extra_env)
    elif args.ci:
        print(f"Running {len(modules['ci']['run'])} apps")
        for module in modules['ci']['run']:
            print(f"\n- Running {str(module['fp'])}")
            run_command([bin_path, "run", module["fp"]], extra_env)
    else:
        raise ValueError("Please provide either --ci or --cd flag")

This script will find all python files with the following comments and run the appropriate command:

  • # deploy: true - deploy the app with modal deploy <filepath> ... and tag the deployment with the short commit sha
  • # cd-run: true - run the app on a CD workflow with modal run <filepath> ...
  • # ci-run: true - run the app on a CI workflow with modal run <filepath> ...

The ci-run flag is useful for running the applications in the CI steps to make sure they don’t break before you merge. This is especially handy for things like dbt where we both want to deploy the new modal function which runs dbt but also run dbt for all the changed models.

With our deployment rollback feature (available on Team and Enterprise plans) you could roll back your deployments automatically in the workflow in case they fail with modal app rollback <app-name>.

We are using dbt’s custom schema name macro to generate the schema name based on the git branch or tag name so we can have multiple environments on Modal with different schema names. This allows us to write our dbt models to custom CI schemas per branch and run dbt CI tests without worrying about conflicting with other branches or production.

{% macro generate_schema_name(custom_schema_name, node) -%}

    {%- set default_schema = target.schema -%}
    {%- set tag = var('tag', 'local').replace('-', '_').replace('/', '_').replace('.', '_') -%}

    {%- if target.name == 'ci' -%}

        {%- if custom_schema_name is none -%}

            ci_{{ tag }}

        {%- else -%}

            ci_{{ tag }}_{{ custom_schema_name | trim }}

        {%- endif -%}

    {%- elif custom_schema_name is none -%}

        {{ default_schema }}

    {%- else -%}

        {{ default_schema }}_{{ custom_schema_name | trim }}

    {%- endif -%}

{%- endmacro %}

Conclusion

dlt and dbt abstracts away the boilerplate code data engineers would previously have had to write for ETL. This includes things like managing and updating database state, automatic schema inference, and incremental loading.

Modal abstracts away the process of cloud deployment and running things in production. Combined, dlt X dbt X Modal multiplies the productivity of data engineers, saves you thousands of dollars of cost per month, and keeps your code base clean as you scale your data team.

Ship your first app in minutes

with $30 / month free compute