コンテンツにスキップ

Celery Cheat Sheet

Overview

Celery is a distributed task queue for Python that enables asynchronous processing of work outside the request-response cycle. It supports scheduling, retries, rate limiting, and multiple message brokers including RabbitMQ and Redis. Celery is widely used in web applications for background job processing, periodic tasks, and distributed computing.

Celery follows a simple producer-consumer model where tasks are sent to a message broker and executed by worker processes. It supports task chaining, grouping, and chords for complex workflows, multiple serialization formats, result storage backends, and real-time monitoring via Flower. Celery integrates seamlessly with Django, Flask, and other Python frameworks.

Installation

# Basic install
pip install celery

# With Redis broker
pip install celery[redis]

# With RabbitMQ broker
pip install celery[librabbitmq]

# With multiple extras
pip install celery[redis,auth,msgpack]

# With monitoring
pip install flower

Core Setup

Basic Celery Application

# celery_app.py
from celery import Celery

app = Celery(
    'myproject',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1',
    include=['myproject.tasks']
)

# Configuration
app.conf.update(
    result_expires=3600,
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
)

Django Integration

# myproject/celery.py
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'

Task Definition

# tasks.py
from celery import shared_task
from celery_app import app

@app.task(bind=True, max_retries=3)
def process_order(self, order_id):
    try:
        result = do_processing(order_id)
        return result
    except ConnectionError as exc:
        raise self.retry(exc=exc, countdown=60)

@shared_task(name='send_email')
def send_email(to, subject, body):
    # send email logic
    return f"Email sent to {to}"

@app.task(
    bind=True,
    rate_limit='10/m',
    time_limit=300,
    soft_time_limit=240,
    acks_late=True,
    reject_on_worker_lost=True,
)
def heavy_task(self, data):
    return process(data)

@app.task(ignore_result=True)
def fire_and_forget(message):
    log_message(message)

Worker Commands

CommandDescription
celery -A myproject worker -l infoStart a worker
celery -A myproject worker -c 4Worker with 4 concurrent processes
celery -A myproject worker -Q high,defaultWorker consuming specific queues
celery -A myproject worker -P gevent -c 100Worker with gevent pool
celery -A myproject worker -P soloSingle-threaded worker
celery -A myproject worker --autoscale=10,3Auto-scale between 3-10 processes
celery -A myproject worker -n worker1@%hNamed worker
celery -A myproject inspect activeList active tasks
celery -A myproject inspect reservedList reserved tasks
celery -A myproject inspect statsWorker statistics
celery -A myproject control shutdownShutdown all workers
celery -A myproject purgePurge all pending tasks

Calling Tasks

# Basic call
result = process_order.delay(order_id=123)

# With options
result = process_order.apply_async(
    args=[123],
    countdown=60,           # delay execution by 60 seconds
    expires=3600,           # expire after 1 hour
    queue='high_priority',  # send to specific queue
    retry=True,
    retry_policy={
        'max_retries': 3,
        'interval_start': 0,
        'interval_step': 0.2,
        'interval_max': 0.5,
    },
)

# Get result
print(result.id)        # task ID
print(result.status)    # PENDING, STARTED, SUCCESS, FAILURE
print(result.get(timeout=10))  # block until result

# Check if ready
if result.ready():
    print(result.result)

Workflows

from celery import chain, group, chord, signature

# Chain — sequential execution
workflow = chain(
    fetch_data.s(url),
    parse_data.s(),
    store_data.s()
)
result = workflow.apply_async()

# Group — parallel execution
job = group(
    process_item.s(item) for item in items
)
result = job.apply_async()

# Chord — group + callback
callback = summarize.s()
result = chord(
    [process_item.s(i) for i in items]
)(callback)

# Chaining with immutable signatures
chain(
    fetch.si(url),       # .si() = immutable, ignores previous result
    process.s(),
    notify.si('done')
).apply_async()

Scheduling (Celery Beat)

# Configuration
app.conf.beat_schedule = {
    'cleanup-every-hour': {
        'task': 'myproject.tasks.cleanup',
        'schedule': 3600.0,
    },
    'report-every-morning': {
        'task': 'myproject.tasks.generate_report',
        'schedule': crontab(hour=7, minute=0),
        'args': ('daily',),
    },
    'check-every-30-seconds': {
        'task': 'myproject.tasks.health_check',
        'schedule': 30.0,
    },
}
# Start beat scheduler
celery -A myproject beat -l info

# Start beat with worker (development only)
celery -A myproject worker -B -l info

# Beat with database scheduler (for dynamic schedules)
pip install django-celery-beat
celery -A myproject beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

Configuration

# celery_config.py
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'

# Serialization
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']

# Timeouts
task_soft_time_limit = 240
task_time_limit = 300
result_expires = 86400

# Concurrency
worker_concurrency = 4
worker_prefetch_multiplier = 1

# Queues
task_default_queue = 'default'
task_routes = {
    'myproject.tasks.urgent_task': {'queue': 'high_priority'},
    'myproject.tasks.slow_task': {'queue': 'low_priority'},
}

# Retry
task_acks_late = True
task_reject_on_worker_lost = True

# Rate limiting
task_default_rate_limit = '100/m'

# Error handling
task_track_started = True
worker_send_task_events = True
task_send_sent_event = True

Monitoring

# Start Flower web monitor
celery -A myproject flower --port=5555

# Flower with basic auth
celery -A myproject flower --basic_auth=admin:password

# CLI monitoring
celery -A myproject events          # real-time event monitor
celery -A myproject inspect active  # currently executing tasks
celery -A myproject inspect scheduled  # ETA/countdown tasks
celery -A myproject inspect conf    # worker configuration
celery -A myproject report          # system information

Advanced Usage

Custom Task Base Class

class DatabaseTask(app.Task):
    _db = None

    @property
    def db(self):
        if self._db is None:
            self._db = Database.connect()
        return self._db

@app.task(base=DatabaseTask, bind=True)
def query_db(self, sql):
    return self.db.execute(sql)

Task Routing

app.conf.task_routes = {
    'feed.tasks.*': {'queue': 'feeds'},
    'web.tasks.*': {'queue': 'web'},
    'myproject.tasks.urgent': {
        'queue': 'priority',
        'routing_key': 'priority.high',
    },
}

Troubleshooting

IssueSolution
Tasks stuck in PENDINGCheck broker connection; ensure workers are running and consuming the right queue
Worker killed (OOM)Set worker_max_memory_per_child; reduce worker_concurrency
Tasks execute twiceEnable task_acks_late with reject_on_worker_lost; ensure idempotent tasks
Beat schedules not runningEnsure only one beat instance is running; check timezone settings
Results not storedVerify result_backend is configured; check ignore_result on task
Connection pool exhaustedIncrease broker_pool_limit; reduce prefetch multiplier
Serialization errorsEnsure all task arguments are JSON serializable; check accept_content
Worker not picking up new tasksRestart worker after code changes; check queue name matches