Salta ai contenuti

Prefect Cheat Sheet

Overview

Prefect is a modern workflow orchestration framework that makes it simple to build, observe, and react to data pipelines. Unlike traditional orchestrators that require complex DAG definitions, Prefect lets you add orchestration to existing Python code by simply decorating functions with @flow and @task. It automatically handles retries, caching, logging, state management, and observability.

Prefect uses a hybrid execution model where the Prefect API (Cloud or self-hosted server) stores metadata and state while the actual code runs in your own infrastructure. This means your data never passes through Prefect’s servers. The framework supports dynamic workflows, concurrent execution, parameterized runs, artifacts, and event-driven automation. Prefect integrates with major cloud providers, databases, and data tools through its collections ecosystem.

Installation

# Install Prefect
pip install prefect

# Install with extras for specific integrations
pip install prefect[aws]
pip install prefect[gcp]
pip install prefect[azure]
pip install prefect-dbt
pip install prefect-sqlalchemy
pip install prefect-dask

# Verify installation
prefect version

# Start the local Prefect server
prefect server start

# Or configure Prefect Cloud
prefect cloud login --key YOUR_API_KEY

Docker Deployment

docker run -d \
    --name prefect-server \
    -p 4200:4200 \
    -e PREFECT_API_DATABASE_CONNECTION_URL="postgresql+asyncpg://user:pass@db:5432/prefect" \
    prefecthq/prefect:2-latest \
    prefect server start --host 0.0.0.0

Core Concepts

Flows and Tasks

from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta

@task(
    retries=3,
    retry_delay_seconds=60,
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=1),
    log_prints=True,
    tags=["extract"],
)
def extract_data(source: str) -> dict:
    logger = get_run_logger()
    logger.info(f"Extracting from {source}")
    return {"records": [1, 2, 3], "source": source}

@task(retries=2, tags=["transform"])
def transform_data(raw_data: dict) -> dict:
    records = [r * 2 for r in raw_data["records"]]
    return {"records": records, "count": len(records)}

@task(tags=["load"])
def load_data(data: dict) -> None:
    print(f"Loaded {data['count']} records")

@flow(
    name="ETL Pipeline",
    description="Extract, transform, and load data",
    retries=1,
    log_prints=True,
)
def etl_pipeline(source: str = "default"):
    raw = extract_data(source)
    transformed = transform_data(raw)
    load_data(transformed)
    return transformed

# Run locally
if __name__ == "__main__":
    etl_pipeline(source="production_db")

Concurrent Execution

from prefect import flow, task
from prefect.futures import wait

@task
def process_partition(partition_id: int) -> dict:
    return {"partition": partition_id, "rows": partition_id * 100}

@flow
def parallel_processing():
    # Submit tasks concurrently
    futures = [process_partition.submit(i) for i in range(10)]

    # Wait for all to complete
    results = [f.result() for f in futures]
    total_rows = sum(r["rows"] for r in results)
    print(f"Processed {total_rows} total rows")

CLI Commands

CommandDescription
prefect server startStart the local Prefect server
prefect cloud loginLogin to Prefect Cloud
prefect deploy --allDeploy all flows defined in prefect.yaml
prefect deployment run <flow>/<deployment>Trigger a deployment run
prefect deployment lsList all deployments
prefect flow-run lsList recent flow runs
prefect flow-run inspect <id>Inspect a specific flow run
prefect work-pool create <name> --type processCreate a work pool
prefect work-pool lsList work pools
prefect worker start --pool <name>Start a worker for a pool
prefect block register -m prefect_awsRegister block types from a module
prefect block lsList all blocks
prefect profile create <name>Create a new profile
prefect profile use <name>Switch active profile
prefect config set PREFECT_API_URL=<url>Set configuration
prefect config viewView current configuration
prefect versionShow Prefect version

Configuration

prefect.yaml (Deployment Configuration)

# prefect.yaml
name: my-project
prefect-version: 2.19.0

build:
  - prefect_docker.deployments.steps.build_docker_image:
      id: build_image
      requires: prefect-docker>=0.4.0
      image_name: my-registry/my-flows
      tag: latest
      dockerfile: auto

push:
  - prefect_docker.deployments.steps.push_docker_image:
      requires: prefect-docker>=0.4.0
      image_name: "{{ build_image.image_name }}"
      tag: "{{ build_image.tag }}"

pull:
  - prefect.deployments.steps.set_working_directory:
      directory: /opt/prefect/flows

deployments:
  - name: etl-daily
    entrypoint: flows/etl.py:etl_pipeline
    work_pool:
      name: kubernetes-pool
    schedule:
      cron: "0 6 * * *"
      timezone: "America/New_York"
    parameters:
      source: "production_db"
    tags:
      - production
      - etl

  - name: etl-hourly
    entrypoint: flows/etl.py:etl_pipeline
    work_pool:
      name: process-pool
    schedule:
      interval: 3600
    parameters:
      source: "staging_db"

Blocks (Reusable Configuration)

from prefect.blocks.system import Secret, JSON
from prefect_aws import S3Bucket, AwsCredentials

# Create and save blocks programmatically
aws_creds = AwsCredentials(
    aws_access_key_id="AKIA...",
    aws_secret_access_key="secret",
    region_name="us-east-1",
)
aws_creds.save("production-aws", overwrite=True)

s3_bucket = S3Bucket(
    bucket_name="my-data-lake",
    credentials=aws_creds,
)
s3_bucket.save("data-lake-bucket", overwrite=True)

# Load and use blocks in flows
@task
def upload_to_s3(data: bytes, key: str):
    bucket = S3Bucket.load("data-lake-bucket")
    bucket.upload_from_path(data, key)

Advanced Usage

Subflows and Error Handling

from prefect import flow, task
from prefect.states import Failed, Completed

@flow
def validation_subflow(data: dict) -> bool:
    if data.get("count", 0) == 0:
        return False
    return True

@flow(name="Robust Pipeline")
def robust_pipeline():
    try:
        raw = extract_data("source")
        is_valid = validation_subflow(raw)

        if not is_valid:
            return Failed(message="Validation failed: empty dataset")

        transformed = transform_data(raw)
        load_data(transformed)
        return Completed(message="Pipeline successful")
    except Exception as e:
        return Failed(message=str(e))

Artifacts and Result Persistence

from prefect import flow, task
from prefect.artifacts import create_markdown_artifact, create_table_artifact

@task(persist_result=True, result_storage_key="daily_report_{parameters[date]}")
def generate_report(date: str) -> dict:
    report = {"date": date, "revenue": 50000, "orders": 1200}

    create_table_artifact(
        key="daily-summary",
        table=[report],
        description=f"Daily summary for {date}",
    )

    create_markdown_artifact(
        key="report-details",
        markdown=f"## Report for {date}\n- Revenue: ${report['revenue']}\n- Orders: {report['orders']}",
    )

    return report

Event-Driven Automations

from prefect import flow
from prefect.events import emit_event

@flow
def data_pipeline():
    result = process_data()

    emit_event(
        event="pipeline.completed",
        resource={"prefect.resource.id": "pipeline.etl"},
        payload={"rows_processed": result["count"]},
    )

Work Pools and Infrastructure

# Create different work pool types
prefect work-pool create my-k8s-pool --type kubernetes
prefect work-pool create my-docker-pool --type docker
prefect work-pool create my-process-pool --type process
prefect work-pool create my-ecs-pool --type ecs:push

# Start workers
prefect worker start --pool my-k8s-pool
prefect worker start --pool my-process-pool --limit 5

Troubleshooting

IssueSolution
Flow run stuck in PendingCheck that a worker is running for the work pool. Run prefect worker start --pool <name>
Task retries exhaustingIncrease retry_delay_seconds for transient errors. Check upstream dependencies
Cache not workingVerify cache_key_fn returns consistent keys. Check cache_expiration duration
Server connection refusedEnsure server is running: prefect server start. Check PREFECT_API_URL config
Deployment not foundRun prefect deploy to register. Check prefect.yaml entrypoint path
Worker not picking up runsVerify work pool name matches deployment config. Check worker logs
Import errors in deployed flowsEnsure all dependencies are in the execution environment (Docker image or virtualenv)
Results too large for storageUse persist_result=True with external storage. Configure result serializers
Concurrent task failuresReduce concurrency with task_runner limits. Check resource availability
Scheduling timezone issuesSet timezone explicitly in schedule config. Use IANA timezone names