Skip to main content

Monitoring Pipelines

Professional+

Calabi Pipelines provides several views and tools for understanding the health of your pipelines: the Grid view for run history, the Gantt chart for task-level timing, detailed task logs, SLA miss tracking, and configurable failure notifications. This page covers each in detail.


DAG and Run States

Every pipeline run and every task instance carries a state. Understanding states is the foundation of effective monitoring.

Run (DAGRun) States

StateColourDescription
queuedPurpleRun is scheduled and waiting for a worker slot
runningLime greenAt least one task is currently executing
successGreenAll tasks completed without error
failedRedOne or more tasks failed and were not retried successfully
upstream_failedOrangeThe run itself did not fail, but an upstream DAG it depends on did
skippedLight blueRun was explicitly skipped by the scheduler (e.g., during backfill with skip=True)

Task Instance States

StateColourDescription
noneWhiteTask has not yet been evaluated
scheduledTanTask is eligible to run (all upstream done)
queuedPurpleSubmitted to the executor, waiting for a worker
runningLime greenTask is actively executing
successGreenTask completed successfully
failedRedTask raised an exception or returned a non-zero exit code
up_for_retryYellowTask failed and will be retried after retry_delay
up_for_rescheduleCornflower blueSensor task is waiting and will be rescheduled
skippedLight blueTask was intentionally skipped (e.g., a branch not taken)
removedLight greyTask existed in a previous DAG version but was removed
deferredVioletTask deferred execution to an async trigger

Grid View

The Grid view is the primary monitoring interface. To access it, navigate to Calabi Pipelines → DAGs, click a pipeline name, and select the Grid tab.

DAG: customer_orders_daily
┌────────────────────────────────────────────────────────────┐
│ Task │ Apr 1 │ Apr 2 │ Apr 3 │ Apr 4 │ ...│
│ ─────────────────── ───────────────────────────────────── │
│ extract_orders │ │ │ │ │ │
│ validate_orders │ │ │ │ – │ │
│ load_to_warehouse │ │ │ │ – │ │
└────────────────────────────────────────────────────────────┘
  • Each column represents one pipeline run (most recent on the right).
  • Each row represents a task.
  • Each cell is a task instance — coloured by state.
  • Click any column header to see run-level metadata (start time, duration, trigger type).
  • Click any cell to open a context menu with options to view logs, clear the task, or mark it as success.

Auto-Refresh

Enable the Auto-refresh toggle (top-right corner) to reload the Grid automatically every 10 seconds. Useful when monitoring a long-running pipeline in real time.

Filtering Runs

Use the date-range picker above the Grid to restrict the view to a time window. For pipelines with long history, filtering prevents the Grid from becoming cluttered.


Gantt Chart

The Gantt tab shows task-level timing for a single pipeline run, making it easy to spot bottlenecks.

To open the Gantt chart:

  1. In the Grid view, click the run column for the run you want to inspect.
  2. Select Gantt from the run detail panel, or click the Gantt tab at the top of the pipeline page and use the run selector.
Run: customer_orders_daily | 2026-04-04 00:00:00 UTC
───────────────────────────────────────────────────────────
extract_orders ████████████░░░░░░░░░░ 12.4s
validate_orders ░░░░░░░░░░░░████████░░ 8.1s (parallel)
validate_counts ░░░░░░░░░░░░██████████ 9.8s (parallel)
load_to_warehouse ░░░░░░░░░░░░░░░░░░░░░████████ 18.2s
───────────────────────────────────────────────────────────
Total wall time: 40.5s

The Gantt chart reveals:

  • Sequential bottlenecks — long tasks that delay everything downstream
  • Parallelism — tasks running simultaneously, shown as overlapping bars
  • Scheduling lag — the gap between a task becoming eligible and actually starting (purple bar prefix)
Look for scheduling lag

If tasks show a long purple bar before execution begins, your worker pool may be undersized relative to task throughput. Consider increasing worker replicas.


Task Logs

Every task execution writes a full log. Logs are stored in object storage (S3) and streamed to the UI in real time.

Accessing Logs

From the Grid:

  1. Click the task cell you want to inspect.
  2. Click Log in the context menu, or select a specific attempt number if the task was retried.

From the task instance detail page:

  1. Click Task Instances in the pipeline detail view.
  2. Filter by date, state, or task ID.
  3. Click any row to open the log viewer.

Log Structure

Each task log includes:

[2026-04-04, 08:01:23 UTC] {taskinstance.py:1234} INFO - Dependencies all met for ...
[2026-04-04, 08:01:23 UTC] {taskinstance.py:1450} INFO - Starting attempt 1 of 3
[2026-04-04, 08:01:24 UTC] {process_utils.py:183} INFO - Running command: python ...
[2026-04-04, 08:01:24 UTC] {customer_orders.py:45} INFO - Extracted 12,847 rows
[2026-04-04, 08:01:39 UTC] {taskinstance.py:1700} INFO - Marking task as SUCCESS

Log Levels

LevelWhen Used
INFONormal progress — task started, record counts, step completions
WARNINGRecoverable issues — deprecated API, slow network
ERRORExceptions and failures
CRITICALUnrecoverable failures causing task abort
DEBUGVerbose internals — enabled per-task for deep debugging

Reading Retry Logs

If a task was retried, the log viewer shows numbered attempts (Attempt 1, Attempt 2, …). Each attempt has its own log. Always check the last attempt first; earlier attempts typically show the same root cause error.


SLA Misses

An SLA (Service Level Agreement) on a task defines the maximum time it may take from pipeline start_date for the task to complete. If the task has not finished within that window, Calabi Pipelines records an SLA miss and can notify your team.

Configuring SLAs

Set sla on individual task operators:

from datetime import timedelta
from airflow.operators.python import PythonOperator

load = PythonOperator(
task_id="load_to_warehouse",
python_callable=load_fn,
sla=timedelta(hours=2), # Must finish within 2 hours of DAG start_date
)

Or set a pipeline-level SLA via dagrun_timeout:

@dag(
dag_id="customer_orders_daily",
schedule="@daily",
start_date=datetime(2024, 1, 1),
dagrun_timeout=timedelta(hours=4), # Entire run must finish within 4 hours
catchup=False,
)
def pipeline(): ...

Viewing SLA Misses

Navigate to Browse → SLA Misses in the Calabi Pipelines UI. The table shows:

ColumnDescription
PipelineDAG ID
TaskTask ID where the SLA was missed
Execution DateThe run logical date
Email SentWhether the SLA notification email was dispatched
TimestampWhen the miss was detected

SLA Miss Callbacks

Define a callback to run custom logic when an SLA is missed:

def sla_miss_alert(dag, task_list, blocking_task_list, slas, blocking_tis):
message = f"SLA missed for tasks: {[t.task_id for t in task_list]}"
# send to Slack, PagerDuty, etc.
print(message)

@dag(
dag_id="customer_orders_daily",
sla_miss_callback=sla_miss_alert,
...
)
def pipeline(): ...

Alerts and Notifications

Email Alerts on Task Failure

Configure per-task email alerts in default_args:

default_args = {
"email": ["data-alerts@example.com"],
"email_on_failure": True,
"email_on_retry": False,
"email_on_success": False,
}

Callback Functions

For programmatic alerting (Slack, PagerDuty, custom webhooks), use operator callbacks:

def notify_slack_failure(context):
import requests
dag_id = context["dag"].dag_id
task_id = context["task_instance"].task_id
run_id = context["run_id"]
log_url = context["task_instance"].log_url

payload = {
"text": (
f":red_circle: *Task Failed* "
f"*Pipeline:* `{dag_id}` "
f"*Task:* `{task_id}` "
f"*Run:* `{run_id}` "
f"<{log_url}|View Logs>"
)
}
requests.post("https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK", json=payload)


@dag(
dag_id="customer_orders_daily",
on_failure_callback=notify_slack_failure,
...
)
def pipeline(): ...

The context dictionary passed to callbacks contains the full execution context including the task instance, DAG run, and exception details.

Common Callback Context Keys

KeyTypeDescription
dagDAGThe DAG object
task_instanceTaskInstanceThe task instance that failed
run_idstrThe run identifier
execution_datedatetimeThe logical run date
exceptionExceptionThe exception that caused the failure
task_instance.log_urlstrDirect URL to the task log in the UI

DAG Health Indicators

The DAGs list page shows at-a-glance health indicators for each pipeline:

IndicatorLocationWhat It Shows
Run state badgesRight of pipeline nameLast 10 run outcomes (colour-coded dots)
Last run statusLast Run columnState of the most recent run
Next run timeNext Run columnWhen the scheduler will next trigger the pipeline
ScheduleSchedule columnCron expression or preset
Active toggleLeft of pipeline nameWhether the scheduler is creating new runs

Interpreting the Run History Strip

The coloured dot strip gives a quick visual summary:

● ● ● ● ● ● ● ●  ●
green green green ... red (most recent failure)

A pattern of mostly green with one recent red indicates a transient failure — likely safe to retry. A pattern of alternating or consecutive reds indicates a structural problem requiring investigation.


Next Steps