Raw SQL example pipeline¶
Even though it is not the recommended way to run larger Raw SQL scripts within pipedag, the aim of pydiverse.pipedag is to enable adoption with the least amount possible given existing code. And lots of data pipelines have extensive code bases of raw SQL scripts.
Pipedag supports automatic cache invalidation for raw SQL scripts and schema swapping. So these are benefits you get
from pipedag adoption out of the box. It will detect tables created by a raw SQL script, and it will offer to reference
such tables with raw_sql["table_name"] get-item syntax both at flow declaration time (will be lazily evaluated by
consumer task) and at runtime.
The following code can be found in this zip file. It keeps the raw SQL scripts in files and creates
a generic pipedag task called tsql which replaces schema references in the raw SQL text. The prefixes out_, in_,
and helper_ are arbitrary and can be modified and enriched as you see fit. The flow builds a dependency chain between
tsql calls to ensure that SQL files are executed in correct order. It doesn’t manage dependencies on table level though.
See best practices around SQL for how to move from raw SQL scripts to better dependency management
and thus more fine-grained cache invalidation.
import os
from pathlib import Path
import sqlalchemy as sa
from pydiverse.pipedag import Flow, Stage, materialize
from pydiverse.pipedag.container import RawSql
from pydiverse.pipedag.context import ConfigContext, StageLockContext
from pydiverse.common.util.structlog import setup_logging
"""
Attention:
Wrapping Raw SQL statements should always be just the first step of pipedag adoption.
Ideally, the next step is to extract individual transformations (SELECT statements) so
they can be gradually converted from text SQL to programmatically created SQL (python).
"""
@materialize(input_type=sa.Table, lazy=True)
def tsql(
name: str,
script_directory: Path,
*,
out_stage: Stage | None = None,
in_sql=None,
helper_sql=None,
depend=None,
):
_ = depend # only relevant for adding additional task dependency
script_path = script_directory / name
sql = Path(script_path).read_text(encoding="utf-8")
sql = raw_sql_bind_schema(sql, "out_", out_stage, transaction=True)
sql = raw_sql_bind_schema(sql, "in_", in_sql)
sql = raw_sql_bind_schema(sql, "helper_", helper_sql)
return RawSql(sql, Path(script_path).name)
def raw_sql_bind_schema(sql, prefix: str, stage: Stage | RawSql | None, *, transaction=False):
if isinstance(stage, RawSql):
stage = stage.stage
config = ConfigContext.get()
store = config.store.table_store
if stage is not None:
stage_name = stage.transaction_name if transaction else stage.name
schema = store.get_schema(stage_name).get()
sql = sql.replace("{{%sschema}}" % prefix, schema)
return sql
def _run_and_check(flow, prep_stage):
with StageLockContext():
flow_result = flow.run()
assert flow_result.successful
config_ctx = flow_result.config_context
schema = config_ctx.store.table_store.get_schema(prep_stage.name).get()
inspector = sa.inspect(config_ctx.store.table_store.engine)
# these constraints might be a bit too harsh in case this test is extended
# to more databases
assert set(inspector.get_table_names(schema=schema)) == {
"raw01A",
"table01",
"special_chars",
"special_chars2",
"special_chars_join",
}
pk = inspector.get_pk_constraint("raw01A", schema=schema)
assert pk["constrained_columns"] == ["entity", "start_date"]
assert pk["name"].startswith("PK__raw01A__")
pk = inspector.get_pk_constraint("table01", schema=schema)
assert pk["constrained_columns"] == ["entity", "reason"]
assert pk["name"].startswith("PK__table01__")
assert len(inspector.get_indexes("table01", schema=schema)) == 0
indexes = inspector.get_indexes("raw01A", schema=schema)
assert indexes[0]["name"] == "raw_start_date"
assert indexes[0]["column_names"] == ["start_date"]
assert indexes[1]["name"] == "raw_start_date_end_date"
assert indexes[1]["column_names"] == ["end_date", "start_date"]
with config_ctx.store.table_store.engine.connect() as conn:
sql = f"SELECT string_col FROM [{schema}].[special_chars_join]"
str_val = conn.execute(sa.text(sql)).fetchone()[0]
assert str_val == "äöüßéç"
def test_raw_sql():
parent_dir = Path(__file__).parent / "raw_sql_scripts" / "mssql"
with Flow() as flow:
with Stage("helper") as out_stage:
helper = tsql("create_db_helpers.sql", parent_dir, out_stage=out_stage)
with Stage("raw") as out_stage:
_dir = parent_dir / "raw"
raw = tsql("raw_views.sql", _dir, out_stage=out_stage, helper_sql=helper)
with Stage("prep") as prep_stage:
_dir = parent_dir / "prep"
prep = tsql(
"entity_checks.sql", _dir, in_sql=raw, out_stage=prep_stage, depend=raw
)
prep = tsql(
"more_tables.sql", _dir, in_sql=raw, out_stage=prep_stage, depend=prep
)
_ = prep
# on a fresh database, this will create indexes with Raw-SQL
_run_and_check(flow, prep_stage)
# make sure cached execution creates the same indexes
_run_and_check(flow, prep_stage)
if __name__ == "__main__":
os.environ["MSSQL_USERNAME"] = "sa"
os.environ["MSSQL_PASSWORD"] = "PydiQuant27"
setup_logging()
test_raw_sql()
SQL scripts look as follows with placeholders like {{out_schema}} since pipedag manages schema names and schema swapping:
DROP TABLE IF EXISTS {{out_schema}}.table01
GO
CREATE TABLE {{out_schema}}.table01 (
entity VARCHAR(17) NOT NULL
, reason VARCHAR(50) NOT NULL
PRIMARY KEY (entity, reason)
)
GO
INSERT INTO {{out_schema}}.table01 WITH (TABLOCKX)
SELECT DISTINCT raw01.entity entity
, 'Missing in raw01' reason
FROM {{in_schema}}.raw01 WITH (NOLOCK)
LEFT JOIN (
SELECT DISTINCT entity
FROM {{in_schema}}.raw01 WITH (NOLOCK)
) raw01x
ON raw01.entity = raw01x.entity
WHERE raw01.end_date = '9999-01-01'
AND raw01x.entity IS NULL
We run it with MSSQL database that we spin up with docker-compose. However, you still need to install the odbc driver:
Installing with
instructions here
worked.
But odbcinst -j revealed that it installed the configuration in /etc/unixODBC/*. But conda installed pyodbc brings
its own odbcinst executable and that shows odbc config files are expected in /etc/*. Symlinks were enough to fix the
problem. Try python -c 'import pyodbc;print(pyodbc.drivers())' and see whether you get more than an empty list.
Furthermore, make sure you use 127.0.0.1 instead of localhost. It seems that /etc/hosts is ignored.
The file raw_sql.zip can be used as follows:
Install pixi.
unzip raw_sql.zip
cd raw_sql
pixi run docker-compose up
and in another terminal within the same directory:
cd raw_sql
pixi run python raw_sql.py