Pipeline Variables & Connections
Calabi Pipelines provides two mechanisms for externalising configuration from your pipeline code: Variables (a key-value store for runtime configuration) and Connections (a credential store for external systems). Using these constructs keeps secrets out of your DAG files and makes pipelines portable across environments.
Variables
What Are Variables?
A Variable is a named key-value pair stored in the Calabi Pipelines metadata database. Variables are encrypted at rest and masked in logs when the key name contains sensitive words (password, secret, api_key, token, etc.).
Variables are designed for:
- Environment-specific configuration (e.g., S3 bucket names, API endpoints)
- Feature flags (e.g.,
enable_new_loader=true) - Shared constants referenced by multiple pipelines
Managing Variables via the UI
- Navigate to Calabi Pipelines → Admin → Variables.
- Click + (Add Variable).
- Fill in:
- Key — The variable name (e.g.,
s3_output_bucket). - Value — The value (plain string or JSON).
- Description — Optional documentation.
- Key — The variable name (e.g.,
- Click Save.
To edit: click the pencil icon on the variable row. To delete: click the bin icon. Deletion is immediate and permanent.
Variables have no history or versioning. If you overwrite a variable value, the old value is gone. For sensitive secrets with audit requirements, use a dedicated secrets manager (AWS Secrets Manager, HashiCorp Vault) and configure Calabi to read from it.
Managing Variables via CLI
# Set a variable
airflow variables set s3_output_bucket my-data-bucket-prod
# Get a variable
airflow variables get s3_output_bucket
# Set a JSON variable
airflow variables set db_config '{"host": "db.example.com", "port": 5432}'
# List all variables
airflow variables list
# Export all variables to a JSON file
airflow variables export variables_backup.json
# Import variables from a JSON file
airflow variables import variables_backup.json
Reading Variables in Python
Method 1: Direct lookup (avoid in top-level DAG code)
from airflow.models import Variable
@task()
def process_data():
bucket = Variable.get("s3_output_bucket")
max_rows = Variable.get("max_rows_per_run", default_var=100_000)
config = Variable.get("db_config", deserialize_json=True)
print(f"Writing to s3://{bucket}")
print(f"DB host: {config['host']}")
Method 2: Jinja templating (preferred for operator params)
from airflow.operators.bash import BashOperator
run_script = BashOperator(
task_id="run_etl",
bash_command="python /opt/etl/run.py --bucket {{ var.value.s3_output_bucket }} --date {{ ds }}",
)
Method 3: Context-based access (inside @task functions)
@task()
def my_task(**context):
bucket = context["var"]["value"].get("s3_output_bucket")
config = context["var"]["json"].get("db_config")
Calling Variable.get() at the module level (outside of task functions or Jinja templates) causes Calabi Pipelines to open a database connection on every DAG parse cycle — which can occur every 30 seconds. This degrades scheduler performance. Always call Variable.get() inside task functions or use Jinja templates.
JSON Variables
Store structured configuration as JSON by passing a JSON string value:
Key: warehouse_config
Value: {"host": "dw.example.com", "database": "analytics", "schema": "raw"}
Deserialise in code:
config = Variable.get("warehouse_config", deserialize_json=True)
host = config["host"]
Connections
What Are Connections?
A Connection stores the credentials and endpoint details needed to connect to an external system — a database, cloud service, API, or messaging platform. Connections are stored encrypted in the Calabi Pipelines metadata database.
Operators that interact with external systems (e.g., PostgresOperator, S3Hook, HttpOperator) reference connections by their Connection ID (conn_id).
Managing Connections via the UI
- Navigate to Calabi Pipelines → Admin → Connections.
- Click + (Add Connection).
- Fill in the fields appropriate for the connection type:
| Field | Description |
|---|---|
| Connection ID | Unique identifier used in your DAG code (e.g., postgres_warehouse) |
| Connection Type | Select from dropdown (database type, S3, HTTP, Snowflake, etc.) |
| Host | Hostname or endpoint URL |
| Schema | Database name or schema |
| Login | Username |
| Password | Password or API key (encrypted at rest) |
| Port | Service port (e.g., 5432 for database connections) |
| Extra | JSON for additional options (e.g., SSL mode, region) |
- Click Test to verify connectivity.
- Click Save.
Connection Types and Extra JSON
Some connection types require additional parameters in the Extra field:
Database with SSL:
{"sslmode": "require"}
Snowflake:
{
"account": "myaccount.us-east-1",
"warehouse": "COMPUTE_WH",
"role": "TRANSFORMER",
"authenticator": "snowflake"
}
AWS S3:
{
"region_name": "us-east-1",
"aws_session_token": ""
}
HTTP with headers:
{"Authorization": "Bearer my-api-token"}
Managing Connections via CLI
# Add a database connection
airflow connections add postgres_warehouse \
--conn-type postgres \
--conn-host db.example.com \
--conn-login datauser \
--conn-password mysecretpassword \
--conn-schema analytics \
--conn-port 5432
# List connections
airflow connections list
# Delete a connection
airflow connections delete postgres_warehouse
# Export connections (passwords are masked)
airflow connections export connections_export.json
Using Connections in DAGs
Database connection example:
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
# Using the operator directly
run_query = PostgresOperator(
task_id="run_aggregate",
postgres_conn_id="postgres_warehouse", # References the stored Connection
sql="SELECT count(*) FROM orders WHERE date = '{{ ds }}'",
)
# Using the hook inside a @task function
@task()
def fetch_data():
hook = PostgresHook(postgres_conn_id="postgres_warehouse")
df = hook.get_pandas_df("SELECT * FROM customers LIMIT 1000")
return len(df)
S3 example:
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
@task()
def upload_to_s3():
hook = S3Hook(aws_conn_id="aws_default")
hook.load_file(
filename="/tmp/output.parquet",
key="data/orders/{{ ds }}/output.parquet",
bucket_name=Variable.get("s3_output_bucket"),
replace=True,
)
HTTP example:
from airflow.providers.http.operators.http import SimpleHttpOperator
call_api = SimpleHttpOperator(
task_id="trigger_downstream_api",
http_conn_id="my_api_connection",
endpoint="/api/v1/process",
method="POST",
data='{"date": "{{ ds }}"}',
headers={"Content-Type": "application/json"},
response_check=lambda response: response.status_code == 200,
)
Environment Variable Injection
As an alternative to storing connections in the UI, you can inject connections via environment variables using the prefix AIRFLOW_CONN_. The value must be a URI-encoded connection string:
export AIRFLOW_CONN_POSTGRES_WAREHOUSE="postgresql://datauser:mysecretpassword@db.example.com:5432/analytics"
This is useful in CI/CD deployments where secrets are managed externally.
Secrets Backends
For enterprise-grade secret management, Calabi Pipelines can be configured to retrieve Variables and Connections from an external secrets manager instead of (or in addition to) the metadata database.
| Backend | Description |
|---|---|
| AWS Secrets Manager | Secrets stored under a configurable prefix (e.g., airflow/variables/, airflow/connections/) |
| HashiCorp Vault | KV secrets engine with Calabi Pipelines Vault backend |
| GCP Secret Manager | For GCP-hosted deployments |
When a secrets backend is configured, Calabi Pipelines checks the external store first and falls back to the database. This allows you to manage the most sensitive credentials in your organisation's existing secrets management infrastructure.
Secrets backend configuration requires access to the Calabi Pipelines deployment settings. Contact your platform administrator to configure a secrets backend.
Best Practices Summary
| Practice | Recommendation |
|---|---|
| Secret storage | Use Connections for credentials, never hardcode in DAG files |
| Variable access | Use Jinja templates or access inside task functions, not at module level |
| JSON config | Store structured config as JSON Variables, deserialize in tasks |
| Naming conventions | Use snake_case for connection and variable IDs; prefix by environment (prod_, staging_) |
| Rotation | When rotating credentials, update the Connection in the UI; running tasks pick up changes on next execution |
| Audit | Review the Variables and Connections pages periodically; remove unused entries |
Next Steps
- Troubleshooting Pipelines — Debug common failures
- Monitoring Pipelines — Track run states and task logs