Skip to main content

Pipelines (DAGs)

Professional+

A pipeline in Calabi Pipelines is a Directed Acyclic Graph (DAG) — a Python file that describes a set of tasks and the order in which they must run. The "directed acyclic" property means data flows in one direction through the graph and there are no cycles, ensuring every pipeline has a clear start and a clear finish.

Each pipeline file defines:

  • What work to do (tasks)
  • When to do it (schedule)
  • In what order (dependencies between tasks)

Anatomy of a Pipeline File

Every pipeline is a Python (.py) file placed in the dags/ folder of your Calabi Pipelines environment. The file must contain a DAG object; Calabi Pipelines discovers it automatically on the next parse cycle (typically within 30 seconds).

from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

# Default arguments applied to every task unless overridden
default_args = {
"owner": "data-team",
"retries": 2,
"retry_delay": timedelta(minutes=5),
"email_on_failure": True,
"email": ["alerts@example.com"],
}

@dag(
dag_id="customer_orders_daily",
description="Load and transform customer orders every day at midnight UTC",
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
default_args=default_args,
tags=["orders", "finance"],
)
def customer_orders_pipeline():
"""
### Customer Orders Pipeline
Ingests raw order data, validates it, and writes to the warehouse.
"""

extract = BashOperator(
task_id="extract_orders",
bash_command="python /opt/scripts/extract_orders.py",
)

@task()
def validate_orders():
import pandas as pd
df = pd.read_parquet("/tmp/orders_raw.parquet")
assert df["order_id"].notnull().all(), "Null order IDs found"
return len(df)

@task()
def load_to_warehouse(row_count: int):
print(f"Loading {row_count} rows to the warehouse...")
# warehouse load logic here

validated = validate_orders()
extract >> validated >> load_to_warehouse(validated)

dag_instance = customer_orders_pipeline()
Use the @dag decorator

The @dag decorator (shown above) is the modern, recommended way to define pipelines. It keeps your pipeline configuration close to the code and avoids global state.

Task Types

Tasks are the individual units of work inside a pipeline. Calabi Pipelines provides several built-in task types:

Task TypeClass / DecoratorBest For
Python function@task / PythonOperatorCustom Python logic, data processing
Shell commandBashOperatorRunning scripts, CLI tools
Kubernetes workloadKubernetesPodOperatorContainerised jobs, heavy compute
SensorHttpSensor, S3KeySensor, etc.Waiting for external events or file arrivals
Empty / dummyEmptyOperatorGrouping, fanout/fanin anchors
SQLSQLExecuteQueryOperatorRunning SQL directly against a connection

PythonOperator / @task

Use @task for any Python callable. The decorated function is your task body.

from airflow.decorators import task

@task()
def compute_revenue(region: str) -> float:
# business logic
return 42_000.0

BashOperator

Run any shell command or script:

from airflow.operators.bash import BashOperator

run_script = BashOperator(
task_id="run_etl_script",
bash_command="python /opt/etl/run.py --date {{ ds }}",
)
Jinja templating

{{ ds }} is a Calabi Pipelines macro that resolves to the pipeline run date as YYYY-MM-DD. Many task parameters support Jinja templates.

KubernetesPodOperator

Launch a Docker container as a one-off Kubernetes Pod. Ideal for heavy workloads or non-Python languages:

from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

spark_job = KubernetesPodOperator(
task_id="run_spark_job",
name="spark-orders-agg",
namespace="calabi-pipelines",
image="my-registry/spark-job:latest",
cmds=["spark-submit"],
arguments=["--date", "{{ ds }}"],
get_logs=True,
)

Task Dependencies

Dependencies define the execution order. Use the >> (downstream) and << (upstream) operators:

# Linear chain: extract → transform → load
extract >> transform >> load

# Fan-out: one task triggers multiple tasks in parallel
extract >> [validate_schema, validate_counts]

# Fan-in: multiple tasks must all succeed before a final task
[validate_schema, validate_counts] >> publish

You can also chain across complex shapes:

from airflow.sdk import chain

chain(ingest, clean, [enrich_a, enrich_b], aggregate, publish)

Schedule Intervals

The schedule parameter controls when a pipeline runs automatically.

Cron Presets

PresetEquivalent CronDescription
@onceRun exactly one time
@hourly0 * * * *Every hour at :00
@daily0 0 * * *Every day at midnight UTC
@weekly0 0 * * 0Every Sunday at midnight UTC
@monthly0 0 1 * *First day of each month
@yearly0 0 1 1 *1 January each year
NoneNo automatic schedule (manual trigger only)

Cron Expressions

For custom schedules use a standard 5-field cron expression:

┌───── minute (0-59)
│ ┌───── hour (0-23)
│ │ ┌───── day of month (1-31)
│ │ │ ┌───── month (1-12)
│ │ │ │ ┌───── day of week (0-6, Sun=0)
│ │ │ │ │
* * * * *
ExpressionMeaning
0 6 * * 1-5Weekdays at 06:00 UTC
*/15 * * * *Every 15 minutes
0 2 1 * *1st of each month at 02:00 UTC
0 8,20 * * *Twice daily at 08:00 and 20:00 UTC

Key DAG Parameters

ParameterTypeDefaultDescription
dag_idstringrequiredUnique pipeline identifier
schedulestring / NoneNoneCron expression or preset
start_datedatetimerequiredEarliest date to schedule runs from
catchupboolFalseBackfill missed runs since start_date
max_active_runsint16Max concurrent pipeline runs
max_active_tasksint16Max concurrent tasks in a single run
default_argsdict{}Args inherited by all tasks
tagslist[]Labels for filtering in the UI
dagrun_timeouttimedeltaNoneFail run if it exceeds this duration
on_failure_callbackcallableNoneFunction called on pipeline failure
Set catchup=False by default

If catchup=True and your start_date is in the past, Calabi Pipelines will immediately queue one run for every missed interval. Always be intentional about this setting.

Full Example Pipeline

The following pipeline fetches daily sales data, runs quality checks in parallel, and then publishes a summary report:

from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator

default_args = {
"owner": "analytics",
"retries": 1,
"retry_delay": timedelta(minutes=10),
"email_on_failure": True,
"email": ["data-alerts@example.com"],
}

@dag(
dag_id="daily_sales_report",
description="Fetch, validate, and publish the daily sales summary",
schedule="0 5 * * *", # 05:00 UTC every day
start_date=datetime(2024, 6, 1),
catchup=False,
default_args=default_args,
tags=["sales", "reporting"],
)
def daily_sales_report():

start = EmptyOperator(task_id="start")
finish = EmptyOperator(task_id="finish")

fetch = BashOperator(
task_id="fetch_sales_data",
bash_command="python /opt/etl/fetch_sales.py --date {{ ds }}",
)

@task()
def check_row_count() -> int:
import pandas as pd
df = pd.read_parquet("/tmp/sales_{{ ds }}.parquet")
assert len(df) > 0, "No rows returned"
return len(df)

@task()
def check_nulls():
import pandas as pd
df = pd.read_parquet("/tmp/sales_{{ ds }}.parquet")
assert df[["order_id", "revenue"]].notnull().all().all()

@task()
def publish_report(row_count: int):
print(f"Publishing report for {{ ds }} — {row_count} rows")
# send to BI layer

rows = check_row_count()
nulls = check_nulls()

start >> fetch >> [rows, nulls] >> finish
finish >> publish_report(rows)

dag_instance = daily_sales_report()

Next Steps