Zum Inhalt springen

ZeroMQ Cheat Sheet

Overview

ZeroMQ (also written as ØMQ or 0MQ) is a high-performance asynchronous messaging library that provides message queuing without requiring a dedicated message broker. It gives your applications a concurrency framework with sockets that carry atomic messages across various transports including in-process, inter-process, TCP, and multicast.

Unlike traditional message brokers, ZeroMQ is a library embedded in your application. It handles I/O asynchronously in background threads, provides built-in patterns like pub-sub, request-reply, and pipeline, and scales from simple in-process communication to complex distributed architectures. ZeroMQ sockets handle reconnection, queuing, and message framing automatically.

Installation

Linux

# Ubuntu/Debian
sudo apt-get install libzmq3-dev

# CentOS/RHEL
sudo yum install zeromq-devel

# From source
wget https://github.com/zeromq/libzmq/releases/download/v4.3.5/zeromq-4.3.5.tar.gz
tar xzf zeromq-4.3.5.tar.gz
cd zeromq-4.3.5
./configure && make && sudo make install
sudo ldconfig

Language Bindings

# Python
pip install pyzmq

# Node.js
npm install zeromq

# Go
go get github.com/pebbe/zmq4

# Rust
cargo add zmq

# Ruby
gem install ffi-rzmq

Socket Types and Patterns

Request-Reply Pattern

SocketRoleDescription
REQClientSends request, waits for reply
REPServerReceives request, sends reply
DEALERAsync clientAsynchronous request distribution
ROUTERAsync serverRoutes replies to correct client

Publish-Subscribe Pattern

SocketRoleDescription
PUBPublisherSends messages to all subscribers
SUBSubscriberReceives filtered messages
XPUBExtended pubReceives subscription notifications
XSUBExtended subSends subscriptions upstream

Pipeline Pattern

SocketRoleDescription
PUSHProducerDistributes work to workers
PULLWorkerReceives work items

Exclusive Pair

SocketRoleDescription
PAIREither endOne-to-one bidirectional

Core Usage (Python)

Request-Reply

# Server (REP)
import zmq

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")

while True:
    message = socket.recv_string()
    print(f"Received: {message}")
    socket.send_string(f"Reply to: {message}")
# Client (REQ)
import zmq

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")

socket.send_string("Hello")
reply = socket.recv_string()
print(f"Got: {reply}")

Publish-Subscribe

# Publisher
import zmq
import time

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5556")

while True:
    socket.send_string("weather NYC 72")
    socket.send_string("sports NBA score-update")
    time.sleep(1)
# Subscriber
import zmq

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5556")
socket.setsockopt_string(zmq.SUBSCRIBE, "weather")

while True:
    message = socket.recv_string()
    print(f"Received: {message}")

Push-Pull Pipeline

# Ventilator (PUSH)
import zmq

context = zmq.Context()
sender = context.socket(zmq.PUSH)
sender.bind("tcp://*:5557")

for task_id in range(100):
    sender.send_json({"task": task_id, "workload": task_id * 10})
# Worker (PULL -> PUSH)
import zmq

context = zmq.Context()
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")

sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")

while True:
    work = receiver.recv_json()
    result = work["workload"] * 2  # process
    sender.send_json({"task": work["task"], "result": result})

Configuration

Socket Options

import zmq

context = zmq.Context()
socket = context.socket(zmq.PUB)

# High water mark (message queue limit)
socket.setsockopt(zmq.SNDHWM, 1000)
socket.setsockopt(zmq.RCVHWM, 1000)

# Linger period on close (ms, -1=infinite, 0=discard)
socket.setsockopt(zmq.LINGER, 0)

# Reconnect interval (ms)
socket.setsockopt(zmq.RECONNECT_IVL, 100)
socket.setsockopt(zmq.RECONNECT_IVL_MAX, 30000)

# Heartbeat (for detecting dead peers)
socket.setsockopt(zmq.HEARTBEAT_IVL, 1000)
socket.setsockopt(zmq.HEARTBEAT_TIMEOUT, 5000)

# Send/receive timeout (ms, -1=infinite)
socket.setsockopt(zmq.SNDTIMEO, 5000)
socket.setsockopt(zmq.RCVTIMEO, 5000)

# TCP keepalive
socket.setsockopt(zmq.TCP_KEEPALIVE, 1)
socket.setsockopt(zmq.TCP_KEEPALIVE_IDLE, 300)

Transport Protocols

ProtocolURIDescription
TCPtcp://host:portNetwork communication
IPCipc:///tmp/mysockInter-process (Unix)
Inprocinproc://nameIn-process (threads)
PGMpgm://interface;group:portReliable multicast

Advanced Usage

ROUTER-DEALER Async Pattern

# Async server with ROUTER
import zmq

context = zmq.Context()
router = context.socket(zmq.ROUTER)
router.bind("tcp://*:5555")

while True:
    identity, empty, message = router.recv_multipart()
    reply = process(message)
    router.send_multipart([identity, b"", reply])

Poller for Multiple Sockets

import zmq

poller = zmq.Poller()
poller.register(receiver, zmq.POLLIN)
poller.register(subscriber, zmq.POLLIN)

while True:
    socks = dict(poller.poll(timeout=1000))

    if receiver in socks:
        msg = receiver.recv_json()

    if subscriber in socks:
        topic, data = subscriber.recv_multipart()

Proxy (Built-in Broker)

import zmq

context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")

backend = context.socket(zmq.DEALER)
backend.bind("tcp://*:5560")

# Built-in proxy (blocks forever)
zmq.proxy(frontend, backend)

CURVE Security (Encryption + Auth)

import zmq.auth
from zmq.auth.thread import ThreadAuthenticator

context = zmq.Context()
auth = ThreadAuthenticator(context)
auth.start()
auth.configure_curve(domain='*', location='/path/to/authorized_keys')

server = context.socket(zmq.REP)
server_public, server_secret = zmq.auth.load_certificate('/path/to/server.key_secret')
server.curve_secretkey = server_secret
server.curve_publickey = server_public
server.curve_server = True
server.bind("tcp://*:5555")

Troubleshooting

IssueSolution
Subscriber receives nothingSubscribers need time to connect; add a brief sleep after connecting before sending
Messages dropped silentlyCheck HWM settings; increase SNDHWM/RCVHWM or slow down producer
Socket hangs on closeSet LINGER to 0 before closing; call context.term()
”Address already in use”Previous socket not properly closed; set LINGER=0 and ensure socket.close()
Memory growingMessages queuing due to slow consumer; tune HWM and add back-pressure
REQ-REP lockstep violationREQ/REP must alternate send/recv; use DEALER/ROUTER for async
Multipart message issuesAll parts must be sent with SNDMORE flag except the last one
Context won’t terminateLingering sockets prevent termination; close all sockets with LINGER=0