Node.js Background Jobs

For long running tasks, CPU intensive tasks, scheduled tasks and tasks that need to be retried, we use Bull background job queue. It is a popular and actively maintained background job queue for Node.js with Redis backend. We use Arena UI dashboard to inspect Bull jobs.

Job Processing

Job processor is defined by passing queue.process a function or a path to a file where the processing function is defined. In case a path is provided, the job will be processed in a sandboxed child process, which allows us to run CPU intensive or blocking code. Also, if the child process crashes, it won’t affect the processing of other jobs. Therefore we recommend to always use a processor file approach.

Error Handling

Listen for failed and error events on the queue. Then log the errors and also send the exception to the error tracker.

Usage

bull-utils.js

import Queue from 'bull'

const redis = { host: process.env.REDIS_HOST, port: process.env.REDIS_PORT }

const handleError = (error) => captureException(error)
const handleFailed = (job, error) => captureException(error)

const createQueue = (name) => {
  const queue = new Queue(name, { redis })
  queue.on('error', handleError)
  queue.on('failed', handleFailed)
  return queue
}

export const createQueueWithProcessor = (name, processor, concurrency = 1) => {
  const queue = createQueue(name)
  queue.process(concurrency, processor)
  return queue
}

users/jobs/registerUser/enqueueJob.js

import { createQueueWithProcessor } from '~/bull-utils'

export const queue = createQueueWithProcessor('registerUser', path.resolve(__dirname, 'processor.js'))

export default async (user) => {
  queue.add({ userId: user.id })
}

users/jobs/registerUser/processor.js

export default async (job) => {
  // Register user in some remote service
}

users/jobs/index.js

export { default as enqueueRegisterUserJob } from './registerUser/enqueueJob'

users/createUser.js

import { enqueueRegisterUserJob } from './jobs'

export default async (name) => {
  const user = await User.forge({ name }).save()
  await enqueueRegisterUserJob(user)
}