Best practices: moving from Raw SQL over handwritten SELECT statements to programmatic SQLΒΆ
Pydiverse.pipedag should enable adoption for whatever data pipeline code already exists and then allow for gradual improvements. Many data pipelines start with considerable code bases of raw SQL scripts. The example raw_sql shows how to wrap raw SQL files with pipedag tasks. Such a raw SQL script may look as follows:
DROP TABLE IF EXISTS {{out_schema}}.table01
GO
CREATE TABLE {{out_schema}}.table01 (
entity VARCHAR(17) NOT NULL
, reason VARCHAR(50) NOT NULL
PRIMARY KEY (entity, reason)
)
INSERT INTO {{out_schema}}.table01 WITH (TABLOCKX)
SELECT DISTINCT raw01.entity entity
, 'Missing in raw01' reason
FROM {{in_schema}}.raw01 WITH (NOLOCK)
LEFT JOIN (
SELECT DISTINCT entity
FROM {{in_schema}}.raw01 WITH (NOLOCK)
) raw01x
ON raw01.entity = raw01x.entity
WHERE raw01.end_date = '9999-01-01'
AND raw01x.entity IS NULL
The actual important user specification part of this SQL script, however, is the SELECT statement. As a next step, it is
suggested to extract all SELECT statements and to let pipedag do the materialization in form of a CREATE TABLE AS SELECT
statement. This will also lead to more portable code since the actual materialization is done quite differently for different
database engines and a lot of options may exist how to do this differently (i.e. compression, etc.):
import sqlalchemy as sa
import pydiverse.pipedag as dag
from pydiverse.pipedag import materialize
def ref(table: sa.Alias):
return f"[{table.original.schema}].[{table.original.name}]"
@materialize(lazy=True, input_type=sa.Table)
def table01(raw01: sa.Alias):
sql = f"""
SELECT DISTINCT raw01.entity entity, 'Missing in raw01' reason
FROM {ref(raw01)} as raw01 WITH (NOLOCK)
LEFT JOIN (
SELECT DISTINCT entity
FROM {ref(raw01)} as raw01 WITH (NOLOCK)
) raw01x
ON raw01.entity = raw01x.entity
WHERE raw01.end_date = '9999-01-01'
AND raw01x.entity IS NULL
"""
return dag.Table(sa.text(sql), name="table01", primary_key=["entity", "reason"])
In the raw sql example, this task can be embedded even in the middle of raw SQL scripts:
with Flow() as flow:
with Stage("helper") as out_stage:
helper = tsql("create_db_helpers.sql", parent_dir, out_stage=out_stage)
with Stage("raw") as out_stage:
_dir = parent_dir / "raw"
raw = tsql("raw_views.sql", _dir, out_stage=out_stage, helper_sql=helper)
with Stage("prep") as prep_stage:
_dir = parent_dir / "prep"
# >> Start of tasks with individual select statements (this comment is just a comment):
_table01 = table01(raw["raw01"])
# << End of tasks with individual select statements:
prep = tsql(
"more_tables.sql", _dir, in_sql=raw, out_stage=prep_stage, depend=_table01
)
_ = prep
In the end, the goal is to see the complete dependency tree on table level in codes. Pipedag can handle lists and dictionaries. So it is no problem for a task to return more than one output table.
Once all SELECT statements are extracted, the next step is to convert them to programmatic SQL. This has the advantage
that general software engineering principles can be used to share code and to test parts of the code. Pipedag supports
multiple ways of describing SQL statements in code (See Backends). The most stable way of writing SQL
statements in python code, however, is SQLAlchemy. The task above would look like this in SQLAlchemy:
import sqlalchemy as sa
import pydiverse.pipedag as dag
from pydiverse.pipedag import materialize
@materialize(lazy=True, input_type=sa.Table)
def table01(raw01: sa.Alias):
raw01x = sa.select([raw01.c.entity]).distinct().alias("raw01x")
sql = (
sa.select([raw01.c.entity, sa.literal("Missing in raw01").label("reason")])
.select_from(raw01.outer_join(raw01x, raw01.c.entity == raw01x.c.entity))
.where((raw01.c.end_date == "9999-01-01") & (raw01x.c.entity.is_(None)))
)
return dag.Table(sql, name="table01", primary_key=["entity", "reason"])