Redis Streams Cheat Sheet
Overview
Redis Streams is a data structure introduced in Redis 5.0 that models a log-like append-only data structure with consumer groups. It combines the simplicity and performance of Redis with features inspired by Apache Kafka, providing an efficient way to implement event sourcing, message queues, and real-time data pipelines.
Redis Streams supports consumer groups for parallel processing, message acknowledgment for reliable delivery, range queries for historical data access, and automatic ID generation with time-based ordering. Streams are memory-efficient and support trimming to cap memory usage, making them suitable for high-throughput event streaming within Redis deployments.
Installation
# Redis 5.0+ is required for Streams
# Docker
docker run -d --name redis -p 6379:6379 redis:7
# Ubuntu/Debian
sudo apt-get install redis-server
# macOS
brew install redis
brew services start redis
# Verify Streams support
redis-cli INFO server | grep redis_version
Core Stream Commands
Adding Messages
# Add a message with auto-generated ID
redis-cli XADD mystream '*' sensor_id 1234 temperature 23.5 humidity 60
# Add with specific ID (timestamp-sequence)
redis-cli XADD mystream '1716000000000-0' event_type login user_id 42
# Add with maxlen trim (keep at most 1000 entries)
redis-cli XADD mystream MAXLEN '~' 1000 '*' key value
# Add with MINID trim (remove entries older than ID)
redis-cli XADD mystream MINID '~' 1716000000000-0 '*' key value
# NOMKSTREAM — don't create stream if it doesn't exist
redis-cli XADD mystream NOMKSTREAM '*' key value
Reading Messages
# Read from beginning
redis-cli XRANGE mystream - +
# Read last 10 messages
redis-cli XRANGE mystream - + COUNT 10
# Read from specific ID forward
redis-cli XRANGE mystream 1716000000000-0 +
# Reverse read (newest first)
redis-cli XREVRANGE mystream + - COUNT 5
# Read new messages (blocking)
redis-cli XREAD COUNT 10 BLOCK 5000 STREAMS mystream '$'
# Read from multiple streams
redis-cli XREAD COUNT 10 BLOCK 0 STREAMS stream1 stream2 '$' '$'
# Read from specific position
redis-cli XREAD COUNT 10 STREAMS mystream 1716000000000-0
Stream Info
# Stream metadata
redis-cli XINFO STREAM mystream
# Full stream info with entries
redis-cli XINFO STREAM mystream FULL COUNT 10
# List consumer groups
redis-cli XINFO GROUPS mystream
# List consumers in a group
redis-cli XINFO CONSUMERS mystream mygroup
# Stream length
redis-cli XLEN mystream
Consumer Groups
Create and Manage Groups
# Create consumer group (read from beginning)
redis-cli XGROUP CREATE mystream mygroup 0
# Create consumer group (read only new messages)
redis-cli XGROUP CREATE mystream mygroup '$'
# Create stream and group together
redis-cli XGROUP CREATE mystream mygroup '$' MKSTREAM
# Set group last-delivered ID
redis-cli XGROUP SETID mystream mygroup 0
# Delete a consumer from group
redis-cli XGROUP DELCONSUMER mystream mygroup consumer-1
# Delete a consumer group
redis-cli XGROUP DESTROY mystream mygroup
Reading with Consumer Groups
# Read as consumer in group (get pending messages)
redis-cli XREADGROUP GROUP mygroup consumer-1 COUNT 10 BLOCK 5000 STREAMS mystream '>'
# Read only unacknowledged messages for this consumer
redis-cli XREADGROUP GROUP mygroup consumer-1 COUNT 10 STREAMS mystream 0
# Acknowledge message processing
redis-cli XACK mystream mygroup 1716000000000-0
# Acknowledge multiple messages
redis-cli XACK mystream mygroup 1716000000000-0 1716000000001-0 1716000000002-0
Pending Messages
# List pending messages summary
redis-cli XPENDING mystream mygroup
# List pending messages with details
redis-cli XPENDING mystream mygroup - + 10
# List pending for specific consumer
redis-cli XPENDING mystream mygroup - + 10 consumer-1
# Claim idle messages (reassign to another consumer)
redis-cli XCLAIM mystream mygroup consumer-2 60000 1716000000000-0
# Auto-claim (claim oldest idle messages)
redis-cli XAUTOCLAIM mystream mygroup consumer-2 60000 0-0 COUNT 10
Command Reference
| Command | Description |
|---|---|
XADD | Append message to stream |
XREAD | Read messages from streams |
XRANGE | Read range of messages |
XREVRANGE | Read range in reverse |
XLEN | Get stream length |
XINFO | Get stream/group/consumer info |
XDEL | Delete specific messages |
XTRIM | Trim stream to max length |
XGROUP CREATE | Create consumer group |
XREADGROUP | Read as consumer group member |
XACK | Acknowledge message |
XPENDING | List pending messages |
XCLAIM | Claim pending messages |
XAUTOCLAIM | Auto-claim idle messages |
Programming Examples
Python (redis-py)
import redis
import time
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# Producer
def produce():
for i in range(100):
r.xadd('events', {
'event_type': 'order_created',
'order_id': str(i),
'amount': str(i * 10.5),
'timestamp': str(time.time()),
}, maxlen=10000)
# Consumer with group
def consume():
group = 'processors'
consumer = 'worker-1'
# Create group if not exists
try:
r.xgroup_create('events', group, id='0', mkstream=True)
except redis.exceptions.ResponseError:
pass # Group already exists
while True:
messages = r.xreadgroup(
groupname=group,
consumername=consumer,
streams={'events': '>'},
count=10,
block=5000,
)
for stream, entries in messages:
for msg_id, data in entries:
print(f"Processing {msg_id}: {data}")
# Process message...
r.xack('events', group, msg_id)
Node.js (ioredis)
const Redis = require('ioredis');
const redis = new Redis();
// Producer
async function produce(data) {
const id = await redis.xadd('events', '*',
'event_type', data.type,
'payload', JSON.stringify(data.payload)
);
console.log(`Added message ${id}`);
}
// Consumer group
async function consume() {
const group = 'processors';
const consumer = 'worker-1';
try {
await redis.xgroup('CREATE', 'events', group, '0', 'MKSTREAM');
} catch (e) {
// Group exists
}
while (true) {
const results = await redis.xreadgroup(
'GROUP', group, consumer,
'COUNT', 10,
'BLOCK', 5000,
'STREAMS', 'events', '>'
);
if (results) {
for (const [stream, messages] of results) {
for (const [id, fields] of messages) {
console.log(`Processing ${id}:`, fields);
await redis.xack('events', group, id);
}
}
}
}
}
Configuration
Memory Management
# Trim stream to max entries
redis-cli XTRIM mystream MAXLEN 100000
# Approximate trim (faster, ~100 extra entries)
redis-cli XTRIM mystream MAXLEN '~' 100000
# Trim by minimum ID
redis-cli XTRIM mystream MINID '~' 1716000000000-0
# Auto-trim on XADD
redis-cli XADD mystream MAXLEN '~' 50000 '*' key value
Redis Configuration
# redis.conf settings for streams
# Max memory policy (noeviction recommended for streams)
maxmemory-policy noeviction
# Stream node max bytes (default 4096)
stream-node-max-bytes 4096
# Stream node max entries (default 100)
stream-node-max-entries 100
Advanced Patterns
Dead Letter Queue
def process_with_dlq(r, stream, group, consumer, max_retries=3):
# Check for messages pending too long
pending = r.xpending_range(stream, group, '-', '+', 100)
for msg in pending:
if msg['times_delivered'] > max_retries:
# Move to dead letter stream
data = r.xrange(stream, msg['message_id'], msg['message_id'])
if data:
r.xadd(f'{stream}:dlq', data[0][1])
r.xack(stream, group, msg['message_id'])
Exactly-Once Processing (Idempotency)
def idempotent_process(r, stream, group, consumer):
messages = r.xreadgroup(group, consumer, {stream: '>'}, count=1, block=5000)
for s, entries in messages:
for msg_id, data in entries:
# Use message ID as idempotency key
if not r.sismember('processed_ids', msg_id):
process(data)
r.sadd('processed_ids', msg_id)
r.xack(stream, group, msg_id)
Monitoring
# Stream statistics
redis-cli XINFO STREAM mystream
# Memory usage
redis-cli MEMORY USAGE mystream
# Monitor stream commands in real-time
redis-cli MONITOR | grep -i 'xadd\|xread\|xack'
# Key metrics to watch:
# - Stream length (XLEN)
# - Pending entry count (XPENDING summary)
# - Consumer lag (last-delivered-id vs stream last ID)
# - Memory usage per stream
Troubleshooting
| Issue | Solution |
|---|---|
| Messages not delivered | Verify consumer is reading with > for new messages |
| Pending messages growing | Ensure consumers call XACK after processing |
| Memory growing unbounded | Use MAXLEN on XADD or periodic XTRIM |
| Consumer group not created | Use MKSTREAM flag; ensure stream exists first |
| Slow consumers blocking others | Add more consumers to the group; increase COUNT per read |
| Messages processing twice | Implement idempotency; ensure XACK runs after processing |
| XCLAIM not working | Check minimum idle time parameter; verify message IDs exist |
| Stream too large | Set aggressive MAXLEN; archive old data to external storage |