API¶
Public¶
- class pydiverse.pipedag.Flow(name: str = 'default')[source]¶
A flow represents a collection of dependent Tasks.
A flow is defined by using it as a context manager. Any stages and tasks defined inside the flow context will automatically get added to the flow with the correct dependency wiring.
- Parameters:
name – The name of the flow.
Examples
@materialize def my_materializing_task(): return Table(...) with Flow("my_flow") as flow: with Stage("stage_1") as stage_1: task_1 = my_materializing_task() task_2 = another_materializing_task(task_1) with Stage("stage_2") as stage_2: task_3 = yet_another_task(task_3) ... result = flow.run()
- __getitem__(name) Stage[source]¶
Retrieves a stage by name.
- Parameters:
name – The name of the stage.
- Raises:
KeyError – if no stage with the specified name exists.
- run(*components: Task | TaskGetItem | Stage, config: ConfigContext = None, orchestration_engine: OrchestrationEngine = None, trace_hook: TraceHook = None, fail_fast: bool | None = None, cache_validation_mode: CacheValidationMode | None = None, disable_cache_function: bool | None = None, ignore_task_version: bool | None = None, ignore_position_hashes: bool = False, inputs: dict[Task, ExternalTableReference] | None = None, **kwargs) Result[source]¶
Execute the flow. This will execute the flow and all tasks within using the orchestration engine.
- Parameters:
components –
An optional selection of tasks or stages that should get executed. If no values are provided, all tasks and stages get executed.
If you specify a subset of tasks, those tasks get executed even when they are cache valid, and they don’t get committed.
If you specify a subset of stages, all tasks and substages of those stages get executed and committed.
config – A
ConfigContextto use as the configuration for executing this flow. If no config is provided, then pipedag uses the current innermost open ConfigContext, or if no such config exists, it gets the default config fromPipedagConfig.default.orchestration_engine – The orchestration engine to use. If no engine is provided, the orchestration engine from the config gets used.
fail_fast – Whether exceptions should get raised or swallowed. If set to True, exceptions that occur get immediately raised and the flow gets aborted.
cache_validation_mode – Override the cache validation mode. See
CacheValidationMode. For None, the cache validation mode from the config gets used. See Pipedag Config File Cache Validation options for more information.disable_cache_function – Override the disable_cache_function setting from the config. See Pipedag Config File Cache Validation options for more information.
ignore_task_version – Override the ignore_task_version setting from the config. See Pipedag Config File Cache Validation options for more information.
ignore_position_hashes – If
True, the position hashes of tasks are not checked when retrieving the inputs of a task from the cache. This simplifies execution of subgraphs if you don’t care whether inputs to that subgraph are cache invalid. This allows multiple modifications in the Graph before the next run updating the cache. Attention: This may break automatic cache invalidation. And for this to work, any task producing an input for the chosen subgraph may never be used more than once per stage. NOTE: This is only supported for the SequentialEngine and SQLTablestoreinputs –
Optionally provide the outputs for a subset of tasks. The format is expected as dict[Task, ExternalTableReference]. When the output of said tasks are retrieved as inputs for another dependent task they will be fetched from the ExternalTableReference instead.
NOTE: This feature is experimental and is not compatible with all aspects of pipedag. Only ExternalTableReferences are supported as inputs. This feature does not work well in combination with Tasks returning multiple outputs (nout > 1 or RawSql tasks returning multiple tables). In the case of multiple outputs, the provided ExternalTableReference will override all outputs of the task. It is currently only supported when using the SQLTablestore at the moment. Using it disables caching.
kwargs – Other keyword arguments that get passed on directly to the
run()method of the orchestration engine. Consequently, these keyword arguments are engine dependant.
- Returns:
A
Resultobject for the current flow run.
Examples
with Flow() as flow: with Stage("stage_1") as stage_1: task_1 = ... task_2 = ... with Stage("stage_2") as stage_2: task_3 = ... task_4 = ... # Execute the entire flow flow.run() # Execute (and commit) only stage 1 flow.run(stage_1) # Execute (but DON'T commit) only task 1 and task 4 flow.run(task_1, task_4)
- get_stage(name: str) Stage[source]¶
Retrieves a stage by name. Alias for
Flow.__getitem__().- Parameters:
name – The name of the stage.
- Returns:
The stage.
- Raises:
KeyError – if no stage with the specified name exists.
- visualize(result: Result | None = None, visualization_tag: str | None = None)[source]¶
Visualizes the flow as a graph.
If you are running in a jupyter notebook, the graph will get displayed inline. Otherwise, it will get rendered to a pdf that then gets opened in your browser.
Requires Graphviz to be installed on your computer.
- Parameters:
result – An optional
Resultinstance. If provided, the visualization will contain additional information such as which tasks ran successfully, or failed.
- visualize_url(result: Result | None = None, visualization_tag: str | None = None) str[source]¶
Visualizes the flow as a graph and returns a URL to view the visualization.
If you don’t have Graphviz installed on your computer (and thus aren’t able to use the
visualize()method) then this is the easiest way to visualize the flow. For this we use a free service called Kroki.- Parameters:
result – An optional
Resultinstance. If provided, the visualization will contain additional information such as which tasks ran successfully, or failed.- Returns:
A URL that, when opened, displays the graph.
- visualize_pydot(result: Result | None = None, visualization_tag: str | None = None) None[source]¶
Visualizes the flow as a graph and return a
pydot.Dotgraph.- Parameters:
result – An optional
Resultinstance. If provided, the visualization will contain additional information such as which tasks ran successfully, or failed.- Returns:
A
pydot.Dotgraph.
- class pydiverse.pipedag.Stage(name: str, materialization_details: str | None = None, group_node_tag: str | None = None, force_committed=False)[source]¶
A stage represents a collection of related tasks.
The main purpose of a Stage is to allow for a transactionality mechanism. Only if all tasks inside a stage finish successfully does the stage get committed.
All task that get defined inside the stage’s context will automatically get added to the stage.
- Parameters:
name – The name of the stage. Two stages with the same name may not be used inside the same flow.
materialization_details – The label of the materialization_details to be used. Overwrites the label given by the stage.
- property current_name: str¶
The name of the stage where the data currently lives.
Before a task has been committed this is the transaction name, after the commit it is the normal name.
- __getitem__(item: str | tuple[str, int]) Task[source]¶
Retrieves a task inside the stage by name.
You can either subscribe a Stage using just a string (
stage["name"]) or using a tuple containing a string and an integer (stage["name", 3]).The string is always interpreted as the name of the task to retrieve. If you also pass in an integer, it is interpreted as the index of the task. This means, that if the stage contains multiple tasks with the same name, then you can retrieve a specific instance of that task based on the order in which they were defined.
- Raises:
KeyError – If no task with the name can be found.
IndexError – If the index is out of bounds.
ValueError – If multiple matching tasks have been found, but no index has been provided.
- get_task(name: str, index: int | None = None) Task[source]¶
Retrieves a task inside the stage by name. Alias for
Stage.__getitem__().- Parameters:
name – The name of the task to retrieve.
index – If multiple task instances with the same name appear inside the stage, you can request a specific one by passing an index.
- @pydiverse.pipedag.materialize(*, name: str | None = None, input_type: type | None = None, version: str | None = None, cache: Callable[[...], Any] | None = None, lazy: bool = False, nout: int = 1, add_input_source: bool = False, ordering_barrier: bool | dict[str, Any] = False, call_context: Callable[[], Any] | None = None, allow_fresh_input: bool = False) Callable[[CallableT], CallableT | UnboundMaterializingTask][source]¶
- @pydiverse.pipedag.materialize(fn: CallableT, /) CallableT | UnboundMaterializingTask
Decorator to create a task whose outputs get materialized.
This decorator takes a class and turns it into a
MaterializingTask. This means, that this function can only be used as part of a flow. Any outputs it produces get written to their appropriate storage backends (either the table or blob store). Additionally, anyTableorBlobobjects this task receives as an input get replaced with the appropriate object retrieved from the storage backend. In other words: All outputs from a materializing task get written to the store, and all inputs get retrieved from the store.Because of how caching is implemented, task inputs and outputs must all be “materializable”. This means that they can only contain objects of the following types:
dict,list,tuple,int,float,str,bool,None,datetime.date,datetime.datetime,pathlib.Path,Table,RawSql,Blob, orStage.- Parameters:
fn – The function that gets executed by this task.
name – The name of this task. If no name is provided, the name of fn is used instead.
input_type – The data type as which to retrieve table objects from the store. All tables passed to this task get loaded from the table store and converted to this type. See Table Backends for more information.
version –
The version of this task. Unless the task is lazy or
version=AUTO_VERSION, you always need to manually change this version number when you change the implementation to ensure that the task gets executed and the cache flushed.If the version is
Noneand the task isn’t lazy, then the task always gets executed, and all downstream tasks get invalidated.cache –
An explicit cache function used to determine cache validity of the task inputs.
This function gets called every time before the task gets executed. It gets called with the same arguments as the task.
An explicit function for validating cache validity. If the output of this function changes while the source parameters are the same (e.g. the source is a filepath and cache loads data from this file), then the cache will be deemed invalid and is not used.
lazy –
Whether this task is lazy or not.
Unlike a normal task, lazy tasks always get executed. However, before table returned by a lazy task gets materialized, the table store checks if the same table has been materialized before. If this is the case, then the table doesn’t get materialized, and instead, the table gets copied from the cache.
This is efficient for tasks that return SQL queries, because the query only gets generated but will not be executed again if the resulting table is cache-valid.
The same also works for
ExternalTableReference, where the “query” is just the identifier of the table in the store.Note
For tasks returning an
ExternalTableReferencepipedag cannot automaticallyknow if the external tables have changed of not. This should be controlled via a cache function given via the
cacheargument ofmaterialize. SeeExternalTableReferencefor an example.For tasks returning a Polars or Pandas DataFrame, the output is deemed cache-valid if the hash of the resulting DataFrame is the same as the hash of the previous run. So, even though the task always gets executed, downstream tasks can remain cache-valid if the DataFrame is the same as before. This is useful for small tasks that are hard to implement using only LazyFrames, but where the DataFrame generation is cheap.
In both cases, you don’t need to manually bump the
versionof a lazy task.Warning
A task returning a Polars LazyFrame should not be marked as lazy. Use
version=AUTO_VERSIONinstead. SeeAUTO_VERSION.group_node_tag – Set a tag that may add this task to a configuration based group node.
nout – The number of objects returned by the task. If set, this allows unpacking and iterating over the results from the task.
add_input_source – If true, Table and Blob objects are provided as tuple together with the dematerialized object.
task(a=tbl)will result ina=(dematerialized_tbl, tbl)in the task.ordering_barrier – If true, the task will be surrounded by a GroupNode(ordering_barrier=True). If a dictionary is provided, it is surrounded by a
GroupNode(**ordering_barrier). This allows passing style, style_tag, and label arguments.call_context – An optional context manager function that is opened before the task or its optional cache function is called and closed afterward.
allow_fresh_input – If true, it allows the task to update outputs (cache invalid) for cache_validation.mode=ASSERT_NO_FRESH_INPUT and with cache function indicating that this is an input task.
Example
@materialize(version="1.0", input_type=pd.DataFrame) def task(df: pd.DataFrame) -> pd.DataFrame: df = ... # Do something with the dataframe return Table(df) @materialize(lazy=True, input_type=sa.Table) def lazy_task(tbl: sa.Alias) -> sa.Table: query = sa.select(tbl).where(...) return Table(query)
You can use the cache argument to specify an explicit cache function. In this example we define a task that reads a dataframe from a csv file. Without specifying a cache function, this task would only get rerun when the path argument changes. Instead, we define a function that calculates a hash based on the contents of the csv file, and pass it to the cache argument of
@materialize. This means that the task now gets invalidated and rerun if either the path argument changes or if the content of the file pointed to by path changes:import hashlib def file_digest(path): with open(path, "rb") as f: return hashlib.file_digest(f, "sha256").hexdigest() @materialize(version="1.0", cache=file_digest) def task(path): df = pd.read_scv(path) return Table(df)
Setting nout allows you to use unpacking assignment with the task result:
@materialize(version="1.0", nout=2) def task(): return 1, 2 # Later, while defining a flow, you can use unpacking assignment # because you specified that the task returns 2 objects (nout=2). one, two = task()
If a task returns a dictionary or a list, you can use square brackets to explicitly wire individual values / elements to task inputs:
@materialize(version="1.0") def task(): return { "x": [0, 1], "y": [2, 3], } # Later, while defining a flow # This would result in another_task being called with ([0, 1], 3). task_out = task() another_task(task_out["x"], task_out["y"][1])
- @pydiverse.pipedag.input_stage_versions(*, name: str | None = None, input_type: type | None = None, version: str | None = None, cache: Callable[[...], Any] | None = None, lazy: bool = False, group_node_tag: str | None = None, nout: int = 1, add_input_source: bool = False, ordering_barrier: bool | dict[str, Any] = True, call_context: Callable[[], Any] | None = None, allow_fresh_input: bool = False, include_views=True, lock_source_stages=True, pass_args: Iterable[str] = tuple()) Callable[[CallableT], CallableT | UnboundMaterializingTask][source]¶
- @pydiverse.pipedag.input_stage_versions(fn: CallableT, /) CallableT | UnboundMaterializingTask
A decorator that marks a function as a task that receives tables from two versions of the same stage. This can either be the currently active schema of this stage before schema swapping, or it can be an active schema of another pipeline instance.
The arguments to the resulting task are significantly different from a task created with @materialize decorator. An arbitrary expression with lists, tuples, dicts, and Table/Blob/RawSQL references can be provided. This decorator will only collect tables referenced and will pass them as one dictionary per version to the task. It maps table names to some reference for the respective stage version in the form specified by input_type parameter.
When a ConfigContext object is passed as toplevel parameter or named parameter in the wiring call of decorated task, it is used for referencing the second stage version. The idea is that the other configuration is just another pipeline instance accessible with the same SQLAlchemy engine of the table store. It may work also if this is not the case, but it may be harder for the task to process the dictionaries it receives. Currently, it is not allowed to pass more than one ConfigContext object.
When no parameters (except for an optional ConfigContext) is passed, all currently existing tables of both stage versions (or their respective schema) are passed in the dictionaries to the task.
With pass_blobs=True, two additional dictionaries are passed to the task at the end. In addition to the arguments mentioned above, pass_args can be used to feed defined keyword arguments through to the decorated task.
- Parameters:
fn – The function that gets executed by this task.
name – The name of this task. If no name is provided, the name of fn is used instead.
input_type – The data type as which to retrieve table objects from the store. All tables passed to this task get loaded from the table store and converted to this type. See Table Backends for more information.
version –
The version of this task. Unless the task is lazy, you always need to manually change this version number when you change the implementation to ensure that the task gets executed and the cache flushed.
If the version is
Noneand the task isn’t lazy, then the task always gets executed, and all downstream tasks get invalidated.cache –
An explicit cache function used to determine cache validity of the task inputs.
This function gets called every time before the task gets executed. It gets called with the same arguments as the task.
An explicit function for validating cache validity. If the output of this function changes while the source parameters are the same (e.g. the source is a filepath and cache loads data from this file), then the cache will be deemed invalid and is not used.
lazy –
Whether this task is lazy or not.
Unlike a normal task, lazy tasks always get executed. However, if a lazy task produces a lazy table (e.g. a SQL query), the table store checks if the same query has been executed before. If this is the case, then the query doesn’t get executed, and instead, the table gets copied from the cache.
This behaviour is very useful, because you don’t need to manually bump the version of a lazy task. This only works because for lazy tables generating the query is very cheap compared to executing it.
group_node_tag – Set a tag that may add this task to a configuration based group node.
nout – The number of objects returned by the task. If set, this allows unpacking and iterating over the results from the task.
add_input_source – If true, Table and Blob objects are provided as tuple together with the dematerialized object.
task(a=tbl)will result ina=(dematerialized_tbl, tbl)in the task.ordering_barrier – If true, the task will be surrounded by a GroupNode(ordering_barrier=True). If a dictionary is provided, it is surrounded by a
GroupNode(**ordering_barrier). This allows passing style, style_tag, and label arguments. Attention: In contrast to @materialize, the default value is True.call_context – An optional context manager function that is opened before the task or its optional cache function is called and closed afterward.
allow_fresh_input – If true, it allows the task to update outputs (cache invalid) for cache_validation.mode=ASSERT_NO_FRESH_INPUT and with cache function indicating that this is an input task.
include_views – In case no explicit table references are given, if true, include views when collecting all tables of both stage versions.
lock_source_stages – If true, lock the other stage version when a ConfigContext object is passed to this task.
pass_args – A list of named arguments that would be passed from the call to the task function. By default, no arguments are passed, and just tables are extracted.
Example
@input_stage_versions(lazy=True, input_type=sa.Table) def validate_stage( transaction: dict[str, sa.Alias], other: dict[str, sa.Alias], ): compare_tables(transaction, other) def get_flow(): with Flow() as f: a = produce_a_table() validate_stage() # implicitly receives all tables written before
- pydiverse.pipedag.AUTO_VERSION = AUTO_VERSION¶
Special constant used to indicate that pipedag should automatically determine a version number for a task.
The version is determined by running the task once to construct a representation of the computation performed on the tables (e.g. construct a query plan / computational graph). Using this representation, a unique version number is constructed such that if the computation changes the version number also changes. Then, if the task is deemed to be cache invalid, it is run again, but this time with actual data.
This puts the following constraints on which tasks can use AUTO_VERSION:
The task must be a pure function.
The task may not inspect the contents of the input tables.
The task must return at least one table.
The task may not return
RawSql.The task may not return
Blob.
Polars
Automatic versioning is best supported by polars. To use it, you must specify polars.LazyFrame as the task input type and only use LazyFrames inside your task. For Polars, “Not inspecting the contents of the input tables” means “not collecting the input tables or any tables derived from them and not working eagerly with any external data sources”.
Warning
Be careful not to collect any LazyFrames or work eagerly in such tasks (especially don’t use collected data in conditional statements). Changes in any external data sources that are referenced (e.g. by scan_csv, scan_parquet,…) must be covered by providing a cache function to the
cacheargument of@materializewhich detects any changes within any scanned data sources (e.g. hash complete file content).Pandas
Pandas support is still experimental. It is implemented using a technique we call “computation tracing”, where we run the task with proxy tables as inputs that record all operations performed on them. This requires heavily monkey patching the pandas and numpy modules. As long as you only use pandas and numpy functions inside your task, computation tracing should work successfully. However, for the monkey patching to work, you only access pandas / numpy functions through their namespace (e.g.
pd.concat(...)is allowed, whilefrom pandas import concat; concat(...)is not allowed).Requires dask to be installed.
Example
@materialize(input_type=pl.LazyFrame, version=AUTO_VERSION) def polars_task(x: pl.LazyFrame): # Some operations that only utilize pl.LazyFrame... return x.with_columns( (pl.col("col1") * pl.col("col2")).alias("col3") )
- class pydiverse.pipedag.Table(obj: T | None = None, name: str | None = None, *, primary_key: str | list[str] | None = None, indexes: list[list[str]] | None = None, type_map: dict[str, Any] | None = None, nullable: list[str] | None = None, non_nullable: list[str] | None = None, materialization_details: str | None = None, annotation: type | None = None, stage: 'Stage | None' = None)[source]¶
Container for storing Tables.
Used to wrap table objects that get returned from materializing tasks. Tables get stored using the table store.
Example: How to return a table from a task.¶@materialize() def task(): df = pd.DataFrame({"x": [0, 1, 2, 3]} return Table(df, "name")
- Parameters:
obj – The table object to wrap
name – Optional, case-insensitive name. If no name is provided, an automatically generated name will be used. To prevent name collisions, you can append
"%%"at the end of the name to enable automatic name mangling.primary_key – Optional name of the primary key that should be used when materializing this table. Only supported by some table stores.
indexes – Optional list of indexes to create. Each provided index should be a list of column names. Only supported by some table stores.
type_map – Optional map of column names to types. Depending on the table store this will allow you to control the datatype as which the specified columns get materialized.
nullable – List of columns that should be nullable. If nullable is not None, all other columns will be non-nullable.
non_nullable – List of columns that should be non-nullable. If non_nullable is not None, all other columns will be nullable.
materialization_details – The label of the materialization_details to be used. Overwrites the label given by the stage.
stage – This parameter should not be set when returning Table objects in pipedag tasks. But it can be used to manually retrieve a table from the table store.
See also
You can specify which types of objects should automatically get converted to tables using the auto_table config option.
- class pydiverse.pipedag.View(src: Any | collections.abc.Iterable[Any], *, sort_by: str | Any | collections.abc.Iterable[str] | collections.abc.Iterable[pydiverse.pipedag.container.SortCol] | collections.abc.Iterable[Any] | None = None, columns: collections.abc.Iterable[str] | collections.abc.Iterable[Any] | collections.abc.Mapping[str, str] | collections.abc.Mapping[str, Any] | str | Any | None = None, limit: int | None = None, assert_normalized: bool = False)[source]¶
Produces a view on pipedag managed tables with caching support.
Unlike for
Table, pipedag needs to understand view queries much better in order to be able to do proper cache invalidation with source tables changing their name despite staying cache valid. That is why this View object includes all information about how to create a view.Only supported by
SQLTableStoreandParquetTableStore.Examples
A view can combine a set of parquet files as one read_parquet operation:
import sqlalchemy as sa @materialize(input_type=sa.Table, lazy=True) def task(tbls: list[sa.Alias]): return Table(View(src=tbls), name="union")
It also works specifying views with other
input_typevalues that support lazy table references. In this case, you need to make sure that thesrcparameter only references input tables exactly as they come in as parameters to the task:import pydiverse.transform as pdt @materialize(input_type=pdt.SqlAlchemy, lazy=True) def task(tbls: list[pdt.Table]): return Table(View(src=tbls), name="union")
Furthermore, it can be used to load only a subset of a table and rename columns:
@materialize(input_type=sa.Table) def task(tbl: sa.Alias): cols = [c.name for c in tbl.c if c.name.startswith("a")] return Table( View( src=tbl, columns={f"c{col}": col for col in cols}, sort_by=[tbl.c.id.desc().nulls_first(), "id2"], limit=10, ), name="selection", )
Please note that the
columnsandsort_byparameters can be provided either as strings or as column objects of the respectiveinput_type. This works at least forinput_typevaluessa.Tableandpdt.SqlAlchemy.You can also specify sort order with an explicit class:
from pydiverse.pipedag import SortCol, SortOrder sort_by = [SortCol("id", SortOrder.DESC, nulls_first=True)]
If you just want to filter columns without renaming, any iterable will do:
@materialize(input_type=sa.Table) def task(tbl: sa.Alias): cols = [c.name for c in tbl.c if c.name.startswith("a")] return Table(View(tbl, columns=cols, sort_by=tbl.c.id), name="selection")
- class pydiverse.pipedag.RawSql(sql: str | None = None, name: str | None = None, separator: str = ';')[source]¶
Container for raw sql strings.
This allows returning sql query strings that then get executed in the table store. This is only intended to help with transitioning legacy sql pipelines to pipedag, and should be replaced with pipedag managed tables as soon as possible.
Attention
When using RawSql, make sure that you only write tables to the stage that the corresponding task is running in. Otherwise, schema swapping won’t work. To do this, pass the current stage as an argument to your task and then access the current stage name using
Stage.current_name.- Parameters:
sql – The sql query string to execute. Depending on the database dialect and the separator parameter, the query will be split into multiple subqueries that then get executed sequentially.
name – Optional, case-insensitive name. If no name is provided, an automatically generated name will be used. To prevent name collisions, you can append
"%%"at the end of the name to enable automatic name mangling.separator – The separator used when splitting the query into subqueries. Default:
";"
Example
When you want to use tables produced by a RawSql task in a downstream task, you can either use square brackets during flow definition to pass a specific table to a child task, or you can pass the entire RawSql object as an input, in which case all tables get loaded and the child task can access them using square brackets (or any of the dict-like methods defined by RawSql).
@materialize(lazy=True) def raw_sql_task(stage): schema = stage.current_name return RawSql(f""" CREATE TABLE {schema}.tbl_1 AS SELECT 1 as x; CREATE TABLE {schema}.tbl_2 AS SELECT 2 as x; """) @materialize(input_type=sa.Table) def foo(tbl_1, tbl_2): ... @materialize(input_type=sa.Table) def bar(raw_sql): tbl_1 = raw_sql["tbl_1"] tbl_2 = raw_sql["tbl_2"] ... with Flow() as f: with Stage("stage") as s: out = raw_sql_task(s) # Manually pass specific tables into the task foo(out["tbl_1"], out["tbl_2"]) # Or pass the entire RawSql object into the task bar(out)
- __contains__(table_name: str) bool[source]¶
Check if this RawSql object produced a table with name table_name.
- class pydiverse.pipedag.Blob(obj: T | None = None, name: str | None = None)[source]¶
Blob (binary large object) container.
Used to wrap arbitrary Python objects that get returned from materializing tasks. Blobs get stored in the blob store.
Example: How to return a blob from a task.¶@materialize() def task(): obj = SomePicklableClass() return Blob(obj, "name")
- Parameters:
obj – The object to wrap
name – Optional, case-insensitive name. If no name is provided, an automatically generated name will be used. To prevent name collisions, you can append
"%%"at the end of the name to enable automatic name mangling.
See also
You can specify which types of objects should automatically get converted to blobs using the auto_blob config option.
- class pydiverse.pipedag.GroupNode(label: str | None = None, style: pydiverse.pipedag.core.group_node.VisualizationStyle | None = None, *, ordering_barrier: bool = False, style_tag: str | None = None)[source]¶
A group node represents a collection of related tasks.
The group can be used as a display element in the visualization, and it can be used to ensure all tasks before/after this group are executed before/after tasks in this group.
Group nodes can contain stages and can be contained by stages.
Example: How to group tasks with a group node.¶with Flow() as flow: with Stage("stage1"): _ = any_task() with GroupNode("group1"): task1 = task_within_group() _ = task_within_group2(task1) _ = any_task()
- Parameters:
label – label displayed in the visualization
style – visualization style for this group node
ordering_barrier – If True, a barrier task will be added to the stage before and after this group to ensure all tasks before/after this group are executed before/after tasks in this group.
style_tag – Style tag to be used for visualization
- class pydiverse.pipedag.VisualizationStyle(hide_box: bool = False, hide_content: bool = False, hide_label: bool = False, box_color_always: str | None = None, box_color_any_failure: str | None = None, box_color_none_cache_valid: str | None = None, box_color_any_cache_valid: str | None = None, box_color_all_cache_valid: str | None = None, box_color_all_skipped: str | None = None) NoneType[source]¶
Visualization style for group nodes.
This can be configured via class GroupNode or via configuration in
pipedag.yaml.- Parameters:
box – Box style of group node
hide_content – Hide content of group node in visualization
hide_label – Hide label of group node in visualization
box_color_always – Color of group node box if specified
box_color_any_failure – Color of group node box if any failure occurred within included tasks
box_color_none_cache_valid – Color of group node box if no failure occurred and no included task cache valid
box_color_any_cache_valid – Color of group node box if no failure occurred and some but not all included tasks cache valid
box_color_all_cache_valid – Color of group node box if no failure occurred and all included tasks cache valid
box_color_all_skipped – Color of group node box if no failure occurred and all included tasks were skipped
- class pydiverse.pipedag.Schema(name: str, prefix: str = '', suffix: str = '') NoneType[source]¶
Class for holding a schema name with separable prefix and suffix.
- class pydiverse.pipedag.Result(flow: 'Flow', subflow: 'Subflow', underlying: Any, successful: bool, config_context: pydiverse.pipedag.context.context.ConfigContext | None, task_values: dict[pydiverse.pipedag.core.task.Task, Any], task_states: dict[pydiverse.pipedag.core.task.Task, pydiverse.pipedag.context.run_context.FinalTaskState], exception: Exception | None) None[source]¶
Flow execution result.
- underlying: Any¶
The underlying result object returned by the orchestration engine. Depending on the engine, this object might have a different type.
- task_values: dict[pydiverse.pipedag.core.task.Task, Any]¶
A dictionary mapping from tasks to the values returned by them.
- task_states: dict[pydiverse.pipedag.core.task.Task, pydiverse.pipedag.context.run_context.FinalTaskState]¶
A dictionary mapping from tasks to their final states.
- exception: Exception | None¶
If an exception was raised during execution, it will get stored in this attribute.
- get(task: Task | TaskGetItem, as_type: type = None, write_local_table_cache: bool = False) Any[source]¶
Retrieve the output produced by a task.
Any tables and blobs returned by a task get loaded from their corresponding store.
If strict_result_get_locking is set to True, this call, as well as as
Flow.run()must be wrapped inside aStageLockContext.- Parameters:
task – The task for which you want to retrieve the output.
as_type – The type as which tables produced by this task should be dematerialized. If no type is specified, the input type of the task is used.
write_local_table_cache –
Flag that determines whether the table should be stored in the local table cache, if it is not already there and cache valid. If no local table cache is configured or the type as which the table is retrieved, is not compatible with the local table cache, this flag has no effect.
Warning
It is not safe to call this method with write_local_table_cache=True from several threads at the same time.
- Returns:
The results of the task.
- visualize(visualization_tag: str | None = None)[source]¶
Wrapper for
Flow.visualize().
- visualize_pydot(visualization_tag: str | None = None) pydot.Dot[source]¶
Wrapper for
Flow.visualize_pydot().
- class pydiverse.pipedag.PipedagConfig(path: str | pathlib.Path | dict[str, Any])[source]¶
This class represents a pipedag config file.
- Parameters:
path – Path to the config yaml file to load or a dictionary containing the raw config as it would be loaded from yaml file.
- default: PipedagConfig¶
The default config file.
If the environment variable
PIPEDAG_CONFIGis set, then this file will be used as the config file. Otherwise, pipedag searches for a file calledpipedag.yamlorpipedag.ymlin:The current working directory
Any parent directories of the working directory
The user folder
- get(instance: str | None = None, flow: str | None = None, per_user: bool = False) ConfigContext[source]¶
Constructs a
ConfigContext. For more details how the specific ConfigContext instance is constructed, check out the specifying instances and flows section.- Parameters:
instance – Name of the instance. If no value is provided the
__any__instance gets used.flow – Name of the flow. If no value is provided the
__any__flow gets used.per_user – Whether to customize the instance id for each user according to per_user_template.
- class pydiverse.pipedag.ConfigContext(config_dict: dict, pipedag_name: str, flow_name: str | None, instance_name: str, fail_fast: bool, strict_result_get_locking: bool, instance_id: str, stage_commit_technique: pydiverse.pipedag.context.context.StageCommitTechnique, cache_validation: box.box.Box, visualization: dict[str, pydiverse.pipedag.context.context.VisualizationConfig], network_interface: str, disable_kroki: bool, kroki_url: str | None, attrs: box.box.Box, table_hook_args: box.box.Box, swallow_exceptions: bool = False, is_evolved: bool = False) NoneType[source]¶
Configuration context for running a particular pipedag instance.
To create a ConfigContext instance use
PipedagConfig.get().- instance_id: str¶
The instance_id.
- attrs: box.box.Box¶
Values from the attrs config section, stored in a
Boxobject. Useful for passing any custom, use case specific config options to your flow.
- evolve(**changes) ConfigContext[source]¶
Create a new config context instance with the changes applied.
Because ConfigContext is immutable, this is the only valid way to derive a new instance with some values mutated.
Wrapper around
attrs.evolve().
- static new(config: dict[str, Any], pipedag_name: str, flow: str, instance: str)[source]¶
Create a new ConfigContext instance from dictionary and a few names.
- Parameters:
config – dictionary with config values
pipedag_name – name of pipedag config
flow – name of flow
instance – name of instance
- Returns:
ConfigContext instance
- static parse_in_object(key: str, value: dict[str, Any], class_type: Any, within: str, *, inout: dict[str, Any])[source]¶
Parse a dictionary into a dataclass instance.
- Parameters:
key – key of inout dictionary to be modified
value – dictionary to be parsed into a dataclass instance and stored in inout[key]
class_type – dataclass type for what should be written to inout[key]
within – context for error messages
inout – dictionary to be modified by this function call
- Returns:
None
- classmethod get() T[source]¶
Returns the current, innermost context instance.
- Raises:
LookupError – If no such context has been entered yet.
- class pydiverse.pipedag.StageLockContext[source]¶
Context manager used to keep stages locked until after
Flow.run().By default, pipedag releases stage locks as soon as it is done processing a stage. This means that by the time the flow has finished running, another flow run might already have overwritten any data in those stages. Consequently, calling
Result.get()might not return the values produced by the current run, but instead those from the other (newer) run.To prevent this, you can wrap the calls to
Flow.run()andResult.get()in a StageLockContext. This keeps all stages modified by the flow to remain locked until the end of StageLockContext.Example
with StageLockContext(): result = flow.run() df = result.get(task_x)
- classmethod get() T[source]¶
Returns the current, innermost context instance.
- Raises:
LookupError – If no such context has been entered yet.
Backend Classes¶
Table Store¶
- class pydiverse.pipedag.backend.table.SQLTableStore(engine_url: str, *, create_database_if_not_exists: bool = False, schema_prefix: str = '', schema_suffix: str = '', avoid_drop_create_schema: bool = False, print_materialize: bool = False, print_sql: bool = False, no_db_locking: bool = True, strict_materialization_details: bool = True, materialization_details: dict[str, dict[str, str | list[str]]] | None = None, default_materialization_details: str | None = None, max_concurrent_copy_operations: int = 5, sqlalchemy_pool_size: int = 12, sqlalchemy_pool_timeout: int = 300, force_transaction_suffix: str | None = None)[source]¶
Table store that materializes tables to a SQL database
Uses schema swapping for transactions: Creates a schema for each stage and a temporary schema for each transaction. If all tasks inside a stage succeed, swaps the schemas by renaming them.
The correct dialect specific subclass of
SQLTableStoregets initialized when based on the dialect found in the provided engine url during initialization.Supported Tables
The SQLTableStore can materialize (that is, store task output) and dematerialize (that is, retrieve task input) the following Table types:
Framework
Materialization
Dematerialization
SQLAlchemy
Pandas
Polars
tidypolars
tp.Tibbletp.TibbleIbis
ibis.api.Tableibis.api.Tablepydiverse.transform
pdt.Tablepdt.Polarspdt.SqlAlchemypydiverse.pipedag table reference
ExternalTableReference(no materialization)Can be read with all dematerialization methods above
pydiverse.pipedag view
View(view with support for src union, column renaming, and sorting)Can be read with all dematerialization methods above
- Parameters:
url –
The SQLAlchemy engine url used to connect to the database.
This URL may contain placeholders like
{name}or{instance_id}(additional ones can be defined in theurl_attrs_file) or environment variables like{$USER}which get substituted with their respective values.Attention: passwords including special characters like
@or:need to be URL encoded.url_attrs_file –
Filename of a yaml file which is read shortly before rendering the final engine URL and which is used to replace custom placeholders in
url.Just like
url, this value may also contain placeholders and environment variables which get substituted.create_database_if_not_exists – If the engine url references a database name that doesn’t yet exists, then setting this value to
Truetells pipedag to create the database before trying to open a connection to it.schema_prefix – A prefix that gets placed in front of all schema names created by pipedag.
schema_suffix – A suffix that gets placed behind of all schema names created by pipedag.
avoid_drop_create_schema – If
True, noCREATE SCHEMAorDROP SCHEMAstatements get issued. This is mostly relevant for databases that support automatic schema creation like IBM DB2.print_materialize – If
True, all tables that get materialized get logged.print_sql – If
True, all executed SQL statements get logged.no_db_locking – Speed up database by telling it we will not rely on it’s locking mechanisms. Currently not implemented.
strict_materialization_details –
- If
True: raise an exception if the argument
materialization_detailsis given even though the table store does not support it.a table references a
materialization_detailstag that is not defined in the config.
If
False: Log an error instead of raising an exception- If
materialization_details –
A dictionary of tags that each define properties used for materialization by the table store. If the tag
__any__is present, the other labels will inherit properties from it if they do not override them.An example config for DB2 that goes into the
argssection of thetable_storeconfig inpipedag.yaml:materialization_details: __any__: compression: ["COMPRESS YES ADAPTIVE", "VALUE COMPRESSION"] table_space_data: "USERSPACE1" no_compression: # user-defined tag. Inherits table_space_data from __any__ # but overwrites compression. compression: ""
Then, by default, all tables will be created with the
__any__config, unless the labelno_compressionis specified at stage or table level.See the documentation of the SQLTableStore Dialects for supported options for each table store.
default_materialization_details – The materialization_details that will be used if materialization_details is not specified on table level. If not set, the
__any__tag (if specified) will be used.max_concurrent_copy_operations – In case of a partially cache-valid stage, we need to copy tables from the cache schema to the new transaction schema. This parameter specifies the maximum number of workers we use for concurrently copying tables.
sqlalchemy_pool_size – The number of connections to keep open inside the connection pool. It is recommended to choose a larger number than
max_concurrent_copy_operationsto avoid running into pool_timeout.sqlalchemy_pool_timeout – The number of seconds to wait before giving up on getting a connection from the pool. This may be relevant in case the connection pool is saturated with concurrent operations each working with one or more database connections.
force_transaction_suffix – This option is for use without proper pipedag flow. It disables lookup in stage metadata table for determining correct transaction slot suffix. (default: None)
- class pydiverse.pipedag.backend.table.ParquetTableStore(engine_url: str, parquet_base_path: upath.core.UPath, *, create_database_if_not_exists: bool = False, schema_prefix: str = '', schema_suffix: str = '', avoid_drop_create_schema: bool = False, print_materialize: bool = False, print_sql: bool = False, no_db_locking: bool = True, strict_materialization_details: bool = True, materialization_details: dict[str, dict[str | list[str]]] | None = None, default_materialization_details: str | None = None, max_concurrent_copy_operations: int = 5, sqlalchemy_pool_size: int = 12, sqlalchemy_pool_timeout: int = 300, force_transaction_suffix: str | None = None, s3_endpoint_url: str | None = None, s3_url_style: str | None = None, s3_region: str | None = None, allow_overwrite: bool = False)[source]¶
Table store that materializes tables as parquet files.
Additionally, a duckdb file is created which links to parquet files.
Uses schema swapping for transactions: Creates a schema for each stage and a temporary schema for each transaction. If all tasks inside a stage succeed, swaps the schemas by renaming them.
Supported Tables
The ParquetTableStore can materialize (store task output) and dematerialize (retrieve task input) the following Table types:
Framework
Materialization
Dematerialization
SQLAlchemy
Pandas
Polars
tidypolars
tp.Tibbletp.TibbleIbis
ibis.api.Tableibis.api.Tablepydiverse.transform
pdt.Tablepdt.Polarspdt.SqlAlchemypydiverse.pipedag table reference
ExternalTableReference(no materialization)Can be read with all dematerialization methods above
pydiverse.pipedag view
View(view with support for src union, column renaming, and sorting)Can be read with all dematerialization methods above
- Parameters:
url –
The SQLAlchemy engine url used to connect to the database.
This URL may contain placeholders like
{name}or{instance_id}(additional ones can be defined in theurl_attrs_file) or environment variables like{$USER}which get substituted with their respective values.Attention: passwords including special characters like
@or:need to be URL encoded.url_attrs_file –
Filename of a yaml file which is read shortly before rendering the final engine URL and which is used to replace custom placeholders in
url.Just like
url, this value may also contain placeholders and environment variables which get substituted.create_database_if_not_exists – If the engine url references a database name that doesn’t yet exists, then setting this value to
Truetells pipedag to create the database before trying to open a connection to it.schema_prefix – Prefix placed in front of all schema names created by pipedag.
schema_suffix – Suffix placed behind all schema names created by pipedag.
avoid_drop_create_schema – If
True, noCREATE SCHEMAorDROP SCHEMAstatements get issued. This is mostly relevant for databases that support automatic schema creation like IBM DB2.print_materialize – If
True, log every table materialization.print_sql – If
True, log every executed SQL statement.no_db_locking – Speed up database by telling it we will not rely on it’s locking mechanisms. Currently not implemented.
strict_materialization_details –
- If
True: raise an exception if the argument
materialization_detailsis given even though the table store does not support it.a table references a
materialization_detailstag that is not defined in the config.
If
False: Log an error instead of raising an exception- If
materialization_details – A dictionary with each entry describing a tag for materialization details of the table store. See subclasses of
BaseMaterializationDetailsfor details.default_materialization_details – The materialization_details that will be used if materialization_details is not specified on table level. If not set, the
__any__tag (if specified) will be used.max_concurrent_copy_operations – In case of a partially cache-valid stage, we need to copy tables from the cache schema to the new transaction schema. This parameter specifies the maximum number of workers we use for concurrently copying tables.
sqlalchemy_pool_size – The number of connections to keep open inside the connection pool. It is recommended to choose a larger number than
max_concurrent_copy_operationsto avoid running into pool_timeout.sqlalchemy_pool_timeout – The number of seconds to wait before giving up on getting a connection from the pool. This may be relevant in case the connection pool is saturated with concurrent operations each working with one or more database connections.
force_transaction_suffix – This option is for use without proper pipedag flow. It disables lookup in stage metadata table for determining correct transaction slot suffix. (default: None)
parquet_base_bath – This is an fsspec compatible path to either a directory or an S3 bucket key prefix. Examples are
/tmp/pipedag/parquet/ors3://pipedag-test-bucket/table_store/.instance_idwill automatically be appended to parquet_base_bath.s3_endpoint_url – When using a non-standard S3 endpoint (like minio), this can be used to specify the URL. Unfortunately, the AWS_ENDPOINT_URL environment variable is not automatically picked up by duckdb. (default: None)
s3_use_ssl – Whether to use SSL when connecting to S3 endpoint. (default: None)
s3_url_style – Specify URL style when connecting to S3 endpoint. Minio for example requires s3_url_style=’path’. (default: None)
allow_overwrite – Allow overwriting tables with store_table. (default: False)
- sync_metadata(flow: Flow | None = None)[source]¶
Sync this DuckDB file with the metadata store.
This method brings the local DuckDB file completely in-sync with the shared metadata store. It creates any missing schemas and syncs views from other users for all stages defined in the flow or for all schemas for which metadata exists.
This is useful when a user wants to access tables created by other users without running the full flow.
Example usage:
cfg = PipedagConfig.default.get(instance_id) store = cfg.store.table_store store.sync_metadata()
Attention: No stage locking is performed, so parquet files can get out-of-sync if pipeline instance is executed by other team members while exploring data in duckdb file. However, stage level transactionality keeps your data in main schema stable until the second pipeline run starts.
- Parameters:
flow – The flow containing stages to sync. If no flow is given, all schemas for which metadata exists are synchronized.
SQLTableStore Dialects¶
PostgreSQL¶
- class pydiverse.pipedag.backend.table.sql.dialects.PostgresTableStore(*args, use_adbc: bool = True, **kwargs)[source]¶
SQLTableStore that supports PostgreSQL.
Takes the same arguments as
SQLTableStore.Supports the
PostgresMaterializationDetailsmaterialization details.
- class pydiverse.pipedag.backend.table.sql.dialects.postgres.PostgresMaterializationDetails(unlogged: bool = False) NoneType[source]¶
Materialization details specific to PostgreSQL.
- Parameters:
unlogged – Whether to use unlogged tables or not. This reduces safety in case of a crash or unclean shutdown, but can significantly increase write performance.
Example
materialization_details: __any__: unlogged: true my_label: unlogged: false
For more general information on materialization details, see
materialization_detailsparameter ofSQLTableStore.
DuckDB¶
- class pydiverse.pipedag.backend.table.sql.dialects.DuckDBTableStore(engine_url: str, *, create_database_if_not_exists: bool = False, schema_prefix: str = '', schema_suffix: str = '', avoid_drop_create_schema: bool = False, print_materialize: bool = False, print_sql: bool = False, no_db_locking: bool = True, strict_materialization_details: bool = True, materialization_details: dict[str, dict[str, str | list[str]]] | None = None, default_materialization_details: str | None = None, max_concurrent_copy_operations: int = 5, sqlalchemy_pool_size: int = 12, sqlalchemy_pool_timeout: int = 300, force_transaction_suffix: str | None = None)[source]¶
SQLTableStore that supports DuckDB.
Takes the same arguments as
SQLTableStore
Microsoft SQL Server / T-SQL¶
- class pydiverse.pipedag.backend.table.sql.dialects.MSSqlTableStore(*args, disable_pytsql: bool = False, disable_arrow_odbc: bool = False, disable_bulk_insert: bool = False, pytsql_isolate_top_level_statements: bool = True, max_query_print_length: int = 500000, **kwargs)[source]¶
SQLTableStore that supports Microsoft SQL Server.
Requires ODBC driver for Microsoft SQL Server to be installed. https://learn.microsoft.com/en-us/sql/connect/odbc/download-odbc-driver-for-sql-server For default arguments, both msodbcsql and mssql-tools need to be installed. Debug installation issues with odbcinst -j. On Linux, conda-forge pyodbc package expects ini files in different location than Microsoft installs it (symlink helps).
Supports the
MSSqlMaterializationDetailsmaterialization details.In addition to the arguments of
SQLTableStore, it also takes the following arguments:- Parameters:
disable_pytsql – For mssql, a package called pytsql is used for executing RawSql scripts. It has the advantage that it allows for some kind of SQL based print statements. However, it may fail for some statements. For those cases, you can set
disable_pytsql: trueto use another logic for splitting up Raw SQL scripts and handing that over to sqlalchemy. This is actually quite a complex process for mssql. Sorry for any inconveniences. We will try to make it work for most tsql code that should be integrated in pipedag pipelines. However, the ultimate goal is to split up monolithic blocks of dynamic sql statements into defined transformations with dynamic aspects written in python.disable_arrow_odbc – By default we try to use arrow-odbc to read data from Microsoft SQL Server. However, pyarrow has problems with long string columns. If you want to disable this, set this parameter to
True.disable_bulk_insert – By default we try to use bulk insert to write data to Microsoft SQL Server. Unfortunately, the bcp command line tool and bulk insert libraries like bcpandas have trouble with corner cases like linebreaks or other special characters in string columns. If you want to disable this, set this parameter to
True. If installed, bulk insert is done with mssqlkit (not open source). Otherwise, bcpandas is used.pytsql_isolate_top_level_statements – This parameter is handed over to pytsql.executes and causes the script to be split in top level statements that are sent to sqlalchemy separately. The tricky part here is that some magic is done to make DECLARE statements reach across, but it is not guaranteed to be identical to scripts executed by a SQL UI.
max_query_print_length – Maximum number of characters in a SQL query that are printed when logging the query before the log is truncated. Defaults to 5000 characters.
- class pydiverse.pipedag.backend.table.sql.dialects.mssql.MSSqlMaterializationDetails(columnstore: bool = False) NoneType[source]¶
Materialization details specific to Microsoft SQL Server.
- Parameters:
columnstore – Whether to create tables as columnstores by defining a clustered columnstore index on them. This can improve performance. See the Microsoft documentation.
IBM DB2¶
- class pydiverse.pipedag.backend.table.sql.dialects.IBMDB2TableStore(engine_url: str, *, create_database_if_not_exists: bool = False, schema_prefix: str = '', schema_suffix: str = '', avoid_drop_create_schema: bool = False, print_materialize: bool = False, print_sql: bool = False, no_db_locking: bool = True, strict_materialization_details: bool = True, materialization_details: dict[str, dict[str, str | list[str]]] | None = None, default_materialization_details: str | None = None, max_concurrent_copy_operations: int = 5, sqlalchemy_pool_size: int = 12, sqlalchemy_pool_timeout: int = 300, force_transaction_suffix: str | None = None)[source]¶
SQLTableStore that supports IBM Db2. Requires ibm-db-sa to be installed.
Takes the same arguments as
SQLTableStore.Supports the
IBMDB2MaterializationDetailsmaterialization details.
- class pydiverse.pipedag.backend.table.sql.dialects.ibm_db2.IBMDB2MaterializationDetails(compression: pydiverse.pipedag.backend.table.sql.dialects.ibm_db2.IBMDB2CompressionTypes | list[pydiverse.pipedag.backend.table.sql.dialects.ibm_db2.IBMDB2CompressionTypes] | None = None, table_space_data: str | None = None, table_space_index: str | None = None, table_space_long: str | list[str] | None = None) NoneType[source]¶
Materialization details specific to IBM DB2.
- Parameters:
compression – Specify the compression methods to be applied to the table. For possible values see
IBMDB2CompressionTypes.table_space_data – The DB2 table space where the data is stored.
table_space_index – The DB2 table space where the partitioned index is stored.
table_space_long – The DB2 table spaces where the values of any long columns are stored.
Example
materialization_details: __any__: compression: ["COMPRESS YES ADAPTIVE", "VALUE COMPRESSION"] table_space_data: "USERSPACE1" no_compression: # user-defined tag. Inherits table_space_data from __any__ # but overwrites compression. compression: ""
- class pydiverse.pipedag.backend.table.sql.dialects.ibm_db2.IBMDB2CompressionTypes(*values)[source]¶
Enum for the compression methods for IBM DB2 tables.
VALUE COMPRESSIONcan be combined with one of the other compression methods.See the IBM DB2 documentation for more details.
- NO_COMPRESSION = ''¶
- ROW_COMPRESSION = 'COMPRESS YES'¶
- STATIC_ROW_COMPRESSION = 'COMPRESS YES STATIC'¶
- ADAPTIVE_ROW_COMPRESSION = 'COMPRESS YES ADAPTIVE'¶
- VALUE_COMPRESSION = 'VALUE COMPRESSION'¶
Local Table Cache¶
- class pydiverse.pipedag.backend.table.cache.ParquetTableCache(*args, base_path: str | upath.core.UPath, **kwargs)[source]¶
Local Table Cache that stores tables in Parquet files.
Supported Tables
The ParquetTableCache supports Pandas, Polars and pydiverse.transform.
- Parameters:
base_path – A path to a folder where the Parquet files should get stored. To differentiate between different instances, the
instance_idwill automatically be appended to the provided path.
Blob Store¶
- class pydiverse.pipedag.backend.blob.FileBlobStore(base_path: str | upath.core.UPath)[source]¶
File based blob store
The
FileBlobStorestores blobs in a folder structure on a file system. In the base directory there will be two folders for every stage, one for the base and one for the transaction stage. Inside those folders the blobs will be stored as pickled files:base_path/instance_id/STAGE_NAME/BLOB_NAME.pkl.To commit a stage, the only thing that has to be done is to rename the appropriate folders.
Lock Manager¶
- class pydiverse.pipedag.backend.lock.DatabaseLockManager(engine: sqlalchemy.engine.base.Engine, instance_id: str, lock_schema: pydiverse.pipedag.container.Schema | None = None, create_lock_schema: bool = True)[source]¶
Lock manager based on database locking mechanisms
Many databases provide some kind of locking mechanism. Depending on the specific database technology, this allows us to implement locking on either the schema level (where each stage can be locked and unlocked individually), or only on the instance level (where the entire instance including all stages get locked and unlocked together).
This lock manager uses the same database as the
SQLTableStore. No other configuration in theargssection of the config file is required.We currently support the following databases: PostgreSQL, Microsoft SQL Server, IBM DB2.
- class pydiverse.pipedag.backend.lock.ZooKeeperLockManager(client: NoneType, base_path: str)[source]¶
Apache ZooKeeper based lock manager
Uses Apache ZooKeeper to establish fully distributed locks that are globally synchronous. The advantage of this approach is that we it is highly reliable and that in case our flow crashes, the acquired locks automatically get released (the locks are ephemeral).
Config File
All arguments in the
argssection get passed as-is to the initializer ofkazoo.client.KazooClient. Some useful arguments include:- Parameters:
hosts – Comma separated list of hosts to connect.
- class pydiverse.pipedag.backend.lock.FileLockManager(base_path: str | pathlib.Path)[source]¶
Lock manager that uses lock files
For details on how exactly the file locking is implemented, check out the filelock documentation.
- Parameters:
base_path – A path to a folder where the lock files should get stored. To differentiate between different instances, the
instance_idwill automatically be appended to the provided path.
- class pydiverse.pipedag.backend.lock.NoLockManager(logger_kwargs=None)[source]¶
This lock manager doesn’t do any locking and only serves as a placeholder for an actual lock manager for testing something locally.
Warning
This lock manager is not intended for use in a production environment. Using a lock manager is essential for preventing data corruption.
Orchestration Engine¶
- class pydiverse.pipedag.engine.SequentialEngine[source]¶
Most basic orchestration engine that just executes all tasks sequentially.
- class pydiverse.pipedag.engine.DaskEngine(**dask_compute_kwargs)[source]¶
Execute a flow in parallel on a single machine using dask.
- Parameters:
dask_compute_kwargs – Keyword arguments that get passed to
dask.compute(). The main kwarg you might be interested in isnum_workers, which allows you to specify how many worker processes dask should spawn. By default, it spawns one worker per CPU core.
- class pydiverse.pipedag.PrefectEngine¶
Alias for either
PrefectOneEngineorPrefectTwoEnginedepending on the version of Prefect that is installed.
Special Table Types¶
- class pydiverse.pipedag.container.ExternalTableReference(name: str, schema: str, shared_lock_allowed: bool = False)[source]¶
Reference to a user-created table.
By returning a ExternalTableReference wrapped in a
Tablefrom, a task you can tell pipedag about a table, a view or DB2 nickname in an external schema. The schema may be a multi-part identifier like “[db_name].[schema_name]” if the database supports this. It is passed to SQLAlchemy as-is.Only supported by
SQLTableStore.Warning
When using a ExternalTableReference, pipedag has no way of knowing the cache validity of the external object. Hence, the user should provide a cache function for the Task. It is now allowed to specify a ExternalTableReference to a table in schema of the current stage.
Example
You can use a ExternalTableReference to tell pipedag about a table that exists in an external schema:
@materialize(version="1.0") def task(): return Table(ExternalTableReference("name_of_table", "schema"))
By using a cache function, you can establish the cache (in-)validity of the external table:
from datetime import date # The external table becomes cache invalid every day at midnight def my_cache_fun(): return date.today().strftime("%Y/%m/%d") @materialize(cache=my_cache_fun) def task(): return Table(ExternalTableReference("name_of_table", "schema"))
The ExternalTableReference object can also be created at flow wiring time:
with Flow() as f: with Stage("stage") as s: tbl = Table(ExternalTableReference("name_of_table", "schema")) _ = some_task(tbl)