Zum Inhalt springen

Bull Cheat Sheet

Overview

Bull is a premium Redis-based queue for Node.js that handles distributed jobs and messages reliably. BullMQ is the next-generation rewrite with TypeScript support, improved architecture, and additional features. Both libraries provide robust job processing with priorities, delays, rate limiting, retries, and concurrency control.

Bull and BullMQ are widely used for background processing in Node.js applications including email sending, image processing, data pipelines, and webhook delivery. They leverage Redis for persistence and pub/sub, support multiple named queues, provide real-time events, and include a dashboard (Bull Board) for monitoring job states and debugging failures.

Installation

# BullMQ (recommended for new projects)
npm install bullmq

# Bull (legacy, still maintained)
npm install bull

# Dashboard
npm install @bull-board/express @bull-board/api

# CLI tools
npm install -g bullmq-cli

Core Usage (BullMQ)

Queue and Producer

import { Queue } from 'bullmq';

const connection = { host: 'localhost', port: 6379 };

const emailQueue = new Queue('email', { connection });

// Add a job
await emailQueue.add('send-welcome', {
  to: 'user@example.com',
  subject: 'Welcome!',
  body: 'Hello and welcome.',
});

// Add with options
await emailQueue.add('send-report', { reportId: 123 }, {
  delay: 60000,             // delay 60 seconds
  attempts: 3,              // retry 3 times
  backoff: {
    type: 'exponential',
    delay: 5000,
  },
  priority: 1,              // higher priority (lower number)
  removeOnComplete: 100,    // keep last 100 completed
  removeOnFail: 500,        // keep last 500 failed
});

// Bulk add
await emailQueue.addBulk([
  { name: 'send', data: { to: 'a@test.com' } },
  { name: 'send', data: { to: 'b@test.com' } },
  { name: 'send', data: { to: 'c@test.com' } },
]);

Worker (Consumer)

import { Worker } from 'bullmq';

const worker = new Worker('email', async (job) => {
  console.log(`Processing ${job.name} - ${job.id}`);
  console.log('Data:', job.data);

  // Update progress
  await job.updateProgress(50);

  // Do work
  const result = await sendEmail(job.data);

  await job.updateProgress(100);
  return result;
}, {
  connection,
  concurrency: 5,
  limiter: {
    max: 10,
    duration: 1000,  // max 10 jobs per second
  },
});

// Event handlers
worker.on('completed', (job, result) => {
  console.log(`Job ${job.id} completed with result: ${result}`);
});

worker.on('failed', (job, err) => {
  console.error(`Job ${job?.id} failed: ${err.message}`);
});

worker.on('error', (err) => {
  console.error('Worker error:', err);
});

Job Events and Lifecycle

import { QueueEvents } from 'bullmq';

const queueEvents = new QueueEvents('email', { connection });

queueEvents.on('completed', ({ jobId, returnvalue }) => {
  console.log(`Job ${jobId} completed:`, returnvalue);
});

queueEvents.on('failed', ({ jobId, failedReason }) => {
  console.error(`Job ${jobId} failed:`, failedReason);
});

queueEvents.on('progress', ({ jobId, data }) => {
  console.log(`Job ${jobId} progress:`, data);
});

queueEvents.on('waiting', ({ jobId }) => {
  console.log(`Job ${jobId} is waiting`);
});

Job States

StateDescription
waitingJob is waiting to be processed
activeJob is being processed by a worker
completedJob finished successfully
failedJob failed after all retries
delayedJob is delayed and waiting for its time
pausedQueue is paused
prioritizedJob waiting with priority ordering
waiting-childrenFlow job waiting for child jobs

Scheduling and Repeatable Jobs

// Repeatable job (cron)
await emailQueue.add('daily-digest', { type: 'digest' }, {
  repeat: {
    pattern: '0 8 * * *',  // every day at 8am
    tz: 'America/New_York',
  },
});

// Repeatable job (interval)
await emailQueue.add('health-check', {}, {
  repeat: {
    every: 30000,  // every 30 seconds
    limit: 100,    // max 100 repetitions
  },
});

// List repeatable jobs
const repeatable = await emailQueue.getRepeatableJobs();
console.log(repeatable);

// Remove repeatable job
await emailQueue.removeRepeatableByKey(repeatable[0].key);

Queue Management

// Pause and resume
await emailQueue.pause();
await emailQueue.resume();

// Get job counts
const counts = await emailQueue.getJobCounts(
  'waiting', 'active', 'completed', 'failed', 'delayed'
);
console.log(counts);

// Get specific jobs
const failed = await emailQueue.getFailed(0, 10);
const waiting = await emailQueue.getWaiting(0, 10);
const active = await emailQueue.getActive(0, 10);

// Retry all failed jobs
const failedJobs = await emailQueue.getFailed(0, 100);
for (const job of failedJobs) {
  await job.retry();
}

// Clean old jobs
await emailQueue.clean(3600000, 100, 'completed');  // older than 1h
await emailQueue.clean(86400000, 100, 'failed');     // older than 1d

// Drain queue (remove all waiting jobs)
await emailQueue.drain();

// Obliterate (remove everything)
await emailQueue.obliterate();

Flows (Parent-Child Jobs)

import { FlowProducer } from 'bullmq';

const flowProducer = new FlowProducer({ connection });

const flow = await flowProducer.add({
  name: 'build-report',
  queueName: 'reports',
  data: { reportId: 1 },
  children: [
    {
      name: 'fetch-data',
      queueName: 'data',
      data: { source: 'database' },
    },
    {
      name: 'fetch-data',
      queueName: 'data',
      data: { source: 'api' },
    },
  ],
});

Dashboard (Bull Board)

import express from 'express';
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';

const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');

createBullBoard({
  queues: [
    new BullMQAdapter(emailQueue),
    new BullMQAdapter(reportQueue),
  ],
  serverAdapter,
});

const app = express();
app.use('/admin/queues', serverAdapter.getRouter());
app.listen(3000);

Configuration

Connection Options

const connection = {
  host: 'localhost',
  port: 6379,
  password: 'mypassword',
  db: 0,
  maxRetriesPerRequest: null,
  enableReadyCheck: false,
  tls: {
    rejectUnauthorized: false,
  },
};

Worker Options

const worker = new Worker('queue', processor, {
  connection,
  concurrency: 10,
  maxStalledCount: 1,
  stalledInterval: 30000,
  lockDuration: 30000,
  lockRenewTime: 15000,
  drainDelay: 5,
  settings: {
    backoffStrategy: (attemptsMade) => {
      return Math.min(attemptsMade * 1000, 30000);
    },
  },
});

Advanced Usage

Sandboxed Processors

// processor.js (separate file)
module.exports = async function (job) {
  // Runs in a separate child process
  return heavyComputation(job.data);
};

// main.js
const worker = new Worker('heavy', './processor.js', {
  connection,
  concurrency: 4,
  useWorkerThreads: true,  // use worker threads instead of child processes
});

Troubleshooting

IssueSolution
Jobs not processingCheck Redis connection; ensure worker is running and queue names match
Jobs stuck in activeIncrease lockDuration; check for unhandled promise rejections
Memory leak in RedisSet removeOnComplete and removeOnFail; run queue.clean() regularly
Duplicate job processingEnable stalledInterval detection; ensure lockRenewTime < lockDuration
Rate limiting not workingSet limiter on the worker, not the queue; check max and duration values
Repeatable jobs duplicatingEnsure only one producer adds the repeatable; use removeRepeatableByKey
Connection timeoutSet maxRetriesPerRequest: null; increase Redis timeout settings
Progress not updatingUse job.updateProgress() not job.progress(); listen on QueueEvents