Multiple instances of a data pipeline: full, mini, midiΒΆ

In general, data pipelines are processing a considerable amount of information be it tables with 100k to 100 million rows or even billions. Thus processing times will be many minutes or hours. However, iteration speed of software development on the pipeline is key because the pipeline is used to transform the data in a way that increases understanding and from better understanding come changes to the code in the data pipeline.

As a consequence, you should not just have one data pipeline. You should always have at least two little siblings for any pipeline:

  • mini: The minimal amount of data that allows the pipeline code to run through technically.

  • midi: A somewhat reasonable selection of data which reaches a high level of code coverage, triggers most edge cases the pipeline code is concerned with, and may be sampled in a way that allows for statistically sound conclusions be it with reduced statistical prediction power or higher error margins. If all goals cannot be met with one subset of the input data, more pipeline instances may be needed.

In pydiverse.pipedag, there is support for multiple pipeline instances readily built in. The idea is that there is a base configuration (instance any) on which more specialized instance definitions may be based. Nearly every aspect can be configured per pipeline instance. The most important setting to be different is the instance_id. The instance_id must either be a part of the database engine URL (i.e. database name) or a schema prefix or suffix or a component in path arguments specifying directories where a pipeline instance puts files. This way it can be guaranteed that two pipeline instances never overwrite any data from each other. It is also possible to specify an arbitrary dictionary of values in the attrs section which can be used at pipeline construction time and even fed into tasks for processing at runtime.

Here is an example pipeline configuration with multiple instances:

Please make sure, you installed psycopg2 and adbc-driver-postgresql. When installing via pip, it might be easier to install psycopg2-binary instead of psycopg.

pip install pydiverse-pipedag pydot psycopg2-binary adbc-driver-postgresql
conda install -c conda-forge pydiverse-pipedag pydot psycopg2 adbc-driver-postgresql

or much faster than conda after installing pixi:

mkdir my_data_proj
cd my_data_proj
pixi init
pixi add pydiverse-pipedag pydot psycopg2 adbc-driver-postgresql

Please save the following file as pipedag.yaml or download the following zip:

name: data_pipeline
table_store_connections:
  postgres_instances:
    args:
      url: "postgresql://{username}:{password}@{host}:{port}/pipedag"
      url_attrs_file: "{$POSTGRES_PASSWORD_CFG}"
      # using one database for multiple instances enables queries across instances
      schema_prefix: "{instance_id}_"

  postgres:
    args:
      # Postgres: this can be used after running `docker-compose up`
      url: "postgresql://{$POSTGRES_USERNAME}:{$POSTGRES_PASSWORD}@127.0.0.1:6543/pipedag"

blob_store_connections:
  file:
    args:
      base_path: "/tmp/pipedag/blobs"

technical_setups:
  default:
    # listen-interface for pipedag context server which synchronizes some task state during DAG execution
    network_interface: "127.0.0.1"
    # classes to be materialized to table store even without pipedag Table wrapper
    auto_table: ["pandas.DataFrame", "sqlalchemy.sql.expression.TextClause", "sqlalchemy.sql.expression.Selectable"]
    # abort as fast a possible on task failure and print most readable stack trace
    fail_fast: true

    # Attention: For disable_kroki: false, stage and task names might be sent to the kroki_url.
    #   You can self-host kroki if you like:
    #   https://docs.kroki.io/kroki/setup/install/
    #   You need to install optional dependency 'pydot' for any visualization
    #   URL to appear.
    disable_kroki: true
    kroki_url: "https://kroki.io"

    instance_id: pipedag_default

    cache_validation:
      mode: "normal"
      disable_cache_function: false
      ignore_task_version: false

    table_store:
      class: "pydiverse.pipedag.backend.table.SQLTableStore"

      # Postgres: this can be used after running `docker-compose up`
      table_store_connection: postgres

      args:
        create_database_if_not_exists: True
        # print select statements before being encapsulated in materialize expressions and tables before writing to
        # database
        print_materialize: true
        # print final sql statements
        print_sql: true

    blob_store:
      class: "pydiverse.pipedag.backend.blob.FileBlobStore"
      blob_store_connection: file

    lock_manager:
      class: "pydiverse.pipedag.backend.lock.DatabaseLockManager"

    orchestration:
      class: "pydiverse.pipedag.engine.SequentialEngine"

instances:
  __any__:
    technical_setup: default
    # The following Attributes are handed over to the flow implementation (pipedag does not care)
    attrs:
      # by default we load source data and not a sampled version of a loaded database
      copy_filtered_input: false

  full:
    # Full dataset is using default database connection and schemas
    instance_id: full
    table_store:
      table_store_connection: postgres_instances

  midi:
    # Full dataset is using default database connection and schemas
    instance_id: midi
    table_store:
      table_store_connection: postgres_instances
    attrs:
      # copy filtered input from full instance
      copy_filtered_input: true
      copy_source: full
      copy_per_user: false
      copy_filter_cnt: 2  # this is just dummy input where we sample 2 rows

  mini:
    # Full dataset is using default database connection and schemas
    instance_id: mini
    table_store:
      table_store_connection: postgres_instances
    attrs:
      copy_filtered_input: true
      copy_source: full
      copy_per_user: false
      copy_filter_cnt: 1  # this is just dummy input where we sample 1 row

This can be used with the following code:

import os
from pathlib import Path
from typing import Any

import pandas as pd
import sqlalchemy as sa

from pydiverse.pipedag import (
    Flow,
    Stage,
    Table,
    materialize,
    input_stage_versions,
    ConfigContext,
)
from pydiverse.pipedag.context import StageLockContext
from pydiverse.pipedag.core.config import PipedagConfig
from pydiverse.common.util.structlog import setup_logging


# these global variables are just a mock for information that can be sourced from somewhere for the full pipeline
dfA_source = pd.DataFrame(
    {
        "a": [0, 1, 2, 4],
        "b": [9, 8, 7, 6],
    }
)
dfA = dfA_source.copy()
input_hash = hash(str(dfA))


def has_new_input():
    global input_hash
    return input_hash


@materialize(nout=2, cache=has_new_input, version="1.0")
def input_task():
    global dfA
    return Table(dfA, "dfA"), Table(dfA, "dfB")


def has_copy_source_fresh_input(
    tbls: dict[str, sa.Alias],
    other_tbls: dict[str, sa.Alias],
    source_cfg: ConfigContext,
    *,
    attrs: dict[str, Any],
    stage: Stage,
):
    _ = tbls, other_tbls, attrs
    with source_cfg:
        _hash = source_cfg.store.table_store.get_stage_hash(stage)
    return _hash


@input_stage_versions(
    input_type=sa.Table,
    cache=has_copy_source_fresh_input,
    pass_args=["attrs", "stage"],
    lazy=True,
)
def copy_filtered_inputs(
    tbls: dict[str, sa.Alias],
    source_tbls: dict[str, sa.Alias],
    source_cfg: ConfigContext,
    *,
    attrs: dict[str, Any],
    stage: Stage,
):
    # we assume that tables can be copied within database engine just from different schema
    _ = source_cfg, stage
    # we expect this schema to be still empty, one could check for collisions
    _ = tbls
    filter_cnt = attrs["copy_filter_cnt"]
    ret = {
        name.lower(): Table(sa.select(tbl).limit(filter_cnt), name)
        for name, tbl in source_tbls.items()
    }
    return ret


@materialize(input_type=pd.DataFrame, version="1.0")
def double_values(df: pd.DataFrame):
    return Table(df.transform(lambda x: x * 2))


# noinspection PyTypeChecker
def get_flow(attrs: dict[str, Any], pipedag_config):
    with Flow("test_instance_selection") as flow:
        with Stage("stage_1") as stage:
            if not attrs["copy_filtered_input"]:
                a, b = input_task()
            else:
                other_cfg = pipedag_config.get(
                    attrs["copy_source"], attrs["copy_per_user"]
                )
                tbls = copy_filtered_inputs(other_cfg, stage=stage, attrs=attrs)
                a, b = tbls["dfa"], tbls["dfb"]
            a2 = double_values(a)

        with Stage("stage_2"):
            b2 = double_values(b)
            a3 = double_values(a2)

    return flow, b2, a3


def check_result(result, out1, out2, *, head=999):
    assert result.successful
    v_out1, v_out2 = result.get(out1), result.get(out2)
    pd.testing.assert_frame_equal(dfA_source.head(head) * 2, v_out1, check_dtype=False)
    pd.testing.assert_frame_equal(dfA_source.head(head) * 4, v_out2, check_dtype=False)


if __name__ == "__main__":
    password_cfg_path = str(Path(__file__).parent / "postgres_password.yaml")

    old_environ = dict(os.environ)
    os.environ["POSTGRES_PASSWORD_CFG"] = password_cfg_path

    setup_logging()  # you can setup the logging and/or structlog libraries as you wish
    pipedag_config = PipedagConfig(Path(__file__).parent / "pipedag.yaml")

    cfg = pipedag_config.get(instance="full")
    flow, out1, out2 = get_flow(cfg.attrs, pipedag_config)

    with StageLockContext():
        result = flow.run(config=cfg)
        check_result(result, out1, out2)

The file multi_instance_pipeline.zip includes a slightly more sophisticated project setup even though it is still missing separate src and tests folders. Furthermore, it lacks proper configuration of pytest markers. However, it already configures mini/midi pipelines to source data from the full pipeline. See best practices for configuring instances for an even more realistic setup of pipeline instances distinguishing fresh and stable pipelines.

multi_instance_pipeline.zip can be used as follows:

Install pixi.

unzip multi_instance_pipeline.zip
cd multi_instance_pipeline
pixi run docker-compose up

and in another terminal within the same directory:

cd multi_instance_pipeline
# for full pipeline
pixi run python multi_instance_pipeline.py
# for running mini/midi as unit tests
pixi run pytest -s -v