Pipedag Config File¶
Note
technical_setups is missing. Do we actually need this?
The pipedag.yaml file is used to configure pipedag.
When executing a flow, pipedag searches for the config file in the following directories:
The current working directory
Any parent directories of the working directory
The user folder
Alternatively you can also specify a config file path using the PIPEDAG_CONFIG environment variable.
Attention
Pipedag config files may be used for code execution under a different user ID.
Pipedag may be instructed to read yaml files with key-value pairs and send content to arbitrary hosts. It can also be instructed to send content of environment variables (i.e. security tokens). Furthermore it can be instructed to load python classes in the python environment and the constructor be fed with arbitrary keyword arguments.
Specifying instances and flows¶
One of the main goals of pipedag is to make it easy to keep multiple instances of a data processing DAG up and running at the same time and maximize development efficiency.
Thus, it is important to allow running the same flow with full input data, a sampled version, and a bare minimum data set for quick smoke tests of newly written code.
We would call setups for those three input data sets ‘pipedag instances’ and they could get names like full, midi or mini.
There is no hard binding between a flow and a pipedag configuration file. For each flow, the pipedag configuration file offers configuration options for multiple pipedag instances.
There are three main places in the config file where you can configure individual instances and flows:
instances:
my_instance:
# Configuration for instance `my_instance`
<parameter>: <value>
flows:
my_flow:
# Configuration for flow `my_flow`
<parameter>: <value>
instances:
my_flow:
# Configuration for the combination of
# flow `my_flow` with instance `my_instance`
<parameter>: <value>
In this case, if you were to run the flow my_flow on the instance my_instance,
we combine all appropriate config parameters in the instances, flows and flows > instances sections,
where values found in flows > instances replace values found in flows,
and values found in flows replace values found in the instances section.
Additionally, there are is a special __any__ instance and __any__ flow which provide the base attributes which are inherited by all other named instances and flows respectively.
Simple Example Config¶
name: pipedag_tests
instances:
__any__:
instance_id: pipedag_default
auto_table:
- pandas.DataFrame
- sqlalchemy.sql.expression.TextClause
- sqlalchemy.sql.expression.Selectable
table_store:
class: "pydiverse.pipedag.backend.table.SQLTableStore"
args:
url: "postgresql://user:password@127.0.0.1:5432/{instance_id}"
create_database_if_not_exists: true
blob_store:
class: "pydiverse.pipedag.backend.blob.FileBlobStore"
args:
base_path: "/tmp/pipedag/blobs"
lock_manager:
class: "pydiverse.pipedag.backend.lock.DatabaseLockManager"
orchestration:
class: "pydiverse.pipedag.engine.SequentialEngine"
When using this config, please make sure, you installed psycopg2 and adbc-driver-postgresql. When installing via pip, it might be easier to install psycopg2-binary instead of psycopg.
pip install pydiverse-pipedag pydot psycopg2-binary adbc-driver-postgresql
conda install -c conda-forge pydiverse-pipedag pydot psycopg2 adbc-driver-postgresql
or much faster than conda after installing pixi:
mkdir my_data_proj
cd my_data_proj
pixi init
pixi add pydiverse-pipedag pydot psycopg2 adbc-driver-postgresql
Specification¶
Top Level Attributes¶
- name
The name of the pipedag configuration. It is also used as the default name for a flow connected with this configuration.
- strict_instance_lookup
If set to true, looking up an instance that was not explicitly defined in the config will fail.
(default
True)
Instance / Flow level attributes¶
Instances of flow level attributes can be placed in the following places, and will be overwritten in the following order (more specific values replace less specific ones):
instances:
__any__:
attribute: value
instances:
<xxx>:
attribute: value
flows:
__any__:
attribute: value
flows:
__any__:
instances:
<xxx>:
attribute: value
flows:
<yyy>:
attribute: value
flows:
<yyy>:
instances:
<xxx>:
attribute: value
- instance_id
An ID for identifying a particular pipedag instance. Optional
Its purpose is to be used in
table_storeandblob_storeconfigurations for ensuring that different pipedag instances don’t overwrite each other’s tables, schemas, files or folders. Please note thatPipedagConfig.get(per_user=True)will modify instance_id such that it is unique for every user ID as taken from environment variables.The
instance_idwill also be used by the locking manager together with the stage name to ensure that different runs on the same instance_id will not mess with identically named schemas. The goal is that flows / pipedag instances can be run from IDE, Continuous Integration, and the Orchestration Engine UI without collisions, automatically ensuring cache validity the running code commit in the moment of transactionally committing a stage result.(default: name of flow)
- stage_commit_technique
We want to prepare the whole output of a
Stagebefore we make it visible to an explorative user looking in the table_store / database. There should never be a time when he sees a mix of new and old tables of that schema and the switch (stage commit) should happen in an instance. We don’t use database transactionality features because of expected slowdowns, and we do want to look at partial output for debugging.In order to commit stages, we currently offer the following techniques:
- schema_swap
We prepare output in a
<stage>__tmpschema and then swap schemas for<stage>and<stage>__tmpwith three rename operations.- read_views
We use two schemas,
<stage>__oddand<stage>__even, and fill schema<stage>just with views to one of those schemas.
Support for different commit techniques¶ Database
schema_swapread_viewsPostgreSQL
yes
yes
DuckDB
no
yes
Microsoft SQL Server
yes
yes
IBM Db2
no
yes
(default:
schema_swap)
- per_user_template
In case the config is generated with
PipedagConfig.get(per_user=True), the current user’s name gets injected intoinstance_id. This new per-user instance_id is them used wherever the old instance_id was used.To customize how this per-user instance_id is constructed, you can provide the
per_user_templaceargument, which must include the template placeholders{username}and{id}.per_user_templace: "{username}_{id}"
(default:
{id}_{username})- network_interface
The network interface to use for communicating with the parent process. Optional
This should be the IP address of the computer on which the flow is being executed. Unless you are running the flow in a distributed manner across multiple computer, you can leave this value blank.
To specify a IPv6 address, you must surround it in square brackets.
(default:
127.0.0.1)- disable_kroki
If set to
True, Kroki URL will not be displayed at end of executing flow. Kroki URLs are a really nice way of visualizing the flow execution without the need to install graphviz and to worry about how to display the result. The graph is sent to the kroki_url only once you click the link. The whole graph is encoded in the URL. However, since there is the risk to expose sensitive information by sending stage and task names to a public server, it is disabled by default. You can also self-host a kroki service and set the kroki_url to your own service.(default:
True)
- kroki_url
A url that points to a Kroki instance. Optional
Pipedag uses a free service called Kroki to visualize flows (see
Flow.visualize_urlandResult.visualize_url). If you want to self-host a Kroki instance (for example, if your flow contains sensitive information), you can specify a custom url that pipedag should use.(default:
https://kroki.io)
- auto_table
A list of tables classes. If a materializing task returns an instance of any class in this list, it automatically gets materialized to the table store. Optional
For example, if you automatically want to store all pandas dataframes, pydiverse transform tables and sql alchemy queries in the table store, you would specify it like this:
auto_table: - pandas.DataFrame - pydiverse.transform.Table - sqlalchemy.sql.expression.TextClause - sqlalchemy.sql.expression.Selectable
- auto_blob
Same as
auto_tablejust for blobs. Optional- fail_fast
When set to
True, and an exception occurs during execution of a flow, the flow will abort execution and reraise the exception.(default:
False)
- strict_result_get_locking
When set to
True, check thatResult.get()is only called within awith StageLockContext(...)statement.This prevents a different flow from overwriting the results before they get fetched. The default is a good choice when (potentially) running tests in parallel. For interactive debugging it might be handy to disable this check.
(default:
True)- cache_validation
See Cache Validation options. Optional
- table_store
See Table Store Config. Required
- blob_store
See Blob Store Config. Required
- lock_manager
See Lock Manager Config. Required
- orchestration
See Orchestration Engine Config. Required
- attrs
A place to put an arbitrary, user specific yaml mapping. During flow execution you will be able to access these
attrsusingConfigContext.get().attrs
Cache Validation options¶
- mode
Choose a mode of cache invalidation.
Supported values:
normal: Normal cache invalidation.assert_no_fresh_input: Same asignore_fresh_inputand additionally fail if tasks having a cache function would still be executed (change in version or lazy query).ignore_fresh_input: Ignore the output of cache functions that help determine the availability of fresh input. Withdisable_cache_function=False, it still calls cache functions, so cache invalidation works interchangeably betweenignore_fresh_inputandnormal.force_fresh_input: Consider all cache function outputs as different and thus make source tasks cache invalid.force_caches_invalid: Disable caching and thus force all tasks as cache invalid. This option impliesforce_fresh_input.
(default:
normal)- disable_cache_function
When set to
True, cache functions are not called. This is not compatible withmode=normal. The difference toignore_fresh_inputis that in case mode is set back tonormal, the cache becomes invalid if disable_cache_function was set toTrueduring last run.(default:
False)- ignore_task_version
When set to
True, tasks that specify an explicit version for cache invalidation will always be considered cache invalid. This might be useful for instances with short execution time during rapid development cycles when manually bumping version numbers becomes cumbersome.(default:
False)
Table Store Config¶
This section (labeled table_store) specifies the table store to use.
- table_store_connection
This attribute allows referencing a block of attributes from the top level
table_store_connectionssection.As an example, this config
table_store_connections: postgres: class: "pydiverse.pipedag.backend.table.SQLTableStore" args: url: "postgresql://postgres:pipedag@127.0.0.1/{instance_id}" instances: __any__: table_store: table_store_connection: postgres args: schema_prefix: "foo"
is, after parsing, equivalent to the following config:
instances: __any__: class: "pydiverse.pipedag.backend.table.SQLTableStore" args: url: "postgresql://postgres:pipedag@127.0.0.1/{instance_id}" schema_prefix: "foo"
- class
The fully qualified name of the class to be used as the table store.
Available classes:
- args
Any values in this subsection will be passed as arguments to the
__init__or, if available, the_init_conf_method of the table store class. For a list of available options, look at the__init__method of the table store you are using.
- hook_args
This subsection allows passing custom config arguments to the different table hooks to influence how tables get materialized and retrieved. The builtin hooks respect the following options:
- pandas
- dtype_backend
The default dtype backend to use.
In both cases, the aim is to avoid
dtype=objectcolumns in the provided pd.DataFrame tables. That is why date columns are converted todatetime64[s]/pa.date32and datetimes todatetime64[us]/pa.time64("us"). Nanosecond precision is rarely needed and triggers conversion to object dtype if year < 1677 or > 2262.Supported values:
numpy: Use pandas’ nullable extension dtypes for numpy.arrow: Use pyarrow backed dataframes.
(default: ‘numpy’)
- polars
- disable_materialize_annotation_action
Disable annotation based materialize actions.
By Default, pipedag will use validation functionality of dataframely or colspec in case task return arguments are annotated with column specification classes. This is also a way to specify more precise datatypes when writing to database. Example:
import polars as pl import pydiverse.colspec as cs class MyColSpec(cs.ColSpec): a: cs.Int16 b: cs.String @materialize def my_task() -> MyColSpec: return pl.DataFrame(dict(a=[1, 2], b=["x", "y"]))
(default:
False)
- disable_retrieve_annotation_action
Disable annotation based retrieve actions.
By Default, pipedag will use validation functionality of dataframely or colspec in case task parameters are annotated with column specification classes. This is also a way to specify more precise datatypes when writing to database. Example:
import polars as pl import pydiverse.colspec as cs class MyColSpec(cs.ColSpec): a: cs.Int16 b: cs.String @materialize(input_type=pl.DataFrame) def my_task(tbl: MyColSpec) -> pl.DataFrame: return tbl.filter(pl.col("a") > 0)
(default:
False)
- fault_tolerant_annotation_action
If set to
True, the annotation based actions will never fail. Instead, an error message is printed to the log.(default:
False)
- sql
- disable_materialize_annotation_action
Disable annotation based materialize actions.
By Default, pipedag will use validation functionality of dataframely or colspec in case task return arguments are annotated with column specification classes. For
input_type=pdt.SqlAlchemy, this is also a way to specify more precise datatypes when writing to database.Example:
import polars as pl import pydiverse.colspec as cs class MyColSpec(cs.ColSpec): a: cs.Int16 b: cs.String @materialize(input_type=pdt.SqlAlchemy) def my_task(tbl: pdt.Table) -> MyColSpec: return tbl >> mutate(a=1, b="b") >> pdt.select(tbl.a, tbl.b)
(default:
False)
- disable_retrieve_annotation_action
Disable annotation based retrieve actions.
Currently, this has no effect for SQL based table hooks. This argument still exists for consistency with polars table hooks.
(default:
False)
- cleanup_annotation_action_on_success
Whether to drop the table used for rows which failed the materialize validation.
While it is nice to clean up empty tables, the downside of this is that the presence of tables in a schema varies especially with
fault_tolerant_annotation_action=True.(default:
False)
- cleanup_annotation_action_intermediate_state
Whether to drop intermediate tables used for validating output tables.
Some checks require subqueries to perform. Subqueries are always a risk to confuse the query optimizer. Thus it is nearly always better to materialize subqueries before their use. Those intermediate tables, however, are not very interesting for the user. Pipedag will still print the queries that produced those intermediate tables.
(default:
True)
- fault_tolerant_annotation_action
If set to
True, the annotation based actions will never fail. Instead, an error message is printed to the log.(default:
False)
- local_table_cache
See Local Table Cache. Optional
- metadata_store
This subsection can hold all attributes of a
table_storethat is only used for metadata. OptionalThis is useful to use a metadata store that is better for sharing between multiple users or for locking. Especially, ParquetTableStore benefits from this, because it is hard to share access to .duckdb files since this is not a supported feature of DuckDB. Postgres is a good database for light-weight managing of synchronization.
You can also use DatabaseLockManager as lock manager if the metadata_store supports it.
Implementation Details: The parent metadata tables will be flushed when using metadata_store to avoid errors when changing metadata_store configuration. Specifically for ParquetTableStore, the metadata_store will also store information of changes in views providing access to parquet files. This is necessary to keep all the local .duckdb files in-sync.
(default: None)
Local Table Cache¶
The section (labeled local_table_cache) inside the table_store section specifies a cache for storing tables locally.
Such a local table cache may help speed up local development significantly as tables don’t need to be retrieved from the (potentially slower) table store.
- class
The fully qualified name of the class to be used as the local table cache.
Available classes:
- attrs
Any values in this subsection will be passed as arguments to the
__init__or, if available, the_init_conf_method of the local table cache class. For a list of available options, look at the__init__method of the table cache you are using.- store_input
If true, input dataframes are cached after reading from the table store. This can significantly speed up the retrieval.
(default:
True)- store_output
If true, output dataframes are stored before writing to table store. This is mainly useful for inspecting / using the cache during debugging.
(default:
False)- use_stored_input_as_cache
If true, input dataframes are read from cache instead of table store if cache is valid.
(default:
True)
Blob Store Config¶
This section (labeled blob_store) specifies the blob store to use.
It is structured very similarly to table_store and provides the following options:
- blob_store_connection
This attribute allows referencing a block of attributes from the top level
blob_store_connectionssection. For more detail, refer to the documentation oftable_store_connection.- class
The fully qualified name of the class to be used as the blob store.
Available classes:
- args
Any values in this subsection will be passed as arguments to the
__init__or, if available, the_init_conf_method of the blob store class. For a list of available options, look at the__init__method of the blob store you are using.
Lock Manager Config¶
This section (labeled lock_manager) specifies the lock manager to use.
- class
The fully qualified name of the class to be used as the lock manager.
Available classes:
- args
Any values in this subsection will be passed as arguments to the
__init__or, if available, the_init_conf_method of the lock manager class. For a list of available options, look at the__init__method of the lock manager you are using.
Orchestration Engine Config¶
This section (labeled orchestration) specifies which orchestration engine should be used for executing a flow.
It is structured very similarly to table_store and provides the following options:
- class
The fully qualified name of the class to be used as the orchestration engine.
Available classes:
- args
Any values in this subsection will be passed as arguments to the
__init__or, if available, the_init_conf_method of the orchestration engine class. For a list of available options, look at the__init__method of the orchestration engine you are using.
Complex Example Config¶
name: pipedag_tests
strict_instance_lookup: true # default value: true
table_store_connections:
postgres:
args:
# Postgres: this can be used after running `docker-compose up`
url: "postgresql://{$POSTGRES_USERNAME}:{$POSTGRES_PASSWORD}@127.0.0.1:6543/{instance_id}"
postgres2:
args:
url: "postgresql://{username}:{password}@{host}:{port}/{instance_id}"
url_attrs_file: "{$POSTGRES_PASSWORD_CFG}"
mssql:
args:
# SQL Server: this can be used after running `docker-compose up`
url: "mssql+pyodbc://{$MSSQL_USERNAME}:{$MSSQL_PASSWORD}@127.0.0.1:1433/master?driver=ODBC+Driver+18+for+SQL+Server&encrypt=no"
# schema_prefix: "master." # SQL Server needs database.schema (exactly one of prefix and suffix must include a dot)
schema_prefix: "{instance_id}_" # SQL Server needs database.schema
schema_suffix: ".dbo" # Alternatively SQL Server databases can be used as schemas with .dbo default schema
blob_store_connections:
file:
args:
base_path: "/tmp/pipedag/blobs"
technical_setups:
default:
# listen-interface for pipedag context server which synchronizes some task state during DAG execution
network_interface: "127.0.0.1"
# classes to be materialized to table store even without pipedag Table wrapper (we have loose coupling between
# pipedag and pydiverse.transform, so consider adding 'pydiverse.transform.Table' in your config)
auto_table: ["pandas.DataFrame", "sqlalchemy.sql.expression.TextClause", "sqlalchemy.sql.expression.Selectable"]
# abort as fast a possible on task failure and print most readable stack trace
fail_fast: true
# Attention: For disable_kroki: false, stage and task names might be sent to the kroki_url.
# You can self-host kroki if you like:
# https://docs.kroki.io/kroki/setup/install/
# You need to install optional dependency 'pydot' for any visualization
# URL to appear.
disable_kroki: true
kroki_url: "https://kroki.io"
instance_id: pipedag_default
table_store:
class: "pydiverse.pipedag.backend.table.SQLTableStore"
# Postgres: this can be used after running `docker-compose up`
table_store_connection: postgres
## SQL Server: this can be used after running `docker-compose up`
#table_store_connection: mssql
args:
create_database_if_not_exists: True
# print select statements before being encapsulated in materialize expressions and tables before writing to
# database
print_materialize: true
# print final sql statements
print_sql: true
blob_store:
class: "pydiverse.pipedag.backend.blob.FileBlobStore"
blob_store_connection: file
lock_manager:
class: "pydiverse.pipedag.backend.lock.DatabaseLockManager"
orchestration:
class: "pydiverse.pipedag.engine.SequentialEngine"
## Activate this class to work either with prefect 1.x or prefect 2.y
# class: "pydiverse.pipedag.engine.prefect.PrefectEngine"
instances:
__any__:
technical_setup: default
# The following Attributes are handed over to the flow implementation (pipedag does not care)
attrs:
# by default we load source data and not a sampled version of a loaded database
copy_filtered_input: false
full:
# Full dataset is using default database connection and schemas
instance_id: pipedag_full
table_store:
table_store_connection: postgres2
# Run this instance under @pytest.mark.slow5
tags: pytest_mark_slow5
midi:
# Full dataset is using default database connection and schemas
instance_id: pipedag_midi
attrs:
# copy filtered input from full instance
copy_filtered_input: true
copy_source: full
copy_per_user: false
copy_filter_cnt: 2 # this is just dummy input where we sample 2 rows
# Run this instance under @pytest.mark.slow4
tags: pytest_mark_slow4
# Run only stage_2 under @pytest.mark.slow3
stage_tags:
pytest_mark_slow3:
- simple_flow_stage2
mini:
# Full dataset is using default database connection and schemas
instance_id: pipedag_mini
attrs:
copy_filtered_input: true
copy_source: full
copy_per_user: false
copy_filter_cnt: 1 # this is just dummy input where we sample 1 row
# Run this instance under @pytest.mark.slow2
tags: pytest_mark_slow2
# Run only stage_2 under @pytest.mark.slow1
stage_tags:
pytest_mark_slow1:
- simple_flow_stage2
mssql:
# Full dataset is using default database connection and schemas
table_store:
table_store_connection: mssql
flows:
# __any__:
# instances:
# # it would be equivalent to move everything in "instances:" to here
test_instance_selection:
instances:
full:
table_store:
schema_suffix: "_full"
table_store:
schema_prefix: "instance_selection_"