تخطَّ إلى المحتوى

Apache Airflow Cheat Sheet

Overview

Apache Airflow is an open-source platform for programmatically authoring, scheduling, and monitoring workflows. Originally developed at Airbnb in 2014, it became an Apache top-level project and is now the industry standard for data pipeline orchestration. Airflow represents workflows as Directed Acyclic Graphs (DAGs) defined in Python, giving engineers full programmatic control over pipeline logic, dependencies, and scheduling.

Airflow’s architecture consists of a scheduler that triggers tasks, an executor that runs them (local, Celery, or Kubernetes), a metadata database that stores state, and a web UI for monitoring. It supports a rich ecosystem of provider packages for connecting to cloud services, databases, APIs, and more. With features like dynamic DAG generation, XCom for inter-task communication, and a powerful plugin system, Airflow handles everything from simple cron-like schedules to complex multi-stage data pipelines with conditional branching and retry logic.

Installation

Install via pip

# Set Airflow home directory
export AIRFLOW_HOME=~/airflow

# Install Apache Airflow with constraints
AIRFLOW_VERSION=2.9.0
PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

# Initialize the database
airflow db init

# Create an admin user
airflow users create \
    --username admin \
    --firstname Admin \
    --lastname User \
    --role Admin \
    --email admin@example.com \
    --password admin

# Start the webserver and scheduler
airflow webserver --port 8080 &
airflow scheduler &

Install via Docker Compose

# Download the official docker-compose file
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.9.0/docker-compose.yaml'

# Create required directories
mkdir -p ./dags ./logs ./plugins ./config
echo -e "AIRFLOW_UID=$(id -u)" > .env

# Initialize Airflow
docker compose up airflow-init

# Start all services
docker compose up -d

Install with Helm on Kubernetes

# Add the Airflow Helm chart repo
helm repo add apache-airflow https://airflow.apache.org
helm repo update

# Install Airflow
helm install airflow apache-airflow/airflow \
    --namespace airflow \
    --create-namespace \
    --set executor=KubernetesExecutor

Core Concepts

Basic DAG Definition

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'email': ['alerts@example.com'],
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(hours=1),
}

with DAG(
    dag_id='etl_pipeline',
    default_args=default_args,
    description='Daily ETL pipeline',
    schedule='0 6 * * *',  # Daily at 6 AM
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['etl', 'production'],
    max_active_runs=1,
) as dag:

    extract = BashOperator(
        task_id='extract_data',
        bash_command='python /scripts/extract.py',
    )

    transform = PythonOperator(
        task_id='transform_data',
        python_callable=lambda: print("Transforming..."),
    )

    load = BashOperator(
        task_id='load_data',
        bash_command='python /scripts/load.py',
    )

    extract >> transform >> load

TaskFlow API (Airflow 2.x)

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['taskflow'],
)
def etl_taskflow():

    @task()
    def extract():
        return {"data": [1, 2, 3]}

    @task()
    def transform(raw_data: dict):
        return {"transformed": [x * 2 for x in raw_data["data"]]}

    @task()
    def load(transformed_data: dict):
        print(f"Loading: {transformed_data}")

    raw = extract()
    transformed = transform(raw)
    load(transformed)

etl_taskflow()

CLI Commands

CommandDescription
airflow dags listList all registered DAGs
airflow dags trigger <dag_id>Trigger a DAG run
airflow dags pause <dag_id>Pause a DAG
airflow dags unpause <dag_id>Unpause a DAG
airflow dags test <dag_id> <date>Test a DAG without writing to DB
airflow tasks list <dag_id>List tasks in a DAG
airflow tasks test <dag_id> <task_id> <date>Test a single task
airflow tasks run <dag_id> <task_id> <date>Run a single task
airflow dags backfill <dag_id> -s <start> -e <end>Backfill DAG runs
airflow db checkCheck database connectivity
airflow db upgradeUpgrade the metadata database schema
airflow db resetReset the metadata database
airflow connections listList all configured connections
airflow connections add <conn_id> --conn-uri <uri>Add a connection
airflow variables set <key> <value>Set an Airflow variable
airflow variables get <key>Get an Airflow variable
airflow pools listList all resource pools
airflow pools set <name> <slots> <description>Create/update a pool
airflow config listList current configuration
airflow infoShow Airflow environment info

Configuration

airflow.cfg Key Settings

[core]
dags_folder = /opt/airflow/dags
executor = CeleryExecutor
load_examples = False
default_timezone = UTC
max_active_runs_per_dag = 16
parallelism = 32
dag_file_processor_timeout = 150

[webserver]
web_server_port = 8080
rbac = True
expose_config = False

[scheduler]
min_file_process_interval = 30
dag_dir_list_interval = 300
parsing_processes = 4

[celery]
broker_url = redis://redis:6379/0
result_backend = db+postgresql://airflow:airflow@postgres/airflow

[logging]
base_log_folder = /opt/airflow/logs
remote_logging = True
remote_log_conn_id = aws_default
remote_base_log_folder = s3://my-bucket/airflow/logs

Connection Configuration

# Add connections via CLI
airflow connections add 'postgres_default' \
    --conn-type 'postgres' \
    --conn-host 'db.example.com' \
    --conn-port 5432 \
    --conn-login 'user' \
    --conn-password 'pass' \
    --conn-schema 'mydb'

airflow connections add 'aws_default' \
    --conn-type 'aws' \
    --conn-extra '{"region_name": "us-east-1"}'

airflow connections add 'gcp_default' \
    --conn-type 'google_cloud_platform' \
    --conn-extra '{"key_path": "/path/to/keyfile.json", "project": "my-project"}'

Advanced Usage

Dynamic DAG Generation

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

databases = ['users', 'orders', 'products', 'inventory']

for db in databases:
    dag_id = f'sync_{db}'

    with DAG(
        dag_id=dag_id,
        schedule='@hourly',
        start_date=datetime(2024, 1, 1),
        catchup=False,
    ) as dag:
        sync = BashOperator(
            task_id=f'sync_{db}_task',
            bash_command=f'python /scripts/sync.py --database {db}',
        )

    globals()[dag_id] = dag

Branching and Conditional Logic

from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator

def choose_branch(**context):
    execution_date = context['logical_date']
    if execution_date.weekday() < 5:
        return 'weekday_processing'
    return 'weekend_processing'

branch = BranchPythonOperator(
    task_id='branch_check',
    python_callable=choose_branch,
)

weekday = EmptyOperator(task_id='weekday_processing')
weekend = EmptyOperator(task_id='weekend_processing')
join = EmptyOperator(task_id='join', trigger_rule='none_failed_min_one_success')

branch >> [weekday, weekend] >> join

Custom Sensors and Operators

from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults

class FileSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, filepath, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.filepath = filepath

    def poke(self, context):
        import os
        return os.path.exists(self.filepath)

wait_for_file = FileSensor(
    task_id='wait_for_file',
    filepath='/data/incoming/report.csv',
    poke_interval=60,
    timeout=3600,
    mode='reschedule',
)

Task Groups

from airflow.utils.task_group import TaskGroup

with DAG('grouped_dag', schedule='@daily', start_date=datetime(2024, 1, 1)) as dag:
    with TaskGroup('extract_group') as extract:
        extract_users = BashOperator(task_id='users', bash_command='echo users')
        extract_orders = BashOperator(task_id='orders', bash_command='echo orders')

    with TaskGroup('transform_group') as transform:
        transform_data = BashOperator(task_id='transform', bash_command='echo transform')

    extract >> transform

Troubleshooting

IssueSolution
DAG not appearing in UICheck for Python syntax errors: python /path/to/dag.py. Ensure file is in dags_folder
Tasks stuck in queued stateCheck executor capacity, pool slots, and worker availability. Verify parallelism settings
Database connection errorsVerify sql_alchemy_conn in config. Run airflow db check
Scheduler not picking up DAGsIncrease min_file_process_interval. Check scheduler logs
XCom too largeUse external storage (S3/GCS) for large data. XCom is for metadata only
Import errors in DAGsInstall missing providers: pip install apache-airflow-providers-*
Webserver 502 errorsIncrease web_server_worker_timeout. Check memory usage
Tasks timing outAdjust execution_timeout in default_args or per-task
Zombie tasks detectedThe scheduler auto-cleans these. Check worker health if persistent
Permission denied on logsEnsure AIRFLOW_UID matches container user. Fix with chmod -R 777 ./logs