API#

Public#

class pydiverse.pipedag.Flow(name: str = 'default')[source]#

A flow represents a collection of dependent Tasks.

A flow is defined by using it as a context manager. Any stages and tasks defined inside the flow context will automatically get added to the flow with the correct dependency wiring.

Parameters:

name – The name of the flow.

Examples

@materialize
def my_materializing_task():
    return Table(...)

with Flow("my_flow") as flow:
    with Stage("stage_1") as stage_1:
        task_1 = my_materializing_task()
        task_2 = another_materializing_task(task_1)

    with Stage("stage_2") as stage_2:
        task_3 = yet_another_task(task_3)
        ...

result = flow.run()
__getitem__(name) Stage[source]#

Retrieves a stage by name.

Parameters:

name – The name of the stage.

Raises:

KeyError – if no stage with the specified name exists.

run(*components: Task | TaskGetItem | Stage, config: ConfigContext = None, orchestration_engine: OrchestrationEngine = None, fail_fast: bool | None = None, ignore_cache_function: bool = False, **kwargs) Result[source]#

Execute the flow. This will execute the flow and all tasks within using the orchestration engine.

Parameters:
  • components

    An optional selection of tasks or stages that should get executed. If no values are provided, all tasks and stages get executed.

    If you specify a subset of tasks, those tasks get executed even when they are cache valid, and they don’t get committed.

    If you specify a subset of stages, all tasks and substages of those stages get executed and committed.

  • config – A ConfigContext to use as the configuration for executing this flow. If no config is provided, then pipedag uses the current innermost open ConfigContext, or if no such config exists, it gets the default config from PipedagConfig.default.

  • orchestration_engine – The orchestration engine to use. If no engine is provided, the orchestration engine from the config gets used.

  • fail_fast – Whether exceptions should get raised or swallowed. If set to True, exceptions that occur get immediately raised and the flow gets aborted.

  • ignore_cache_function – When set to True, the task’s cache function gets ignored when determining the cache validity of a task.

  • kwargs – Other keyword arguments that get passed on directly to the run() method of the orchestration engine. Consequently, these keyword arguments are engine dependant.

Returns:

A Result object for the current flow run.

Examples

with Flow() as flow:
    with Stage("stage_1") as stage_1:
        task_1 = ...
        task_2 = ...

    with Stage("stage_2") as stage_2:
        task_3 = ...
        task_4 = ...

# Execute the entire flow
flow.run()

# Execute (and commit) only stage 1
flow.run(stage_1)

# Execute (but DON'T commit) only task 1 and task 4
flow.run(task_1, task_4)
get_stage(name: str) Stage[source]#

Retrieves a stage by name. Alias for Flow.__getitem__().

Parameters:

name – The name of the stage.

Returns:

The stage.

Raises:

KeyError – if no stage with the specified name exists.

visualize(result: Result | None = None)[source]#

Visualizes the flow as a graph.

If you are running in a jupyter notebook, the graph will get displayed inline. Otherwise, it will get rendered to a pdf that then gets opened in your browser.

Requires Graphviz to be installed on your computer.

Parameters:

result – An optional Result instance. If provided, the visualization will contain additional information such as which tasks ran successfully, or failed.

visualize_url(result: Result | None = None) str[source]#

Visualizes the flow as a graph and returns a URL to view the visualization.

If you don’t have Graphviz installed on your computer (and thus aren’t able to use the visualize() method) then this is the easiest way to visualize the flow. For this we use a free service called Kroki.

Parameters:

result – An optional Result instance. If provided, the visualization will contain additional information such as which tasks ran successfully, or failed.

Returns:

A URL that, when opened, displays the graph.

visualize_pydot(result: Result | None = None) pydot.Dot[source]#

Visualizes the flow as a graph and return a pydot.Dot graph.

Parameters:

result – An optional Result instance. If provided, the visualization will contain additional information such as which tasks ran successfully, or failed.

Returns:

A pydot.Dot graph.

class pydiverse.pipedag.Stage(name: str, materialization_details: str | None = None)[source]#

A stage represents a collection of related tasks.

The main purpose of a Stage is to allow for a transactionality mechanism. Only if all tasks inside a stage finish successfully does the stage get committed.

All task that get defined inside the stage’s context will automatically get added to the stage.

Parameters:
  • name – The name of the stage. Two stages with the same name may not be used inside the same flow.

  • materialization_details – The label of the materialization_details to be used. Overwrites the label given by the stage.

property name: str#

The name of the stage.

property transaction_name: str#

The name temporary transaction stage.

property current_name: str#

The name of the stage where the data currently lives.

Before a task has been committed this is the transaction name, after the commit it is the normal name.

__getitem__(item: str | tuple[str, int]) Task[source]#

Retrieves a task inside the stage by name.

You can either subscribe a Stage using just a string (stage["name"]) or using a tuple containing a string and an integer (stage["name", 3]).

The string is always interpreted as the name of the task to retrieve. If you also pass in an integer, it is interpreted as the index of the task. This means, that if the stage contains multiple tasks with the same name, then you can retrieve a specific instance of that task based on the order in which they were defined.

Raises:
  • KeyError – If no task with the name can be found.

  • IndexError – If the index is out of bounds.

  • ValueError – If multiple matching tasks have been found, but no index has been provided.

get_task(name: str, index: int | None = None) Task[source]#

Retrieves a task inside the stage by name. Alias for Stage.__getitem__().

Parameters:
  • name – The name of the task to retrieve.

  • index – If multiple task instances with the same name appear inside the stage, you can request a specific one by passing an index.

@pydiverse.pipedag.materialize(*, name: str = None, input_type: type | tuple | dict[str, Any] = None, version: str = None, cache: Callable = None, lazy: bool = False, nout: int = 1) Callable[[CallableT], CallableT | UnboundMaterializingTask][source]#
@pydiverse.pipedag.materialize(fn: CallableT, /) CallableT | UnboundMaterializingTask

Decorator to create a task whose outputs get materialized.

This decorator takes a class and turns it into a MaterializingTask. This means, that this function can only be used as part of a flow. Any outputs it produces get written to their appropriate storage backends (either the table or blob store). Additionally, any Table or Blob objects this task receives as an input get replaced with the appropriate object retrieved from the storage backend. In other words: All outputs from a materializing task get written to the store, and all inputs get retrieved from the store.

Because of how caching is implemented, task inputs and outputs must all be “materializable”. This means that they can only contain objects of the following types: dict, list, tuple, int, float, str, bool, None, datetime.date, datetime.datetime, pathlib.Path, Table, RawSql, Blob, or Stage.

Parameters:
  • fn – The function that gets executed by this task.

  • name – The name of this task. If no name is provided, the name of fn is used instead.

  • input_type – The data type as which to retrieve table objects from the store. All tables passed to this task get loaded from the table store and converted to this type. See Table Backends for more information.

  • version

    The version of this task. Unless the task is lazy, you always need to manually change this version number when you change the implementation to ensure that the task gets executed and the cache flushed.

    If the version is None and the task isn’t lazy, then the task always gets executed, and all downstream tasks get invalidated.

  • cache

    An explicit cache function used to determine cache validity of the task inputs.

    This function gets called every time before the task gets executed. It gets called with the same arguments as the task.

    An explicit function for validating cache validity. If the output of this function changes while the source parameters are the same (e.g. the source is a filepath and cache loads data from this file), then the cache will be deemed invalid and is not used.

  • lazy

    Whether this task is lazy or not.

    Unlike a normal task, lazy tasks always get executed. However, if a lazy task produces a lazy table (e.g. a SQL query), the table store checks if the same query has been executed before. If this is the case, then the query doesn’t get executed, and instead, the table gets copied from the cache.

    This behaviour is very useful, because you don’t need to manually bump the version of a lazy task. This only works because for lazy tables generating the query is very cheap compared to executing it.

  • nout – The number of objects returned by the task. If set, this allows unpacking and iterating over the results from the task.

Example

@materialize(version="1.0", input_type=pd.DataFrame)
def task(df: pd.DataFrame) -> pd.DataFrame:
    df = ...  # Do something with the dataframe
    return Table(df)

@materialize(lazy=True, input_type=sa.Table)
def lazy_task(tbl: sa.Table) -> sa.Table:
    query = sa.select(tbl).where(...)
    return Table(query)

You can use the cache argument to specify an explicit cache function. In this example we define a task that reads a dataframe from a csv file. Without specifying a cache function, this task would only get rerun when the path argument changes. Instead, we define a function that calculates a hash based on the contents of the csv file, and pass it to the cache argument of @materialize. This means that the task now gets invalidated and rerun if either the path argument changes or if the content of the file pointed to by path changes:

import hashlib

def file_digest(path):
    with open(path, "rb") as f:
        return hashlib.file_digest(f, "sha256").hexdigest()

@materialize(version="1.0", cache=file_digest)
def task(path):
    df = pd.read_scv(path)
    return Table(df)

Setting nout allows you to use unpacking assignment with the task result:

@materialize(version="1.0", nout=2)
def task():
    return 1, 2

# Later, while defining a flow, you can use unpacking assignment
# because you specified that the task returns 2 objects (nout=2).
one, two = task()

If a task returns a dictionary or a list, you can use square brackets to explicitly wire individual values / elements to task inputs:

@materialize(version="1.0")
def task():
    return {
        "x": [0, 1],
        "y": [2, 3],
    }

# Later, while defining a flow
# This would result in another_task being called with ([0, 1], 3).
task_out = task()
another_task(task_out["x"], task_out["y"][1])
pydiverse.pipedag.AUTO_VERSION = AUTO_VERSION#

Special constant used to indicate that pipedag should automatically determine a version number for a task.

The version is determined by running the task once to construct a representation of the computation performed on the tables (e.g. construct a query plan / computational graph). Using this representation, a unique version number is constructed such that if the computation changes the version number also changes. Then, if the task is deemed to be cache invalid, it is run again, but this time with actual data.

This puts the following constraints on which tasks can use AUTO_VERSION:

  • The task must be a pure function.

  • The task may not inspect the contents of the input tables.

  • The task must return at least one table.

  • The task may not return RawSql.

  • The task may not return Blob.

Polars

Automatic versioning is best supported by polars. To use it, you must specify polars.LazyFrame as the task input type and only use LazyFrames inside your task.

Pandas

Pandas support is still experimental. It is implemented using a technique we call “computation tracing”, where we run the task with proxy tables as inputs that record all operations performed on them. This requires heavily monkey patching the pandas and numpy modules. As long as you only use pandas and numpy functions inside your task, computation tracing should work successfully. However, for the monkey patching to work, you only access pandas / numpy functions through their namespace (e.g. pd.concat(...) is allowed, while from pandas import concat; concat(...) is not allowed).

Requires dask to be installed.

Example

@materialize(input_type=pl.LazyFrame, version=AUTO_VERSION)
def polars_task(x: pl.LazyFrame):
    # Some operations that only utilize pl.LazyFrame...
    return x.with_columns(
        (pl.col("col1") * pl.col("col2")).alias("col3")
    )
class pydiverse.pipedag.Table(obj: T | None = None, name: str | None = None, *, primary_key: str | list[str] | None = None, indexes: list[list[str]] | None = None, type_map: dict[str, Any] | None = None, materialization_details: str | None = None)[source]#

Container for storing Tables.

Used to wrap table objects that get returned from materializing tasks. Tables get stored using the table store.

Example: How to return a table from a task.#
 @materialize()
 def task():
     df = pd.DataFrame({"x": [0, 1, 2, 3]}
     return Table(df, "name")
Parameters:
  • obj – The table object to wrap

  • name – Optional, case-insensitive name. If no name is provided, an automatically generated name will be used. To prevent name collisions, you can append "%%" at the end of the name to enable automatic name mangling.

  • primary_key – Optional name of the primary key that should be used when materializing this table. Only supported by some table stores.

  • indexes – Optional list of indexes to create. Each provided index should be a list of column names. Only supported by some table stores.

  • type_map – Optional map of column names to types. Depending on the table store this will allow you to control the datatype as which the specified columns get materialized.

  • materialization_details – The label of the materialization_details to be used. Overwrites the label given by the stage.

See also

You can specify which types of objects should automatically get converted to tables using the auto_table config option.

class pydiverse.pipedag.RawSql(sql: str | None = None, name: str | None = None, separator: str = ';')[source]#

Container for raw sql strings.

This allows returning sql query strings that then get executed in the table store. This is only intended to help with transitioning legacy sql pipelines to pipedag, and should be replaced with pipedag managed tables as soon as possible.

Attention

When using RawSql, make sure that you only write tables to the stage that the corresponding task is running in. Otherwise, schema swapping won’t work. To do this, pass the current stage as an argument to your task and then access the current stage name using Stage.current_name.

Parameters:
  • sql – The sql query string to execute. Depending on the database dialect and the separator parameter, the query will be split into multiple subqueries that then get executed sequentially.

  • name – Optional, case-insensitive name. If no name is provided, an automatically generated name will be used. To prevent name collisions, you can append "%%" at the end of the name to enable automatic name mangling.

  • separator – The separator used when splitting the query into subqueries. Default: ";"

Example

When you want to use tables produced by a RawSql task in a downstream task, you can either use square brackets during flow definition to pass a specific table to a child task, or you can pass the entire RawSql object as an input, in which case all tables get loaded and the child task can access them using square brackets (or any of the dict-like methods defined by RawSql).

@materialize(lazy=True)
def raw_sql_task(stage):
    schema = stage.current_name
    return RawSql(f"""
        CREATE TABLE {schema}.tbl_1 AS SELECT 1 as x;
        CREATE TABEL {schema}.tbl_2 AS SELECT 2 as x;
    """)

@materialize(input_type=sa.Table)
def foo(tbl_1, tbl_2):
    ...

@materialize(input_type=sa.Table)
def bar(raw_sql):
    tbl_1 = raw_sql["tbl_1"]
    tbl_2 = raw_sql["tbl_2"]
    ...

with Flow() as f:
    with Stage("stage") as s:
        out = raw_sql_task(s)

        # Manually pass specific tables into the task
        foo(out["tbl_1"], out["tbl_2"])

        # Or pass the entire RawSql object into the task
        bar(out)
__iter__() Iterable[str][source]#

Yields all names of tables produced by this RawSql object.

__contains__(table_name: str) bool[source]#

Check if this RawSql object produced a table with name table_name.

__getitem__(table_name: str)[source]#

Gets the table produced by this RawSql object with name table_name.

items() Iterable[tuple[str, Any]][source]#

Returns pairs of (table_name, table).

get(table_name: str, default=None)[source]#

Returns the table with name table_name, or if no such table exists, the default value.

class pydiverse.pipedag.Blob(obj: T | None = None, name: str | None = None)[source]#

Blob (binary large object) container.

Used to wrap arbitrary Python objects that get returned from materializing tasks. Blobs get stored in the blob store.

Example: How to return a blob from a task.#
 @materialize()
 def task():
     obj = SomePicklableClass()
     return Blob(obj, "name")
Parameters:
  • obj – The object to wrap

  • name – Optional, case-insensitive name. If no name is provided, an automatically generated name will be used. To prevent name collisions, you can append "%%" at the end of the name to enable automatic name mangling.

See also

You can specify which types of objects should automatically get converted to blobs using the auto_blob config option.

class pydiverse.pipedag.Result(flow: 'Flow', subflow: 'Subflow', underlying: 'Any', successful: 'bool', config_context: 'ConfigContext | None', task_values: 'dict[Task, Any]', task_states: 'dict[Task, FinalTaskState]', exception: 'Exception | None') None[source]#

Flow execution result.

flow: Flow#

The flow that produced this result

underlying: Any#

The underlying result object returned by the orchestration engine. Depending on the engine, this object might have a different type.

successful: bool#

Whether the flow execution was successful or not.

task_values: dict[Task, Any]#

A dictionary mapping from tasks to the values returned by them.

task_states: dict[Task, FinalTaskState]#

A dictionary mapping from tasks to their final states.

exception: Exception | None#

If an exception was raised during execution, it will get stored in this attribute.

get(task: Task | TaskGetItem, as_type: type = None) Any[source]#

Retrieve the output produced by a task.

Any tables and blobs returned by a task get loaded from their corresponding store.

If strict_result_get_locking is set to True, this call, as well as as Flow.run() must be wrapped inside a StageLockContext.

Parameters:
  • task – The task for which you want to retrieve the output.

  • as_type – The type as which tables produced by this task should be dematerialized. If no type is specified, the input type of the task is used.

Returns:

The results of the task.

visualize()[source]#

Wrapper for Flow.visualize().

visualize_url() str[source]#

Wrapper for Flow.visualize_url().

visualize_pydot() pydot.Dot[source]#

Wrapper for Flow.visualize_pydot().

class pydiverse.pipedag.PipedagConfig(path: str | pathlib.Path | dict[str, Any])[source]#

This class represents a pipedag config file.

Parameters:

path – Path to the config yaml file to load or a dictionary containing the raw config as it would be loaded from yaml file.

default: PipedagConfig#

The default config file.

If the environment variable PIPEDAG_CONFIG is set, then this file will be used as the config file. Otherwise, pipedag searches for a file called pipedag.yaml or pipedag.yml in:

  • The current working directory

  • Any parent directories of the working directory

  • The user folder

get(instance: str | None = None, flow: str | None = None, per_user: bool = False) ConfigContext[source]#

Constructs a ConfigContext. For more details how the specific ConfigContext instance is constructed, check out the specifying instances and flows section.

Parameters:
  • instance – Name of the instance. If no value is provided the __any__ instance gets used.

  • flow – Name of the flow. If no value is provided the __any__ flow gets used.

  • per_user – Whether to customize the instance id for each user according to per_user_template.

class pydiverse.pipedag.ConfigContext(config_dict: dict, pipedag_name: str, flow_name: str | None, instance_name: str, fail_fast: bool, strict_result_get_locking: bool, ignore_task_version: bool, instance_id: str, stage_commit_technique: pydiverse.pipedag.context.context.StageCommitTechnique, network_interface: str, kroki_url: str | None, attrs: box.box.Box, table_hook_args: box.box.Box, ignore_cache_function: bool = False, swallow_exceptions: bool = False, force_task_execution: bool = False) NoneType[source]#

Configuration context for running a particular pipedag instance.

To create a ConfigContext instance use PipedagConfig.get().

pipedag_name: str#

Name of the config file.

flow_name: str | None#

Name of the flow used for instantiating this config.

instance_name: str#

Name of the instance used for instantiating this config.

instance_id: str#

The instance_id.

attrs: box.box.Box#

Values from the attrs config section, stored in a Box object. Useful for passing any custom, use case specific config options to your flow.

evolve(**changes) ConfigContext[source]#

Create a new config context instance with the changes applied.

Because ConfigContext is immutable, this is the only valid way to derive a new instance with some values mutated.

Wrapper around attrs.evolve().

classmethod get() T[source]#

Returns the current, innermost context instance.

Raises:

LookupError – If no such context has been entered yet.

class pydiverse.pipedag.StageLockContext[source]#

Context manager used to keep stages locked until after Flow.run().

By default, pipedag releases stage locks as soon as it is done processing a stage. This means that by the time the flow has finished running, another flow run might already have overwritten any data in those stages. Consequently, calling Result.get() might not return the values produced by the current run, but instead those from the other (newer) run.

To prevent this, you can wrap the calls to Flow.run() and Result.get() in a StageLockContext. This keeps all stages modified by the flow to remain locked until the end of StageLockContext.

Example

with StageLockContext():
    result = flow.run()
    df = result.get(task_x)
classmethod get() T[source]#

Returns the current, innermost context instance.

Raises:

LookupError – If no such context has been entered yet.

Backend Classes#

Table Store#

class pydiverse.pipedag.backend.table.SQLTableStore(engine_url: str, *, create_database_if_not_exists: bool = False, schema_prefix: str = '', schema_suffix: str = '', avoid_drop_create_schema: bool = False, print_materialize: bool = False, print_sql: bool = False, no_db_locking: bool = True, strict_materialization_details: bool = True, materialization_details: dict[str, dict[str | list[str]]] | None = None, default_materialization_details: str | None = None)[source]#

Table store that materializes tables to a SQL database

Uses schema swapping for transactions: Creates a schema for each stage and a temporary schema for each transaction. If all tasks inside a stage succeed, swaps the schemas by renaming them.

The correct dialect specific subclass of SQLTableStore gets initialized when based on the dialect found in the provided engine url during initialization.

Supported Tables

The SQLTableStore can materialize (that is, store task output) and dematerialize (that is, retrieve task input) the following Table types:

Framework

Materialization

Dematerialization

SQLAlchemy

sa.Table

Pandas

pd.DataFrame

pd.DataFrame

Polars

pl.DataFrame

pl.DataFrame

tidypolars

tp.Tibble

tp.Tibble

Ibis

ibis.api.Table

ibis.api.Table

pydiverse.transform

pdt.Table

pdt.eager.PandasTableImpl
pdt.lazy.SQLTableImpl

pydiverse.pipedag

Parameters:
  • url

    The SQLAlchemy engine url use to connect to the database.

    This URL may contain placeholders like {name} or {instance_id} (additional ones can be defined in the url_attrs_file) or environment variables like {$USER} which get substituted with their respective values.

  • url_attrs_file

    Filename of a yaml file which is read shortly before rendering the final engine URL and which is used to replace custom placeholders in url.

    Just like url, this value may also contain placeholders and environment variables which get substituted.

  • create_database_if_not_exists – If the engine url references a database name that doesn’t yet exists, then setting this value to True tells pipedag to create the database before trying to open a connection to it.

  • schema_prefix – A prefix that gets placed in front of all schema names created by pipedag.

  • schema_suffix – A suffix that gets placed behind of all schema names created by pipedag.

  • avoid_drop_create_schema – If True, no CREATE SCHEMA or DROP SCHEMA statements get issued. This is mostly relevant for databases that support automatic schema creation like IBM DB2.

  • print_materialize – If True, all tables that get materialized get logged.

  • print_sql – If True, all executed SQL statements get logged.

  • no_db_locking – Speed up database by telling it we will not rely on it’s locking mechanisms. Currently not implemented.

  • strict_materialization_details

    If True: raise an exception if
    • the argument materialization_details is given even though the table store does not support it.

    • a table references a materialization_details tag that is not defined in the config.

    If False: Log an error instead of raising an exception

  • materialization_details

    A dictionary with each entry describing a tag for materialization details of the table store. See subclasses of :py:class:`BaseMaterializationDetails

    <pydiverse.pipedag.materialize.details.BaseMaterializationDetails>`

    for details.

  • default_materialization_details – The materialization_details that will be used if materialization_details is not specified on table level. If not set, the __any__ tag (if specified) will be used.

SQLTableStore Dialects#

class pydiverse.pipedag.backend.table.sql.dialects.PostgresTableStore(engine_url: str, *, create_database_if_not_exists: bool = False, schema_prefix: str = '', schema_suffix: str = '', avoid_drop_create_schema: bool = False, print_materialize: bool = False, print_sql: bool = False, no_db_locking: bool = True, strict_materialization_details: bool = True, materialization_details: dict[str, dict[str | list[str]]] | None = None, default_materialization_details: str | None = None)[source]#

SQLTableStore that supports PostgreSQL.

Takes the same arguments as SQLTableStore

class pydiverse.pipedag.backend.table.sql.dialects.DuckDBTableStore(engine_url: str, *, create_database_if_not_exists: bool = False, schema_prefix: str = '', schema_suffix: str = '', avoid_drop_create_schema: bool = False, print_materialize: bool = False, print_sql: bool = False, no_db_locking: bool = True, strict_materialization_details: bool = True, materialization_details: dict[str, dict[str | list[str]]] | None = None, default_materialization_details: str | None = None)[source]#

SQLTableStore that supports DuckDB.

Takes the same arguments as SQLTableStore

class pydiverse.pipedag.backend.table.sql.dialects.MSSqlTableStore(*args, disable_pytsql: bool = False, pytsql_isolate_top_level_statements: bool = True, **kwargs)[source]#

SQLTableStore that supports Microsoft SQL Server.

In addition to the arguments of SQLTableStore, it also takes the following arguments:

Parameters:
  • disable_pytsql – For mssql, a package called pytsql is used for executing RawSql scripts. It has the advantage that it allows for some kind of SQL based print statements. However, it may fail for some statements. For those cases, you can set disable_pytsql: true to use another logic for splitting up Raw SQL scripts and handing that over to sqlalchemy. This is actually quite a complex process for mssql. Sorry for any inconveniences. We will try to make it work for most tsql code that should be integrated in pipedag pipelines. However, the ultimate goal is to split up monolithic blocks of dynamic sql statements into defined transformations with dynamic aspects written in python.

  • pytsql_isolate_top_level_statements – This parameter is handed over to pytsql.executes and causes the script to be split in top level statements that are sent to sqlalchemy separately. The tricky part here is that some magic is done to make DECLARE statements reach across, but it is not guaranteed to be identical to scripts executed by a SQL UI.

class pydiverse.pipedag.backend.table.sql.dialects.IBMDB2TableStore(engine_url: str, *, create_database_if_not_exists: bool = False, schema_prefix: str = '', schema_suffix: str = '', avoid_drop_create_schema: bool = False, print_materialize: bool = False, print_sql: bool = False, no_db_locking: bool = True, strict_materialization_details: bool = True, materialization_details: dict[str, dict[str | list[str]]] | None = None, default_materialization_details: str | None = None)[source]#

SQLTableStore that supports IBM Db2. Requires ibm-db-sa to be installed.

Takes the same arguments as SQLTableStore

Local Table Cache#

class pydiverse.pipedag.backend.table.cache.ParquetTableCache(*args, base_path: str | pathlib.Path, **kwargs)[source]#

Local Table Cache that stores tables in Parquet files.

Supported Tables

The ParquetTableCache supports Pandas, Polars and pydiverse.transform.

Parameters:

base_path – A path to a folder where the Parquet files should get stored. To differentiate between different instances, the instance_id will automatically be appended to the provided path.

Blob Store#

class pydiverse.pipedag.backend.blob.FileBlobStore(base_path: str | pathlib.Path)[source]#

File based blob store

The FileBlobStore stores blobs in a folder structure on a file system. In the base directory there will be two folders for every stage, one for the base and one for the transaction stage. Inside those folders the blobs will be stored as pickled files: base_path/instance_id/STAGE_NAME/BLOB_NAME.pkl.

To commit a stage, the only thing that has to be done is to rename the appropriate folders.

Lock Manager#

class pydiverse.pipedag.backend.lock.DatabaseLockManager(engine: sqlalchemy.engine.base.Engine, instance_id: str, lock_schema: pydiverse.pipedag.backend.table.sql.ddl.Schema | None = None, create_lock_schema: bool = True)[source]#

Lock manager based on database locking mechanisms

Many databases provide some kind of locking mechanism. Depending on the specific database technology, this allows us to implement locking on either the schema level (where each stage can be locked and unlocked individually), or only on the instance level (where the entire instance including all stages get locked and unlocked together).

This lock manager uses the same database as the SQLTableStore. No other configuration in the args section of the config file is required.

We currently support the following databases: PostgreSQL, Microsoft SQL Server, IBM DB2.

class pydiverse.pipedag.backend.lock.ZooKeeperLockManager(client: kazoo.client.KazooClient, base_path: str)[source]#

Apache ZooKeeper based lock manager

Uses Apache ZooKeeper to establish fully distributed locks that are globally synchronous. The advantage of this approach is that we it is highly reliable and that in case our flow crashes, the acquired locks automatically get released (the locks are ephemeral).

Config File

All arguments in the args section get passed as-is to the initializer of kazoo.client.KazooClient. Some useful arguments include:

Parameters:

hosts – Comma separated list of hosts to connect.

class pydiverse.pipedag.backend.lock.FileLockManager(base_path: str | pathlib.Path)[source]#

Lock manager that uses lock files

For details on how exactly the file locking is implemented, check out the filelock documentation.

Parameters:

base_path – A path to a folder where the lock files should get stored. To differentiate between different instances, the instance_id will automatically be appended to the provided path.

class pydiverse.pipedag.backend.lock.NoLockManager[source]#

This lock manager doesn’t do any locking and only serves as a placeholder for an actual lock manager for testing something locally.

Warning

This lock manager is not intended for use in a production environment. Using a lock manager is essential for preventing data corruption.

Orchestration Engine#

class pydiverse.pipedag.engine.SequentialEngine[source]#

Most basic orchestration engine that just executes all tasks sequentially.

class pydiverse.pipedag.engine.DaskEngine(**dask_compute_kwargs)[source]#

Execute a flow in parallel on a single machine using dask.

Parameters:

dask_compute_kwargs – Keyword arguments that get passed to dask.compute(). The main kwarg you might be interested in is num_workers, which allows you to specify how many worker processes dask should spawn. By default, it spawns one worker per CPU core.

class pydiverse.pipedag.PrefectEngine#

Alias for either PrefectOneEngine or PrefectTwoEngine depending on the version of Prefect that is installed.

class pydiverse.pipedag.engine.PrefectOneEngine(flow_kwargs: dict[str, Any] = None)[source]#

Hands over execution of a flow to Prefect 1.

Parameters:

flow_kwargs – Optional dictionary of keyword arguments that get passed to the initializer of prefect.Flow.

class pydiverse.pipedag.engine.PrefectTwoEngine(flow_kwargs: dict[str, Any] = None)[source]#

Hands over execution of a flow to Prefect 2.

Parameters:

flow_kwargs – Optional dictionary of keyword arguments that get passed to the initializer of @prefect.flow deecorator.

Special Table Types#

class pydiverse.pipedag.backend.table.sql.ExternalTableReference(name: str, schema: str, shared_lock_allowed: bool = False)[source]#

Reference to a user-created table.

By returning a ExternalTableReference wrapped in a Table from, a task you can tell pipedag about a table, a view or DB2 nickname in an external schema. The schema may be a multi-part identifier like “[db_name].[schema_name]” if the database supports this. It is passed to SQLAlchemy as-is.

Only supported by SQLTableStore.

Warning

When using a ExternalTableReference, pipedag has no way of knowing the cache validity of the external object. Hence, the user should provide a cache function for the Task. It is now allowed to specify a ExternalTableReference to a table in schema of the current stage.

Example

You can use a ExternalTableReference to tell pipedag about a table that exists in an external schema:

@materialize(version="1.0")
def task():
    return Table(ExternalTableReference("name_of_table", "schema"))

By using a cache function, you can establish the cache (in-)validity of the external table:

from datetime import date

# The external table becomes cache invalid every day at midnight
def my_cache_fun():
    return date.today().strftime("%Y/%m/%d")

@materialize(cache=my_cache_fun)
def task():
    return Table(ExternalTableReference("name_of_table", "schema"))