Salta ai contenuti

Redis Streams Cheat Sheet

Overview

Redis Streams is a data structure introduced in Redis 5.0 that models a log-like append-only data structure with consumer groups. It combines the simplicity and performance of Redis with features inspired by Apache Kafka, providing an efficient way to implement event sourcing, message queues, and real-time data pipelines.

Redis Streams supports consumer groups for parallel processing, message acknowledgment for reliable delivery, range queries for historical data access, and automatic ID generation with time-based ordering. Streams are memory-efficient and support trimming to cap memory usage, making them suitable for high-throughput event streaming within Redis deployments.

Installation

# Redis 5.0+ is required for Streams
# Docker
docker run -d --name redis -p 6379:6379 redis:7

# Ubuntu/Debian
sudo apt-get install redis-server

# macOS
brew install redis
brew services start redis

# Verify Streams support
redis-cli INFO server | grep redis_version

Core Stream Commands

Adding Messages

# Add a message with auto-generated ID
redis-cli XADD mystream '*' sensor_id 1234 temperature 23.5 humidity 60

# Add with specific ID (timestamp-sequence)
redis-cli XADD mystream '1716000000000-0' event_type login user_id 42

# Add with maxlen trim (keep at most 1000 entries)
redis-cli XADD mystream MAXLEN '~' 1000 '*' key value

# Add with MINID trim (remove entries older than ID)
redis-cli XADD mystream MINID '~' 1716000000000-0 '*' key value

# NOMKSTREAM — don't create stream if it doesn't exist
redis-cli XADD mystream NOMKSTREAM '*' key value

Reading Messages

# Read from beginning
redis-cli XRANGE mystream - +

# Read last 10 messages
redis-cli XRANGE mystream - + COUNT 10

# Read from specific ID forward
redis-cli XRANGE mystream 1716000000000-0 +

# Reverse read (newest first)
redis-cli XREVRANGE mystream + - COUNT 5

# Read new messages (blocking)
redis-cli XREAD COUNT 10 BLOCK 5000 STREAMS mystream '$'

# Read from multiple streams
redis-cli XREAD COUNT 10 BLOCK 0 STREAMS stream1 stream2 '$' '$'

# Read from specific position
redis-cli XREAD COUNT 10 STREAMS mystream 1716000000000-0

Stream Info

# Stream metadata
redis-cli XINFO STREAM mystream

# Full stream info with entries
redis-cli XINFO STREAM mystream FULL COUNT 10

# List consumer groups
redis-cli XINFO GROUPS mystream

# List consumers in a group
redis-cli XINFO CONSUMERS mystream mygroup

# Stream length
redis-cli XLEN mystream

Consumer Groups

Create and Manage Groups

# Create consumer group (read from beginning)
redis-cli XGROUP CREATE mystream mygroup 0

# Create consumer group (read only new messages)
redis-cli XGROUP CREATE mystream mygroup '$'

# Create stream and group together
redis-cli XGROUP CREATE mystream mygroup '$' MKSTREAM

# Set group last-delivered ID
redis-cli XGROUP SETID mystream mygroup 0

# Delete a consumer from group
redis-cli XGROUP DELCONSUMER mystream mygroup consumer-1

# Delete a consumer group
redis-cli XGROUP DESTROY mystream mygroup

Reading with Consumer Groups

# Read as consumer in group (get pending messages)
redis-cli XREADGROUP GROUP mygroup consumer-1 COUNT 10 BLOCK 5000 STREAMS mystream '>'

# Read only unacknowledged messages for this consumer
redis-cli XREADGROUP GROUP mygroup consumer-1 COUNT 10 STREAMS mystream 0

# Acknowledge message processing
redis-cli XACK mystream mygroup 1716000000000-0

# Acknowledge multiple messages
redis-cli XACK mystream mygroup 1716000000000-0 1716000000001-0 1716000000002-0

Pending Messages

# List pending messages summary
redis-cli XPENDING mystream mygroup

# List pending messages with details
redis-cli XPENDING mystream mygroup - + 10

# List pending for specific consumer
redis-cli XPENDING mystream mygroup - + 10 consumer-1

# Claim idle messages (reassign to another consumer)
redis-cli XCLAIM mystream mygroup consumer-2 60000 1716000000000-0

# Auto-claim (claim oldest idle messages)
redis-cli XAUTOCLAIM mystream mygroup consumer-2 60000 0-0 COUNT 10

Command Reference

CommandDescription
XADDAppend message to stream
XREADRead messages from streams
XRANGERead range of messages
XREVRANGERead range in reverse
XLENGet stream length
XINFOGet stream/group/consumer info
XDELDelete specific messages
XTRIMTrim stream to max length
XGROUP CREATECreate consumer group
XREADGROUPRead as consumer group member
XACKAcknowledge message
XPENDINGList pending messages
XCLAIMClaim pending messages
XAUTOCLAIMAuto-claim idle messages

Programming Examples

Python (redis-py)

import redis
import time

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# Producer
def produce():
    for i in range(100):
        r.xadd('events', {
            'event_type': 'order_created',
            'order_id': str(i),
            'amount': str(i * 10.5),
            'timestamp': str(time.time()),
        }, maxlen=10000)

# Consumer with group
def consume():
    group = 'processors'
    consumer = 'worker-1'

    # Create group if not exists
    try:
        r.xgroup_create('events', group, id='0', mkstream=True)
    except redis.exceptions.ResponseError:
        pass  # Group already exists

    while True:
        messages = r.xreadgroup(
            groupname=group,
            consumername=consumer,
            streams={'events': '>'},
            count=10,
            block=5000,
        )

        for stream, entries in messages:
            for msg_id, data in entries:
                print(f"Processing {msg_id}: {data}")
                # Process message...
                r.xack('events', group, msg_id)

Node.js (ioredis)

const Redis = require('ioredis');
const redis = new Redis();

// Producer
async function produce(data) {
  const id = await redis.xadd('events', '*',
    'event_type', data.type,
    'payload', JSON.stringify(data.payload)
  );
  console.log(`Added message ${id}`);
}

// Consumer group
async function consume() {
  const group = 'processors';
  const consumer = 'worker-1';

  try {
    await redis.xgroup('CREATE', 'events', group, '0', 'MKSTREAM');
  } catch (e) {
    // Group exists
  }

  while (true) {
    const results = await redis.xreadgroup(
      'GROUP', group, consumer,
      'COUNT', 10,
      'BLOCK', 5000,
      'STREAMS', 'events', '>'
    );

    if (results) {
      for (const [stream, messages] of results) {
        for (const [id, fields] of messages) {
          console.log(`Processing ${id}:`, fields);
          await redis.xack('events', group, id);
        }
      }
    }
  }
}

Configuration

Memory Management

# Trim stream to max entries
redis-cli XTRIM mystream MAXLEN 100000

# Approximate trim (faster, ~100 extra entries)
redis-cli XTRIM mystream MAXLEN '~' 100000

# Trim by minimum ID
redis-cli XTRIM mystream MINID '~' 1716000000000-0

# Auto-trim on XADD
redis-cli XADD mystream MAXLEN '~' 50000 '*' key value

Redis Configuration

# redis.conf settings for streams
# Max memory policy (noeviction recommended for streams)
maxmemory-policy noeviction

# Stream node max bytes (default 4096)
stream-node-max-bytes 4096

# Stream node max entries (default 100)
stream-node-max-entries 100

Advanced Patterns

Dead Letter Queue

def process_with_dlq(r, stream, group, consumer, max_retries=3):
    # Check for messages pending too long
    pending = r.xpending_range(stream, group, '-', '+', 100)

    for msg in pending:
        if msg['times_delivered'] > max_retries:
            # Move to dead letter stream
            data = r.xrange(stream, msg['message_id'], msg['message_id'])
            if data:
                r.xadd(f'{stream}:dlq', data[0][1])
            r.xack(stream, group, msg['message_id'])

Exactly-Once Processing (Idempotency)

def idempotent_process(r, stream, group, consumer):
    messages = r.xreadgroup(group, consumer, {stream: '>'}, count=1, block=5000)

    for s, entries in messages:
        for msg_id, data in entries:
            # Use message ID as idempotency key
            if not r.sismember('processed_ids', msg_id):
                process(data)
                r.sadd('processed_ids', msg_id)
            r.xack(stream, group, msg_id)

Monitoring

# Stream statistics
redis-cli XINFO STREAM mystream

# Memory usage
redis-cli MEMORY USAGE mystream

# Monitor stream commands in real-time
redis-cli MONITOR | grep -i 'xadd\|xread\|xack'

# Key metrics to watch:
# - Stream length (XLEN)
# - Pending entry count (XPENDING summary)
# - Consumer lag (last-delivered-id vs stream last ID)
# - Memory usage per stream

Troubleshooting

IssueSolution
Messages not deliveredVerify consumer is reading with > for new messages
Pending messages growingEnsure consumers call XACK after processing
Memory growing unboundedUse MAXLEN on XADD or periodic XTRIM
Consumer group not createdUse MKSTREAM flag; ensure stream exists first
Slow consumers blocking othersAdd more consumers to the group; increase COUNT per read
Messages processing twiceImplement idempotency; ensure XACK runs after processing
XCLAIM not workingCheck minimum idle time parameter; verify message IDs exist
Stream too largeSet aggressive MAXLEN; archive old data to external storage