Simple pipeline¶
This example (see also example.zip) shows a simple pipeline
with a few tasks and stages. It is the same example as used
in Database Testing but with a DuckDB connection that does not require docker-compose or
pipedag.yaml.
It also shows how to unit-test a pipeline by dematerializing tables after running the flow:
result.get(lazy_1, as_type=pd.DataFrame)
import tempfile
import pandas as pd
import sqlalchemy as sa
from pydiverse.pipedag import Flow, Stage, Table, materialize
from pydiverse.pipedag.context import StageLockContext
from pydiverse.pipedag.core.config import create_basic_pipedag_config
from pydiverse.common.util.structlog import setup_logging
@materialize(lazy=True)
def lazy_task_1():
return sa.select(
sa.literal(1).label("x"),
sa.literal(2).label("y"),
)
@materialize(lazy=True, input_type=sa.Table)
def lazy_task_2(input1: sa.Alias, input2: sa.Alias):
query = sa.select(
(input1.c.x * 5).label("x5"),
input2.c.a,
).select_from(input1.outerjoin(input2, input2.c.x == input1.c.x))
return Table(query, name="task_2_out", primary_key=["a"])
@materialize(lazy=True, input_type=sa.Table)
def lazy_task_3(input1: sa.Alias):
return sa.text(f"SELECT * FROM {input1.original.schema}.{input1.original.name}")
@materialize(lazy=True, input_type=sa.Table)
def lazy_task_4(input1: sa.Alias):
return sa.text(f"SELECT * FROM {input1.original.schema}.{input1.original.name}")
@materialize(nout=2, version="1.0.0")
def eager_inputs():
dfA = pd.DataFrame(
{
"a": [0, 1, 2, 4],
"b": [9, 8, 7, 6],
}
)
dfB = pd.DataFrame(
{
"a": [2, 1, 0, 1],
"x": [1, 1, 2, 2],
}
)
return Table(dfA, "dfA"), Table(dfB, "dfB_%%")
@materialize(version="1.0.0", input_type=pd.DataFrame)
def eager_task(tbl1: pd.DataFrame, tbl2: pd.DataFrame):
return tbl1.merge(tbl2, on="x")
def main():
with tempfile.TemporaryDirectory() as temp_dir:
cfg = create_basic_pipedag_config(
f"duckdb:///{temp_dir}/db.duckdb",
disable_stage_locking=True, # This is special for duckdb
# Attention: If uncommented, stage and task names might be sent to the
# following URL. You can self-host kroki if you like:
# https://docs.kroki.io/kroki/setup/install/
# You need to install optional dependency 'pydot' for any visualization
# URL to appear.
# kroki_url="https://kroki.io",
).get("default")
with cfg:
with Flow() as f:
with Stage("stage_1"):
lazy_1 = lazy_task_1()
a, b = eager_inputs()
with Stage("stage_2"):
lazy_2 = lazy_task_2(lazy_1, b)
lazy_3 = lazy_task_3(lazy_2)
eager = eager_task(lazy_1, b)
with Stage("stage_3"):
lazy_4 = lazy_task_4(lazy_2)
_ = lazy_3, lazy_4, eager # unused terminal output tables
# Run flow
result = f.run()
assert result.successful
# Run in a different way for testing
with StageLockContext():
result = f.run()
assert result.successful
assert result.get(lazy_1, as_type=pd.DataFrame)["x"][0] == 1
if __name__ == "__main__":
setup_logging() # you can setup the logging and/or structlog libraries as you wish
main()
What happens when f.run() is called¶
When you call f.run(), pipedag executes your pipeline stage by stage. Here’s a step-by-step breakdown of what happens:
1. Stage-by-stage execution¶
Pipedag processes stages in order (stage_1 → stage_2 → stage_3). All tasks within a stage are executed before
moving to the next stage. Tasks within a stage may run in parallel if you use a parallel orchestration engine (like Dask
or Prefect).
If only a subset of stages is specified for execution (using f.run(stage_2, stage_3)), only those stages are run.
2. Using a temporary schema per stage¶
A stage corresponds to a production schema in the database (e.g., stage_1) and a temporary schema (e.g.,
stage_1__tmp).
When executing a stage, pipedag uses a temporary schema (e.g., stage_1__tmp) where all task outputs for
that stage are written. This keeps the work-in-progress isolated from any previously committed results.
If the schema already exists (e.g., from a previous run), all of its contents are dropped at the beginning of the
stage execution.
Note: Depending on the backend, schema renaming may not be supported. In such cases, schema names with __odd and
__even suffixes are used to alternate between two schemas for each stage.
This behavior is triggered by setting stage_commit_technique: READ_VIEWS in pipedag.yaml.
3. Task execution, caching, and materialization¶
How and if a task runs depends on its type (lazy or eager) and cache validity.
Lazy tasks (like
lazy_task_1): The task function is always executed to produce its result. This is typically a lightweight operation e.g., a SQL query or small dataframes. The output of the task (e.g., the SQL query or the dataframe) is used as an input for determining the cache validity of the task. Any task output is only materialized (i.e. written to the database) if it is cache invalid (e.g. the generated SQL query changed).Eager tasks (like
eager_inputs): The cache is checked before execution. If valid, the task is skipped entirely and the cached result is reused. Only cache-invalid eager tasks actually run their Python code.
During task execution, all outputs are written to the temporary schema of the current stage. If a task is cache-valid, its output is either copied from the production schema or a view / synonym is created in the temporary schema. To avoid a mixture of copies and views / synonyms within a stage, if at least one task in the stage is cache-invalid, all cache-valid tasks are copied into the temporary schema.
4. Schema swapping¶
Once all tasks in a stage complete successfully, pipedag performs a swap of the temporary schema with the production
schema: In this example stage_1 and stage_1__tmp are swapped. Hence, after the swap, stage_1 contains the newly
computed
tables,
and stage_1__tmp contains the previous version (which will be dropped on the next run).
This ensures that the production schema always contains a consistent set of tables from a fully completed stage.
5. Final result¶
After all stages complete, f.run() returns a Result object. You can:
Check
result.successfulto verify the pipeline completedUse
result.get(task_output, as_type=pd.DataFrame)to retrieve any task’s output as a specific type