Skip to main content

Pipeline Variables & Connections

Professional+

Calabi Pipelines provides two mechanisms for externalising configuration from your pipeline code: Variables (a key-value store for runtime configuration) and Connections (a credential store for external systems). Using these constructs keeps secrets out of your DAG files and makes pipelines portable across environments.


Variables

What Are Variables?

A Variable is a named key-value pair stored in the Calabi Pipelines metadata database. Variables are encrypted at rest and masked in logs when the key name contains sensitive words (password, secret, api_key, token, etc.).

Variables are designed for:

  • Environment-specific configuration (e.g., S3 bucket names, API endpoints)
  • Feature flags (e.g., enable_new_loader=true)
  • Shared constants referenced by multiple pipelines

Managing Variables via the UI

  1. Navigate to Calabi Pipelines → Admin → Variables.
  2. Click + (Add Variable).
  3. Fill in:
    • Key — The variable name (e.g., s3_output_bucket).
    • Value — The value (plain string or JSON).
    • Description — Optional documentation.
  4. Click Save.

To edit: click the pencil icon on the variable row. To delete: click the bin icon. Deletion is immediate and permanent.

No versioning for variables

Variables have no history or versioning. If you overwrite a variable value, the old value is gone. For sensitive secrets with audit requirements, use a dedicated secrets manager (AWS Secrets Manager, HashiCorp Vault) and configure Calabi to read from it.

Managing Variables via CLI

# Set a variable
airflow variables set s3_output_bucket my-data-bucket-prod

# Get a variable
airflow variables get s3_output_bucket

# Set a JSON variable
airflow variables set db_config '{"host": "db.example.com", "port": 5432}'

# List all variables
airflow variables list

# Export all variables to a JSON file
airflow variables export variables_backup.json

# Import variables from a JSON file
airflow variables import variables_backup.json

Reading Variables in Python

Method 1: Direct lookup (avoid in top-level DAG code)

from airflow.models import Variable

@task()
def process_data():
bucket = Variable.get("s3_output_bucket")
max_rows = Variable.get("max_rows_per_run", default_var=100_000)
config = Variable.get("db_config", deserialize_json=True)

print(f"Writing to s3://{bucket}")
print(f"DB host: {config['host']}")

Method 2: Jinja templating (preferred for operator params)

from airflow.operators.bash import BashOperator

run_script = BashOperator(
task_id="run_etl",
bash_command="python /opt/etl/run.py --bucket {{ var.value.s3_output_bucket }} --date {{ ds }}",
)

Method 3: Context-based access (inside @task functions)

@task()
def my_task(**context):
bucket = context["var"]["value"].get("s3_output_bucket")
config = context["var"]["json"].get("db_config")
Avoid top-level Variable.get() calls

Calling Variable.get() at the module level (outside of task functions or Jinja templates) causes Calabi Pipelines to open a database connection on every DAG parse cycle — which can occur every 30 seconds. This degrades scheduler performance. Always call Variable.get() inside task functions or use Jinja templates.

JSON Variables

Store structured configuration as JSON by passing a JSON string value:

Key:   warehouse_config
Value: {"host": "dw.example.com", "database": "analytics", "schema": "raw"}

Deserialise in code:

config = Variable.get("warehouse_config", deserialize_json=True)
host = config["host"]

Connections

What Are Connections?

A Connection stores the credentials and endpoint details needed to connect to an external system — a database, cloud service, API, or messaging platform. Connections are stored encrypted in the Calabi Pipelines metadata database.

Operators that interact with external systems (e.g., PostgresOperator, S3Hook, HttpOperator) reference connections by their Connection ID (conn_id).

Managing Connections via the UI

  1. Navigate to Calabi Pipelines → Admin → Connections.
  2. Click + (Add Connection).
  3. Fill in the fields appropriate for the connection type:
FieldDescription
Connection IDUnique identifier used in your DAG code (e.g., postgres_warehouse)
Connection TypeSelect from dropdown (database type, S3, HTTP, Snowflake, etc.)
HostHostname or endpoint URL
SchemaDatabase name or schema
LoginUsername
PasswordPassword or API key (encrypted at rest)
PortService port (e.g., 5432 for database connections)
ExtraJSON for additional options (e.g., SSL mode, region)
  1. Click Test to verify connectivity.
  2. Click Save.

Connection Types and Extra JSON

Some connection types require additional parameters in the Extra field:

Database with SSL:

{"sslmode": "require"}

Snowflake:

{
"account": "myaccount.us-east-1",
"warehouse": "COMPUTE_WH",
"role": "TRANSFORMER",
"authenticator": "snowflake"
}

AWS S3:

{
"region_name": "us-east-1",
"aws_session_token": ""
}

HTTP with headers:

{"Authorization": "Bearer my-api-token"}

Managing Connections via CLI

# Add a database connection
airflow connections add postgres_warehouse \
--conn-type postgres \
--conn-host db.example.com \
--conn-login datauser \
--conn-password mysecretpassword \
--conn-schema analytics \
--conn-port 5432

# List connections
airflow connections list

# Delete a connection
airflow connections delete postgres_warehouse

# Export connections (passwords are masked)
airflow connections export connections_export.json

Using Connections in DAGs

Database connection example:

from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook

# Using the operator directly
run_query = PostgresOperator(
task_id="run_aggregate",
postgres_conn_id="postgres_warehouse", # References the stored Connection
sql="SELECT count(*) FROM orders WHERE date = '{{ ds }}'",
)

# Using the hook inside a @task function
@task()
def fetch_data():
hook = PostgresHook(postgres_conn_id="postgres_warehouse")
df = hook.get_pandas_df("SELECT * FROM customers LIMIT 1000")
return len(df)

S3 example:

from airflow.providers.amazon.aws.hooks.s3 import S3Hook

@task()
def upload_to_s3():
hook = S3Hook(aws_conn_id="aws_default")
hook.load_file(
filename="/tmp/output.parquet",
key="data/orders/{{ ds }}/output.parquet",
bucket_name=Variable.get("s3_output_bucket"),
replace=True,
)

HTTP example:

from airflow.providers.http.operators.http import SimpleHttpOperator

call_api = SimpleHttpOperator(
task_id="trigger_downstream_api",
http_conn_id="my_api_connection",
endpoint="/api/v1/process",
method="POST",
data='{"date": "{{ ds }}"}',
headers={"Content-Type": "application/json"},
response_check=lambda response: response.status_code == 200,
)

Environment Variable Injection

As an alternative to storing connections in the UI, you can inject connections via environment variables using the prefix AIRFLOW_CONN_. The value must be a URI-encoded connection string:

export AIRFLOW_CONN_POSTGRES_WAREHOUSE="postgresql://datauser:mysecretpassword@db.example.com:5432/analytics"

This is useful in CI/CD deployments where secrets are managed externally.


Secrets Backends

For enterprise-grade secret management, Calabi Pipelines can be configured to retrieve Variables and Connections from an external secrets manager instead of (or in addition to) the metadata database.

BackendDescription
AWS Secrets ManagerSecrets stored under a configurable prefix (e.g., airflow/variables/, airflow/connections/)
HashiCorp VaultKV secrets engine with Calabi Pipelines Vault backend
GCP Secret ManagerFor GCP-hosted deployments

When a secrets backend is configured, Calabi Pipelines checks the external store first and falls back to the database. This allows you to manage the most sensitive credentials in your organisation's existing secrets management infrastructure.

Configuring secrets backends

Secrets backend configuration requires access to the Calabi Pipelines deployment settings. Contact your platform administrator to configure a secrets backend.


Best Practices Summary

PracticeRecommendation
Secret storageUse Connections for credentials, never hardcode in DAG files
Variable accessUse Jinja templates or access inside task functions, not at module level
JSON configStore structured config as JSON Variables, deserialize in tasks
Naming conventionsUse snake_case for connection and variable IDs; prefix by environment (prod_, staging_)
RotationWhen rotating credentials, update the Connection in the UI; running tasks pick up changes on next execution
AuditReview the Variables and Connections pages periodically; remove unused entries

Next Steps