Search Shortcut cmd + k | ctrl + k
duckorch

DAG orchestration with side-product lineage for DuckDB - SQL-file tasks, parallel execution, Mermaid graphs, OpenLineage emission

Maintainer(s): nkwork9999

Installing and Loading

INSTALL duckorch FROM community;
LOAD duckorch;

Example

LOAD duckorch;

-- Set up state schema
PRAGMA orch_init;

-- Register a task directly (in real use, prefer .sql files via PRAGMA orch_register('./tasks/'))
INSERT INTO __orch__.tasks (name, sql, inputs, outputs, retries) VALUES
  ('clean_users',
   'CREATE OR REPLACE TABLE clean_users AS SELECT id FROM raw_users WHERE deleted_at IS NULL',
   ['raw_users']::VARCHAR[],
   ['clean_users']::VARCHAR[],
   0);

-- Run the DAG (topological order, parallel within each layer)
SET orch_max_parallel = 4;
PRAGMA orch_run;

-- Inspect run history
SELECT task_name, status, retry_count FROM __orch__.runs ORDER BY started_at;

-- Get a Mermaid lineage graph
PRAGMA orch_visualize('lineage');

-- Pure helpers (auto-extract I/O from a SQL string)
SELECT orch_extract_io('INSERT INTO out SELECT * FROM a JOIN b ON a.id = b.id');
-- {"inputs":["a","b"],"outputs":["out"]}

About duckorch

duckorch is a DuckDB extension for DAG-based task orchestration, with table-level lineage emerging as a side product of each task's declared inputs and outputs.

Task definition (SQLMesh-style headers in plain .sql files):

-- @task name=user_stats
-- @outputs analytics.user_stats
-- @retries 2
-- @incremental_by updated_at
-- @test "SELECT COUNT(*) FROM analytics.user_stats WHERE users < 0" expect 0

CREATE OR REPLACE TABLE analytics.user_stats AS
SELECT country, COUNT(*) AS users
FROM analytics.clean_users
GROUP BY country;

Inputs are auto-extracted from the SQL via sqlparser-rs, so writing @inputs is optional in most cases.

Capabilities:

  • Directory-loaded task files (PRAGMA orch_register('./tasks/'))
  • Topological execution with per-layer parallelism (SET orch_max_parallel)
  • Exponential backoff retry, downstream skip on failure
  • Incremental processing with {{ last_processed_at }} / {{ now }} / {{ run_id }} placeholders
  • Per-task assertions via @test "<sql>" expect|expect_gt|expect_lt|expect_empty|expect_non_empty <n>
  • Mermaid output (lineage / dag / combined modes) — PRAGMA orch_visualize
  • OpenLineage event emission to Marquez / DataHub / etc. (SET orch_openlineage_url)
  • State tables: __orch__.tasks / runs / lineage_edges / task_edges tests / schedules

Companion CLI: duck-orch provides register / run / status / graph / test / validate / impact / lineage / schedule subcommands, all with --json for agent integration.

Architecture: thin C++ shim plus a Rust workspace (orch_common / orch_dag / orch_lineage / orch_runtime / orch_ol / orch_core), keeping all logic in Rust while the C++ layer handles DuckDB-internal calls (PRAGMAs, scalar functions, parallel dispatch).

Added Functions

function_name function_type description comment examples
orch_build_dag scalar NULL NULL  
orch_downstream_of scalar NULL NULL  
orch_extract_io scalar NULL NULL  
orch_hello scalar NULL NULL  
orch_init pragma NULL NULL  
orch_load_directory_json scalar NULL NULL  
orch_parse_task scalar NULL NULL  
orch_register pragma NULL NULL  
orch_render_mermaid scalar NULL NULL  
orch_run pragma NULL NULL  
orch_test pragma NULL NULL  
orch_visualize pragma NULL NULL  

Overloaded Functions

This extension does not add any function overloads.

Added Types

This extension does not add any types.

Added Settings

name description input_type scope aliases
orch_capture_interactive Capture column lineage for ad-hoc INSERT/CTAS queries via ParserExtension BOOLEAN GLOBAL []
orch_max_parallel Maximum parallel tasks per DAG layer BIGINT GLOBAL []
orch_namespace Job namespace for OpenLineage events VARCHAR GLOBAL []
orch_openlineage_api_key OpenLineage API key VARCHAR GLOBAL []
orch_openlineage_debug Log OpenLineage events to stderr BOOLEAN GLOBAL []
orch_openlineage_url OpenLineage backend URL (e.g. http://localhost:5000/api/v1/lineage) VARCHAR GLOBAL []