Overview
ZeroMQ (also written as ØMQ or 0MQ) is a high-performance asynchronous messaging library that provides message queuing without requiring a dedicated message broker. It gives your applications a concurrency framework with sockets that carry atomic messages across various transports including in-process, inter-process, TCP, and multicast.
Unlike traditional message brokers, ZeroMQ is a library embedded in your application. It handles I/O asynchronously in background threads, provides built-in patterns like pub-sub, request-reply, and pipeline, and scales from simple in-process communication to complex distributed architectures. ZeroMQ sockets handle reconnection, queuing, and message framing automatically.
Installation
Linux
# Ubuntu/Debian
sudo apt-get install libzmq3-dev
# CentOS/RHEL
sudo yum install zeromq-devel
# From source
wget https://github.com/zeromq/libzmq/releases/download/v4.3.5/zeromq-4.3.5.tar.gz
tar xzf zeromq-4.3.5.tar.gz
cd zeromq-4.3.5
./configure && make && sudo make install
sudo ldconfig
Language Bindings
# Python
pip install pyzmq
# Node.js
npm install zeromq
# Go
go get github.com/pebbe/zmq4
# Rust
cargo add zmq
# Ruby
gem install ffi-rzmq
Socket Types and Patterns
Request-Reply Pattern
| Socket | Role | Description |
|---|
REQ | Client | Sends request, waits for reply |
REP | Server | Receives request, sends reply |
DEALER | Async client | Asynchronous request distribution |
ROUTER | Async server | Routes replies to correct client |
Publish-Subscribe Pattern
| Socket | Role | Description |
|---|
PUB | Publisher | Sends messages to all subscribers |
SUB | Subscriber | Receives filtered messages |
XPUB | Extended pub | Receives subscription notifications |
XSUB | Extended sub | Sends subscriptions upstream |
Pipeline Pattern
| Socket | Role | Description |
|---|
PUSH | Producer | Distributes work to workers |
PULL | Worker | Receives work items |
Exclusive Pair
| Socket | Role | Description |
|---|
PAIR | Either end | One-to-one bidirectional |
Core Usage (Python)
Request-Reply
# Server (REP)
import zmq
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
while True:
message = socket.recv_string()
print(f"Received: {message}")
socket.send_string(f"Reply to: {message}")
# Client (REQ)
import zmq
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
socket.send_string("Hello")
reply = socket.recv_string()
print(f"Got: {reply}")
Publish-Subscribe
# Publisher
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5556")
while True:
socket.send_string("weather NYC 72")
socket.send_string("sports NBA score-update")
time.sleep(1)
# Subscriber
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5556")
socket.setsockopt_string(zmq.SUBSCRIBE, "weather")
while True:
message = socket.recv_string()
print(f"Received: {message}")
Push-Pull Pipeline
# Ventilator (PUSH)
import zmq
context = zmq.Context()
sender = context.socket(zmq.PUSH)
sender.bind("tcp://*:5557")
for task_id in range(100):
sender.send_json({"task": task_id, "workload": task_id * 10})
# Worker (PULL -> PUSH)
import zmq
context = zmq.Context()
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")
sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")
while True:
work = receiver.recv_json()
result = work["workload"] * 2 # process
sender.send_json({"task": work["task"], "result": result})
Configuration
Socket Options
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
# High water mark (message queue limit)
socket.setsockopt(zmq.SNDHWM, 1000)
socket.setsockopt(zmq.RCVHWM, 1000)
# Linger period on close (ms, -1=infinite, 0=discard)
socket.setsockopt(zmq.LINGER, 0)
# Reconnect interval (ms)
socket.setsockopt(zmq.RECONNECT_IVL, 100)
socket.setsockopt(zmq.RECONNECT_IVL_MAX, 30000)
# Heartbeat (for detecting dead peers)
socket.setsockopt(zmq.HEARTBEAT_IVL, 1000)
socket.setsockopt(zmq.HEARTBEAT_TIMEOUT, 5000)
# Send/receive timeout (ms, -1=infinite)
socket.setsockopt(zmq.SNDTIMEO, 5000)
socket.setsockopt(zmq.RCVTIMEO, 5000)
# TCP keepalive
socket.setsockopt(zmq.TCP_KEEPALIVE, 1)
socket.setsockopt(zmq.TCP_KEEPALIVE_IDLE, 300)
Transport Protocols
| Protocol | URI | Description |
|---|
| TCP | tcp://host:port | Network communication |
| IPC | ipc:///tmp/mysock | Inter-process (Unix) |
| Inproc | inproc://name | In-process (threads) |
| PGM | pgm://interface;group:port | Reliable multicast |
Advanced Usage
ROUTER-DEALER Async Pattern
# Async server with ROUTER
import zmq
context = zmq.Context()
router = context.socket(zmq.ROUTER)
router.bind("tcp://*:5555")
while True:
identity, empty, message = router.recv_multipart()
reply = process(message)
router.send_multipart([identity, b"", reply])
Poller for Multiple Sockets
import zmq
poller = zmq.Poller()
poller.register(receiver, zmq.POLLIN)
poller.register(subscriber, zmq.POLLIN)
while True:
socks = dict(poller.poll(timeout=1000))
if receiver in socks:
msg = receiver.recv_json()
if subscriber in socks:
topic, data = subscriber.recv_multipart()
Proxy (Built-in Broker)
import zmq
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")
backend = context.socket(zmq.DEALER)
backend.bind("tcp://*:5560")
# Built-in proxy (blocks forever)
zmq.proxy(frontend, backend)
CURVE Security (Encryption + Auth)
import zmq.auth
from zmq.auth.thread import ThreadAuthenticator
context = zmq.Context()
auth = ThreadAuthenticator(context)
auth.start()
auth.configure_curve(domain='*', location='/path/to/authorized_keys')
server = context.socket(zmq.REP)
server_public, server_secret = zmq.auth.load_certificate('/path/to/server.key_secret')
server.curve_secretkey = server_secret
server.curve_publickey = server_public
server.curve_server = True
server.bind("tcp://*:5555")
Troubleshooting
| Issue | Solution |
|---|
| Subscriber receives nothing | Subscribers need time to connect; add a brief sleep after connecting before sending |
| Messages dropped silently | Check HWM settings; increase SNDHWM/RCVHWM or slow down producer |
| Socket hangs on close | Set LINGER to 0 before closing; call context.term() |
| ”Address already in use” | Previous socket not properly closed; set LINGER=0 and ensure socket.close() |
| Memory growing | Messages queuing due to slow consumer; tune HWM and add back-pressure |
| REQ-REP lockstep violation | REQ/REP must alternate send/recv; use DEALER/ROUTER for async |
| Multipart message issues | All parts must be sent with SNDMORE flag except the last one |
| Context won’t terminate | Lingering sockets prevent termination; close all sockets with LINGER=0 |