Celery Cheat Sheet
Overview
Celery is a distributed task queue for Python that enables asynchronous processing of work outside the request-response cycle. It supports scheduling, retries, rate limiting, and multiple message brokers including RabbitMQ and Redis. Celery is widely used in web applications for background job processing, periodic tasks, and distributed computing.
Celery follows a simple producer-consumer model where tasks are sent to a message broker and executed by worker processes. It supports task chaining, grouping, and chords for complex workflows, multiple serialization formats, result storage backends, and real-time monitoring via Flower. Celery integrates seamlessly with Django, Flask, and other Python frameworks.
Installation
# Basic install
pip install celery
# With Redis broker
pip install celery[redis]
# With RabbitMQ broker
pip install celery[librabbitmq]
# With multiple extras
pip install celery[redis,auth,msgpack]
# With monitoring
pip install flower
Core Setup
Basic Celery Application
# celery_app.py
from celery import Celery
app = Celery(
'myproject',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1',
include=['myproject.tasks']
)
# Configuration
app.conf.update(
result_expires=3600,
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
)
Django Integration
# myproject/celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'
Task Definition
# tasks.py
from celery import shared_task
from celery_app import app
@app.task(bind=True, max_retries=3)
def process_order(self, order_id):
try:
result = do_processing(order_id)
return result
except ConnectionError as exc:
raise self.retry(exc=exc, countdown=60)
@shared_task(name='send_email')
def send_email(to, subject, body):
# send email logic
return f"Email sent to {to}"
@app.task(
bind=True,
rate_limit='10/m',
time_limit=300,
soft_time_limit=240,
acks_late=True,
reject_on_worker_lost=True,
)
def heavy_task(self, data):
return process(data)
@app.task(ignore_result=True)
def fire_and_forget(message):
log_message(message)
Worker Commands
| Command | Description |
|---|---|
celery -A myproject worker -l info | Start a worker |
celery -A myproject worker -c 4 | Worker with 4 concurrent processes |
celery -A myproject worker -Q high,default | Worker consuming specific queues |
celery -A myproject worker -P gevent -c 100 | Worker with gevent pool |
celery -A myproject worker -P solo | Single-threaded worker |
celery -A myproject worker --autoscale=10,3 | Auto-scale between 3-10 processes |
celery -A myproject worker -n worker1@%h | Named worker |
celery -A myproject inspect active | List active tasks |
celery -A myproject inspect reserved | List reserved tasks |
celery -A myproject inspect stats | Worker statistics |
celery -A myproject control shutdown | Shutdown all workers |
celery -A myproject purge | Purge all pending tasks |
Calling Tasks
# Basic call
result = process_order.delay(order_id=123)
# With options
result = process_order.apply_async(
args=[123],
countdown=60, # delay execution by 60 seconds
expires=3600, # expire after 1 hour
queue='high_priority', # send to specific queue
retry=True,
retry_policy={
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.5,
},
)
# Get result
print(result.id) # task ID
print(result.status) # PENDING, STARTED, SUCCESS, FAILURE
print(result.get(timeout=10)) # block until result
# Check if ready
if result.ready():
print(result.result)
Workflows
from celery import chain, group, chord, signature
# Chain — sequential execution
workflow = chain(
fetch_data.s(url),
parse_data.s(),
store_data.s()
)
result = workflow.apply_async()
# Group — parallel execution
job = group(
process_item.s(item) for item in items
)
result = job.apply_async()
# Chord — group + callback
callback = summarize.s()
result = chord(
[process_item.s(i) for i in items]
)(callback)
# Chaining with immutable signatures
chain(
fetch.si(url), # .si() = immutable, ignores previous result
process.s(),
notify.si('done')
).apply_async()
Scheduling (Celery Beat)
# Configuration
app.conf.beat_schedule = {
'cleanup-every-hour': {
'task': 'myproject.tasks.cleanup',
'schedule': 3600.0,
},
'report-every-morning': {
'task': 'myproject.tasks.generate_report',
'schedule': crontab(hour=7, minute=0),
'args': ('daily',),
},
'check-every-30-seconds': {
'task': 'myproject.tasks.health_check',
'schedule': 30.0,
},
}
# Start beat scheduler
celery -A myproject beat -l info
# Start beat with worker (development only)
celery -A myproject worker -B -l info
# Beat with database scheduler (for dynamic schedules)
pip install django-celery-beat
celery -A myproject beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
Configuration
# celery_config.py
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'
# Serialization
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
# Timeouts
task_soft_time_limit = 240
task_time_limit = 300
result_expires = 86400
# Concurrency
worker_concurrency = 4
worker_prefetch_multiplier = 1
# Queues
task_default_queue = 'default'
task_routes = {
'myproject.tasks.urgent_task': {'queue': 'high_priority'},
'myproject.tasks.slow_task': {'queue': 'low_priority'},
}
# Retry
task_acks_late = True
task_reject_on_worker_lost = True
# Rate limiting
task_default_rate_limit = '100/m'
# Error handling
task_track_started = True
worker_send_task_events = True
task_send_sent_event = True
Monitoring
# Start Flower web monitor
celery -A myproject flower --port=5555
# Flower with basic auth
celery -A myproject flower --basic_auth=admin:password
# CLI monitoring
celery -A myproject events # real-time event monitor
celery -A myproject inspect active # currently executing tasks
celery -A myproject inspect scheduled # ETA/countdown tasks
celery -A myproject inspect conf # worker configuration
celery -A myproject report # system information
Advanced Usage
Custom Task Base Class
class DatabaseTask(app.Task):
_db = None
@property
def db(self):
if self._db is None:
self._db = Database.connect()
return self._db
@app.task(base=DatabaseTask, bind=True)
def query_db(self, sql):
return self.db.execute(sql)
Task Routing
app.conf.task_routes = {
'feed.tasks.*': {'queue': 'feeds'},
'web.tasks.*': {'queue': 'web'},
'myproject.tasks.urgent': {
'queue': 'priority',
'routing_key': 'priority.high',
},
}
Troubleshooting
| Issue | Solution |
|---|---|
| Tasks stuck in PENDING | Check broker connection; ensure workers are running and consuming the right queue |
| Worker killed (OOM) | Set worker_max_memory_per_child; reduce worker_concurrency |
| Tasks execute twice | Enable task_acks_late with reject_on_worker_lost; ensure idempotent tasks |
| Beat schedules not running | Ensure only one beat instance is running; check timezone settings |
| Results not stored | Verify result_backend is configured; check ignore_result on task |
| Connection pool exhausted | Increase broker_pool_limit; reduce prefetch multiplier |
| Serialization errors | Ensure all task arguments are JSON serializable; check accept_content |
| Worker not picking up new tasks | Restart worker after code changes; check queue name matches |