A lightweight message queue for Node.js that requires no dedicated queue server. Just a Redis server.
tl;dr: If you run a Redis server and currently use Amazon SQS or a similar message queue you might as well use this fast little replacement. Using a shared Redis server multiple Node.js processes can send / receive messages.
- Lightweight: Just Redis and ~500 lines of javascript.
- Speed: Send and receive 1000+ messages per second on an average machine. It's just Redis.
- Guaranteed delivery of a message to exactly one recipient within a messages visibility timeout.
- Received messages that are not deleted will reappear after the visibility timeout.
- Test coverage
- A message is deleted by the message id. The message id is returned by the
sendMessageandreceiveMessagemethod. - Messages stay in the queue unless deleted.
- Optional RESTful interface via rest-rsmq
Note: RSMQ uses the Redis EVAL command (LUA scripts) so the minimum Redis version is 2.6+.
- After creating a queue you can send messages to that queue.
- The messages will be handled in a FIFO (first in first out) manner unless specified with a delay.
- Every message has a unique
idthat you can use to delete the message. - The
sendMessagemethod will return theidfor a sent message. - The
receiveMessagemethod will return anidalong with the message and some stats. - Should you not delete the message it will be eligible to be received again after the visibility timeout is reached.
- Please have a look at the
createQueueandreceiveMessagemethods described below for optional parameters like visibility timeout and delay.
npm install rsmq
To keep the core of RSMQ small additional functionality is available as modules:
- rsmq-worker Helper to implement a worker with RSMQ.
- rest-rsmq A RESTful interface for RSMQ.
- rsmq-cli A command-line interface / Terminal client for RSMQ.
- rsmq-promise Promise interface for RSMQ
The simplicity of RSMQ is useful in other languages. Here is a list of implementations in other languages:
Note: Should you plan to port RSQM to another language please make sure to have tests to insure compatibility with all RSMQ clients. And of course: let me know so i can mention your port here.
RedisSMQ = require("rsmq");
rsmq = new RedisSMQ( {host: "127.0.0.1", port: 6379, ns: "rsmq"} );Parameters for RedisSMQ via an options object:
host(String): optional (Default: "127.0.0.1") The Redis serverport(Number): optional (Default: 6379) The Redis portoptions(Object): optional (Default: {}) The Redis https://github.com/NodeRedis/node_redis#options-object-propertiesoptionsobject.client(RedisClient): optional A existing redis client instance.hostandserverwill be ignored.ns(String): optional (Default: "rsmq") The namespace prefix used for all keys created by RSMQ
Please look at the Methods section for optional parameters when creating a queue.
rsmq.createQueue({qname:"myqueue"}, function (err, resp) {
if (resp===1) {
console.log("queue created")
}
});rsmq.sendMessage({qname:"myqueue", message:"Hello World"}, function (err, resp) {
if (resp) {
console.log("Message sent. ID:", resp);
}
});rsmq.receiveMessage({qname:"myqueue"}, function (err, resp) {
if (resp.id) {
console.log("Message received.", resp)
}
else {
console.log("No messages for me...")
}
});rsmq.deleteMessage({qname:"myqueue", id:"dhoiwpiirm15ce77305a5c3a3b0f230c6e20f09b55"}, function (err, resp) {
if (resp===1) {
console.log("Message deleted.")
}
else {
console.log("Message not found.")
}
});rsmq.listQueues( function (err, queues) {
if( err ){
console.error( err )
return
}
console.log("Active queues: " + queues.join( "," ) )
});Change the visibility timer of a single message.
The time when the message will be visible again is calculated from the current time (now) + vt.
Parameters:
qname(String): The Queue name.id(String): The message id.vt(Number): The length of time, in seconds, that this message will not be visible. Allowed values: 0-9999999 (around 115 days)
Returns:
1if successful,0if the message was not found.
Create a new queue.
Parameters:
qname(String): The Queue name. Maximum 160 characters; alphanumeric characters, hyphens (-), and underscores (_) are allowed.vt(Number): optional (Default: 30) The length of time, in seconds, that a message received from a queue will be invisible to other receiving components when they ask to receive messages. Allowed values: 0-9999999 (around 115 days)delay(Number): optional (Default: 0) The time in seconds that the delivery of all new messages in the queue will be delayed. Allowed values: 0-9999999 (around 115 days)maxsize(Number): optional (Default: 65536) The maximum message size in bytes. Allowed values: 1024-65536 and -1 (for unlimited size)
Returns:
1
Parameters:
qname(String): The Queue name.id(String): message id to delete.
Returns:
1if successful,0if the message was not found.
Deletes a queue and all messages.
Parameters:
qname(String): The Queue name.
Returns:
1
Get queue attributes, counter and stats
Parameters:
qname(String): The Queue name.
Returns an object:
vt: The visibility timeout for the queue in secondsdelay: The delay for new messages in secondsmaxsize: The maximum size of a message in bytestotalrecv: Total number of messages received from the queuetotalsent: Total number of messages sent to the queuecreated: Timestamp (epoch in seconds) when the queue was createdmodified: Timestamp (epoch in seconds) when the queue was last modified withsetQueueAttributesmsgs: Current number of messages in the queuehiddenmsgs: Current number of hidden / not visible messages. A message can be hidden while "in flight" due to avtparameter or when sent with adelay
List all queues
Returns an array:
["qname1", "qname2"]
Receive the next message from the queue and delete it.
Important: This method deletes the message it receives right away. There is no way to receive the message again if something goes wrong while working on the message.
Parameters:
qname(String): The Queue name.
Returns an object:
message: The message's contents.id: The internal message id.sent: Timestamp of when this message was sent / created.fr: Timestamp of when this message was first received.rc: Number of times this message was received.
Note: Will return an empty object if no message is there
Receive the next message from the queue.
Parameters:
qname(String): The Queue name.vt(Number): optional (Default: queue settings) The length of time, in seconds, that the received message will be invisible to others. Allowed values: 0-9999999 (around 115 days)
Returns an object:
message: The message's contents.id: The internal message id.sent: Timestamp of when this message was sent / created.fr: Timestamp of when this message was first received.rc: Number of times this message was received.
Note: Will return an empty object if no message is there
Sends a new message.
Parameters:
qname(String)message(String)delay(Number): optional (Default: queue settings) The time in seconds that the delivery of the message will be delayed. Allowed values: 0-9999999 (around 115 days)
Returns:
id: The internal message id.
Sets queue parameters.
Parameters:
qname(String): The Queue name.vt(Number): optional * The length of time, in seconds, that a message received from a queue will be invisible to other receiving components when they ask to receive messages. Allowed values: 0-9999999 (around 115 days)delay(Number): optional The time in seconds that the delivery of all new messages in the queue will be delayed. Allowed values: 0-9999999 (around 115 days)maxsize(Number): optional The maximum message size in bytes. Allowed values: 1024-65536 and -1 (for unlimited size)
Note: At least one attribute (vt, delay, maxsize) must be supplied. Only attributes that are supplied will be modified.
Returns an object:
vt: The visibility timeout for the queue in secondsdelay: The delay for new messages in secondsmaxsize: The maximum size of a message in bytestotalrecv: Total number of messages received from the queuetotalsent: Total number of messages sent to the queuecreated: Timestamp (epoch in seconds) when the queue was createdmodified: Timestamp (epoch in seconds) when the queue was last modified withsetQueueAttributesmsgs: Current number of messages in the queuehiddenmsgs: Current number of hidden / not visible messages. A message can be hidden while "in flight" due to avtparameter or when sent with adelay
Disconnect the redis client. This is only useful if you are using rsmq within a script and want node to be able to exit.
see the CHANGELOG
| Name | Description |
|---|---|
| node-cache | Simple and fast Node.js internal caching. Node internal in memory cache like memcached. |
| redis-tagging | A Node.js helper library to make tagging of items in any legacy database (SQL or NoSQL) easy and fast. |
| redis-sessions | An advanced session store for Node.js and Redis |
| rsmq-worker | Helper to implement a worker based on RSMQ (Redis Simple Message Queue). |
| redis-notifications | A Redis based notification engine. It implements the rsmq-worker to safely create notifications and recurring reports. |
| task-queue-worker | A powerful tool for background processing of tasks that are run by making standard http requests. |
| obj-schema | Simple module to validate an object by a predefined schema |
| connect-redis-sessions | A connect or express middleware to use redis sessions that lets you handle multiple sessions per user_id. |
| systemhealth | Node module to run simple custom checks for your machine or it's connections. It will use redis-heartbeat to send the current state to Redis. |
| soyer | Soyer is small lib for serverside use of Google Closure Templates with node.js. |
| grunt-soy-compile | Compile Goggle Closure Templates (SOY) templates including the handling of XLIFF language files. |
| backlunr | A solution to bring Backbone Collections together with the browser fulltext search engine Lunr.js |
Please see the LICENSE.md file.

