Zum Inhalt springen

Dagster Cheat Sheet

Overview

Dagster is a modern data orchestration platform that introduces the concept of software-defined assets, where pipelines are built around the data assets they produce rather than the tasks they execute. This asset-centric approach provides automatic data lineage, observability, and a clear mapping between code and the data artifacts in your warehouse, lake, or file system.

Unlike traditional task-based orchestrators, Dagster treats data as a first-class citizen. Each asset declaration includes its dependencies, metadata, partitioning scheme, and freshness policies. Dagster provides a rich local development experience with Dagit (the web UI), built-in testing utilities, and type checking for pipeline inputs and outputs. It integrates with dbt, Spark, Pandas, Snowflake, and most data tools through its resource and I/O manager system.

Installation

# Install Dagster and the web UI
pip install dagster dagster-webserver

# Install common integrations
pip install dagster-dbt dagster-snowflake dagster-aws dagster-gcp
pip install dagster-pandas dagster-pyspark dagster-duckdb

# Create a new project from scaffold
dagster project scaffold --name my_dagster_project
cd my_dagster_project
pip install -e ".[dev]"

# Start the development server
dagster dev

# Or start with a specific module
dagster dev -m my_dagster_project

Docker Setup

# Dockerfile for Dagster
cat <<'EOF' > Dockerfile
FROM python:3.11-slim
WORKDIR /opt/dagster/app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
EXPOSE 3000
CMD ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "4000", "-m", "my_project"]
EOF

Core Concepts

Software-Defined Assets

from dagster import asset, AssetExecutionContext
import pandas as pd

@asset(
    description="Raw orders loaded from source database",
    group_name="staging",
    compute_kind="pandas",
)
def raw_orders(context: AssetExecutionContext) -> pd.DataFrame:
    context.log.info("Loading raw orders")
    df = pd.read_sql("SELECT * FROM orders", connection)
    context.add_output_metadata({
        "num_rows": len(df),
        "columns": list(df.columns),
    })
    return df

@asset(
    description="Cleaned and enriched orders",
    group_name="intermediate",
)
def enriched_orders(raw_orders: pd.DataFrame) -> pd.DataFrame:
    df = raw_orders.dropna(subset=["order_id"])
    df["order_total"] = df["quantity"] * df["unit_price"]
    return df

@asset(
    description="Daily revenue summary",
    group_name="marts",
)
def daily_revenue(enriched_orders: pd.DataFrame) -> pd.DataFrame:
    return enriched_orders.groupby("order_date").agg(
        total_revenue=("order_total", "sum"),
        order_count=("order_id", "nunique"),
    ).reset_index()

Resources and I/O Managers

from dagster import Definitions, resource, IOManager, io_manager
import sqlalchemy

class SnowflakeIOManager(IOManager):
    def __init__(self, connection_string):
        self.engine = sqlalchemy.create_engine(connection_string)

    def handle_output(self, context, obj):
        table_name = context.asset_key.to_python_identifier()
        obj.to_sql(table_name, self.engine, if_exists="replace", index=False)

    def load_input(self, context):
        table_name = context.upstream_output.asset_key.to_python_identifier()
        return pd.read_sql(f"SELECT * FROM {table_name}", self.engine)

@io_manager
def snowflake_io_manager(context):
    conn = context.resource_config["connection_string"]
    return SnowflakeIOManager(conn)

defs = Definitions(
    assets=[raw_orders, enriched_orders, daily_revenue],
    resources={
        "io_manager": snowflake_io_manager.configured({
            "connection_string": {"env": "SNOWFLAKE_CONN"}
        }),
    },
)

CLI Commands

CommandDescription
dagster devStart development server (Dagit UI + daemon)
dagster dev -m my_moduleStart with a specific Python module
dagster dev -f my_file.pyStart with a specific file
dagster asset materialize --select raw_ordersMaterialize a specific asset
dagster asset materialize --select "*"Materialize all assets
dagster asset listList all defined assets
dagster asset wipeDelete all asset materializations from event log
dagster job execute -j my_jobExecute a job
dagster job listList all jobs
dagster schedule listList all schedules
dagster schedule start my_scheduleStart a schedule
dagster sensor listList all sensors
dagster project scaffold --name projCreate new project
dagster instance infoShow instance configuration
dagster debug export <run_id> output.fileExport debug info for a run

Configuration

dagster.yaml (Instance Configuration)

# dagster.yaml
storage:
  postgres:
    postgres_db:
      username:
        env: DAGSTER_PG_USER
      password:
        env: DAGSTER_PG_PASSWORD
      hostname:
        env: DAGSTER_PG_HOST
      db_name:
        env: DAGSTER_PG_DB
      port: 5432

run_launcher:
  module: dagster_k8s
  class: K8sRunLauncher
  config:
    service_account_name: dagster
    job_namespace: dagster
    image_pull_policy: Always

compute_logs:
  module: dagster_aws.s3.compute_log_manager
  class: S3ComputeLogManager
  config:
    bucket: dagster-logs
    prefix: compute-logs

run_coordinator:
  module: dagster.core.run_coordinator
  class: QueuedRunCoordinator
  config:
    max_concurrent_runs: 10

Partitioned Assets

from dagster import asset, DailyPartitionsDefinition, WeeklyPartitionsDefinition

daily_partitions = DailyPartitionsDefinition(start_date="2024-01-01")

@asset(partitions_def=daily_partitions)
def daily_events(context):
    partition_date = context.partition_key
    context.log.info(f"Processing events for {partition_date}")
    query = f"SELECT * FROM events WHERE date = '{partition_date}'"
    return pd.read_sql(query, connection)

@asset(partitions_def=daily_partitions)
def daily_metrics(daily_events: pd.DataFrame) -> pd.DataFrame:
    return daily_events.groupby("event_type").size().reset_index(name="count")

Advanced Usage

Sensors

from dagster import sensor, RunRequest, SensorEvaluationContext
import os

@sensor(job=my_materialize_job, minimum_interval_seconds=30)
def new_file_sensor(context: SensorEvaluationContext):
    directory = "/data/incoming"
    last_mtime = float(context.cursor) if context.cursor else 0

    new_files = []
    max_mtime = last_mtime
    for f in os.listdir(directory):
        fpath = os.path.join(directory, f)
        mtime = os.path.getmtime(fpath)
        if mtime > last_mtime:
            new_files.append(f)
            max_mtime = max(max_mtime, mtime)

    if new_files:
        context.update_cursor(str(max_mtime))
        yield RunRequest(
            run_key=f"files-{max_mtime}",
            run_config={"ops": {"process": {"config": {"files": new_files}}}},
        )

Schedules

from dagster import schedule, ScheduleDefinition, build_schedule_from_partitioned_job

# Simple cron schedule
hourly_schedule = ScheduleDefinition(
    job=my_job,
    cron_schedule="0 * * * *",
    default_status=DefaultScheduleStatus.RUNNING,
)

# Partition-based schedule
daily_schedule = build_schedule_from_partitioned_job(
    job=daily_partitioned_job,
)

Freshness Policies and Auto-Materialize

from dagster import asset, FreshnessPolicy, AutoMaterializePolicy

@asset(
    freshness_policy=FreshnessPolicy(maximum_lag_minutes=60),
    auto_materialize_policy=AutoMaterializePolicy.eager(),
)
def fresh_metrics(enriched_orders: pd.DataFrame) -> pd.DataFrame:
    return enriched_orders.describe()

dbt Integration

from dagster_dbt import DbtCliResource, dbt_assets, DbtProject

my_dbt_project = DbtProject(project_dir="/path/to/dbt_project")

@dbt_assets(manifest=my_dbt_project.manifest_path)
def my_dbt_assets(context, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()

defs = Definitions(
    assets=[my_dbt_assets],
    resources={"dbt": DbtCliResource(project_dir=my_dbt_project)},
)

Troubleshooting

IssueSolution
dagster dev fails to startCheck for import errors in your code module. Run python -c "import my_module" to test
Assets not appearing in UIEnsure assets are included in Definitions. Check __init__.py exports
Partition backfill slowIncrease concurrency in run coordinator config. Use max_concurrent_runs
I/O manager errorsVerify resource configuration. Check connection credentials and permissions
Sensor not triggeringCheck minimum_interval_seconds. Verify cursor state with dagster sensor list
Schedule not runningEnsure dagster-daemon is running. Check dagster schedule list for status
Out of memory on large assetsUse chunked processing or switch to PySpark. Configure resource limits
Type check failuresVerify asset return types match downstream input expectations
Database migration neededRun dagster instance migrate after upgrading Dagster versions
Run queue backed upIncrease max_concurrent_runs or check for stuck runs in the UI