pydiverse.pipedag¶
Pydiverse.pipedag is a data pipeline orchestration library designed to streamline the organization of code into stages, ranging from data ingestion and transformations to model training and evaluation. The library facilitates the interoperability of tasks written in various data transformation languages that can serialize/deserialize tables to/from a relational database.
Currently, pipedag supports tasks written with pandas, polars, tidypolars, sqlalchemy, ibis, and pydiverse transform code. It simplifies the integration of existing code and eliminates boilerplate code, thus accelerating SQL table (de) materialization, caching, and cache invalidation.
Refer to the quickstart guide for instructions on getting started. Below is a simple example of a pipedag pipeline:
# Define how the different tasks should be wired
with Flow() as flow:
with Stage("inputs"):
names, ages = input_tables()
with Stage("features"):
joined_table = join_tables(names, ages)
print_dataframe(joined_table)
# Execute the flow
flow.run()
Graphical representations of the flow are generated automatically:
In the above example, each task like join_tables() can choose the format it prefers for table access.
Here, SQL Alchemy is used to generate a JOIN-SQL-Query from two table references:
@materialize(lazy=True, input_type=sa.Table)
def join_tables(names: sa.Alias, ages: sa.Alias):
return (
sa.select(names.c.id, names.c.name, ages.c.age)
.join_from(names, ages, names.c.id == ages.c.id)
)
The same task can also be written using pandas:
@materialize(version="1.0.0", input_type=pd.DataFrame)
def join_tables(names: pd.DataFrame, ages: pd.DataFrame):
return names.merge(ages, on="id", how="inner")[["id", "name", "age"]]
Or handwritten SQL:
def ref(table: sa.Alias):
return f'"{table.original.schema}"."{table.original.name}"'
@materialize(lazy=True, input_type=sa.Table)
def join_tables(names: sa.Alias, ages: sa.Alias):
sql = f"""
SELECT names.id, names.name, ages.age
FROM {ref(names)} AS names INNER JOIN {ref(ages)} ages ON names.id = ages.id
"""
return sa.text(sql)
Or with pydiverse.transform:
@materialize(lazy=True, input_type=pdt.SQLTableImpl)
def join_tables(names: pdt.Table, ages: pdt.Table):
return (
names
>> join(ages, names.id == ages.id)
>> select(names.id, names.name, ages.age)
)
The main goal of pipedag is to enhance agility and iteration speed for teams developing data pipelines. It offers several key features:
Easy embedding of existing data transformation, modeling, or model evaluation code into a pipedag flow.
Value-add out of the box, such as:
Simple setup of multiple pipeline instances with varying input data sizes.
Materialization of all tables in the database for easy inspection using explorative SQL queries.
Automatic caching and cache invalidation for improved speed.
Incremental improvement of data pipeline code, task by task, stage by stage.
Convenient unit and integration testing across multiple pipeline instances.
Stage transactionality concept ensures that a big input pipeline can be analyzed with explorative SQL at the same time as the pipeline is updated.
Pipeline runs can be triggered from IDE debugger, continuous integration framework, or pipeline orchestration UI without worrying about race conditions.
A realistic pipeline typically includes the following stages:
Raw ingestion
Cleaning for easier inspection (e.g., improving types for pandas.read_sql)
Transformation into the best possible representation for economic reasoning
Feature engineering (both stateless and stateful)
Model training
Model evaluation
To try it out, refer to the quickstart guide.
Here, you can find more examples.
The Pydiverse Library Collection¶
Pydiverse is a collection of libraries for describing data transformations and data processing pipelines.
Pydiverse.pipedag is designed to encapsulate any form of data processing pipeline code, providing immediate benefits. It simplifies the operation of multiple pipeline instances with varying input data sizes and enhances performance through automatic caching and cache invalidation. A key objective is to facilitate the iterative improvement of data pipeline code, task by task, stage by stage.
Pydiverse.transform is designed to provide a single syntax for data transformation code that can be executed reliably on both in-memory dataframes and SQL databases. The interoperability of tasks in pipedag allows transform to narrow its scope and concentrate on quality. The results should be identical across different backends, and good error messages should be raised before sending a query to a backend if a specific feature is not supported.
We are placing increased emphasis on simplifying unit and integration testing across multiple pipeline instances, which may warrant a separate library called pydiverse.pipetest.
In line with our goal to develop data pipeline code on small input data pipeline instances, generating test data from the full input data could be an area worth exploring. This may lead to the creation of a separate library, pydiverse.testdata.
Check out the Pydiverse libraries on GitHub:
Check out the Pydiverse libraries on Read the Docs:
Concepts for future feature extensions¶
Pydiverse.pipedag has the aim to solve all data pipeline related common problems for typical econometric machine learning modeling where flexibility and iteration speed matters most (rather than brute force performance in crunching ever larger data amounts with the need to limit the communication pattern space). The target space would be 100GB to 100TB of unencrypted data and mostly focusing on 1-100 Mio row tables.
Pipedag will be most powerful when giving it control of cache invalidation of the whole data processing pipeline from raw ingestion with filtering to various sample sizes (many pipeline instances) to model training and evaluation. A few features are still missing so pipedag can handle this job in all envisioned cases:
In some cases, performance might not permit to materialize all tables in the database. Thus, we envision a feature to mark a task output with an “inline” annotation (compare with the “inline” keyword in C++). In case two tasks can actually be connected directly (i.e via passing an Apache Arrow dataframe), the materialization of the intermediate step will be skipped. Per instance it should also be possible to control this behavior globally for the whole pipeline, so this optimization might only be used on the biggest pipeline instances due to performance and data storage constraints. In general, we like materialization of intermediate results since it massively speeds up debugging without the need for a debugger. Please note that we already have a feature called local table cache which can avoid some of the database to python transfers.
Interactive debugging might currently not be as nice as it could be. The aim is that given the flow graph, you may ask for the input of any task and then interactively run the code within the task right up to materialization. We already have a feature to trigger materialization interactively when debugging inside a task. This breaks the flow execution since cache validation is infeasible in this scenario, but it greatly helps debugging materialization problems like duplicate keys or violation of database specific constraints like maximum row size.
Unit testing is an area we still want to greatly expand. It is probably worth another pydiverse library. The idea is that a unit test should be able to ask for any data within the graph of any pipeline instance. It can use that data for testing code that is best tested with real data. Additionally, it might specify what to do in case the data is not valid given the current code checkout/commit. We like to have a layer inside the pipeline that builds the best possible form for economic reasoning. This layer should be kept agile and not set in stone. Thus, any code working with this data layer should be heavily unit tested. Testing this code with real data often yields small tests that read their input from rather stable and big pipeline instances. Thus skipping such tests in case the big pipeline instance is cache invalid is the appropriate behavior. In other cases, making a dataset cache valid given the current code would be appropriate.
Currently, each pipeline instance has exactly one cache slot. When developing with multiple branches, it would still be nice to not generate infinite amounts of test-instances with complex space collection mechanisms. Instead, it is envisioned that a pipeline instance can have a defined number of cache slots. This would speed up tests when working with this many branches that modify the pipeline in early stages at the same time.
Pydiverse pipedag and pydiverse transform together form a way in which complex computation graphs can be managed in two complementing syntax styles. Transformation tools would have full access to the underlying computations and could convert them to other forms (like ONNX for example) in order to optimize execution performance, target specific hardware or make production deployments more robust.
Currently, pipedag supports the database backends Postgres, duckdb, MSSQL/TSQL, and DB2 in a tested way that is used in real projects. There exists also a parquet based backend which uses duckdb but makes sure each table is written to parquet files and only views to those parquet files end up in the .duckdb file. Pipedag can be extended by users. However, support for Snowflake, BigQuery, and common cloud scalabale databases with Postgres compatible SQL dialect are on the list to be supported out-of-the-box in the future. An in-memory apache arrow backed table backend is also likely to come.