Installation
# Docker (quickest)
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=secret \
rabbitmq:3-management
# Docker Compose
services:
rabbitmq:
image: rabbitmq:3-management
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: secret
ports:
- "5672:5672" # AMQP
- "15672:15672" # Management UI
- "5671:5671" # AMQPS (TLS)
volumes:
- rabbitmq_data:/var/lib/rabbitmq
volumes:
rabbitmq_data:
# Ubuntu / Debian
sudo apt install rabbitmq-server
sudo systemctl enable --now rabbitmq-server
# macOS
brew install rabbitmq
brew services start rabbitmq
# Enable management plugin
rabbitmq-plugins enable rabbitmq_management
Management UI: http://localhost:15672
Configuration
# Networking
listeners.tcp.default = 5672
management.listener.port = 15672
# Authentication
default_user = admin
default_pass = secret
default_vhost = /
# Resource limits
vm_memory_high_watermark.relative = 0.6
disk_free_limit.absolute = 2GB
# Queue defaults
queue_master_locator = min-masters
# Logging
log.console = true
log.console.level = info
log.file = /var/log/rabbitmq/rabbit.log
# TLS
listeners.ssl.default = 5671
ssl_options.cacertfile = /etc/rabbitmq/ca.pem
ssl_options.certfile = /etc/rabbitmq/cert.pem
ssl_options.keyfile = /etc/rabbitmq/key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = false
Environment Variables
RABBITMQ_NODENAME=rabbit@myhost
RABBITMQ_NODE_PORT=5672
RABBITMQ_LOG_BASE=/var/log/rabbitmq
RABBITMQ_MNESIA_BASE=/var/lib/rabbitmq/mnesia
RABBITMQ_ENABLED_PLUGINS_FILE=/etc/rabbitmq/enabled_plugins
Core Commands
rabbitmqctl
| Command | Description |
|---|
rabbitmqctl status | Show broker status |
rabbitmqctl cluster_status | Show cluster info |
rabbitmqctl list_queues | List all queues |
rabbitmqctl list_queues name messages consumers | Queue with stats |
rabbitmqctl list_exchanges | List exchanges |
rabbitmqctl list_bindings | List bindings |
rabbitmqctl list_connections | List connections |
rabbitmqctl list_channels | List channels |
rabbitmqctl list_consumers | List consumers |
rabbitmqctl add_user alice secret | Create user |
rabbitmqctl set_user_tags alice administrator | Set user role |
rabbitmqctl set_permissions -p / alice ".*" ".*" ".*" | Full permissions |
rabbitmqctl delete_user alice | Delete user |
rabbitmqctl add_vhost myapp | Create virtual host |
rabbitmqctl delete_vhost myapp | Delete virtual host |
rabbitmqctl purge_queue my-queue | Empty a queue |
rabbitmqctl delete_queue my-queue | Delete a queue |
rabbitmqctl stop_app | Stop the app (keep node) |
rabbitmqctl start_app | Start the app |
rabbitmqctl reset | Reset node to default |
rabbitmq-plugins
| Command | Description |
|---|
rabbitmq-plugins list | List all plugins |
rabbitmq-plugins enable rabbitmq_management | Enable management |
rabbitmq-plugins enable rabbitmq_shovel | Enable shovel |
rabbitmq-plugins enable rabbitmq_federation | Enable federation |
rabbitmq-plugins enable rabbitmq_delayed_message_exchange | Delayed messages |
rabbitmq-plugins disable rabbitmq_management | Disable plugin |
HTTP Management API
| Endpoint | Description |
|---|
GET /api/overview | Broker overview stats |
GET /api/queues | List all queues |
GET /api/queues/%2F/my-queue | Specific queue (%2F = /) |
DELETE /api/queues/%2F/my-queue/contents | Purge queue |
GET /api/exchanges | List exchanges |
GET /api/bindings | List bindings |
GET /api/connections | List connections |
GET /api/nodes | Cluster node info |
POST /api/exchanges/%2F/my-exchange/publish | Publish test message |
# Example: list queues via curl
curl -u admin:secret http://localhost:15672/api/queues | jq '.[].name'
# Purge a queue
curl -u admin:secret -X DELETE \
http://localhost:15672/api/queues/%2F/my-queue/contents
Advanced Usage
Exchange Types and Routing
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# --- DIRECT EXCHANGE ---
# Routes to queues where routing_key == binding_key
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
channel.queue_declare(queue='error_logs')
channel.queue_bind(exchange='direct_logs', queue='error_logs', routing_key='error')
channel.basic_publish(
exchange='direct_logs',
routing_key='error',
body='This is an error'
)
# --- TOPIC EXCHANGE ---
# Routing keys with wildcards: * (one word), # (zero or more words)
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
channel.queue_declare(queue='payment_errors')
channel.queue_bind(
exchange='topic_logs',
queue='payment_errors',
routing_key='payment.*.error' # matches payment.uk.error, payment.us.error
)
channel.queue_bind(
exchange='topic_logs',
queue='payment_errors',
routing_key='payment.#' # matches all payment.* events
)
# --- FANOUT EXCHANGE ---
# Broadcasts to ALL bound queues, routing key ignored
channel.exchange_declare(exchange='broadcast', exchange_type='fanout')
channel.queue_declare(queue='service_a')
channel.queue_declare(queue='service_b')
channel.queue_bind(exchange='broadcast', queue='service_a')
channel.queue_bind(exchange='broadcast', queue='service_b')
channel.basic_publish(exchange='broadcast', routing_key='', body='Hello all')
# --- HEADERS EXCHANGE ---
# Routes based on message headers, not routing key
channel.exchange_declare(exchange='headers_ex', exchange_type='headers')
channel.queue_declare(queue='pdf_reports')
channel.queue_bind(
exchange='headers_ex',
queue='pdf_reports',
routing_key='',
arguments={'format': 'pdf', 'x-match': 'all'} # all headers must match
)
channel.basic_publish(
exchange='headers_ex',
routing_key='',
properties=pika.BasicProperties(headers={'format': 'pdf', 'type': 'report'}),
body='Report data'
)
Message Acknowledgments
# Manual acknowledgment (safe — message not lost on crash)
def callback(ch, method, properties, body):
try:
process(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception:
# Requeue on failure
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
channel.basic_qos(prefetch_count=1) # one message at a time per consumer
channel.basic_consume(queue='tasks', on_message_callback=callback)
channel.start_consuming()
# Reject without requeue (sends to DLX)
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
Dead Letter Exchange (DLX)
# 1. Create the dead letter exchange
channel.exchange_declare(exchange='dlx', exchange_type='direct')
channel.queue_declare(queue='dead_letters')
channel.queue_bind(exchange='dlx', queue='dead_letters', routing_key='dead')
# 2. Attach DLX to main queue
channel.queue_declare(
queue='orders',
arguments={
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'dead',
'x-message-ttl': 30000, # auto-expire after 30s → DLX
'x-max-length': 10000, # max queue length → DLX on overflow
}
)
Priority Queues
# Declare queue with max priority level
channel.queue_declare(
queue='priority_tasks',
arguments={'x-max-priority': 10}
)
# Publish with priority (higher = processed first)
channel.basic_publish(
exchange='',
routing_key='priority_tasks',
body='High priority job',
properties=pika.BasicProperties(priority=9)
)
channel.basic_publish(
exchange='',
routing_key='priority_tasks',
body='Low priority job',
properties=pika.BasicProperties(priority=1)
)
Publisher Confirms
# Enable publisher confirms for at-least-once delivery
channel.confirm_delivery()
try:
channel.basic_publish(
exchange='',
routing_key='my-queue',
body='Important message',
properties=pika.BasicProperties(delivery_mode=2) # persistent
)
print("Message confirmed by broker")
except pika.exceptions.UnroutableError:
print("Message returned — no queue matched routing key")
Clustering
# Node 1 (already running)
rabbitmqctl status
# Node 2 — join the cluster
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
# Verify
rabbitmqctl cluster_status
# Set HA policy (mirror queues to all nodes)
rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
# Set HA policy (mirror to exactly 2 nodes)
rabbitmqctl set_policy ha-two "^ha\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
# Remove a node from cluster
rabbitmqctl forget_cluster_node rabbit@node3
Quorum Queues (Preferred over Classic Mirrored)
# Quorum queues replace mirrored queues for HA
channel.queue_declare(
queue='quorum_orders',
arguments={
'x-queue-type': 'quorum',
'x-delivery-limit': 5, # max re-deliveries before DLX
},
durable=True
)
Delayed Messages
# Enable the plugin first
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# Declare a delayed exchange
channel.exchange_declare(
exchange='delayed',
exchange_type='x-delayed-message',
arguments={'x-delayed-type': 'direct'}
)
# Publish with delay (ms)
channel.basic_publish(
exchange='delayed',
routing_key='my-queue',
body='Send me in 10 seconds',
properties=pika.BasicProperties(headers={'x-delay': 10000})
)
Common Workflows
Work Queue (Task Distribution)
# Producer
channel.queue_declare(queue='tasks', durable=True)
channel.basic_publish(
exchange='',
routing_key='tasks',
body='{"job": "resize_image", "id": 42}',
properties=pika.BasicProperties(delivery_mode=2) # persistent
)
# Consumer — fair dispatch + manual ack
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='tasks', on_message_callback=callback)
channel.start_consuming()
RPC Pattern
# Client side
import uuid
corr_id = str(uuid.uuid4())
reply_queue = channel.queue_declare(queue='', exclusive=True).method.queue
channel.basic_publish(
exchange='', routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=reply_queue,
correlation_id=corr_id
),
body='{"method": "add", "args": [30, 12]}'
)
# Server side
def on_request(ch, method, props, body):
result = process_rpc(json.loads(body))
ch.basic_publish(
exchange='', routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=json.dumps(result)
)
ch.basic_ack(delivery_tag=method.delivery_tag)
Monitor Queue Depth
# Watch queue depth every 2 seconds
watch -n 2 'rabbitmqctl list_queues name messages consumers'
# Via API
curl -s -u admin:secret http://localhost:15672/api/queues | \
jq '.[] | {name: .name, messages: .messages, consumers: .consumers}'
Tips and Best Practices
- Always declare queues as
durable=True and publish with delivery_mode=2 (persistent) for messages that must survive broker restarts.
- Use manual acknowledgments in consumers — auto-ack can lose messages if the consumer crashes between delivery and processing.
basic_qos(prefetch_count=1) ensures fair dispatch — without it, RabbitMQ sends all pending messages to the first free consumer.
- Quorum queues over classic mirrored queues for new HA deployments — they use Raft consensus and are more reliable.
- Dead letter exchanges are your safety net — always attach a DLX to production queues so rejected/expired messages are preserved, not silently dropped.
- Publisher confirms give at-least-once delivery semantics — use them for critical messages where loss is unacceptable.
- Virtual hosts provide namespace isolation — use a separate vhost per application or environment (dev/staging/prod).
- Monitor
messages_ready and messages_unacknowledged — a growing unacknowledged count indicates slow or stuck consumers.
- Lazy queues (
x-queue-mode: lazy) page messages to disk immediately, reducing RAM usage for very deep queues.
- Bind multiple routing keys to a single queue on a topic exchange instead of creating many queues with overlapping consumers.
- The management UI at
:15672 is invaluable for debugging — you can inspect queue contents, publish test messages, and trace message flow.
- Connection pooling — reuse connections and channels; creating a new TCP connection per message is expensive.