Monitoring Pipelines
Calabi Pipelines provides several views and tools for understanding the health of your pipelines: the Grid view for run history, the Gantt chart for task-level timing, detailed task logs, SLA miss tracking, and configurable failure notifications. This page covers each in detail.
DAG and Run States
Every pipeline run and every task instance carries a state. Understanding states is the foundation of effective monitoring.
Run (DAGRun) States
| State | Colour | Description |
|---|---|---|
queued | Purple | Run is scheduled and waiting for a worker slot |
running | Lime green | At least one task is currently executing |
success | Green | All tasks completed without error |
failed | Red | One or more tasks failed and were not retried successfully |
upstream_failed | Orange | The run itself did not fail, but an upstream DAG it depends on did |
skipped | Light blue | Run was explicitly skipped by the scheduler (e.g., during backfill with skip=True) |
Task Instance States
| State | Colour | Description |
|---|---|---|
none | White | Task has not yet been evaluated |
scheduled | Tan | Task is eligible to run (all upstream done) |
queued | Purple | Submitted to the executor, waiting for a worker |
running | Lime green | Task is actively executing |
success | Green | Task completed successfully |
failed | Red | Task raised an exception or returned a non-zero exit code |
up_for_retry | Yellow | Task failed and will be retried after retry_delay |
up_for_reschedule | Cornflower blue | Sensor task is waiting and will be rescheduled |
skipped | Light blue | Task was intentionally skipped (e.g., a branch not taken) |
removed | Light grey | Task existed in a previous DAG version but was removed |
deferred | Violet | Task deferred execution to an async trigger |
Grid View
The Grid view is the primary monitoring interface. To access it, navigate to Calabi Pipelines → DAGs, click a pipeline name, and select the Grid tab.
DAG: customer_orders_daily
┌────────────────────────────────────────────────────────────┐
│ Task │ Apr 1 │ Apr 2 │ Apr 3 │ Apr 4 │ ...│
│ ─────────────────── ───────────────────────────────────── │
│ extract_orders │ │ │ │ │ │
│ validate_orders │ │ │ │ – │ │
│ load_to_warehouse │ │ │ │ – │ │
└────────────────────────────────────────────────────────────┘
- Each column represents one pipeline run (most recent on the right).
- Each row represents a task.
- Each cell is a task instance — coloured by state.
- Click any column header to see run-level metadata (start time, duration, trigger type).
- Click any cell to open a context menu with options to view logs, clear the task, or mark it as success.
Auto-Refresh
Enable the Auto-refresh toggle (top-right corner) to reload the Grid automatically every 10 seconds. Useful when monitoring a long-running pipeline in real time.
Filtering Runs
Use the date-range picker above the Grid to restrict the view to a time window. For pipelines with long history, filtering prevents the Grid from becoming cluttered.
Gantt Chart
The Gantt tab shows task-level timing for a single pipeline run, making it easy to spot bottlenecks.
To open the Gantt chart:
- In the Grid view, click the run column for the run you want to inspect.
- Select Gantt from the run detail panel, or click the Gantt tab at the top of the pipeline page and use the run selector.
Run: customer_orders_daily | 2026-04-04 00:00:00 UTC
───────────────────────────────────────────────────────────
extract_orders ████████████░░░░░░░░░░ 12.4s
validate_orders ░░░░░░░░░░░░████████░░ 8.1s (parallel)
validate_counts ░░░░░░░░░░░░██████████ 9.8s (parallel)
load_to_warehouse ░░░░░░░░░░░░░░░░░░░░░████████ 18.2s
───────────────────────────────────────────────────────────
Total wall time: 40.5s
The Gantt chart reveals:
- Sequential bottlenecks — long tasks that delay everything downstream
- Parallelism — tasks running simultaneously, shown as overlapping bars
- Scheduling lag — the gap between a task becoming eligible and actually starting (purple bar prefix)
If tasks show a long purple bar before execution begins, your worker pool may be undersized relative to task throughput. Consider increasing worker replicas.
Task Logs
Every task execution writes a full log. Logs are stored in object storage (S3) and streamed to the UI in real time.
Accessing Logs
From the Grid:
- Click the task cell you want to inspect.
- Click Log in the context menu, or select a specific attempt number if the task was retried.
From the task instance detail page:
- Click Task Instances in the pipeline detail view.
- Filter by date, state, or task ID.
- Click any row to open the log viewer.
Log Structure
Each task log includes:
[2026-04-04, 08:01:23 UTC] {taskinstance.py:1234} INFO - Dependencies all met for ...
[2026-04-04, 08:01:23 UTC] {taskinstance.py:1450} INFO - Starting attempt 1 of 3
[2026-04-04, 08:01:24 UTC] {process_utils.py:183} INFO - Running command: python ...
[2026-04-04, 08:01:24 UTC] {customer_orders.py:45} INFO - Extracted 12,847 rows
[2026-04-04, 08:01:39 UTC] {taskinstance.py:1700} INFO - Marking task as SUCCESS
Log Levels
| Level | When Used |
|---|---|
INFO | Normal progress — task started, record counts, step completions |
WARNING | Recoverable issues — deprecated API, slow network |
ERROR | Exceptions and failures |
CRITICAL | Unrecoverable failures causing task abort |
DEBUG | Verbose internals — enabled per-task for deep debugging |
Reading Retry Logs
If a task was retried, the log viewer shows numbered attempts (Attempt 1, Attempt 2, …). Each attempt has its own log. Always check the last attempt first; earlier attempts typically show the same root cause error.
SLA Misses
An SLA (Service Level Agreement) on a task defines the maximum time it may take from pipeline start_date for the task to complete. If the task has not finished within that window, Calabi Pipelines records an SLA miss and can notify your team.
Configuring SLAs
Set sla on individual task operators:
from datetime import timedelta
from airflow.operators.python import PythonOperator
load = PythonOperator(
task_id="load_to_warehouse",
python_callable=load_fn,
sla=timedelta(hours=2), # Must finish within 2 hours of DAG start_date
)
Or set a pipeline-level SLA via dagrun_timeout:
@dag(
dag_id="customer_orders_daily",
schedule="@daily",
start_date=datetime(2024, 1, 1),
dagrun_timeout=timedelta(hours=4), # Entire run must finish within 4 hours
catchup=False,
)
def pipeline(): ...
Viewing SLA Misses
Navigate to Browse → SLA Misses in the Calabi Pipelines UI. The table shows:
| Column | Description |
|---|---|
| Pipeline | DAG ID |
| Task | Task ID where the SLA was missed |
| Execution Date | The run logical date |
| Email Sent | Whether the SLA notification email was dispatched |
| Timestamp | When the miss was detected |
SLA Miss Callbacks
Define a callback to run custom logic when an SLA is missed:
def sla_miss_alert(dag, task_list, blocking_task_list, slas, blocking_tis):
message = f"SLA missed for tasks: {[t.task_id for t in task_list]}"
# send to Slack, PagerDuty, etc.
print(message)
@dag(
dag_id="customer_orders_daily",
sla_miss_callback=sla_miss_alert,
...
)
def pipeline(): ...
Alerts and Notifications
Email Alerts on Task Failure
Configure per-task email alerts in default_args:
default_args = {
"email": ["data-alerts@example.com"],
"email_on_failure": True,
"email_on_retry": False,
"email_on_success": False,
}
Callback Functions
For programmatic alerting (Slack, PagerDuty, custom webhooks), use operator callbacks:
def notify_slack_failure(context):
import requests
dag_id = context["dag"].dag_id
task_id = context["task_instance"].task_id
run_id = context["run_id"]
log_url = context["task_instance"].log_url
payload = {
"text": (
f":red_circle: *Task Failed* "
f"*Pipeline:* `{dag_id}` "
f"*Task:* `{task_id}` "
f"*Run:* `{run_id}` "
f"<{log_url}|View Logs>"
)
}
requests.post("https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK", json=payload)
@dag(
dag_id="customer_orders_daily",
on_failure_callback=notify_slack_failure,
...
)
def pipeline(): ...
The context dictionary passed to callbacks contains the full execution context including the task instance, DAG run, and exception details.
Common Callback Context Keys
| Key | Type | Description |
|---|---|---|
dag | DAG | The DAG object |
task_instance | TaskInstance | The task instance that failed |
run_id | str | The run identifier |
execution_date | datetime | The logical run date |
exception | Exception | The exception that caused the failure |
task_instance.log_url | str | Direct URL to the task log in the UI |
DAG Health Indicators
The DAGs list page shows at-a-glance health indicators for each pipeline:
| Indicator | Location | What It Shows |
|---|---|---|
| Run state badges | Right of pipeline name | Last 10 run outcomes (colour-coded dots) |
| Last run status | Last Run column | State of the most recent run |
| Next run time | Next Run column | When the scheduler will next trigger the pipeline |
| Schedule | Schedule column | Cron expression or preset |
| Active toggle | Left of pipeline name | Whether the scheduler is creating new runs |
Interpreting the Run History Strip
The coloured dot strip gives a quick visual summary:
● ● ● ● ● ● ● ● ●
green green green ... red (most recent failure)
A pattern of mostly green with one recent red indicates a transient failure — likely safe to retry. A pattern of alternating or consecutive reds indicates a structural problem requiring investigation.
Next Steps
- Variables & Connections — Manage credentials and configuration
- Troubleshooting Pipelines — Diagnose and resolve common failures