Implement pub/sub in Node.js with Bee-queue

Anthony Ng
4 min readSep 28, 2020

--

In the distributed computing domain, event-driven or pub/sub is a sensible architectural option for asynchronous communication. There are several fashions to implement a pub/sub in Node.js. The built-in event emitter in Node is a low-cost way to implement a first-come, first-served queue in memory. RabbitMQ or Kafka is capable of messaging in a more sophisticated enterprise environment. This article discusses a lightweight solution to create a pub/sub architecture with flexible retry functionality and disaster recovery.

There are hundreds of reasons to fail a process in distributed systems, to name a few, network error, remote system unavailable, unfulfilled business requirements, integration problems, etc. Fault tolerance is a vital feature for pub/sub as it may fail at once but recover later when particular hassle is gone.

Depends on the use cases, a published message or an event might carry mission-critical information such as a customer order, a payment transaction, a pick-up notification, on-boarding information, etc. Any miss of the events/ messages suffer from customer dissatisfaction and financial loss.

You might be looking for a solution more complex than running in memory but less complicated than a full range of messaging solutions. It is is the right post for you. I will show you a 3-minute approach to set up a job queue with fault tolerance and disaster recovery capability.

Bee-queue (NPM/Github) is a lightweight, fast and robust pub/sub implementation combining the benefits of Kue and Bull.

This demo primarily structures in three files; queue.js, job.js, and app.js. queue.js acts as a subscriber to drain the jobs in the queue and handle the jobs. job.js acts as a publisher to specify the job data, the fault tolerance method (retry strategy to be specific). app.js is the entry point to make and queue up a job.

Before diving into the code, I highly recommend running the Quick Start to get a sense of the behaviour of the job queue. Node 12 or above (for ES2020) and Docker are needed to run this demo.

Now, let’s see the subscriber code.

// queue.js (the subscriber)import Queue from 'bee-queue'// ...const queue = new Queue('my-awesome-queue', { 
activateDelayedJobs: true
})
queue.process((job, done) => {
const k = generateRandomNumber()

if (k > threshold) {
return done(Error('// whatever message'))
}
return done(null, k)
})
export default queue

The subscriber (queue.js) defines the queue and the job handling logic. The default queue setting retry a failed job immediately. In reality, a fault tolerance strategy is likely to retry at a regular interval, or even an exponential wait interval. The property activeDelayedJobs has to set to true to enable this feature. Read here to see the all the queue properties.

queue.process takes a callback function as the job processing logic. To simulate a failing in random fashion, a random number compares to a constant threshold. A success will return done(null, result) that completes the job. A failure return done(err) cause the job re-join the queue.

Next we see the Publisher code.

// job.js (publisher)import queue from './queue' const retryCount = 20 const lineUp = async (payload) => {  
const job = await queue
.createJob(payload)
.backoff('fixed', 1000)
.retries(retryCount)
.save()

job.on('succeeded', (result) => { /* follow up the success */ })
job.on('retrying', (err) => { /* follow up the failure */ })

return job
}
export default lineUp

There are a couple of things in the publisher.

  1. Generate a generic job with a retry setting (and expose it as a reusable function). FunctioncreateJob takes a parameter payload, and it will pass to job.data. Function backoff defines the wait strategy with the first parameter; The value fixed uses a regular interval, while the valueexponential doubles the wait time every retry. The second parameter specifies the initial wait time.
  2. Specify the handling of events. In our case, succeeded and retrying is being listened to and printing the progress in console.

app.js serves as the entry point to create jobs once the queue is ready.

// app.jsimport lineUp from './job'
import queue from './queue'
queue.on('ready', () => {
const job1 = lineUp('Job 1')
const job2 = lineUp('Job 2')
})

Disaster Recovery

Redis tracks the states of jobs in the queue. Queues can recover in case of an application crash. Once the application restarts, bee-queue resumes the jobs, and hence the events/messages are protected from loss. If you are concerned about Redis’s disaster recovery, please explore more of the persistence strategy in Redis, visit this link.

You may find the source code of this demo here.

I hope you like this demo. Happy coding!

--

--

Anthony Ng