Running Transformations
Calabi Transform is executed on a schedule via Calabi Pipelines for production workloads. For development and ad-hoc operations, you can also trigger runs through the Calabi AI Agent or the CLI directly. This page covers all three execution paths, how to interpret run results, and patterns for incremental and full-refresh runs.
Execution Architecture
All Calabi Transform executions write run artifacts — manifests, run results, and logs — that are picked up by Calabi Catalogue to update model documentation, lineage, and data quality status.
Running via Calabi Pipelines (Production)
The standard production pattern uses a Calabi Pipelines DAG to orchestrate Calabi Transform runs on a schedule. This gives you full control over execution order, retry behaviour, alerting, and integration with upstream ingestion pipelines.
Recommended DAG Pattern
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator
default_args = {
"owner": "data-engineering",
"retries": 1,
"retry_delay": timedelta(minutes=5),
"email_on_failure": True,
"email": ["data-alerts@example.com"],
}
@dag(
dag_id="calabi_transform_daily",
description="Run Calabi Transform models and tests after Calabi Connect syncs complete",
schedule="0 4 * * *", # 04:00 UTC daily — after overnight syncs
start_date=datetime(2024, 1, 1),
catchup=False,
default_args=default_args,
tags=["transform", "dbt", "daily"],
)
def calabi_transform_daily():
check_source_freshness = BashOperator(
task_id="check_source_freshness",
bash_command="cd /opt/dbt/calabi_project && dbt source freshness --profiles-dir /opt/dbt/profiles",
)
run_staging = BashOperator(
task_id="run_staging_models",
bash_command="cd /opt/dbt/calabi_project && dbt run --select staging --profiles-dir /opt/dbt/profiles",
)
run_intermediate = BashOperator(
task_id="run_intermediate_models",
bash_command="cd /opt/dbt/calabi_project && dbt run --select intermediate --profiles-dir /opt/dbt/profiles",
)
run_marts = BashOperator(
task_id="run_mart_models",
bash_command="cd /opt/dbt/calabi_project && dbt run --select marts --profiles-dir /opt/dbt/profiles",
)
test_models = BashOperator(
task_id="test_models",
bash_command="cd /opt/dbt/calabi_project && dbt test --profiles-dir /opt/dbt/profiles",
)
generate_docs = BashOperator(
task_id="generate_docs",
bash_command="cd /opt/dbt/calabi_project && dbt docs generate --profiles-dir /opt/dbt/profiles",
)
check_source_freshness >> run_staging >> run_intermediate >> run_marts >> test_models >> generate_docs
dag_instance = calabi_transform_daily()
Running Specific Model Subsets
Use Calabi Pipelines DAG parameters (conf) to pass a model selection to the run:
@task()
def run_selected_models(**context):
import subprocess
conf = context["dag_run"].conf or {}
select = conf.get("select", "staging intermediate marts")
result = subprocess.run(
f"cd /opt/dbt/calabi_project && dbt run --select {select} --profiles-dir /opt/dbt/profiles",
shell=True, capture_output=True, text=True,
)
if result.returncode != 0:
raise Exception(f"dbt run failed: {result.stderr}")
return result.stdout
Trigger from the UI with:
{"select": "marts.fct_orders+"}
Running via Calabi AI Agent
The Calabi AI Agent can trigger and monitor Calabi Transform runs conversationally. This is the fastest path for ad-hoc execution during development or incident response.
Example Agent Commands
- "Run the fct_orders model and its dependencies"
- "Run all staging models in the Calabi Transform project"
- "Check source freshness for the Salesforce source"
- "Run tests on the mart models and show me any failures"
- "Do a full refresh of the fct_orders table"
- "What models failed in yesterday's Calabi Transform run?"
The agent translates these instructions into the appropriate dbt CLI commands, executes them against your configured Calabi Transform environment, and returns a summary of results.
CLI Reference
When working locally or in a CI environment, use the dbt CLI directly.
Core Commands
# Run all models
dbt run
# Run a specific model
dbt run --select fct_orders
# Run a model and all its upstream dependencies
dbt run --select +fct_orders
# Run a model and all its downstream dependents
dbt run --select fct_orders+
# Run a model plus all upstream and downstream
dbt run --select +fct_orders+
# Run by tag
dbt run --select tag:finance
# Run a specific layer
dbt run --select staging
dbt run --select marts
# Run tests
dbt test
dbt test --select fct_orders
dbt test --select source:*
# Run models and then test them
dbt build --select fct_orders+
# Check source freshness
dbt source freshness
# Compile SQL without running (useful for debugging)
dbt compile --select fct_orders
# Generate documentation artifacts
dbt docs generate
# Serve documentation locally
dbt docs serve
Node Selection Syntax
| Selector | Meaning |
|---|---|
stg_orders | That model only |
+stg_orders | stg_orders and all its upstream models |
stg_orders+ | stg_orders and all its downstream models |
+stg_orders+ | stg_orders, all upstream, all downstream |
staging.* | All models in the staging subdirectory |
tag:daily | All models with the daily tag |
source:jaffle_shop | All models that read from the jaffle_shop source |
path:models/marts | All models in the marts path |
Checking Run Results
In the Terminal
A successful dbt run prints a summary:
Running with dbt=1.8.0
Found 24 models, 48 tests, 6 sources
Concurrency: 4 threads (target='prod')
1 of 24 START sql view model staging.stg_orders ........................... [RUN]
1 of 24 OK created sql view model staging.stg_orders ...................... [CREATE VIEW in 1.24s]
2 of 24 START sql view model staging.stg_customers ........................ [RUN]
...
24 of 24 OK created sql table model marts.fct_orders ...................... [CREATE TABLE in 8.73s]
Finished running 24 models in 0 hours 1 minutes and 42.15 seconds (102.15s).
PASS=24 WARN=0 ERROR=0 SKIP=0 TOTAL=24
Run Result Codes
| Code | Meaning |
|---|---|
OK | Model created or updated successfully |
WARN | Model created but with non-fatal warnings |
ERROR | Model failed to build — check the SQL or upstream dependency |
SKIP | Model skipped because an upstream dependency errored |
PASS | Test passed |
FAIL | Test failed — check the failure output or --store-failures tables |
In Calabi Catalogue
After each run, Calabi Transform writes a run_results.json artifact. Calabi Catalogue ingests this to update:
- Model freshness timestamps
- Test pass/fail status per column
- Model execution duration history
- Documentation currency indicator
Navigate to Calabi Catalogue → [Model Name] → Lineage / Quality to see the latest run status.
Incremental Models
Incremental models only process new or changed rows since the last run. This is critical for large fact tables where a full rebuild would take hours.
How Incremental Runs Work
- First run: Calabi Transform builds the full table (same as
materialized='table'). - Subsequent runs: Only rows matching the
{% if is_incremental() %}filter are processed and upserted (merged) into the existing table.
{{ config(
materialized='incremental',
unique_key='event_id',
) }}
select
event_id,
user_id,
event_type,
occurred_at,
properties
from {{ ref('stg_events') }}
{% if is_incremental() %}
where occurred_at > (select max(occurred_at) from {{ this }})
{% endif %}
Running Incremental Models
# Normal incremental run (only new rows)
dbt run --select fct_events
# Force a full rebuild, ignoring the incremental filter
dbt run --select fct_events --full-refresh
When to Full-Refresh
| Scenario | Action |
|---|---|
| Schema change (new column added to model) | --full-refresh |
| Logic change that affects historical rows | --full-refresh |
| Corrupt or incorrect historical data | --full-refresh |
| Normal scheduled run | Incremental (no flag) |
A --full-refresh on a table with billions of rows will take significantly longer and consume more warehouse compute. Schedule full refreshes for off-peak hours and ensure your team is aware before triggering.
The dbt build Command
dbt build is a convenience command that runs models and tests for each node in dependency order, stopping at the first failure. This is the preferred command for production pipelines because it prevents downstream models from being built on top of failed upstream models.
# Build everything
dbt build
# Build a model and its dependencies with their tests
dbt build --select +fct_orders
# Build with full refresh
dbt build --select +fct_orders --full-refresh
Each node is tested before downstream nodes are built, catching issues early in the dependency chain.
Threads and Parallelism
Calabi Transform runs models in parallel using threads. The threads setting in profiles.yml controls the maximum number of concurrent model builds.
# profiles.yml
calabi_project:
target: prod
outputs:
prod:
type: snowflake
threads: 8 # Build up to 8 models simultaneously
account: "..."
...
Increase threads for projects with many independent models to reduce total run time. The practical limit depends on your warehouse's concurrency capacity.
Next Steps
- Models — Understand model types and materialisation
- Sources & Raw Data — Configure and test source freshness
- Data Tests — Set up quality assertions on your models