Prefect Cheat Sheet
Overview
Prefect is a modern workflow orchestration framework that makes it simple to build, observe, and react to data pipelines. Unlike traditional orchestrators that require complex DAG definitions, Prefect lets you add orchestration to existing Python code by simply decorating functions with @flow and @task. It automatically handles retries, caching, logging, state management, and observability.
Prefect uses a hybrid execution model where the Prefect API (Cloud or self-hosted server) stores metadata and state while the actual code runs in your own infrastructure. This means your data never passes through Prefect’s servers. The framework supports dynamic workflows, concurrent execution, parameterized runs, artifacts, and event-driven automation. Prefect integrates with major cloud providers, databases, and data tools through its collections ecosystem.
Installation
# Install Prefect
pip install prefect
# Install with extras for specific integrations
pip install prefect[aws]
pip install prefect[gcp]
pip install prefect[azure]
pip install prefect-dbt
pip install prefect-sqlalchemy
pip install prefect-dask
# Verify installation
prefect version
# Start the local Prefect server
prefect server start
# Or configure Prefect Cloud
prefect cloud login --key YOUR_API_KEY
Docker Deployment
docker run -d \
--name prefect-server \
-p 4200:4200 \
-e PREFECT_API_DATABASE_CONNECTION_URL="postgresql+asyncpg://user:pass@db:5432/prefect" \
prefecthq/prefect:2-latest \
prefect server start --host 0.0.0.0
Core Concepts
Flows and Tasks
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(
retries=3,
retry_delay_seconds=60,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1),
log_prints=True,
tags=["extract"],
)
def extract_data(source: str) -> dict:
logger = get_run_logger()
logger.info(f"Extracting from {source}")
return {"records": [1, 2, 3], "source": source}
@task(retries=2, tags=["transform"])
def transform_data(raw_data: dict) -> dict:
records = [r * 2 for r in raw_data["records"]]
return {"records": records, "count": len(records)}
@task(tags=["load"])
def load_data(data: dict) -> None:
print(f"Loaded {data['count']} records")
@flow(
name="ETL Pipeline",
description="Extract, transform, and load data",
retries=1,
log_prints=True,
)
def etl_pipeline(source: str = "default"):
raw = extract_data(source)
transformed = transform_data(raw)
load_data(transformed)
return transformed
# Run locally
if __name__ == "__main__":
etl_pipeline(source="production_db")
Concurrent Execution
from prefect import flow, task
from prefect.futures import wait
@task
def process_partition(partition_id: int) -> dict:
return {"partition": partition_id, "rows": partition_id * 100}
@flow
def parallel_processing():
# Submit tasks concurrently
futures = [process_partition.submit(i) for i in range(10)]
# Wait for all to complete
results = [f.result() for f in futures]
total_rows = sum(r["rows"] for r in results)
print(f"Processed {total_rows} total rows")
CLI Commands
| Command | Description |
|---|---|
prefect server start | Start the local Prefect server |
prefect cloud login | Login to Prefect Cloud |
prefect deploy --all | Deploy all flows defined in prefect.yaml |
prefect deployment run <flow>/<deployment> | Trigger a deployment run |
prefect deployment ls | List all deployments |
prefect flow-run ls | List recent flow runs |
prefect flow-run inspect <id> | Inspect a specific flow run |
prefect work-pool create <name> --type process | Create a work pool |
prefect work-pool ls | List work pools |
prefect worker start --pool <name> | Start a worker for a pool |
prefect block register -m prefect_aws | Register block types from a module |
prefect block ls | List all blocks |
prefect profile create <name> | Create a new profile |
prefect profile use <name> | Switch active profile |
prefect config set PREFECT_API_URL=<url> | Set configuration |
prefect config view | View current configuration |
prefect version | Show Prefect version |
Configuration
prefect.yaml (Deployment Configuration)
# prefect.yaml
name: my-project
prefect-version: 2.19.0
build:
- prefect_docker.deployments.steps.build_docker_image:
id: build_image
requires: prefect-docker>=0.4.0
image_name: my-registry/my-flows
tag: latest
dockerfile: auto
push:
- prefect_docker.deployments.steps.push_docker_image:
requires: prefect-docker>=0.4.0
image_name: "{{ build_image.image_name }}"
tag: "{{ build_image.tag }}"
pull:
- prefect.deployments.steps.set_working_directory:
directory: /opt/prefect/flows
deployments:
- name: etl-daily
entrypoint: flows/etl.py:etl_pipeline
work_pool:
name: kubernetes-pool
schedule:
cron: "0 6 * * *"
timezone: "America/New_York"
parameters:
source: "production_db"
tags:
- production
- etl
- name: etl-hourly
entrypoint: flows/etl.py:etl_pipeline
work_pool:
name: process-pool
schedule:
interval: 3600
parameters:
source: "staging_db"
Blocks (Reusable Configuration)
from prefect.blocks.system import Secret, JSON
from prefect_aws import S3Bucket, AwsCredentials
# Create and save blocks programmatically
aws_creds = AwsCredentials(
aws_access_key_id="AKIA...",
aws_secret_access_key="secret",
region_name="us-east-1",
)
aws_creds.save("production-aws", overwrite=True)
s3_bucket = S3Bucket(
bucket_name="my-data-lake",
credentials=aws_creds,
)
s3_bucket.save("data-lake-bucket", overwrite=True)
# Load and use blocks in flows
@task
def upload_to_s3(data: bytes, key: str):
bucket = S3Bucket.load("data-lake-bucket")
bucket.upload_from_path(data, key)
Advanced Usage
Subflows and Error Handling
from prefect import flow, task
from prefect.states import Failed, Completed
@flow
def validation_subflow(data: dict) -> bool:
if data.get("count", 0) == 0:
return False
return True
@flow(name="Robust Pipeline")
def robust_pipeline():
try:
raw = extract_data("source")
is_valid = validation_subflow(raw)
if not is_valid:
return Failed(message="Validation failed: empty dataset")
transformed = transform_data(raw)
load_data(transformed)
return Completed(message="Pipeline successful")
except Exception as e:
return Failed(message=str(e))
Artifacts and Result Persistence
from prefect import flow, task
from prefect.artifacts import create_markdown_artifact, create_table_artifact
@task(persist_result=True, result_storage_key="daily_report_{parameters[date]}")
def generate_report(date: str) -> dict:
report = {"date": date, "revenue": 50000, "orders": 1200}
create_table_artifact(
key="daily-summary",
table=[report],
description=f"Daily summary for {date}",
)
create_markdown_artifact(
key="report-details",
markdown=f"## Report for {date}\n- Revenue: ${report['revenue']}\n- Orders: {report['orders']}",
)
return report
Event-Driven Automations
from prefect import flow
from prefect.events import emit_event
@flow
def data_pipeline():
result = process_data()
emit_event(
event="pipeline.completed",
resource={"prefect.resource.id": "pipeline.etl"},
payload={"rows_processed": result["count"]},
)
Work Pools and Infrastructure
# Create different work pool types
prefect work-pool create my-k8s-pool --type kubernetes
prefect work-pool create my-docker-pool --type docker
prefect work-pool create my-process-pool --type process
prefect work-pool create my-ecs-pool --type ecs:push
# Start workers
prefect worker start --pool my-k8s-pool
prefect worker start --pool my-process-pool --limit 5
Troubleshooting
| Issue | Solution |
|---|---|
| Flow run stuck in Pending | Check that a worker is running for the work pool. Run prefect worker start --pool <name> |
| Task retries exhausting | Increase retry_delay_seconds for transient errors. Check upstream dependencies |
| Cache not working | Verify cache_key_fn returns consistent keys. Check cache_expiration duration |
| Server connection refused | Ensure server is running: prefect server start. Check PREFECT_API_URL config |
| Deployment not found | Run prefect deploy to register. Check prefect.yaml entrypoint path |
| Worker not picking up runs | Verify work pool name matches deployment config. Check worker logs |
| Import errors in deployed flows | Ensure all dependencies are in the execution environment (Docker image or virtualenv) |
| Results too large for storage | Use persist_result=True with external storage. Configure result serializers |
| Concurrent task failures | Reduce concurrency with task_runner limits. Check resource availability |
| Scheduling timezone issues | Set timezone explicitly in schedule config. Use IANA timezone names |