Skip to main content

Running Transformations

Professional+

Calabi Transform is executed on a schedule via Calabi Pipelines for production workloads. For development and ad-hoc operations, you can also trigger runs through the Calabi AI Agent or the CLI directly. This page covers all three execution paths, how to interpret run results, and patterns for incremental and full-refresh runs.


Execution Architecture

All Calabi Transform executions write run artifacts — manifests, run results, and logs — that are picked up by Calabi Catalogue to update model documentation, lineage, and data quality status.


Running via Calabi Pipelines (Production)

The standard production pattern uses a Calabi Pipelines DAG to orchestrate Calabi Transform runs on a schedule. This gives you full control over execution order, retry behaviour, alerting, and integration with upstream ingestion pipelines.

from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator

default_args = {
"owner": "data-engineering",
"retries": 1,
"retry_delay": timedelta(minutes=5),
"email_on_failure": True,
"email": ["data-alerts@example.com"],
}

@dag(
dag_id="calabi_transform_daily",
description="Run Calabi Transform models and tests after Calabi Connect syncs complete",
schedule="0 4 * * *", # 04:00 UTC daily — after overnight syncs
start_date=datetime(2024, 1, 1),
catchup=False,
default_args=default_args,
tags=["transform", "dbt", "daily"],
)
def calabi_transform_daily():

check_source_freshness = BashOperator(
task_id="check_source_freshness",
bash_command="cd /opt/dbt/calabi_project && dbt source freshness --profiles-dir /opt/dbt/profiles",
)

run_staging = BashOperator(
task_id="run_staging_models",
bash_command="cd /opt/dbt/calabi_project && dbt run --select staging --profiles-dir /opt/dbt/profiles",
)

run_intermediate = BashOperator(
task_id="run_intermediate_models",
bash_command="cd /opt/dbt/calabi_project && dbt run --select intermediate --profiles-dir /opt/dbt/profiles",
)

run_marts = BashOperator(
task_id="run_mart_models",
bash_command="cd /opt/dbt/calabi_project && dbt run --select marts --profiles-dir /opt/dbt/profiles",
)

test_models = BashOperator(
task_id="test_models",
bash_command="cd /opt/dbt/calabi_project && dbt test --profiles-dir /opt/dbt/profiles",
)

generate_docs = BashOperator(
task_id="generate_docs",
bash_command="cd /opt/dbt/calabi_project && dbt docs generate --profiles-dir /opt/dbt/profiles",
)

check_source_freshness >> run_staging >> run_intermediate >> run_marts >> test_models >> generate_docs

dag_instance = calabi_transform_daily()

Running Specific Model Subsets

Use Calabi Pipelines DAG parameters (conf) to pass a model selection to the run:

@task()
def run_selected_models(**context):
import subprocess
conf = context["dag_run"].conf or {}
select = conf.get("select", "staging intermediate marts")
result = subprocess.run(
f"cd /opt/dbt/calabi_project && dbt run --select {select} --profiles-dir /opt/dbt/profiles",
shell=True, capture_output=True, text=True,
)
if result.returncode != 0:
raise Exception(f"dbt run failed: {result.stderr}")
return result.stdout

Trigger from the UI with:

{"select": "marts.fct_orders+"}

Running via Calabi AI Agent

The Calabi AI Agent can trigger and monitor Calabi Transform runs conversationally. This is the fastest path for ad-hoc execution during development or incident response.

Example Agent Commands

  • "Run the fct_orders model and its dependencies"
  • "Run all staging models in the Calabi Transform project"
  • "Check source freshness for the Salesforce source"
  • "Run tests on the mart models and show me any failures"
  • "Do a full refresh of the fct_orders table"
  • "What models failed in yesterday's Calabi Transform run?"

The agent translates these instructions into the appropriate dbt CLI commands, executes them against your configured Calabi Transform environment, and returns a summary of results.


CLI Reference

When working locally or in a CI environment, use the dbt CLI directly.

Core Commands

# Run all models
dbt run

# Run a specific model
dbt run --select fct_orders

# Run a model and all its upstream dependencies
dbt run --select +fct_orders

# Run a model and all its downstream dependents
dbt run --select fct_orders+

# Run a model plus all upstream and downstream
dbt run --select +fct_orders+

# Run by tag
dbt run --select tag:finance

# Run a specific layer
dbt run --select staging
dbt run --select marts

# Run tests
dbt test
dbt test --select fct_orders
dbt test --select source:*

# Run models and then test them
dbt build --select fct_orders+

# Check source freshness
dbt source freshness

# Compile SQL without running (useful for debugging)
dbt compile --select fct_orders

# Generate documentation artifacts
dbt docs generate

# Serve documentation locally
dbt docs serve

Node Selection Syntax

SelectorMeaning
stg_ordersThat model only
+stg_ordersstg_orders and all its upstream models
stg_orders+stg_orders and all its downstream models
+stg_orders+stg_orders, all upstream, all downstream
staging.*All models in the staging subdirectory
tag:dailyAll models with the daily tag
source:jaffle_shopAll models that read from the jaffle_shop source
path:models/martsAll models in the marts path

Checking Run Results

In the Terminal

A successful dbt run prints a summary:

Running with dbt=1.8.0

Found 24 models, 48 tests, 6 sources

Concurrency: 4 threads (target='prod')

1 of 24 START sql view model staging.stg_orders ........................... [RUN]
1 of 24 OK created sql view model staging.stg_orders ...................... [CREATE VIEW in 1.24s]
2 of 24 START sql view model staging.stg_customers ........................ [RUN]
...
24 of 24 OK created sql table model marts.fct_orders ...................... [CREATE TABLE in 8.73s]

Finished running 24 models in 0 hours 1 minutes and 42.15 seconds (102.15s).

PASS=24 WARN=0 ERROR=0 SKIP=0 TOTAL=24

Run Result Codes

CodeMeaning
OKModel created or updated successfully
WARNModel created but with non-fatal warnings
ERRORModel failed to build — check the SQL or upstream dependency
SKIPModel skipped because an upstream dependency errored
PASSTest passed
FAILTest failed — check the failure output or --store-failures tables

In Calabi Catalogue

After each run, Calabi Transform writes a run_results.json artifact. Calabi Catalogue ingests this to update:

  • Model freshness timestamps
  • Test pass/fail status per column
  • Model execution duration history
  • Documentation currency indicator

Navigate to Calabi Catalogue → [Model Name] → Lineage / Quality to see the latest run status.


Incremental Models

Incremental models only process new or changed rows since the last run. This is critical for large fact tables where a full rebuild would take hours.

How Incremental Runs Work

  1. First run: Calabi Transform builds the full table (same as materialized='table').
  2. Subsequent runs: Only rows matching the {% if is_incremental() %} filter are processed and upserted (merged) into the existing table.
{{ config(
materialized='incremental',
unique_key='event_id',
) }}

select
event_id,
user_id,
event_type,
occurred_at,
properties
from {{ ref('stg_events') }}

{% if is_incremental() %}
where occurred_at > (select max(occurred_at) from {{ this }})
{% endif %}

Running Incremental Models

# Normal incremental run (only new rows)
dbt run --select fct_events

# Force a full rebuild, ignoring the incremental filter
dbt run --select fct_events --full-refresh

When to Full-Refresh

ScenarioAction
Schema change (new column added to model)--full-refresh
Logic change that affects historical rows--full-refresh
Corrupt or incorrect historical data--full-refresh
Normal scheduled runIncremental (no flag)
Full-refresh on large tables

A --full-refresh on a table with billions of rows will take significantly longer and consume more warehouse compute. Schedule full refreshes for off-peak hours and ensure your team is aware before triggering.


The dbt build Command

dbt build is a convenience command that runs models and tests for each node in dependency order, stopping at the first failure. This is the preferred command for production pipelines because it prevents downstream models from being built on top of failed upstream models.

# Build everything
dbt build

# Build a model and its dependencies with their tests
dbt build --select +fct_orders

# Build with full refresh
dbt build --select +fct_orders --full-refresh

Each node is tested before downstream nodes are built, catching issues early in the dependency chain.


Threads and Parallelism

Calabi Transform runs models in parallel using threads. The threads setting in profiles.yml controls the maximum number of concurrent model builds.

# profiles.yml
calabi_project:
target: prod
outputs:
prod:
type: snowflake
threads: 8 # Build up to 8 models simultaneously
account: "..."
...

Increase threads for projects with many independent models to reduce total run time. The practical limit depends on your warehouse's concurrency capacity.


Next Steps

  • Models — Understand model types and materialisation
  • Sources & Raw Data — Configure and test source freshness
  • Data Tests — Set up quality assertions on your models