Pular para o conteúdo

RabbitMQ

Installation

# Docker (quickest)
docker run -d \
  --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=secret \
  rabbitmq:3-management

# Docker Compose
services:
  rabbitmq:
    image: rabbitmq:3-management
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: secret
    ports:
      - "5672:5672"    # AMQP
      - "15672:15672"  # Management UI
      - "5671:5671"    # AMQPS (TLS)
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
volumes:
  rabbitmq_data:

# Ubuntu / Debian
sudo apt install rabbitmq-server
sudo systemctl enable --now rabbitmq-server

# macOS
brew install rabbitmq
brew services start rabbitmq

# Enable management plugin
rabbitmq-plugins enable rabbitmq_management

Management UI: http://localhost:15672

Configuration

rabbitmq.conf (modern format)

# Networking
listeners.tcp.default = 5672
management.listener.port = 15672

# Authentication
default_user = admin
default_pass = secret
default_vhost = /

# Resource limits
vm_memory_high_watermark.relative = 0.6
disk_free_limit.absolute = 2GB

# Queue defaults
queue_master_locator = min-masters

# Logging
log.console = true
log.console.level = info
log.file = /var/log/rabbitmq/rabbit.log

# TLS
listeners.ssl.default = 5671
ssl_options.cacertfile = /etc/rabbitmq/ca.pem
ssl_options.certfile   = /etc/rabbitmq/cert.pem
ssl_options.keyfile    = /etc/rabbitmq/key.pem
ssl_options.verify     = verify_peer
ssl_options.fail_if_no_peer_cert = false

Environment Variables

RABBITMQ_NODENAME=rabbit@myhost
RABBITMQ_NODE_PORT=5672
RABBITMQ_LOG_BASE=/var/log/rabbitmq
RABBITMQ_MNESIA_BASE=/var/lib/rabbitmq/mnesia
RABBITMQ_ENABLED_PLUGINS_FILE=/etc/rabbitmq/enabled_plugins

Core Commands

rabbitmqctl

CommandDescription
rabbitmqctl statusShow broker status
rabbitmqctl cluster_statusShow cluster info
rabbitmqctl list_queuesList all queues
rabbitmqctl list_queues name messages consumersQueue with stats
rabbitmqctl list_exchangesList exchanges
rabbitmqctl list_bindingsList bindings
rabbitmqctl list_connectionsList connections
rabbitmqctl list_channelsList channels
rabbitmqctl list_consumersList consumers
rabbitmqctl add_user alice secretCreate user
rabbitmqctl set_user_tags alice administratorSet user role
rabbitmqctl set_permissions -p / alice ".*" ".*" ".*"Full permissions
rabbitmqctl delete_user aliceDelete user
rabbitmqctl add_vhost myappCreate virtual host
rabbitmqctl delete_vhost myappDelete virtual host
rabbitmqctl purge_queue my-queueEmpty a queue
rabbitmqctl delete_queue my-queueDelete a queue
rabbitmqctl stop_appStop the app (keep node)
rabbitmqctl start_appStart the app
rabbitmqctl resetReset node to default

rabbitmq-plugins

CommandDescription
rabbitmq-plugins listList all plugins
rabbitmq-plugins enable rabbitmq_managementEnable management
rabbitmq-plugins enable rabbitmq_shovelEnable shovel
rabbitmq-plugins enable rabbitmq_federationEnable federation
rabbitmq-plugins enable rabbitmq_delayed_message_exchangeDelayed messages
rabbitmq-plugins disable rabbitmq_managementDisable plugin

HTTP Management API

EndpointDescription
GET /api/overviewBroker overview stats
GET /api/queuesList all queues
GET /api/queues/%2F/my-queueSpecific queue (%2F = /)
DELETE /api/queues/%2F/my-queue/contentsPurge queue
GET /api/exchangesList exchanges
GET /api/bindingsList bindings
GET /api/connectionsList connections
GET /api/nodesCluster node info
POST /api/exchanges/%2F/my-exchange/publishPublish test message
# Example: list queues via curl
curl -u admin:secret http://localhost:15672/api/queues | jq '.[].name'

# Purge a queue
curl -u admin:secret -X DELETE \
  http://localhost:15672/api/queues/%2F/my-queue/contents

Advanced Usage

Exchange Types and Routing

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# --- DIRECT EXCHANGE ---
# Routes to queues where routing_key == binding_key
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
channel.queue_declare(queue='error_logs')
channel.queue_bind(exchange='direct_logs', queue='error_logs', routing_key='error')

channel.basic_publish(
    exchange='direct_logs',
    routing_key='error',
    body='This is an error'
)

# --- TOPIC EXCHANGE ---
# Routing keys with wildcards: * (one word), # (zero or more words)
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
channel.queue_declare(queue='payment_errors')
channel.queue_bind(
    exchange='topic_logs',
    queue='payment_errors',
    routing_key='payment.*.error'   # matches payment.uk.error, payment.us.error
)
channel.queue_bind(
    exchange='topic_logs',
    queue='payment_errors',
    routing_key='payment.#'         # matches all payment.* events
)

# --- FANOUT EXCHANGE ---
# Broadcasts to ALL bound queues, routing key ignored
channel.exchange_declare(exchange='broadcast', exchange_type='fanout')
channel.queue_declare(queue='service_a')
channel.queue_declare(queue='service_b')
channel.queue_bind(exchange='broadcast', queue='service_a')
channel.queue_bind(exchange='broadcast', queue='service_b')

channel.basic_publish(exchange='broadcast', routing_key='', body='Hello all')

# --- HEADERS EXCHANGE ---
# Routes based on message headers, not routing key
channel.exchange_declare(exchange='headers_ex', exchange_type='headers')
channel.queue_declare(queue='pdf_reports')
channel.queue_bind(
    exchange='headers_ex',
    queue='pdf_reports',
    routing_key='',
    arguments={'format': 'pdf', 'x-match': 'all'}  # all headers must match
)

channel.basic_publish(
    exchange='headers_ex',
    routing_key='',
    properties=pika.BasicProperties(headers={'format': 'pdf', 'type': 'report'}),
    body='Report data'
)

Message Acknowledgments

# Manual acknowledgment (safe — message not lost on crash)
def callback(ch, method, properties, body):
    try:
        process(body)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception:
        # Requeue on failure
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

channel.basic_qos(prefetch_count=1)  # one message at a time per consumer
channel.basic_consume(queue='tasks', on_message_callback=callback)
channel.start_consuming()

# Reject without requeue (sends to DLX)
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)

Dead Letter Exchange (DLX)

# 1. Create the dead letter exchange
channel.exchange_declare(exchange='dlx', exchange_type='direct')
channel.queue_declare(queue='dead_letters')
channel.queue_bind(exchange='dlx', queue='dead_letters', routing_key='dead')

# 2. Attach DLX to main queue
channel.queue_declare(
    queue='orders',
    arguments={
        'x-dead-letter-exchange': 'dlx',
        'x-dead-letter-routing-key': 'dead',
        'x-message-ttl': 30000,          # auto-expire after 30s → DLX
        'x-max-length': 10000,           # max queue length → DLX on overflow
    }
)

Priority Queues

# Declare queue with max priority level
channel.queue_declare(
    queue='priority_tasks',
    arguments={'x-max-priority': 10}
)

# Publish with priority (higher = processed first)
channel.basic_publish(
    exchange='',
    routing_key='priority_tasks',
    body='High priority job',
    properties=pika.BasicProperties(priority=9)
)

channel.basic_publish(
    exchange='',
    routing_key='priority_tasks',
    body='Low priority job',
    properties=pika.BasicProperties(priority=1)
)

Publisher Confirms

# Enable publisher confirms for at-least-once delivery
channel.confirm_delivery()

try:
    channel.basic_publish(
        exchange='',
        routing_key='my-queue',
        body='Important message',
        properties=pika.BasicProperties(delivery_mode=2)  # persistent
    )
    print("Message confirmed by broker")
except pika.exceptions.UnroutableError:
    print("Message returned — no queue matched routing key")

Clustering

# Node 1 (already running)
rabbitmqctl status

# Node 2 — join the cluster
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app

# Verify
rabbitmqctl cluster_status

# Set HA policy (mirror queues to all nodes)
rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

# Set HA policy (mirror to exactly 2 nodes)
rabbitmqctl set_policy ha-two "^ha\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

# Remove a node from cluster
rabbitmqctl forget_cluster_node rabbit@node3

Quorum Queues (Preferred over Classic Mirrored)

# Quorum queues replace mirrored queues for HA
channel.queue_declare(
    queue='quorum_orders',
    arguments={
        'x-queue-type': 'quorum',
        'x-delivery-limit': 5,          # max re-deliveries before DLX
    },
    durable=True
)

Delayed Messages

# Enable the plugin first
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# Declare a delayed exchange
channel.exchange_declare(
    exchange='delayed',
    exchange_type='x-delayed-message',
    arguments={'x-delayed-type': 'direct'}
)

# Publish with delay (ms)
channel.basic_publish(
    exchange='delayed',
    routing_key='my-queue',
    body='Send me in 10 seconds',
    properties=pika.BasicProperties(headers={'x-delay': 10000})
)

Common Workflows

Work Queue (Task Distribution)

# Producer
channel.queue_declare(queue='tasks', durable=True)
channel.basic_publish(
    exchange='',
    routing_key='tasks',
    body='{"job": "resize_image", "id": 42}',
    properties=pika.BasicProperties(delivery_mode=2)  # persistent
)

# Consumer — fair dispatch + manual ack
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='tasks', on_message_callback=callback)
channel.start_consuming()

RPC Pattern

# Client side
import uuid

corr_id = str(uuid.uuid4())
reply_queue = channel.queue_declare(queue='', exclusive=True).method.queue

channel.basic_publish(
    exchange='', routing_key='rpc_queue',
    properties=pika.BasicProperties(
        reply_to=reply_queue,
        correlation_id=corr_id
    ),
    body='{"method": "add", "args": [30, 12]}'
)

# Server side
def on_request(ch, method, props, body):
    result = process_rpc(json.loads(body))
    ch.basic_publish(
        exchange='', routing_key=props.reply_to,
        properties=pika.BasicProperties(correlation_id=props.correlation_id),
        body=json.dumps(result)
    )
    ch.basic_ack(delivery_tag=method.delivery_tag)

Monitor Queue Depth

# Watch queue depth every 2 seconds
watch -n 2 'rabbitmqctl list_queues name messages consumers'

# Via API
curl -s -u admin:secret http://localhost:15672/api/queues | \
  jq '.[] | {name: .name, messages: .messages, consumers: .consumers}'

Tips and Best Practices

  • Always declare queues as durable=True and publish with delivery_mode=2 (persistent) for messages that must survive broker restarts.
  • Use manual acknowledgments in consumers — auto-ack can lose messages if the consumer crashes between delivery and processing.
  • basic_qos(prefetch_count=1) ensures fair dispatch — without it, RabbitMQ sends all pending messages to the first free consumer.
  • Quorum queues over classic mirrored queues for new HA deployments — they use Raft consensus and are more reliable.
  • Dead letter exchanges are your safety net — always attach a DLX to production queues so rejected/expired messages are preserved, not silently dropped.
  • Publisher confirms give at-least-once delivery semantics — use them for critical messages where loss is unacceptable.
  • Virtual hosts provide namespace isolation — use a separate vhost per application or environment (dev/staging/prod).
  • Monitor messages_ready and messages_unacknowledged — a growing unacknowledged count indicates slow or stuck consumers.
  • Lazy queues (x-queue-mode: lazy) page messages to disk immediately, reducing RAM usage for very deep queues.
  • Bind multiple routing keys to a single queue on a topic exchange instead of creating many queues with overlapping consumers.
  • The management UI at :15672 is invaluable for debugging — you can inspect queue contents, publish test messages, and trace message flow.
  • Connection pooling — reuse connections and channels; creating a new TCP connection per message is expensive.