Bull Cheat Sheet
Overview
Bull is a premium Redis-based queue for Node.js that handles distributed jobs and messages reliably. BullMQ is the next-generation rewrite with TypeScript support, improved architecture, and additional features. Both libraries provide robust job processing with priorities, delays, rate limiting, retries, and concurrency control.
Bull and BullMQ are widely used for background processing in Node.js applications including email sending, image processing, data pipelines, and webhook delivery. They leverage Redis for persistence and pub/sub, support multiple named queues, provide real-time events, and include a dashboard (Bull Board) for monitoring job states and debugging failures.
Installation
# BullMQ (recommended for new projects)
npm install bullmq
# Bull (legacy, still maintained)
npm install bull
# Dashboard
npm install @bull-board/express @bull-board/api
# CLI tools
npm install -g bullmq-cli
Core Usage (BullMQ)
Queue and Producer
import { Queue } from 'bullmq';
const connection = { host: 'localhost', port: 6379 };
const emailQueue = new Queue('email', { connection });
// Add a job
await emailQueue.add('send-welcome', {
to: 'user@example.com',
subject: 'Welcome!',
body: 'Hello and welcome.',
});
// Add with options
await emailQueue.add('send-report', { reportId: 123 }, {
delay: 60000, // delay 60 seconds
attempts: 3, // retry 3 times
backoff: {
type: 'exponential',
delay: 5000,
},
priority: 1, // higher priority (lower number)
removeOnComplete: 100, // keep last 100 completed
removeOnFail: 500, // keep last 500 failed
});
// Bulk add
await emailQueue.addBulk([
{ name: 'send', data: { to: 'a@test.com' } },
{ name: 'send', data: { to: 'b@test.com' } },
{ name: 'send', data: { to: 'c@test.com' } },
]);
Worker (Consumer)
import { Worker } from 'bullmq';
const worker = new Worker('email', async (job) => {
console.log(`Processing ${job.name} - ${job.id}`);
console.log('Data:', job.data);
// Update progress
await job.updateProgress(50);
// Do work
const result = await sendEmail(job.data);
await job.updateProgress(100);
return result;
}, {
connection,
concurrency: 5,
limiter: {
max: 10,
duration: 1000, // max 10 jobs per second
},
});
// Event handlers
worker.on('completed', (job, result) => {
console.log(`Job ${job.id} completed with result: ${result}`);
});
worker.on('failed', (job, err) => {
console.error(`Job ${job?.id} failed: ${err.message}`);
});
worker.on('error', (err) => {
console.error('Worker error:', err);
});
Job Events and Lifecycle
import { QueueEvents } from 'bullmq';
const queueEvents = new QueueEvents('email', { connection });
queueEvents.on('completed', ({ jobId, returnvalue }) => {
console.log(`Job ${jobId} completed:`, returnvalue);
});
queueEvents.on('failed', ({ jobId, failedReason }) => {
console.error(`Job ${jobId} failed:`, failedReason);
});
queueEvents.on('progress', ({ jobId, data }) => {
console.log(`Job ${jobId} progress:`, data);
});
queueEvents.on('waiting', ({ jobId }) => {
console.log(`Job ${jobId} is waiting`);
});
Job States
| State | Description |
|---|---|
waiting | Job is waiting to be processed |
active | Job is being processed by a worker |
completed | Job finished successfully |
failed | Job failed after all retries |
delayed | Job is delayed and waiting for its time |
paused | Queue is paused |
prioritized | Job waiting with priority ordering |
waiting-children | Flow job waiting for child jobs |
Scheduling and Repeatable Jobs
// Repeatable job (cron)
await emailQueue.add('daily-digest', { type: 'digest' }, {
repeat: {
pattern: '0 8 * * *', // every day at 8am
tz: 'America/New_York',
},
});
// Repeatable job (interval)
await emailQueue.add('health-check', {}, {
repeat: {
every: 30000, // every 30 seconds
limit: 100, // max 100 repetitions
},
});
// List repeatable jobs
const repeatable = await emailQueue.getRepeatableJobs();
console.log(repeatable);
// Remove repeatable job
await emailQueue.removeRepeatableByKey(repeatable[0].key);
Queue Management
// Pause and resume
await emailQueue.pause();
await emailQueue.resume();
// Get job counts
const counts = await emailQueue.getJobCounts(
'waiting', 'active', 'completed', 'failed', 'delayed'
);
console.log(counts);
// Get specific jobs
const failed = await emailQueue.getFailed(0, 10);
const waiting = await emailQueue.getWaiting(0, 10);
const active = await emailQueue.getActive(0, 10);
// Retry all failed jobs
const failedJobs = await emailQueue.getFailed(0, 100);
for (const job of failedJobs) {
await job.retry();
}
// Clean old jobs
await emailQueue.clean(3600000, 100, 'completed'); // older than 1h
await emailQueue.clean(86400000, 100, 'failed'); // older than 1d
// Drain queue (remove all waiting jobs)
await emailQueue.drain();
// Obliterate (remove everything)
await emailQueue.obliterate();
Flows (Parent-Child Jobs)
import { FlowProducer } from 'bullmq';
const flowProducer = new FlowProducer({ connection });
const flow = await flowProducer.add({
name: 'build-report',
queueName: 'reports',
data: { reportId: 1 },
children: [
{
name: 'fetch-data',
queueName: 'data',
data: { source: 'database' },
},
{
name: 'fetch-data',
queueName: 'data',
data: { source: 'api' },
},
],
});
Dashboard (Bull Board)
import express from 'express';
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');
createBullBoard({
queues: [
new BullMQAdapter(emailQueue),
new BullMQAdapter(reportQueue),
],
serverAdapter,
});
const app = express();
app.use('/admin/queues', serverAdapter.getRouter());
app.listen(3000);
Configuration
Connection Options
const connection = {
host: 'localhost',
port: 6379,
password: 'mypassword',
db: 0,
maxRetriesPerRequest: null,
enableReadyCheck: false,
tls: {
rejectUnauthorized: false,
},
};
Worker Options
const worker = new Worker('queue', processor, {
connection,
concurrency: 10,
maxStalledCount: 1,
stalledInterval: 30000,
lockDuration: 30000,
lockRenewTime: 15000,
drainDelay: 5,
settings: {
backoffStrategy: (attemptsMade) => {
return Math.min(attemptsMade * 1000, 30000);
},
},
});
Advanced Usage
Sandboxed Processors
// processor.js (separate file)
module.exports = async function (job) {
// Runs in a separate child process
return heavyComputation(job.data);
};
// main.js
const worker = new Worker('heavy', './processor.js', {
connection,
concurrency: 4,
useWorkerThreads: true, // use worker threads instead of child processes
});
Troubleshooting
| Issue | Solution |
|---|---|
| Jobs not processing | Check Redis connection; ensure worker is running and queue names match |
| Jobs stuck in active | Increase lockDuration; check for unhandled promise rejections |
| Memory leak in Redis | Set removeOnComplete and removeOnFail; run queue.clean() regularly |
| Duplicate job processing | Enable stalledInterval detection; ensure lockRenewTime < lockDuration |
| Rate limiting not working | Set limiter on the worker, not the queue; check max and duration values |
| Repeatable jobs duplicating | Ensure only one producer adds the repeatable; use removeRepeatableByKey |
| Connection timeout | Set maxRetriesPerRequest: null; increase Redis timeout settings |
| Progress not updating | Use job.updateProgress() not job.progress(); listen on QueueEvents |