Apache Airflow Cheat Sheet
Overview
Apache Airflow is an open-source platform for programmatically authoring, scheduling, and monitoring workflows. Originally developed at Airbnb in 2014, it became an Apache top-level project and is now the industry standard for data pipeline orchestration. Airflow represents workflows as Directed Acyclic Graphs (DAGs) defined in Python, giving engineers full programmatic control over pipeline logic, dependencies, and scheduling.
Airflow’s architecture consists of a scheduler that triggers tasks, an executor that runs them (local, Celery, or Kubernetes), a metadata database that stores state, and a web UI for monitoring. It supports a rich ecosystem of provider packages for connecting to cloud services, databases, APIs, and more. With features like dynamic DAG generation, XCom for inter-task communication, and a powerful plugin system, Airflow handles everything from simple cron-like schedules to complex multi-stage data pipelines with conditional branching and retry logic.
Installation
Install via pip
# Set Airflow home directory
export AIRFLOW_HOME=~/airflow
# Install Apache Airflow with constraints
AIRFLOW_VERSION=2.9.0
PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
# Initialize the database
airflow db init
# Create an admin user
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com \
--password admin
# Start the webserver and scheduler
airflow webserver --port 8080 &
airflow scheduler &
Install via Docker Compose
# Download the official docker-compose file
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.9.0/docker-compose.yaml'
# Create required directories
mkdir -p ./dags ./logs ./plugins ./config
echo -e "AIRFLOW_UID=$(id -u)" > .env
# Initialize Airflow
docker compose up airflow-init
# Start all services
docker compose up -d
Install with Helm on Kubernetes
# Add the Airflow Helm chart repo
helm repo add apache-airflow https://airflow.apache.org
helm repo update
# Install Airflow
helm install airflow apache-airflow/airflow \
--namespace airflow \
--create-namespace \
--set executor=KubernetesExecutor
Core Concepts
Basic DAG Definition
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email_on_failure': True,
'email': ['alerts@example.com'],
'retries': 3,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=1),
}
with DAG(
dag_id='etl_pipeline',
default_args=default_args,
description='Daily ETL pipeline',
schedule='0 6 * * *', # Daily at 6 AM
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['etl', 'production'],
max_active_runs=1,
) as dag:
extract = BashOperator(
task_id='extract_data',
bash_command='python /scripts/extract.py',
)
transform = PythonOperator(
task_id='transform_data',
python_callable=lambda: print("Transforming..."),
)
load = BashOperator(
task_id='load_data',
bash_command='python /scripts/load.py',
)
extract >> transform >> load
TaskFlow API (Airflow 2.x)
from airflow.decorators import dag, task
from datetime import datetime
@dag(
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['taskflow'],
)
def etl_taskflow():
@task()
def extract():
return {"data": [1, 2, 3]}
@task()
def transform(raw_data: dict):
return {"transformed": [x * 2 for x in raw_data["data"]]}
@task()
def load(transformed_data: dict):
print(f"Loading: {transformed_data}")
raw = extract()
transformed = transform(raw)
load(transformed)
etl_taskflow()
CLI Commands
| Command | Description |
|---|---|
airflow dags list | List all registered DAGs |
airflow dags trigger <dag_id> | Trigger a DAG run |
airflow dags pause <dag_id> | Pause a DAG |
airflow dags unpause <dag_id> | Unpause a DAG |
airflow dags test <dag_id> <date> | Test a DAG without writing to DB |
airflow tasks list <dag_id> | List tasks in a DAG |
airflow tasks test <dag_id> <task_id> <date> | Test a single task |
airflow tasks run <dag_id> <task_id> <date> | Run a single task |
airflow dags backfill <dag_id> -s <start> -e <end> | Backfill DAG runs |
airflow db check | Check database connectivity |
airflow db upgrade | Upgrade the metadata database schema |
airflow db reset | Reset the metadata database |
airflow connections list | List all configured connections |
airflow connections add <conn_id> --conn-uri <uri> | Add a connection |
airflow variables set <key> <value> | Set an Airflow variable |
airflow variables get <key> | Get an Airflow variable |
airflow pools list | List all resource pools |
airflow pools set <name> <slots> <description> | Create/update a pool |
airflow config list | List current configuration |
airflow info | Show Airflow environment info |
Configuration
airflow.cfg Key Settings
[core]
dags_folder = /opt/airflow/dags
executor = CeleryExecutor
load_examples = False
default_timezone = UTC
max_active_runs_per_dag = 16
parallelism = 32
dag_file_processor_timeout = 150
[webserver]
web_server_port = 8080
rbac = True
expose_config = False
[scheduler]
min_file_process_interval = 30
dag_dir_list_interval = 300
parsing_processes = 4
[celery]
broker_url = redis://redis:6379/0
result_backend = db+postgresql://airflow:airflow@postgres/airflow
[logging]
base_log_folder = /opt/airflow/logs
remote_logging = True
remote_log_conn_id = aws_default
remote_base_log_folder = s3://my-bucket/airflow/logs
Connection Configuration
# Add connections via CLI
airflow connections add 'postgres_default' \
--conn-type 'postgres' \
--conn-host 'db.example.com' \
--conn-port 5432 \
--conn-login 'user' \
--conn-password 'pass' \
--conn-schema 'mydb'
airflow connections add 'aws_default' \
--conn-type 'aws' \
--conn-extra '{"region_name": "us-east-1"}'
airflow connections add 'gcp_default' \
--conn-type 'google_cloud_platform' \
--conn-extra '{"key_path": "/path/to/keyfile.json", "project": "my-project"}'
Advanced Usage
Dynamic DAG Generation
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
databases = ['users', 'orders', 'products', 'inventory']
for db in databases:
dag_id = f'sync_{db}'
with DAG(
dag_id=dag_id,
schedule='@hourly',
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
sync = BashOperator(
task_id=f'sync_{db}_task',
bash_command=f'python /scripts/sync.py --database {db}',
)
globals()[dag_id] = dag
Branching and Conditional Logic
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator
def choose_branch(**context):
execution_date = context['logical_date']
if execution_date.weekday() < 5:
return 'weekday_processing'
return 'weekend_processing'
branch = BranchPythonOperator(
task_id='branch_check',
python_callable=choose_branch,
)
weekday = EmptyOperator(task_id='weekday_processing')
weekend = EmptyOperator(task_id='weekend_processing')
join = EmptyOperator(task_id='join', trigger_rule='none_failed_min_one_success')
branch >> [weekday, weekend] >> join
Custom Sensors and Operators
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
class FileSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, filepath, *args, **kwargs):
super().__init__(*args, **kwargs)
self.filepath = filepath
def poke(self, context):
import os
return os.path.exists(self.filepath)
wait_for_file = FileSensor(
task_id='wait_for_file',
filepath='/data/incoming/report.csv',
poke_interval=60,
timeout=3600,
mode='reschedule',
)
Task Groups
from airflow.utils.task_group import TaskGroup
with DAG('grouped_dag', schedule='@daily', start_date=datetime(2024, 1, 1)) as dag:
with TaskGroup('extract_group') as extract:
extract_users = BashOperator(task_id='users', bash_command='echo users')
extract_orders = BashOperator(task_id='orders', bash_command='echo orders')
with TaskGroup('transform_group') as transform:
transform_data = BashOperator(task_id='transform', bash_command='echo transform')
extract >> transform
Troubleshooting
| Issue | Solution |
|---|---|
| DAG not appearing in UI | Check for Python syntax errors: python /path/to/dag.py. Ensure file is in dags_folder |
| Tasks stuck in queued state | Check executor capacity, pool slots, and worker availability. Verify parallelism settings |
| Database connection errors | Verify sql_alchemy_conn in config. Run airflow db check |
| Scheduler not picking up DAGs | Increase min_file_process_interval. Check scheduler logs |
| XCom too large | Use external storage (S3/GCS) for large data. XCom is for metadata only |
| Import errors in DAGs | Install missing providers: pip install apache-airflow-providers-* |
| Webserver 502 errors | Increase web_server_worker_timeout. Check memory usage |
| Tasks timing out | Adjust execution_timeout in default_args or per-task |
| Zombie tasks detected | The scheduler auto-cleans these. Check worker health if persistent |
| Permission denied on logs | Ensure AIRFLOW_UID matches container user. Fix with chmod -R 777 ./logs |