Pipelines (DAGs)
A pipeline in Calabi Pipelines is a Directed Acyclic Graph (DAG) — a Python file that describes a set of tasks and the order in which they must run. The "directed acyclic" property means data flows in one direction through the graph and there are no cycles, ensuring every pipeline has a clear start and a clear finish.
Each pipeline file defines:
- What work to do (tasks)
- When to do it (schedule)
- In what order (dependencies between tasks)
Anatomy of a Pipeline File
Every pipeline is a Python (.py) file placed in the dags/ folder of your Calabi Pipelines environment. The file must contain a DAG object; Calabi Pipelines discovers it automatically on the next parse cycle (typically within 30 seconds).
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
# Default arguments applied to every task unless overridden
default_args = {
"owner": "data-team",
"retries": 2,
"retry_delay": timedelta(minutes=5),
"email_on_failure": True,
"email": ["alerts@example.com"],
}
@dag(
dag_id="customer_orders_daily",
description="Load and transform customer orders every day at midnight UTC",
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
default_args=default_args,
tags=["orders", "finance"],
)
def customer_orders_pipeline():
"""
### Customer Orders Pipeline
Ingests raw order data, validates it, and writes to the warehouse.
"""
extract = BashOperator(
task_id="extract_orders",
bash_command="python /opt/scripts/extract_orders.py",
)
@task()
def validate_orders():
import pandas as pd
df = pd.read_parquet("/tmp/orders_raw.parquet")
assert df["order_id"].notnull().all(), "Null order IDs found"
return len(df)
@task()
def load_to_warehouse(row_count: int):
print(f"Loading {row_count} rows to the warehouse...")
# warehouse load logic here
validated = validate_orders()
extract >> validated >> load_to_warehouse(validated)
dag_instance = customer_orders_pipeline()
@dag decoratorThe @dag decorator (shown above) is the modern, recommended way to define pipelines. It keeps your pipeline configuration close to the code and avoids global state.
Task Types
Tasks are the individual units of work inside a pipeline. Calabi Pipelines provides several built-in task types:
| Task Type | Class / Decorator | Best For |
|---|---|---|
| Python function | @task / PythonOperator | Custom Python logic, data processing |
| Shell command | BashOperator | Running scripts, CLI tools |
| Kubernetes workload | KubernetesPodOperator | Containerised jobs, heavy compute |
| Sensor | HttpSensor, S3KeySensor, etc. | Waiting for external events or file arrivals |
| Empty / dummy | EmptyOperator | Grouping, fanout/fanin anchors |
| SQL | SQLExecuteQueryOperator | Running SQL directly against a connection |
PythonOperator / @task
Use @task for any Python callable. The decorated function is your task body.
from airflow.decorators import task
@task()
def compute_revenue(region: str) -> float:
# business logic
return 42_000.0
BashOperator
Run any shell command or script:
from airflow.operators.bash import BashOperator
run_script = BashOperator(
task_id="run_etl_script",
bash_command="python /opt/etl/run.py --date {{ ds }}",
)
{{ ds }} is a Calabi Pipelines macro that resolves to the pipeline run date as YYYY-MM-DD. Many task parameters support Jinja templates.
KubernetesPodOperator
Launch a Docker container as a one-off Kubernetes Pod. Ideal for heavy workloads or non-Python languages:
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
spark_job = KubernetesPodOperator(
task_id="run_spark_job",
name="spark-orders-agg",
namespace="calabi-pipelines",
image="my-registry/spark-job:latest",
cmds=["spark-submit"],
arguments=["--date", "{{ ds }}"],
get_logs=True,
)
Task Dependencies
Dependencies define the execution order. Use the >> (downstream) and << (upstream) operators:
# Linear chain: extract → transform → load
extract >> transform >> load
# Fan-out: one task triggers multiple tasks in parallel
extract >> [validate_schema, validate_counts]
# Fan-in: multiple tasks must all succeed before a final task
[validate_schema, validate_counts] >> publish
You can also chain across complex shapes:
from airflow.sdk import chain
chain(ingest, clean, [enrich_a, enrich_b], aggregate, publish)
Schedule Intervals
The schedule parameter controls when a pipeline runs automatically.
Cron Presets
| Preset | Equivalent Cron | Description |
|---|---|---|
@once | — | Run exactly one time |
@hourly | 0 * * * * | Every hour at :00 |
@daily | 0 0 * * * | Every day at midnight UTC |
@weekly | 0 0 * * 0 | Every Sunday at midnight UTC |
@monthly | 0 0 1 * * | First day of each month |
@yearly | 0 0 1 1 * | 1 January each year |
None | — | No automatic schedule (manual trigger only) |
Cron Expressions
For custom schedules use a standard 5-field cron expression:
┌───── minute (0-59)
│ ┌───── hour (0-23)
│ │ ┌───── day of month (1-31)
│ │ │ ┌───── month (1-12)
│ │ │ │ ┌───── day of week (0-6, Sun=0)
│ │ │ │ │
* * * * *
| Expression | Meaning |
|---|---|
0 6 * * 1-5 | Weekdays at 06:00 UTC |
*/15 * * * * | Every 15 minutes |
0 2 1 * * | 1st of each month at 02:00 UTC |
0 8,20 * * * | Twice daily at 08:00 and 20:00 UTC |
Key DAG Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
dag_id | string | required | Unique pipeline identifier |
schedule | string / None | None | Cron expression or preset |
start_date | datetime | required | Earliest date to schedule runs from |
catchup | bool | False | Backfill missed runs since start_date |
max_active_runs | int | 16 | Max concurrent pipeline runs |
max_active_tasks | int | 16 | Max concurrent tasks in a single run |
default_args | dict | {} | Args inherited by all tasks |
tags | list | [] | Labels for filtering in the UI |
dagrun_timeout | timedelta | None | Fail run if it exceeds this duration |
on_failure_callback | callable | None | Function called on pipeline failure |
catchup=False by defaultIf catchup=True and your start_date is in the past, Calabi Pipelines will immediately queue one run for every missed interval. Always be intentional about this setting.
Full Example Pipeline
The following pipeline fetches daily sales data, runs quality checks in parallel, and then publishes a summary report:
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
default_args = {
"owner": "analytics",
"retries": 1,
"retry_delay": timedelta(minutes=10),
"email_on_failure": True,
"email": ["data-alerts@example.com"],
}
@dag(
dag_id="daily_sales_report",
description="Fetch, validate, and publish the daily sales summary",
schedule="0 5 * * *", # 05:00 UTC every day
start_date=datetime(2024, 6, 1),
catchup=False,
default_args=default_args,
tags=["sales", "reporting"],
)
def daily_sales_report():
start = EmptyOperator(task_id="start")
finish = EmptyOperator(task_id="finish")
fetch = BashOperator(
task_id="fetch_sales_data",
bash_command="python /opt/etl/fetch_sales.py --date {{ ds }}",
)
@task()
def check_row_count() -> int:
import pandas as pd
df = pd.read_parquet("/tmp/sales_{{ ds }}.parquet")
assert len(df) > 0, "No rows returned"
return len(df)
@task()
def check_nulls():
import pandas as pd
df = pd.read_parquet("/tmp/sales_{{ ds }}.parquet")
assert df[["order_id", "revenue"]].notnull().all().all()
@task()
def publish_report(row_count: int):
print(f"Publishing report for {{ ds }} — {row_count} rows")
# send to BI layer
rows = check_row_count()
nulls = check_nulls()
start >> fetch >> [rows, nulls] >> finish
finish >> publish_report(rows)
dag_instance = daily_sales_report()
Next Steps
- Triggering Pipelines — Run pipelines manually or via API
- Monitoring Pipelines — Inspect run state, logs, and task history
- Variables & Connections — Manage secrets and reusable config