Dagster Cheat Sheet
Overview
Dagster is a modern data orchestration platform that introduces the concept of software-defined assets, where pipelines are built around the data assets they produce rather than the tasks they execute. This asset-centric approach provides automatic data lineage, observability, and a clear mapping between code and the data artifacts in your warehouse, lake, or file system.
Unlike traditional task-based orchestrators, Dagster treats data as a first-class citizen. Each asset declaration includes its dependencies, metadata, partitioning scheme, and freshness policies. Dagster provides a rich local development experience with Dagit (the web UI), built-in testing utilities, and type checking for pipeline inputs and outputs. It integrates with dbt, Spark, Pandas, Snowflake, and most data tools through its resource and I/O manager system.
Installation
# Install Dagster and the web UI
pip install dagster dagster-webserver
# Install common integrations
pip install dagster-dbt dagster-snowflake dagster-aws dagster-gcp
pip install dagster-pandas dagster-pyspark dagster-duckdb
# Create a new project from scaffold
dagster project scaffold --name my_dagster_project
cd my_dagster_project
pip install -e ".[dev]"
# Start the development server
dagster dev
# Or start with a specific module
dagster dev -m my_dagster_project
Docker Setup
# Dockerfile for Dagster
cat <<'EOF' > Dockerfile
FROM python:3.11-slim
WORKDIR /opt/dagster/app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
EXPOSE 3000
CMD ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "4000", "-m", "my_project"]
EOF
Core Concepts
Software-Defined Assets
from dagster import asset, AssetExecutionContext
import pandas as pd
@asset(
description="Raw orders loaded from source database",
group_name="staging",
compute_kind="pandas",
)
def raw_orders(context: AssetExecutionContext) -> pd.DataFrame:
context.log.info("Loading raw orders")
df = pd.read_sql("SELECT * FROM orders", connection)
context.add_output_metadata({
"num_rows": len(df),
"columns": list(df.columns),
})
return df
@asset(
description="Cleaned and enriched orders",
group_name="intermediate",
)
def enriched_orders(raw_orders: pd.DataFrame) -> pd.DataFrame:
df = raw_orders.dropna(subset=["order_id"])
df["order_total"] = df["quantity"] * df["unit_price"]
return df
@asset(
description="Daily revenue summary",
group_name="marts",
)
def daily_revenue(enriched_orders: pd.DataFrame) -> pd.DataFrame:
return enriched_orders.groupby("order_date").agg(
total_revenue=("order_total", "sum"),
order_count=("order_id", "nunique"),
).reset_index()
Resources and I/O Managers
from dagster import Definitions, resource, IOManager, io_manager
import sqlalchemy
class SnowflakeIOManager(IOManager):
def __init__(self, connection_string):
self.engine = sqlalchemy.create_engine(connection_string)
def handle_output(self, context, obj):
table_name = context.asset_key.to_python_identifier()
obj.to_sql(table_name, self.engine, if_exists="replace", index=False)
def load_input(self, context):
table_name = context.upstream_output.asset_key.to_python_identifier()
return pd.read_sql(f"SELECT * FROM {table_name}", self.engine)
@io_manager
def snowflake_io_manager(context):
conn = context.resource_config["connection_string"]
return SnowflakeIOManager(conn)
defs = Definitions(
assets=[raw_orders, enriched_orders, daily_revenue],
resources={
"io_manager": snowflake_io_manager.configured({
"connection_string": {"env": "SNOWFLAKE_CONN"}
}),
},
)
CLI Commands
| Command | Description |
|---|---|
dagster dev | Start development server (Dagit UI + daemon) |
dagster dev -m my_module | Start with a specific Python module |
dagster dev -f my_file.py | Start with a specific file |
dagster asset materialize --select raw_orders | Materialize a specific asset |
dagster asset materialize --select "*" | Materialize all assets |
dagster asset list | List all defined assets |
dagster asset wipe | Delete all asset materializations from event log |
dagster job execute -j my_job | Execute a job |
dagster job list | List all jobs |
dagster schedule list | List all schedules |
dagster schedule start my_schedule | Start a schedule |
dagster sensor list | List all sensors |
dagster project scaffold --name proj | Create new project |
dagster instance info | Show instance configuration |
dagster debug export <run_id> output.file | Export debug info for a run |
Configuration
dagster.yaml (Instance Configuration)
# dagster.yaml
storage:
postgres:
postgres_db:
username:
env: DAGSTER_PG_USER
password:
env: DAGSTER_PG_PASSWORD
hostname:
env: DAGSTER_PG_HOST
db_name:
env: DAGSTER_PG_DB
port: 5432
run_launcher:
module: dagster_k8s
class: K8sRunLauncher
config:
service_account_name: dagster
job_namespace: dagster
image_pull_policy: Always
compute_logs:
module: dagster_aws.s3.compute_log_manager
class: S3ComputeLogManager
config:
bucket: dagster-logs
prefix: compute-logs
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
config:
max_concurrent_runs: 10
Partitioned Assets
from dagster import asset, DailyPartitionsDefinition, WeeklyPartitionsDefinition
daily_partitions = DailyPartitionsDefinition(start_date="2024-01-01")
@asset(partitions_def=daily_partitions)
def daily_events(context):
partition_date = context.partition_key
context.log.info(f"Processing events for {partition_date}")
query = f"SELECT * FROM events WHERE date = '{partition_date}'"
return pd.read_sql(query, connection)
@asset(partitions_def=daily_partitions)
def daily_metrics(daily_events: pd.DataFrame) -> pd.DataFrame:
return daily_events.groupby("event_type").size().reset_index(name="count")
Advanced Usage
Sensors
from dagster import sensor, RunRequest, SensorEvaluationContext
import os
@sensor(job=my_materialize_job, minimum_interval_seconds=30)
def new_file_sensor(context: SensorEvaluationContext):
directory = "/data/incoming"
last_mtime = float(context.cursor) if context.cursor else 0
new_files = []
max_mtime = last_mtime
for f in os.listdir(directory):
fpath = os.path.join(directory, f)
mtime = os.path.getmtime(fpath)
if mtime > last_mtime:
new_files.append(f)
max_mtime = max(max_mtime, mtime)
if new_files:
context.update_cursor(str(max_mtime))
yield RunRequest(
run_key=f"files-{max_mtime}",
run_config={"ops": {"process": {"config": {"files": new_files}}}},
)
Schedules
from dagster import schedule, ScheduleDefinition, build_schedule_from_partitioned_job
# Simple cron schedule
hourly_schedule = ScheduleDefinition(
job=my_job,
cron_schedule="0 * * * *",
default_status=DefaultScheduleStatus.RUNNING,
)
# Partition-based schedule
daily_schedule = build_schedule_from_partitioned_job(
job=daily_partitioned_job,
)
Freshness Policies and Auto-Materialize
from dagster import asset, FreshnessPolicy, AutoMaterializePolicy
@asset(
freshness_policy=FreshnessPolicy(maximum_lag_minutes=60),
auto_materialize_policy=AutoMaterializePolicy.eager(),
)
def fresh_metrics(enriched_orders: pd.DataFrame) -> pd.DataFrame:
return enriched_orders.describe()
dbt Integration
from dagster_dbt import DbtCliResource, dbt_assets, DbtProject
my_dbt_project = DbtProject(project_dir="/path/to/dbt_project")
@dbt_assets(manifest=my_dbt_project.manifest_path)
def my_dbt_assets(context, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
defs = Definitions(
assets=[my_dbt_assets],
resources={"dbt": DbtCliResource(project_dir=my_dbt_project)},
)
Troubleshooting
| Issue | Solution |
|---|---|
dagster dev fails to start | Check for import errors in your code module. Run python -c "import my_module" to test |
| Assets not appearing in UI | Ensure assets are included in Definitions. Check __init__.py exports |
| Partition backfill slow | Increase concurrency in run coordinator config. Use max_concurrent_runs |
| I/O manager errors | Verify resource configuration. Check connection credentials and permissions |
| Sensor not triggering | Check minimum_interval_seconds. Verify cursor state with dagster sensor list |
| Schedule not running | Ensure dagster-daemon is running. Check dagster schedule list for status |
| Out of memory on large assets | Use chunked processing or switch to PySpark. Configure resource limits |
| Type check failures | Verify asset return types match downstream input expectations |
| Database migration needed | Run dagster instance migrate after upgrading Dagster versions |
| Run queue backed up | Increase max_concurrent_runs or check for stuck runs in the UI |