DAG Visualization and Grouping Tasks/Stages

This example how to group tasks and stages within the visualization of the pipeline. Grouping may also be used as a way to ensure some tasks to be executed before/after others even if they don’t have an explicit dependency.

This is a simple flow visualization example with a group:

        with Flow() as flow:
            with Stage("stage1"):
                _ = any_task()
                with GroupNode("group1"):
                    task1 = task_within_group()
                    _ = task_within_group2(task1)
                _ = any_task()

Visualization of group of tasks within stage

You can also use the group to order tasks within a stage with ordering_barrier=True parameter:

        with Flow() as flow:
            with Stage("stage1"):
                _ = any_task()
                with GroupNode("group1", ordering_barrier=True):
                    task1 = task_within_group()
                    _ = task_within_group2(task1)
                _ = any_task()

In this visualization case, the tasks outside the group are linked with the group:

Visualization of group of tasks within stage with barrier

The group style can be configured quite flexibly with passing a VisualizationStyle parameter:

        with Flow() as flow:
            with Stage("stage1"):
                _ = any_task()
                with GroupNode(
                        "group1", ordering_barrier=True,
                        style=VisualizationStyle(hide_content=True, box_always_color="#ccccff")
                ):
                    task1 = task_within_group()
                    _ = task_within_group2(task1)
                _ = any_task()

In this visualization case, the content of the group is hidden and the color of the box is chosen explicitly:

Visualization of group of tasks within stage with hide_content

The box of groups can be hidden. In case of ordering_barrier=True, barrier tasks are added in visualization instead:

        with Flow() as flow:
            with Stage("stage1"):
                _ = any_task()
                with GroupNode("group1", ordering_barrier=True, style=VisualizationStyle(hide_box=True)):
                    task1 = task_within_group()
                    _ = task_within_group2(task1)
                _ = any_task()

Visualization of group of tasks within stage with hide_content

Groups can also span stages like this:

        with Flow() as flow:
            with GroupNode("group1"):
                with Stage("stage1"):
                    task1 = task_within_group()
                    _ = task_within_group2(task1)
            with Stage("stage2"):
                _ = any_task()

Visualization of group of stage

And it is possible to setup visualization groups within configuration:

        with Flow() as flow:
            with Stage("stage1"):
                _ = any_task()
                task1 = task_within_group()
                _ = task_within_group2(task1)
            with Stage("stage2"):
                _ = any_task()
instances:
    __any__:
        visualization:
            default:
                styles:
                    my_group_style:
                        hide_content: true
                        box_color_always: "#ccccff"
                group_nodes:
                    my_group1:
                        label: "Group 1"
                        tasks:
                          - task_within_group
                          - task_within_group2
                        style_tag: my_group_style
                    my_group2:
                        label: "Group 2"
                        stages:
                            - stage2

For testing, it might be easier to assemble the configuration in code:

        from pydiverse.pipedag.core.config import create_basic_pipedag_config, PipedagConfig
        base_cfg = create_basic_pipedag_config(
            f"duckdb:///{temp_dir}/db.duckdb",
            disable_stage_locking=True,  # This is special for duckdb
            # Attention: If uncommented, stage and task names might be sent to the
            #   following URL. You can self-host kroki if you like:
            #   https://docs.kroki.io/kroki/setup/install/
            #   You need to install optional dependency 'pydot' for any visualization
            #   URL to appear.
            kroki_url="https://kroki.io",
        )
        group_nodes = dict(
            my_group1=dict(tasks=["task_within_group", "task_within_group2"], label="Group 1", style_tag="my_group_style"),
            my_group2=dict(stages=["stage2"], label="Group 2"),
        )
        styles = dict(
            my_group_style=dict(hide_content=True, box_color_always="#ccccff"),
        )
        visualization = dict(default=dict(group_nodes=group_nodes, styles=styles), alternative={})
        raw_cfg = base_cfg.raw_config.copy()
        raw_cfg["instances"]["__any__"]["visualization"] = visualization
        cfg = PipedagConfig(raw_cfg).get()

Visualization of group of stage

Here is a legend of default colors used for tasks and group nodes:

Legend of default task and group node colors

The following code (see also example.zip) is a basis for testing the code snippets from above. It can also be executed as jupyter notebook when using an ipython kernel similar to running conda env create with this environment.yml.

import tempfile

from pydiverse.pipedag import Flow, Stage, materialize, GroupNode, VisualizationStyle
from pydiverse.pipedag.core.config import create_basic_pipedag_config
from pydiverse.common.util.structlog import setup_logging


@materialize
def any_task():
    return 1

@materialize
def task_within_group():
    return 2

@materialize
def task_within_group2(input1: int):
    return input1 + 1


def main():
    with tempfile.TemporaryDirectory() as temp_dir:
        cfg = create_basic_pipedag_config(
            f"duckdb:///{temp_dir}/db.duckdb",
            disable_stage_locking=True,  # This is special for duckdb
            # Attention: stage and task names might be sent to the
            #   following URL. You can self-host kroki if you like:
            #   https://docs.kroki.io/kroki/setup/install/
            #   You need to install optional dependency 'pydot' for any visualization
            #   URL to appear.
            kroki_url="https://kroki.io",
        ).get("default")
        with cfg:
            with Flow() as flow:
                with Stage("stage1"):
                    _ = any_task()
                    with GroupNode(
                            "group1", ordering_barrier=True,
                            style=VisualizationStyle(hide_content=True, box_color_always="#ccccff")
                    ):
                        task1 = task_within_group()
                        _ = task_within_group2(task1)
                    _ = any_task()

            # Run flow
            result = flow.run()
            assert result.successful

            # you can also visualize the flow explicitly:
            # kroki_url = result.visualize_url()
            # result.visualize()


if __name__ == "__main__":
    setup_logging()  # you can setup the logging and/or structlog libraries as you wish
    main()

This code was used for creating the legend of colors:

import tempfile

from pydiverse.pipedag import Flow, GroupNode, Stage, VisualizationStyle, materialize
from pydiverse.pipedag.core.config import create_basic_pipedag_config
from pydiverse.common.util.structlog import setup_logging


@materialize
def failed():
    assert False, "This task is supposed to fail"


@materialize(version=None)
def completed_but_cache_invalid():
    return 1


@materialize(version="1.0")
def cache_valid():
    return 2


@materialize(version="1.0")
def cache_valid2():
    return 3


@materialize
def skipped(out):
    return out + 1


def main():
    with tempfile.TemporaryDirectory() as temp_dir:
        cfg = create_basic_pipedag_config(
            f"duckdb:///{temp_dir}/db.duckdb",
            disable_stage_locking=True,  # This is special for duckdb
            # Attention: stage and task names might be sent to the
            #   following URL. You can self-host kroki if you like:
            #   https://docs.kroki.io/kroki/setup/install/
            #   You need to install optional dependency 'pydot' for any visualization
            #   URL to appear.
            kroki_url="https://kroki.io",
            fail_fast=False,
        ).get("default").evolve(swallow_exceptions=True)
        with cfg:
            with Flow() as flow:
                with Stage("stage1"):
                    _ = completed_but_cache_invalid()
                    _ = cache_valid()
                with Stage("stage2"):
                    out = failed()
                with Stage("stage3"):
                    _ = skipped(out)
                    with GroupNode("group_none_cache_valid", style=VisualizationStyle(hide_content=True)):
                        _ = completed_but_cache_invalid()
                    with GroupNode("group_any_cache_valid", style=VisualizationStyle(hide_content=True)):
                        _ = completed_but_cache_invalid()
                        _ = cache_valid()
                    with GroupNode("group_all_cache_valid", style=VisualizationStyle(hide_content=True)):
                        # avoid memoization (not counted as cache valid)
                        _ = cache_valid2()
                    with GroupNode("group_any_failed", style=VisualizationStyle(hide_content=True)):
                        _ = completed_but_cache_invalid()
                        out = failed()
                        _ = skipped(out)

            # Run flow
            result = flow.run()
            assert not result.successful

            # Run flow again for cache validity
            result = flow.run()
            assert not result.successful

            # you can also visualize the flow explicitly:
            # kroki_url = result.visualize_url()
            # result.visualize()

if __name__ == "__main__":
    setup_logging()  # you can setup the logging and/or structlog libraries as you wish
    main()