API

Public

class pydiverse.pipedag.Flow(name: str = 'default')[source]

A flow represents a collection of dependent Tasks.

A flow is defined by using it as a context manager. Any stages and tasks defined inside the flow context will automatically get added to the flow with the correct dependency wiring.

Parameters:

name – The name of the flow.

Examples

@materialize
def my_materializing_task():
    return Table(...)

with Flow("my_flow") as flow:
    with Stage("stage_1") as stage_1:
        task_1 = my_materializing_task()
        task_2 = another_materializing_task(task_1)

    with Stage("stage_2") as stage_2:
        task_3 = yet_another_task(task_3)
        ...

result = flow.run()
__getitem__(name) Stage[source]

Retrieves a stage by name.

Parameters:

name – The name of the stage.

Raises:

KeyError – if no stage with the specified name exists.

run(*components: Task | TaskGetItem | Stage, config: ConfigContext = None, orchestration_engine: OrchestrationEngine = None, trace_hook: TraceHook = None, fail_fast: bool | None = None, cache_validation_mode: CacheValidationMode | None = None, disable_cache_function: bool | None = None, ignore_task_version: bool | None = None, ignore_position_hashes: bool = False, inputs: dict[Task, ExternalTableReference] | None = None, **kwargs) Result[source]

Execute the flow. This will execute the flow and all tasks within using the orchestration engine.

Parameters:
  • components

    An optional selection of tasks or stages that should get executed. If no values are provided, all tasks and stages get executed.

    If you specify a subset of tasks, those tasks get executed even when they are cache valid, and they don’t get committed.

    If you specify a subset of stages, all tasks and substages of those stages get executed and committed.

  • config – A ConfigContext to use as the configuration for executing this flow. If no config is provided, then pipedag uses the current innermost open ConfigContext, or if no such config exists, it gets the default config from PipedagConfig.default.

  • orchestration_engine – The orchestration engine to use. If no engine is provided, the orchestration engine from the config gets used.

  • fail_fast – Whether exceptions should get raised or swallowed. If set to True, exceptions that occur get immediately raised and the flow gets aborted.

  • cache_validation_mode – Override the cache validation mode. See CacheValidationMode. For None, the cache validation mode from the config gets used. See Pipedag Config File Cache Validation options for more information.

  • disable_cache_function – Override the disable_cache_function setting from the config. See Pipedag Config File Cache Validation options for more information.

  • ignore_task_version – Override the ignore_task_version setting from the config. See Pipedag Config File Cache Validation options for more information.

  • ignore_position_hashes – If True, the position hashes of tasks are not checked when retrieving the inputs of a task from the cache. This simplifies execution of subgraphs if you don’t care whether inputs to that subgraph are cache invalid. This allows multiple modifications in the Graph before the next run updating the cache. Attention: This may break automatic cache invalidation. And for this to work, any task producing an input for the chosen subgraph may never be used more than once per stage. NOTE: This is only supported for the SequentialEngine and SQLTablestore

  • inputs

    Optionally provide the outputs for a subset of tasks. The format is expected as dict[Task, ExternalTableReference]. When the output of said tasks are retrieved as inputs for another dependent task they will be fetched from the ExternalTableReference instead.

    NOTE: This feature is experimental and is not compatible with all aspects of pipedag. Only ExternalTableReferences are supported as inputs. This feature does not work well in combination with Tasks returning multiple outputs (nout > 1 or RawSql tasks returning multiple tables). In the case of multiple outputs, the provided ExternalTableReference will override all outputs of the task. It is currently only supported when using the SQLTablestore at the moment. Using it disables caching.

  • kwargs – Other keyword arguments that get passed on directly to the run() method of the orchestration engine. Consequently, these keyword arguments are engine dependant.

Returns:

A Result object for the current flow run.

Examples

with Flow() as flow:
    with Stage("stage_1") as stage_1:
        task_1 = ...
        task_2 = ...

    with Stage("stage_2") as stage_2:
        task_3 = ...
        task_4 = ...

# Execute the entire flow
flow.run()

# Execute (and commit) only stage 1
flow.run(stage_1)

# Execute (but DON'T commit) only task 1 and task 4
flow.run(task_1, task_4)
get_stage(name: str) Stage[source]

Retrieves a stage by name. Alias for Flow.__getitem__().

Parameters:

name – The name of the stage.

Returns:

The stage.

Raises:

KeyError – if no stage with the specified name exists.

visualize(result: Result | None = None, visualization_tag: str | None = None)[source]

Visualizes the flow as a graph.

If you are running in a jupyter notebook, the graph will get displayed inline. Otherwise, it will get rendered to a pdf that then gets opened in your browser.

Requires Graphviz to be installed on your computer.

Parameters:

result – An optional Result instance. If provided, the visualization will contain additional information such as which tasks ran successfully, or failed.

visualize_url(result: Result | None = None, visualization_tag: str | None = None) str[source]

Visualizes the flow as a graph and returns a URL to view the visualization.

If you don’t have Graphviz installed on your computer (and thus aren’t able to use the visualize() method) then this is the easiest way to visualize the flow. For this we use a free service called Kroki.

Parameters:

result – An optional Result instance. If provided, the visualization will contain additional information such as which tasks ran successfully, or failed.

Returns:

A URL that, when opened, displays the graph.

visualize_pydot(result: Result | None = None, visualization_tag: str | None = None) None[source]

Visualizes the flow as a graph and return a pydot.Dot graph.

Parameters:

result – An optional Result instance. If provided, the visualization will contain additional information such as which tasks ran successfully, or failed.

Returns:

A pydot.Dot graph.

class pydiverse.pipedag.Stage(name: str, materialization_details: str | None = None, group_node_tag: str | None = None, force_committed=False)[source]

A stage represents a collection of related tasks.

The main purpose of a Stage is to allow for a transactionality mechanism. Only if all tasks inside a stage finish successfully does the stage get committed.

All task that get defined inside the stage’s context will automatically get added to the stage.

Parameters:
  • name – The name of the stage. Two stages with the same name may not be used inside the same flow.

  • materialization_details – The label of the materialization_details to be used. Overwrites the label given by the stage.

property name: str

The name of the stage.

property transaction_name: str

The name temporary transaction stage.

property current_name: str

The name of the stage where the data currently lives.

Before a task has been committed this is the transaction name, after the commit it is the normal name.

__getitem__(item: str | tuple[str, int]) Task[source]

Retrieves a task inside the stage by name.

You can either subscribe a Stage using just a string (stage["name"]) or using a tuple containing a string and an integer (stage["name", 3]).

The string is always interpreted as the name of the task to retrieve. If you also pass in an integer, it is interpreted as the index of the task. This means, that if the stage contains multiple tasks with the same name, then you can retrieve a specific instance of that task based on the order in which they were defined.

Raises:
  • KeyError – If no task with the name can be found.

  • IndexError – If the index is out of bounds.

  • ValueError – If multiple matching tasks have been found, but no index has been provided.

get_task(name: str, index: int | None = None) Task[source]

Retrieves a task inside the stage by name. Alias for Stage.__getitem__().

Parameters:
  • name – The name of the task to retrieve.

  • index – If multiple task instances with the same name appear inside the stage, you can request a specific one by passing an index.

@pydiverse.pipedag.materialize(*, name: str | None = None, input_type: type | None = None, version: str | None = None, cache: Callable[[...], Any] | None = None, lazy: bool = False, nout: int = 1, add_input_source: bool = False, ordering_barrier: bool | dict[str, Any] = False, call_context: Callable[[], Any] | None = None, allow_fresh_input: bool = False) Callable[[CallableT], CallableT | UnboundMaterializingTask][source]
@pydiverse.pipedag.materialize(fn: CallableT, /) CallableT | UnboundMaterializingTask

Decorator to create a task whose outputs get materialized.

This decorator takes a class and turns it into a MaterializingTask. This means, that this function can only be used as part of a flow. Any outputs it produces get written to their appropriate storage backends (either the table or blob store). Additionally, any Table or Blob objects this task receives as an input get replaced with the appropriate object retrieved from the storage backend. In other words: All outputs from a materializing task get written to the store, and all inputs get retrieved from the store.

Because of how caching is implemented, task inputs and outputs must all be “materializable”. This means that they can only contain objects of the following types: dict, list, tuple, int, float, str, bool, None, datetime.date, datetime.datetime, pathlib.Path, Table, RawSql, Blob, or Stage.

Parameters:
  • fn – The function that gets executed by this task.

  • name – The name of this task. If no name is provided, the name of fn is used instead.

  • input_type – The data type as which to retrieve table objects from the store. All tables passed to this task get loaded from the table store and converted to this type. See Table Backends for more information.

  • version

    The version of this task. Unless the task is lazy or version=AUTO_VERSION, you always need to manually change this version number when you change the implementation to ensure that the task gets executed and the cache flushed.

    If the version is None and the task isn’t lazy, then the task always gets executed, and all downstream tasks get invalidated.

  • cache

    An explicit cache function used to determine cache validity of the task inputs.

    This function gets called every time before the task gets executed. It gets called with the same arguments as the task.

    An explicit function for validating cache validity. If the output of this function changes while the source parameters are the same (e.g. the source is a filepath and cache loads data from this file), then the cache will be deemed invalid and is not used.

  • lazy

    Whether this task is lazy or not.

    Unlike a normal task, lazy tasks always get executed. However, before table returned by a lazy task gets materialized, the table store checks if the same table has been materialized before. If this is the case, then the table doesn’t get materialized, and instead, the table gets copied from the cache.

    This is efficient for tasks that return SQL queries, because the query only gets generated but will not be executed again if the resulting table is cache-valid.

    The same also works for ExternalTableReference, where the “query” is just the identifier of the table in the store.

    Note

    For tasks returning an ExternalTableReference pipedag cannot automatically

    know if the external tables have changed of not. This should be controlled via a cache function given via the cache argument of materialize. See ExternalTableReference for an example.

    For tasks returning a Polars or Pandas DataFrame, the output is deemed cache-valid if the hash of the resulting DataFrame is the same as the hash of the previous run. So, even though the task always gets executed, downstream tasks can remain cache-valid if the DataFrame is the same as before. This is useful for small tasks that are hard to implement using only LazyFrames, but where the DataFrame generation is cheap.

    In both cases, you don’t need to manually bump the version of a lazy task.

    Warning

    A task returning a Polars LazyFrame should not be marked as lazy. Use version=AUTO_VERSION instead. See AUTO_VERSION.

  • group_node_tag – Set a tag that may add this task to a configuration based group node.

  • nout – The number of objects returned by the task. If set, this allows unpacking and iterating over the results from the task.

  • add_input_source – If true, Table and Blob objects are provided as tuple together with the dematerialized object. task(a=tbl) will result in a=(dematerialized_tbl, tbl) in the task.

  • ordering_barrier – If true, the task will be surrounded by a GroupNode(ordering_barrier=True). If a dictionary is provided, it is surrounded by a GroupNode(**ordering_barrier). This allows passing style, style_tag, and label arguments.

  • call_context – An optional context manager function that is opened before the task or its optional cache function is called and closed afterward.

  • allow_fresh_input – If true, it allows the task to update outputs (cache invalid) for cache_validation.mode=ASSERT_NO_FRESH_INPUT and with cache function indicating that this is an input task.

Example

@materialize(version="1.0", input_type=pd.DataFrame)
def task(df: pd.DataFrame) -> pd.DataFrame:
    df = ...  # Do something with the dataframe
    return Table(df)

@materialize(lazy=True, input_type=sa.Table)
def lazy_task(tbl: sa.Alias) -> sa.Table:
    query = sa.select(tbl).where(...)
    return Table(query)

You can use the cache argument to specify an explicit cache function. In this example we define a task that reads a dataframe from a csv file. Without specifying a cache function, this task would only get rerun when the path argument changes. Instead, we define a function that calculates a hash based on the contents of the csv file, and pass it to the cache argument of @materialize. This means that the task now gets invalidated and rerun if either the path argument changes or if the content of the file pointed to by path changes:

import hashlib

def file_digest(path):
    with open(path, "rb") as f:
        return hashlib.file_digest(f, "sha256").hexdigest()

@materialize(version="1.0", cache=file_digest)
def task(path):
    df = pd.read_scv(path)
    return Table(df)

Setting nout allows you to use unpacking assignment with the task result:

@materialize(version="1.0", nout=2)
def task():
    return 1, 2

# Later, while defining a flow, you can use unpacking assignment
# because you specified that the task returns 2 objects (nout=2).
one, two = task()

If a task returns a dictionary or a list, you can use square brackets to explicitly wire individual values / elements to task inputs:

@materialize(version="1.0")
def task():
    return {
        "x": [0, 1],
        "y": [2, 3],
    }

# Later, while defining a flow
# This would result in another_task being called with ([0, 1], 3).
task_out = task()
another_task(task_out["x"], task_out["y"][1])
@pydiverse.pipedag.input_stage_versions(*, name: str | None = None, input_type: type | None = None, version: str | None = None, cache: Callable[[...], Any] | None = None, lazy: bool = False, group_node_tag: str | None = None, nout: int = 1, add_input_source: bool = False, ordering_barrier: bool | dict[str, Any] = True, call_context: Callable[[], Any] | None = None, allow_fresh_input: bool = False, include_views=True, lock_source_stages=True, pass_args: Iterable[str] = tuple()) Callable[[CallableT], CallableT | UnboundMaterializingTask][source]
@pydiverse.pipedag.input_stage_versions(fn: CallableT, /) CallableT | UnboundMaterializingTask

A decorator that marks a function as a task that receives tables from two versions of the same stage. This can either be the currently active schema of this stage before schema swapping, or it can be an active schema of another pipeline instance.

The arguments to the resulting task are significantly different from a task created with @materialize decorator. An arbitrary expression with lists, tuples, dicts, and Table/Blob/RawSQL references can be provided. This decorator will only collect tables referenced and will pass them as one dictionary per version to the task. It maps table names to some reference for the respective stage version in the form specified by input_type parameter.

When a ConfigContext object is passed as toplevel parameter or named parameter in the wiring call of decorated task, it is used for referencing the second stage version. The idea is that the other configuration is just another pipeline instance accessible with the same SQLAlchemy engine of the table store. It may work also if this is not the case, but it may be harder for the task to process the dictionaries it receives. Currently, it is not allowed to pass more than one ConfigContext object.

When no parameters (except for an optional ConfigContext) is passed, all currently existing tables of both stage versions (or their respective schema) are passed in the dictionaries to the task.

With pass_blobs=True, two additional dictionaries are passed to the task at the end. In addition to the arguments mentioned above, pass_args can be used to feed defined keyword arguments through to the decorated task.

Parameters:
  • fn – The function that gets executed by this task.

  • name – The name of this task. If no name is provided, the name of fn is used instead.

  • input_type – The data type as which to retrieve table objects from the store. All tables passed to this task get loaded from the table store and converted to this type. See Table Backends for more information.

  • version

    The version of this task. Unless the task is lazy, you always need to manually change this version number when you change the implementation to ensure that the task gets executed and the cache flushed.

    If the version is None and the task isn’t lazy, then the task always gets executed, and all downstream tasks get invalidated.

  • cache

    An explicit cache function used to determine cache validity of the task inputs.

    This function gets called every time before the task gets executed. It gets called with the same arguments as the task.

    An explicit function for validating cache validity. If the output of this function changes while the source parameters are the same (e.g. the source is a filepath and cache loads data from this file), then the cache will be deemed invalid and is not used.

  • lazy

    Whether this task is lazy or not.

    Unlike a normal task, lazy tasks always get executed. However, if a lazy task produces a lazy table (e.g. a SQL query), the table store checks if the same query has been executed before. If this is the case, then the query doesn’t get executed, and instead, the table gets copied from the cache.

    This behaviour is very useful, because you don’t need to manually bump the version of a lazy task. This only works because for lazy tables generating the query is very cheap compared to executing it.

  • group_node_tag – Set a tag that may add this task to a configuration based group node.

  • nout – The number of objects returned by the task. If set, this allows unpacking and iterating over the results from the task.

  • add_input_source – If true, Table and Blob objects are provided as tuple together with the dematerialized object. task(a=tbl) will result in a=(dematerialized_tbl, tbl) in the task.

  • ordering_barrier – If true, the task will be surrounded by a GroupNode(ordering_barrier=True). If a dictionary is provided, it is surrounded by a GroupNode(**ordering_barrier). This allows passing style, style_tag, and label arguments. Attention: In contrast to @materialize, the default value is True.

  • call_context – An optional context manager function that is opened before the task or its optional cache function is called and closed afterward.

  • allow_fresh_input – If true, it allows the task to update outputs (cache invalid) for cache_validation.mode=ASSERT_NO_FRESH_INPUT and with cache function indicating that this is an input task.

  • include_views – In case no explicit table references are given, if true, include views when collecting all tables of both stage versions.

  • lock_source_stages – If true, lock the other stage version when a ConfigContext object is passed to this task.

  • pass_args – A list of named arguments that would be passed from the call to the task function. By default, no arguments are passed, and just tables are extracted.

Example

@input_stage_versions(lazy=True, input_type=sa.Table)
def validate_stage(
    transaction: dict[str, sa.Alias],
    other: dict[str, sa.Alias],
):
    compare_tables(transaction, other)

def get_flow():
    with Flow() as f:
        a = produce_a_table()
        validate_stage()  # implicitly receives all tables written before
pydiverse.pipedag.AUTO_VERSION = AUTO_VERSION

Special constant used to indicate that pipedag should automatically determine a version number for a task.

The version is determined by running the task once to construct a representation of the computation performed on the tables (e.g. construct a query plan / computational graph). Using this representation, a unique version number is constructed such that if the computation changes the version number also changes. Then, if the task is deemed to be cache invalid, it is run again, but this time with actual data.

This puts the following constraints on which tasks can use AUTO_VERSION:

  • The task must be a pure function.

  • The task may not inspect the contents of the input tables.

  • The task must return at least one table.

  • The task may not return RawSql.

  • The task may not return Blob.

Polars

Automatic versioning is best supported by polars. To use it, you must specify polars.LazyFrame as the task input type and only use LazyFrames inside your task. For Polars, “Not inspecting the contents of the input tables” means “not collecting the input tables or any tables derived from them and not working eagerly with any external data sources”.

Warning

Be careful not to collect any LazyFrames or work eagerly in such tasks (especially don’t use collected data in conditional statements). Changes in any external data sources that are referenced (e.g. by scan_csv, scan_parquet,…) must be covered by providing a cache function to the cache argument of @materialize which detects any changes within any scanned data sources (e.g. hash complete file content).

Pandas

Pandas support is still experimental. It is implemented using a technique we call “computation tracing”, where we run the task with proxy tables as inputs that record all operations performed on them. This requires heavily monkey patching the pandas and numpy modules. As long as you only use pandas and numpy functions inside your task, computation tracing should work successfully. However, for the monkey patching to work, you only access pandas / numpy functions through their namespace (e.g. pd.concat(...) is allowed, while from pandas import concat; concat(...) is not allowed).

Requires dask to be installed.

Example

@materialize(input_type=pl.LazyFrame, version=AUTO_VERSION)
def polars_task(x: pl.LazyFrame):
    # Some operations that only utilize pl.LazyFrame...
    return x.with_columns(
        (pl.col("col1") * pl.col("col2")).alias("col3")
    )
class pydiverse.pipedag.Table(obj: T | None = None, name: str | None = None, *, primary_key: str | list[str] | None = None, indexes: list[list[str]] | None = None, type_map: dict[str, Any] | None = None, nullable: list[str] | None = None, non_nullable: list[str] | None = None, materialization_details: str | None = None, annotation: type | None = None, stage: 'Stage | None' = None)[source]

Container for storing Tables.

Used to wrap table objects that get returned from materializing tasks. Tables get stored using the table store.

Example: How to return a table from a task.
 @materialize()
 def task():
     df = pd.DataFrame({"x": [0, 1, 2, 3]}
     return Table(df, "name")
Parameters:
  • obj – The table object to wrap

  • name – Optional, case-insensitive name. If no name is provided, an automatically generated name will be used. To prevent name collisions, you can append "%%" at the end of the name to enable automatic name mangling.

  • primary_key – Optional name of the primary key that should be used when materializing this table. Only supported by some table stores.

  • indexes – Optional list of indexes to create. Each provided index should be a list of column names. Only supported by some table stores.

  • type_map – Optional map of column names to types. Depending on the table store this will allow you to control the datatype as which the specified columns get materialized.

  • nullable – List of columns that should be nullable. If nullable is not None, all other columns will be non-nullable.

  • non_nullable – List of columns that should be non-nullable. If non_nullable is not None, all other columns will be nullable.

  • materialization_details – The label of the materialization_details to be used. Overwrites the label given by the stage.

  • stage – This parameter should not be set when returning Table objects in pipedag tasks. But it can be used to manually retrieve a table from the table store.

See also

You can specify which types of objects should automatically get converted to tables using the auto_table config option.

class pydiverse.pipedag.View(src: Any | collections.abc.Iterable[Any], *, sort_by: str | Any | collections.abc.Iterable[str] | collections.abc.Iterable[pydiverse.pipedag.container.SortCol] | collections.abc.Iterable[Any] | None = None, columns: collections.abc.Iterable[str] | collections.abc.Iterable[Any] | collections.abc.Mapping[str, str] | collections.abc.Mapping[str, Any] | str | Any | None = None, limit: int | None = None, assert_normalized: bool = False)[source]

Produces a view on pipedag managed tables with caching support.

Unlike for Table, pipedag needs to understand view queries much better in order to be able to do proper cache invalidation with source tables changing their name despite staying cache valid. That is why this View object includes all information about how to create a view.

Only supported by SQLTableStore and ParquetTableStore.

Examples

A view can combine a set of parquet files as one read_parquet operation:

import sqlalchemy as sa

@materialize(input_type=sa.Table, lazy=True)
def task(tbls: list[sa.Alias]):
    return Table(View(src=tbls), name="union")

It also works specifying views with other input_type values that support lazy table references. In this case, you need to make sure that the src parameter only references input tables exactly as they come in as parameters to the task:

import pydiverse.transform as pdt

@materialize(input_type=pdt.SqlAlchemy, lazy=True)
def task(tbls: list[pdt.Table]):
    return Table(View(src=tbls), name="union")

Furthermore, it can be used to load only a subset of a table and rename columns:

@materialize(input_type=sa.Table)
def task(tbl: sa.Alias):
    cols = [c.name for c in tbl.c if c.name.startswith("a")]
    return Table(
        View(
            src=tbl,
            columns={f"c{col}": col for col in cols},
            sort_by=[tbl.c.id.desc().nulls_first(), "id2"],
            limit=10,
        ),
        name="selection",
    )

Please note that the columns and sort_by parameters can be provided either as strings or as column objects of the respective input_type. This works at least for input_type values sa.Table and pdt.SqlAlchemy.

You can also specify sort order with an explicit class:

from pydiverse.pipedag import SortCol, SortOrder
sort_by = [SortCol("id", SortOrder.DESC, nulls_first=True)]

If you just want to filter columns without renaming, any iterable will do:

@materialize(input_type=sa.Table)
def task(tbl: sa.Alias):
    cols = [c.name for c in tbl.c if c.name.startswith("a")]
    return Table(View(tbl, columns=cols, sort_by=tbl.c.id), name="selection")
class pydiverse.pipedag.RawSql(sql: str | None = None, name: str | None = None, separator: str = ';')[source]

Container for raw sql strings.

This allows returning sql query strings that then get executed in the table store. This is only intended to help with transitioning legacy sql pipelines to pipedag, and should be replaced with pipedag managed tables as soon as possible.

Attention

When using RawSql, make sure that you only write tables to the stage that the corresponding task is running in. Otherwise, schema swapping won’t work. To do this, pass the current stage as an argument to your task and then access the current stage name using Stage.current_name.

Parameters:
  • sql – The sql query string to execute. Depending on the database dialect and the separator parameter, the query will be split into multiple subqueries that then get executed sequentially.

  • name – Optional, case-insensitive name. If no name is provided, an automatically generated name will be used. To prevent name collisions, you can append "%%" at the end of the name to enable automatic name mangling.

  • separator – The separator used when splitting the query into subqueries. Default: ";"

Example

When you want to use tables produced by a RawSql task in a downstream task, you can either use square brackets during flow definition to pass a specific table to a child task, or you can pass the entire RawSql object as an input, in which case all tables get loaded and the child task can access them using square brackets (or any of the dict-like methods defined by RawSql).

@materialize(lazy=True)
def raw_sql_task(stage):
    schema = stage.current_name
    return RawSql(f"""
        CREATE TABLE {schema}.tbl_1 AS SELECT 1 as x;
        CREATE TABLE {schema}.tbl_2 AS SELECT 2 as x;
    """)

@materialize(input_type=sa.Table)
def foo(tbl_1, tbl_2):
    ...

@materialize(input_type=sa.Table)
def bar(raw_sql):
    tbl_1 = raw_sql["tbl_1"]
    tbl_2 = raw_sql["tbl_2"]
    ...

with Flow() as f:
    with Stage("stage") as s:
        out = raw_sql_task(s)

        # Manually pass specific tables into the task
        foo(out["tbl_1"], out["tbl_2"])

        # Or pass the entire RawSql object into the task
        bar(out)
__iter__() Iterable[str][source]

Yields all names of tables produced by this RawSql object.

__contains__(table_name: str) bool[source]

Check if this RawSql object produced a table with name table_name.

__getitem__(table_name: str)[source]

Gets the table produced by this RawSql object with name table_name.

items() Iterable[tuple[str, Any]][source]

Returns pairs of (table_name, table).

get(table_name: str, default=None)[source]

Returns the table with name table_name, or if no such table exists, the default value.

class pydiverse.pipedag.Blob(obj: T | None = None, name: str | None = None)[source]

Blob (binary large object) container.

Used to wrap arbitrary Python objects that get returned from materializing tasks. Blobs get stored in the blob store.

Example: How to return a blob from a task.
 @materialize()
 def task():
     obj = SomePicklableClass()
     return Blob(obj, "name")
Parameters:
  • obj – The object to wrap

  • name – Optional, case-insensitive name. If no name is provided, an automatically generated name will be used. To prevent name collisions, you can append "%%" at the end of the name to enable automatic name mangling.

See also

You can specify which types of objects should automatically get converted to blobs using the auto_blob config option.

class pydiverse.pipedag.GroupNode(label: str | None = None, style: pydiverse.pipedag.core.group_node.VisualizationStyle | None = None, *, ordering_barrier: bool = False, style_tag: str | None = None)[source]

A group node represents a collection of related tasks.

The group can be used as a display element in the visualization, and it can be used to ensure all tasks before/after this group are executed before/after tasks in this group.

Group nodes can contain stages and can be contained by stages.

Example: How to group tasks with a group node.
 with Flow() as flow:
     with Stage("stage1"):
         _ = any_task()
         with GroupNode("group1"):
             task1 = task_within_group()
             _ = task_within_group2(task1)
         _ = any_task()

https://kroki.io/graphviz/svg/eNqljk1LxDAQQO_5FUO8rtjUk5R4XdnjXhcJkyZtw45JyQeyiP_dbsOCyiKoOc7kvTfGjRHnCbbwxvrwMofijcyx2I6louuup5KyjSqplHG0Yvmqxz5QiJLfNOtrG96xOtKE_bFjQ_D5y4BQW5LVsMjziaxMgZz5VOKXlFfz7knst8N0F4Lk_ymOMZT5e1HAYXBEFyUaOzxovoGKZExH9ery5Lxa8Q1Ump8ha_hzx9rfGdorinfGmh8t6E_qbLrC3v8VXKICbh-hBTjQZNHI5TTK6EiKdf0Bc8SyVQ==

Parameters:
  • label – label displayed in the visualization

  • style – visualization style for this group node

  • ordering_barrier – If True, a barrier task will be added to the stage before and after this group to ensure all tasks before/after this group are executed before/after tasks in this group.

  • style_tag – Style tag to be used for visualization

class pydiverse.pipedag.VisualizationStyle(hide_box: bool = False, hide_content: bool = False, hide_label: bool = False, box_color_always: str | None = None, box_color_any_failure: str | None = None, box_color_none_cache_valid: str | None = None, box_color_any_cache_valid: str | None = None, box_color_all_cache_valid: str | None = None, box_color_all_skipped: str | None = None) NoneType[source]

Visualization style for group nodes.

This can be configured via class GroupNode or via configuration in pipedag.yaml.

Parameters:
  • box – Box style of group node

  • hide_content – Hide content of group node in visualization

  • hide_label – Hide label of group node in visualization

  • box_color_always – Color of group node box if specified

  • box_color_any_failure – Color of group node box if any failure occurred within included tasks

  • box_color_none_cache_valid – Color of group node box if no failure occurred and no included task cache valid

  • box_color_any_cache_valid – Color of group node box if no failure occurred and some but not all included tasks cache valid

  • box_color_all_cache_valid – Color of group node box if no failure occurred and all included tasks cache valid

  • box_color_all_skipped – Color of group node box if no failure occurred and all included tasks were skipped

class pydiverse.pipedag.Schema(name: str, prefix: str = '', suffix: str = '') NoneType[source]

Class for holding a schema name with separable prefix and suffix.

name: str

The schema name.

prefix: str

The prefix to be added to the schema name.

suffix: str

The suffix to be added to the schema name.

get() str[source]

Get the schema name with prefix and suffix.

class pydiverse.pipedag.Result(flow: 'Flow', subflow: 'Subflow', underlying: Any, successful: bool, config_context: pydiverse.pipedag.context.context.ConfigContext | None, task_values: dict[pydiverse.pipedag.core.task.Task, Any], task_states: dict[pydiverse.pipedag.core.task.Task, pydiverse.pipedag.context.run_context.FinalTaskState], exception: Exception | None) None[source]

Flow execution result.

flow: Flow

The flow that produced this result

underlying: Any

The underlying result object returned by the orchestration engine. Depending on the engine, this object might have a different type.

successful: bool

Whether the flow execution was successful or not.

task_values: dict[pydiverse.pipedag.core.task.Task, Any]

A dictionary mapping from tasks to the values returned by them.

task_states: dict[pydiverse.pipedag.core.task.Task, pydiverse.pipedag.context.run_context.FinalTaskState]

A dictionary mapping from tasks to their final states.

exception: Exception | None

If an exception was raised during execution, it will get stored in this attribute.

get(task: Task | TaskGetItem, as_type: type = None, write_local_table_cache: bool = False) Any[source]

Retrieve the output produced by a task.

Any tables and blobs returned by a task get loaded from their corresponding store.

If strict_result_get_locking is set to True, this call, as well as as Flow.run() must be wrapped inside a StageLockContext.

Parameters:
  • task – The task for which you want to retrieve the output.

  • as_type – The type as which tables produced by this task should be dematerialized. If no type is specified, the input type of the task is used.

  • write_local_table_cache

    Flag that determines whether the table should be stored in the local table cache, if it is not already there and cache valid. If no local table cache is configured or the type as which the table is retrieved, is not compatible with the local table cache, this flag has no effect.

    Warning

    It is not safe to call this method with write_local_table_cache=True from several threads at the same time.

Returns:

The results of the task.

visualize(visualization_tag: str | None = None)[source]

Wrapper for Flow.visualize().

visualize_url(visualization_tag: str | None = None) str[source]

Wrapper for Flow.visualize_url().

visualize_pydot(visualization_tag: str | None = None) pydot.Dot[source]

Wrapper for Flow.visualize_pydot().

class pydiverse.pipedag.PipedagConfig(path: str | pathlib.Path | dict[str, Any])[source]

This class represents a pipedag config file.

Parameters:

path – Path to the config yaml file to load or a dictionary containing the raw config as it would be loaded from yaml file.

default: PipedagConfig

The default config file.

If the environment variable PIPEDAG_CONFIG is set, then this file will be used as the config file. Otherwise, pipedag searches for a file called pipedag.yaml or pipedag.yml in:

  • The current working directory

  • Any parent directories of the working directory

  • The user folder

get(instance: str | None = None, flow: str | None = None, per_user: bool = False) ConfigContext[source]

Constructs a ConfigContext. For more details how the specific ConfigContext instance is constructed, check out the specifying instances and flows section.

Parameters:
  • instance – Name of the instance. If no value is provided the __any__ instance gets used.

  • flow – Name of the flow. If no value is provided the __any__ flow gets used.

  • per_user – Whether to customize the instance id for each user according to per_user_template.

class pydiverse.pipedag.ConfigContext(config_dict: dict, pipedag_name: str, flow_name: str | None, instance_name: str, fail_fast: bool, strict_result_get_locking: bool, instance_id: str, stage_commit_technique: pydiverse.pipedag.context.context.StageCommitTechnique, cache_validation: box.box.Box, visualization: dict[str, pydiverse.pipedag.context.context.VisualizationConfig], network_interface: str, disable_kroki: bool, kroki_url: str | None, attrs: box.box.Box, table_hook_args: box.box.Box, swallow_exceptions: bool = False, is_evolved: bool = False) NoneType[source]

Configuration context for running a particular pipedag instance.

To create a ConfigContext instance use PipedagConfig.get().

pipedag_name: str

Name of the config file.

flow_name: str | None

Name of the flow used for instantiating this config.

instance_name: str

Name of the instance used for instantiating this config.

instance_id: str

The instance_id.

attrs: box.box.Box

Values from the attrs config section, stored in a Box object. Useful for passing any custom, use case specific config options to your flow.

evolve(**changes) ConfigContext[source]

Create a new config context instance with the changes applied.

Because ConfigContext is immutable, this is the only valid way to derive a new instance with some values mutated.

Wrapper around attrs.evolve().

static new(config: dict[str, Any], pipedag_name: str, flow: str, instance: str)[source]

Create a new ConfigContext instance from dictionary and a few names.

Parameters:
  • config – dictionary with config values

  • pipedag_name – name of pipedag config

  • flow – name of flow

  • instance – name of instance

Returns:

ConfigContext instance

static parse_in_object(key: str, value: dict[str, Any], class_type: Any, within: str, *, inout: dict[str, Any])[source]

Parse a dictionary into a dataclass instance.

Parameters:
  • key – key of inout dictionary to be modified

  • value – dictionary to be parsed into a dataclass instance and stored in inout[key]

  • class_type – dataclass type for what should be written to inout[key]

  • within – context for error messages

  • inout – dictionary to be modified by this function call

Returns:

None

classmethod get() T[source]

Returns the current, innermost context instance.

Raises:

LookupError – If no such context has been entered yet.

class pydiverse.pipedag.StageLockContext[source]

Context manager used to keep stages locked until after Flow.run().

By default, pipedag releases stage locks as soon as it is done processing a stage. This means that by the time the flow has finished running, another flow run might already have overwritten any data in those stages. Consequently, calling Result.get() might not return the values produced by the current run, but instead those from the other (newer) run.

To prevent this, you can wrap the calls to Flow.run() and Result.get() in a StageLockContext. This keeps all stages modified by the flow to remain locked until the end of StageLockContext.

Example

with StageLockContext():
    result = flow.run()
    df = result.get(task_x)
classmethod get() T[source]

Returns the current, innermost context instance.

Raises:

LookupError – If no such context has been entered yet.

Backend Classes

Table Store

class pydiverse.pipedag.backend.table.SQLTableStore(engine_url: str, *, create_database_if_not_exists: bool = False, schema_prefix: str = '', schema_suffix: str = '', avoid_drop_create_schema: bool = False, print_materialize: bool = False, print_sql: bool = False, no_db_locking: bool = True, strict_materialization_details: bool = True, materialization_details: dict[str, dict[str, str | list[str]]] | None = None, default_materialization_details: str | None = None, max_concurrent_copy_operations: int = 5, sqlalchemy_pool_size: int = 12, sqlalchemy_pool_timeout: int = 300, force_transaction_suffix: str | None = None)[source]

Table store that materializes tables to a SQL database

Uses schema swapping for transactions: Creates a schema for each stage and a temporary schema for each transaction. If all tasks inside a stage succeed, swaps the schemas by renaming them.

The correct dialect specific subclass of SQLTableStore gets initialized when based on the dialect found in the provided engine url during initialization.

Supported Tables

The SQLTableStore can materialize (that is, store task output) and dematerialize (that is, retrieve task input) the following Table types:

Framework

Materialization

Dematerialization

SQLAlchemy

sa.Table

Pandas

pd.DataFrame

pd.DataFrame

Polars

tidypolars

tp.Tibble

tp.Tibble

Ibis

ibis.api.Table

ibis.api.Table

pydiverse.transform

pdt.Table

pdt.Polars
pdt.SqlAlchemy

pydiverse.pipedag table reference

ExternalTableReference (no materialization)

Can be read with all dematerialization methods above

pydiverse.pipedag view

View (view with support for src union, column renaming, and sorting)

Can be read with all dematerialization methods above

Parameters:
  • url

    The SQLAlchemy engine url used to connect to the database.

    This URL may contain placeholders like {name} or {instance_id} (additional ones can be defined in the url_attrs_file) or environment variables like {$USER} which get substituted with their respective values.

    Attention: passwords including special characters like @ or : need to be URL encoded.

  • url_attrs_file

    Filename of a yaml file which is read shortly before rendering the final engine URL and which is used to replace custom placeholders in url.

    Just like url, this value may also contain placeholders and environment variables which get substituted.

  • create_database_if_not_exists – If the engine url references a database name that doesn’t yet exists, then setting this value to True tells pipedag to create the database before trying to open a connection to it.

  • schema_prefix – A prefix that gets placed in front of all schema names created by pipedag.

  • schema_suffix – A suffix that gets placed behind of all schema names created by pipedag.

  • avoid_drop_create_schema – If True, no CREATE SCHEMA or DROP SCHEMA statements get issued. This is mostly relevant for databases that support automatic schema creation like IBM DB2.

  • print_materialize – If True, all tables that get materialized get logged.

  • print_sql – If True, all executed SQL statements get logged.

  • no_db_locking – Speed up database by telling it we will not rely on it’s locking mechanisms. Currently not implemented.

  • strict_materialization_details

    If True: raise an exception if
    • the argument materialization_details is given even though the table store does not support it.

    • a table references a materialization_details tag that is not defined in the config.

    If False: Log an error instead of raising an exception

  • materialization_details

    A dictionary of tags that each define properties used for materialization by the table store. If the tag __any__ is present, the other labels will inherit properties from it if they do not override them.

    An example config for DB2 that goes into the args section of the table_store config in pipedag.yaml:

    materialization_details:
      __any__:
        compression: ["COMPRESS YES ADAPTIVE", "VALUE COMPRESSION"]
        table_space_data: "USERSPACE1"
      no_compression:
        # user-defined tag. Inherits table_space_data from __any__
        # but overwrites compression.
        compression: ""
    

    Then, by default, all tables will be created with the __any__ config, unless the label no_compression is specified at stage or table level.

    See the documentation of the SQLTableStore Dialects for supported options for each table store.

  • default_materialization_details – The materialization_details that will be used if materialization_details is not specified on table level. If not set, the __any__ tag (if specified) will be used.

  • max_concurrent_copy_operations – In case of a partially cache-valid stage, we need to copy tables from the cache schema to the new transaction schema. This parameter specifies the maximum number of workers we use for concurrently copying tables.

  • sqlalchemy_pool_size – The number of connections to keep open inside the connection pool. It is recommended to choose a larger number than max_concurrent_copy_operations to avoid running into pool_timeout.

  • sqlalchemy_pool_timeout – The number of seconds to wait before giving up on getting a connection from the pool. This may be relevant in case the connection pool is saturated with concurrent operations each working with one or more database connections.

  • force_transaction_suffix – This option is for use without proper pipedag flow. It disables lookup in stage metadata table for determining correct transaction slot suffix. (default: None)

class pydiverse.pipedag.backend.table.ParquetTableStore(engine_url: str, parquet_base_path: upath.core.UPath, *, create_database_if_not_exists: bool = False, schema_prefix: str = '', schema_suffix: str = '', avoid_drop_create_schema: bool = False, print_materialize: bool = False, print_sql: bool = False, no_db_locking: bool = True, strict_materialization_details: bool = True, materialization_details: dict[str, dict[str | list[str]]] | None = None, default_materialization_details: str | None = None, max_concurrent_copy_operations: int = 5, sqlalchemy_pool_size: int = 12, sqlalchemy_pool_timeout: int = 300, force_transaction_suffix: str | None = None, s3_endpoint_url: str | None = None, s3_url_style: str | None = None, s3_region: str | None = None, allow_overwrite: bool = False)[source]

Table store that materializes tables as parquet files.

Additionally, a duckdb file is created which links to parquet files.

Uses schema swapping for transactions: Creates a schema for each stage and a temporary schema for each transaction. If all tasks inside a stage succeed, swaps the schemas by renaming them.

Supported Tables

The ParquetTableStore can materialize (store task output) and dematerialize (retrieve task input) the following Table types:

Framework

Materialization

Dematerialization

SQLAlchemy

sa.Table

Pandas

pd.DataFrame

pd.DataFrame

Polars

tidypolars

tp.Tibble

tp.Tibble

Ibis

ibis.api.Table

ibis.api.Table

pydiverse.transform

pdt.Table

pdt.Polars
pdt.SqlAlchemy

pydiverse.pipedag table reference

ExternalTableReference (no materialization)

Can be read with all dematerialization methods above

pydiverse.pipedag view

View (view with support for src union, column renaming, and sorting)

Can be read with all dematerialization methods above

Parameters:
  • url

    The SQLAlchemy engine url used to connect to the database.

    This URL may contain placeholders like {name} or {instance_id} (additional ones can be defined in the url_attrs_file) or environment variables like {$USER} which get substituted with their respective values.

    Attention: passwords including special characters like @ or : need to be URL encoded.

  • url_attrs_file

    Filename of a yaml file which is read shortly before rendering the final engine URL and which is used to replace custom placeholders in url.

    Just like url, this value may also contain placeholders and environment variables which get substituted.

  • create_database_if_not_exists – If the engine url references a database name that doesn’t yet exists, then setting this value to True tells pipedag to create the database before trying to open a connection to it.

  • schema_prefix – Prefix placed in front of all schema names created by pipedag.

  • schema_suffix – Suffix placed behind all schema names created by pipedag.

  • avoid_drop_create_schema – If True, no CREATE SCHEMA or DROP SCHEMA statements get issued. This is mostly relevant for databases that support automatic schema creation like IBM DB2.

  • print_materialize – If True, log every table materialization.

  • print_sql – If True, log every executed SQL statement.

  • no_db_locking – Speed up database by telling it we will not rely on it’s locking mechanisms. Currently not implemented.

  • strict_materialization_details

    If True: raise an exception if
    • the argument materialization_details is given even though the table store does not support it.

    • a table references a materialization_details tag that is not defined in the config.

    If False: Log an error instead of raising an exception

  • materialization_details – A dictionary with each entry describing a tag for materialization details of the table store. See subclasses of BaseMaterializationDetails for details.

  • default_materialization_details – The materialization_details that will be used if materialization_details is not specified on table level. If not set, the __any__ tag (if specified) will be used.

  • max_concurrent_copy_operations – In case of a partially cache-valid stage, we need to copy tables from the cache schema to the new transaction schema. This parameter specifies the maximum number of workers we use for concurrently copying tables.

  • sqlalchemy_pool_size – The number of connections to keep open inside the connection pool. It is recommended to choose a larger number than max_concurrent_copy_operations to avoid running into pool_timeout.

  • sqlalchemy_pool_timeout – The number of seconds to wait before giving up on getting a connection from the pool. This may be relevant in case the connection pool is saturated with concurrent operations each working with one or more database connections.

  • force_transaction_suffix – This option is for use without proper pipedag flow. It disables lookup in stage metadata table for determining correct transaction slot suffix. (default: None)

  • parquet_base_bath – This is an fsspec compatible path to either a directory or an S3 bucket key prefix. Examples are /tmp/pipedag/parquet/ or s3://pipedag-test-bucket/table_store/. instance_id will automatically be appended to parquet_base_bath.

  • s3_endpoint_url – When using a non-standard S3 endpoint (like minio), this can be used to specify the URL. Unfortunately, the AWS_ENDPOINT_URL environment variable is not automatically picked up by duckdb. (default: None)

  • s3_use_ssl – Whether to use SSL when connecting to S3 endpoint. (default: None)

  • s3_url_style – Specify URL style when connecting to S3 endpoint. Minio for example requires s3_url_style=’path’. (default: None)

  • allow_overwrite – Allow overwriting tables with store_table. (default: False)

sync_metadata(flow: Flow | None = None)[source]

Sync this DuckDB file with the metadata store.

This method brings the local DuckDB file completely in-sync with the shared metadata store. It creates any missing schemas and syncs views from other users for all stages defined in the flow or for all schemas for which metadata exists.

This is useful when a user wants to access tables created by other users without running the full flow.

Example usage:

cfg = PipedagConfig.default.get(instance_id)
store = cfg.store.table_store
store.sync_metadata()

Attention: No stage locking is performed, so parquet files can get out-of-sync if pipeline instance is executed by other team members while exploring data in duckdb file. However, stage level transactionality keeps your data in main schema stable until the second pipeline run starts.

Parameters:

flow – The flow containing stages to sync. If no flow is given, all schemas for which metadata exists are synchronized.

SQLTableStore Dialects

PostgreSQL
class pydiverse.pipedag.backend.table.sql.dialects.PostgresTableStore(*args, use_adbc: bool = True, **kwargs)[source]

SQLTableStore that supports PostgreSQL.

Takes the same arguments as SQLTableStore.

Supports the PostgresMaterializationDetails materialization details.

class pydiverse.pipedag.backend.table.sql.dialects.postgres.PostgresMaterializationDetails(unlogged: bool = False) NoneType[source]

Materialization details specific to PostgreSQL.

Parameters:

unlogged – Whether to use unlogged tables or not. This reduces safety in case of a crash or unclean shutdown, but can significantly increase write performance.

Example

materialization_details:
    __any__:
        unlogged: true
    my_label:
        unlogged: false

For more general information on materialization details, see materialization_details parameter of SQLTableStore.

DuckDB
class pydiverse.pipedag.backend.table.sql.dialects.DuckDBTableStore(engine_url: str, *, create_database_if_not_exists: bool = False, schema_prefix: str = '', schema_suffix: str = '', avoid_drop_create_schema: bool = False, print_materialize: bool = False, print_sql: bool = False, no_db_locking: bool = True, strict_materialization_details: bool = True, materialization_details: dict[str, dict[str, str | list[str]]] | None = None, default_materialization_details: str | None = None, max_concurrent_copy_operations: int = 5, sqlalchemy_pool_size: int = 12, sqlalchemy_pool_timeout: int = 300, force_transaction_suffix: str | None = None)[source]

SQLTableStore that supports DuckDB.

Takes the same arguments as SQLTableStore

Microsoft SQL Server / T-SQL
class pydiverse.pipedag.backend.table.sql.dialects.MSSqlTableStore(*args, disable_pytsql: bool = False, disable_arrow_odbc: bool = False, disable_bulk_insert: bool = False, pytsql_isolate_top_level_statements: bool = True, max_query_print_length: int = 500000, **kwargs)[source]

SQLTableStore that supports Microsoft SQL Server.

Requires ODBC driver for Microsoft SQL Server to be installed. https://learn.microsoft.com/en-us/sql/connect/odbc/download-odbc-driver-for-sql-server For default arguments, both msodbcsql and mssql-tools need to be installed. Debug installation issues with odbcinst -j. On Linux, conda-forge pyodbc package expects ini files in different location than Microsoft installs it (symlink helps).

Supports the MSSqlMaterializationDetails materialization details.

In addition to the arguments of SQLTableStore, it also takes the following arguments:

Parameters:
  • disable_pytsql – For mssql, a package called pytsql is used for executing RawSql scripts. It has the advantage that it allows for some kind of SQL based print statements. However, it may fail for some statements. For those cases, you can set disable_pytsql: true to use another logic for splitting up Raw SQL scripts and handing that over to sqlalchemy. This is actually quite a complex process for mssql. Sorry for any inconveniences. We will try to make it work for most tsql code that should be integrated in pipedag pipelines. However, the ultimate goal is to split up monolithic blocks of dynamic sql statements into defined transformations with dynamic aspects written in python.

  • disable_arrow_odbc – By default we try to use arrow-odbc to read data from Microsoft SQL Server. However, pyarrow has problems with long string columns. If you want to disable this, set this parameter to True.

  • disable_bulk_insert – By default we try to use bulk insert to write data to Microsoft SQL Server. Unfortunately, the bcp command line tool and bulk insert libraries like bcpandas have trouble with corner cases like linebreaks or other special characters in string columns. If you want to disable this, set this parameter to True. If installed, bulk insert is done with mssqlkit (not open source). Otherwise, bcpandas is used.

  • pytsql_isolate_top_level_statements – This parameter is handed over to pytsql.executes and causes the script to be split in top level statements that are sent to sqlalchemy separately. The tricky part here is that some magic is done to make DECLARE statements reach across, but it is not guaranteed to be identical to scripts executed by a SQL UI.

  • max_query_print_length – Maximum number of characters in a SQL query that are printed when logging the query before the log is truncated. Defaults to 5000 characters.

class pydiverse.pipedag.backend.table.sql.dialects.mssql.MSSqlMaterializationDetails(columnstore: bool = False) NoneType[source]

Materialization details specific to Microsoft SQL Server.

Parameters:

columnstore – Whether to create tables as columnstores by defining a clustered columnstore index on them. This can improve performance. See the Microsoft documentation.

IBM DB2
class pydiverse.pipedag.backend.table.sql.dialects.IBMDB2TableStore(engine_url: str, *, create_database_if_not_exists: bool = False, schema_prefix: str = '', schema_suffix: str = '', avoid_drop_create_schema: bool = False, print_materialize: bool = False, print_sql: bool = False, no_db_locking: bool = True, strict_materialization_details: bool = True, materialization_details: dict[str, dict[str, str | list[str]]] | None = None, default_materialization_details: str | None = None, max_concurrent_copy_operations: int = 5, sqlalchemy_pool_size: int = 12, sqlalchemy_pool_timeout: int = 300, force_transaction_suffix: str | None = None)[source]

SQLTableStore that supports IBM Db2. Requires ibm-db-sa to be installed.

Takes the same arguments as SQLTableStore.

Supports the IBMDB2MaterializationDetails materialization details.

class pydiverse.pipedag.backend.table.sql.dialects.ibm_db2.IBMDB2MaterializationDetails(compression: pydiverse.pipedag.backend.table.sql.dialects.ibm_db2.IBMDB2CompressionTypes | list[pydiverse.pipedag.backend.table.sql.dialects.ibm_db2.IBMDB2CompressionTypes] | None = None, table_space_data: str | None = None, table_space_index: str | None = None, table_space_long: str | list[str] | None = None) NoneType[source]

Materialization details specific to IBM DB2.

Parameters:
  • compression – Specify the compression methods to be applied to the table. For possible values see IBMDB2CompressionTypes.

  • table_space_data – The DB2 table space where the data is stored.

  • table_space_index – The DB2 table space where the partitioned index is stored.

  • table_space_long – The DB2 table spaces where the values of any long columns are stored.

Example

materialization_details:
  __any__:
    compression: ["COMPRESS YES ADAPTIVE", "VALUE COMPRESSION"]
    table_space_data: "USERSPACE1"
  no_compression:
    # user-defined tag. Inherits table_space_data from __any__
    # but overwrites compression.
    compression: ""
class pydiverse.pipedag.backend.table.sql.dialects.ibm_db2.IBMDB2CompressionTypes(*values)[source]

Enum for the compression methods for IBM DB2 tables. VALUE COMPRESSION can be combined with one of the other compression methods.

See the IBM DB2 documentation for more details.

NO_COMPRESSION = ''
ROW_COMPRESSION = 'COMPRESS YES'
STATIC_ROW_COMPRESSION = 'COMPRESS YES STATIC'
ADAPTIVE_ROW_COMPRESSION = 'COMPRESS YES ADAPTIVE'
VALUE_COMPRESSION = 'VALUE COMPRESSION'

Local Table Cache

class pydiverse.pipedag.backend.table.cache.ParquetTableCache(*args, base_path: str | upath.core.UPath, **kwargs)[source]

Local Table Cache that stores tables in Parquet files.

Supported Tables

The ParquetTableCache supports Pandas, Polars and pydiverse.transform.

Parameters:

base_path – A path to a folder where the Parquet files should get stored. To differentiate between different instances, the instance_id will automatically be appended to the provided path.

Blob Store

class pydiverse.pipedag.backend.blob.FileBlobStore(base_path: str | upath.core.UPath)[source]

File based blob store

The FileBlobStore stores blobs in a folder structure on a file system. In the base directory there will be two folders for every stage, one for the base and one for the transaction stage. Inside those folders the blobs will be stored as pickled files: base_path/instance_id/STAGE_NAME/BLOB_NAME.pkl.

To commit a stage, the only thing that has to be done is to rename the appropriate folders.

Lock Manager

class pydiverse.pipedag.backend.lock.DatabaseLockManager(engine: sqlalchemy.engine.base.Engine, instance_id: str, lock_schema: pydiverse.pipedag.container.Schema | None = None, create_lock_schema: bool = True)[source]

Lock manager based on database locking mechanisms

Many databases provide some kind of locking mechanism. Depending on the specific database technology, this allows us to implement locking on either the schema level (where each stage can be locked and unlocked individually), or only on the instance level (where the entire instance including all stages get locked and unlocked together).

This lock manager uses the same database as the SQLTableStore. No other configuration in the args section of the config file is required.

We currently support the following databases: PostgreSQL, Microsoft SQL Server, IBM DB2.

class pydiverse.pipedag.backend.lock.ZooKeeperLockManager(client: NoneType, base_path: str)[source]

Apache ZooKeeper based lock manager

Uses Apache ZooKeeper to establish fully distributed locks that are globally synchronous. The advantage of this approach is that we it is highly reliable and that in case our flow crashes, the acquired locks automatically get released (the locks are ephemeral).

Config File

All arguments in the args section get passed as-is to the initializer of kazoo.client.KazooClient. Some useful arguments include:

Parameters:

hosts – Comma separated list of hosts to connect.

class pydiverse.pipedag.backend.lock.FileLockManager(base_path: str | pathlib.Path)[source]

Lock manager that uses lock files

For details on how exactly the file locking is implemented, check out the filelock documentation.

Parameters:

base_path – A path to a folder where the lock files should get stored. To differentiate between different instances, the instance_id will automatically be appended to the provided path.

class pydiverse.pipedag.backend.lock.NoLockManager(logger_kwargs=None)[source]

This lock manager doesn’t do any locking and only serves as a placeholder for an actual lock manager for testing something locally.

Warning

This lock manager is not intended for use in a production environment. Using a lock manager is essential for preventing data corruption.

Orchestration Engine

class pydiverse.pipedag.engine.SequentialEngine[source]

Most basic orchestration engine that just executes all tasks sequentially.

class pydiverse.pipedag.engine.DaskEngine(**dask_compute_kwargs)[source]

Execute a flow in parallel on a single machine using dask.

Parameters:

dask_compute_kwargs – Keyword arguments that get passed to dask.compute(). The main kwarg you might be interested in is num_workers, which allows you to specify how many worker processes dask should spawn. By default, it spawns one worker per CPU core.

class pydiverse.pipedag.PrefectEngine

Alias for either PrefectOneEngine or PrefectTwoEngine depending on the version of Prefect that is installed.

class pydiverse.pipedag.engine.prefect.PrefectOneEngine(flow_kwargs: dict[str, Any] = None)[source]

Hands over execution of a flow to Prefect 1.

Parameters:

flow_kwargs – Optional dictionary of keyword arguments that get passed to the initializer of prefect.Flow.

class pydiverse.pipedag.engine.prefect.PrefectTwoEngine(flow_kwargs: dict[str, Any] = None)[source]

Hands over execution of a flow to Prefect 2.

Parameters:

flow_kwargs – Optional dictionary of keyword arguments that get passed to the initializer of @prefect.flow decorator.

Special Table Types

class pydiverse.pipedag.container.ExternalTableReference(name: str, schema: str, shared_lock_allowed: bool = False)[source]

Reference to a user-created table.

By returning a ExternalTableReference wrapped in a Table from, a task you can tell pipedag about a table, a view or DB2 nickname in an external schema. The schema may be a multi-part identifier like “[db_name].[schema_name]” if the database supports this. It is passed to SQLAlchemy as-is.

Only supported by SQLTableStore.

Warning

When using a ExternalTableReference, pipedag has no way of knowing the cache validity of the external object. Hence, the user should provide a cache function for the Task. It is now allowed to specify a ExternalTableReference to a table in schema of the current stage.

Example

You can use a ExternalTableReference to tell pipedag about a table that exists in an external schema:

@materialize(version="1.0")
def task():
    return Table(ExternalTableReference("name_of_table", "schema"))

By using a cache function, you can establish the cache (in-)validity of the external table:

from datetime import date

# The external table becomes cache invalid every day at midnight
def my_cache_fun():
    return date.today().strftime("%Y/%m/%d")

@materialize(cache=my_cache_fun)
def task():
    return Table(ExternalTableReference("name_of_table", "schema"))

The ExternalTableReference object can also be created at flow wiring time:

with Flow() as f:
    with Stage("stage") as s:
        tbl = Table(ExternalTableReference("name_of_table", "schema"))
        _ = some_task(tbl)