rabbitmq tutorial work queues

This tutorials is based on the official document in RabbitMQ

prerequisites: RabbitMQ is installed and running on localhost:5672

Work Queues

Targets:

1. In first tutorials we wrote pragrams to send and recieve messages from a named queues.
2. In this one we will create a Work queues that will be used to distribute time-consuming tasks amount multiple workers.

When should I use it?

doing a resource-intensive task immediately and having to wait for it to complete. Instead, we schedule the task to be done later

Prepairation

3. We’ll take the number of dots in the string as its complexity; every dot will account for one second of “work”. For example, a fake task described by Hello… will take three seconds.
4. So we create a new_task.js to allow arbitrary messages to be sent from the command line. This program will schedule tasks to our work queue.

Steps:

2. change receive.js to worker.js, it needs to fake a second of work for every dot in the message body.

3. run script in three terminal:
two or nore workders run:

4. By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin. Try this out with three or more workers.

Message acknowledgment

• If you kill a worker we will lose the message it was just processing. We’ll also lose all the messages that were dispatched to this particular worker but were not yet handled. we don’t want to lose any tasks. If a worker dies, We’d like the task to be delivered to another worker.
• In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back from the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it.

Message durability

• When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren’t lost: we need to mark both the queue and messages as durable.

• This durable option change needs to be applied to both the producer and consumer code.

• At this point we’re sure that the task_queue queue won’t be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by using the persistent option Channel.sendToQueue takes.

Fair dispatch

• You might have noticed that the dispatching still doesn’t work exactly as we want. For example in a situation with two workers, when all odd messages are heavy and even messages are light, one worker will be constantly busy and the other one will do hardly any work. Well, RabbitMQ doesn’t know anything about that and will still dispatch messages evenly.

• This happens because RabbitMQ just dispatches a message when the message enters the queue. It doesn’t look at the number of unacknowledged messages for a consumer. It just blindly dispatches every n-th message to the n-th consumer.

• In order to defeat that we can use the prefetch method with the value of 1. This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don’t dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.

Putting it all together

Final code of our new_task.js class:

123456789101112131415#!/usr/bin/env nodevar amqp = require('amqplib/callback_api');amqp.connect('amqp://localhost', function(err, conn) {  conn.createChannel(function(err, ch) {    var q = 'task_queue';    var msg = process.argv.slice(2).join(' ') || "Hello World!";    ch.assertQueue(q, {durable: true});    ch.sendToQueue(q, new Buffer(msg), {persistent: true});    console.log(" [x] Sent '%s'", msg);  });  setTimeout(function() { conn.close(); process.exit(0) }, 500);});

And our worker.js:

12345678910111213141516171819202122#!/usr/bin/env nodevar amqp = require('amqplib/callback_api');amqp.connect('amqp://localhost', function(err, conn) {  conn.createChannel(function(err, ch) {    var q = 'task_queue';    ch.assertQueue(q, {durable: true});    ch.prefetch(1);    console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);    ch.consume(q, function(msg) {      var secs = msg.content.toString().split('.').length - 1;      console.log(" [x] Received %s", msg.content.toString());      setTimeout(function() {        console.log(" [x] Done");        ch.ack(msg);      }, secs * 1000);    }, {noAck: false});  });});