diff --git a/README.md b/README.md index 0405ae4..aa92f9a 100644 --- a/README.md +++ b/README.md @@ -2,105 +2,271 @@ [ØMQ](http://www.zeromq.org/) bindings for node.js. +## Compatibility + +This module maintains full compatibility with ZeroMQ 2, 3 and 4. We test rigorously on Linux and OS X. Windows support +is at the moment a best-effort situation, so volunteers, please raise your hands! We attempt to support a wide range +of Node.js versions (see the [Travis configuration](.travis.yml) for a list of supported versions). + ## Installation ### on Windows: -First install [Visual Studio](https://www.visualstudio.com/) and either -[Node.js](https://nodejs.org/download/) or [io.js](https://iojs.org/dist/latest/). -Ensure you're building zmq from a conservative location on disk, one without -unusual characters or spaces, for example somewhere like: `C:\sources\myproject`. +First install [Visual Studio](https://www.visualstudio.com/) and [Node.js](https://nodejs.org/download/) + +Ensure you're building ZeroMQ from a conservative location on disk, one without unusual characters or spaces, for +example somewhere like: `C:\sources\myproject`. -Installing the ZeroMQ library is optional and not required on Windows. We -recommend running `npm install` and node executable commands from a -[github for windows](https://windows.github.com/) shell or similar environment. +Installing the ZeroMQ library is optional and not required on Windows. -### installing on Unix/POSIX (and osx): +### on Unix/POSIX and OS X: First install `pkg-config` and the [ZeroMQ library](http://www.zeromq.org/intro:get-the-software). -This module is compatible with ZeroMQ versions 2, 3 and 4. The installation -process varies by platform, but headers are mandatory. Most Linux distributions -provide these headers with `-devel` packages like `zeromq-devel` or -`zeromq3-devel`. Homebrew for OS X provides versions 4 and 3 with packages -`zeromq` and `zeromq3`, respectively. A -[Chris Lea PPA](https://launchpad.net/~chris-lea/+archive/ubuntu/zeromq) -is available for Debian-like users who want a version newer than currently -provided by their distribution. Windows is supported but not actively -maintained. +The installation process varies by platform, but headers are mandatory. Most Linux distributions provide these headers +with `-devel` packages like `zeromq-devel` or `zeromq3-devel`. Homebrew for OS X provides versions 3 and 4 with packages +`zeromq3` and `zeromq`, respectively. A [Chris Lea PPA](https://launchpad.net/~chris-lea/+archive/ubuntu/zeromq) is +available for Debian-like users who want a version newer than currently provided by their distribution. -Note: For zap support with versions >=4 you need to have libzmq built and linked -against libsodium. Check the [Travis configuration](.travis.yml) for a list of what is tested -and therefore known to work. +Note: For zap support with versions >=4 you need to have libzmq built and linked against libsodium. Check the +[Travis configuration](.travis.yml) for a list of what is tested and therefore known to work. #### with your platform-specifics taken care of, install and use this module: - $ npm install zmq + $ npm install zmq --save -## Examples +## API -### Push/Pull +Before starting, please make sure you understand the fundamentals of ZeroMQ. The excellent +[ØMQ - The Guide](http://zguide.zeromq.org/page:all) by [Pieter Hintjens](https://github.com/hintjens) is often praised +as a benchmark on how to write great documentation. Please do go through a good part of it before undertaking anything +with ZeroMQ. + +### Constants + +ZeroMQ is full of constants for various that are [well documented](http://api.zeromq.org/). It would be overkill to +document them all here again, so please refer to the official documentation. Whenever you use a constant in node-zmq, +you may refer to it as a string that is either the full constant name (eg: `'ZMQ_MAX_SOCKETS'`), or by a short lowercase +version without the `ZMQ_` prefix (eg: `'max_sockets'`). + +### Importing this module ```js -// producer.js -var zmq = require('zmq') - , sock = zmq.socket('push'); +var zmq = require('zmq'); -sock.bindSync('tcp://127.0.0.1:3000'); -console.log('Producer bound to port 3000'); +console.log('Using libzmq version:', zmq.version); +``` -setInterval(function(){ - console.log('sending work'); - sock.send('some work'); -}, 500); +### Feature support + +To see if a socket type is supported by this version of ZeroMQ: + +```js +var type = 'sub'; +var isSupported = zmq.socketTypeExists(type); // returns a boolean ``` +To see if a socket option is supported by this version of ZeroMQ: + ```js -// worker.js -var zmq = require('zmq') - , sock = zmq.socket('pull'); +var option = 'ZMQ_ROUTER_MANDATORY'; +var isSupported = zmq.socketOptionExists(option); // returns a boolean +``` -sock.connect('tcp://127.0.0.1:3000'); -console.log('Worker connected to port 3000'); +To see if a context option is supported by this version of ZeroMQ: -sock.on('message', function(msg){ - console.log('work: %s', msg.toString()); +```js +var option = 'ZMQ_IO_THREADS'; +var isSupported = zmq.contextOptionExists(option); // returns a boolean +``` + +### Contexts + +**Creating a context** + +```js +var options = { + ZMQ_IO_THREADS: 1, + ZMQ_MAX_SOCKETS: 1024, + /* etc */ +}; + +var ctx = zmq.context(options); +``` + +Creating a custom context is optional, and you can ignore this if you want to use default settings. But if you want to +create your sockets on a custom context, you can use this API. + +You may pass along an options object as illustrated above. Every option, and the object itself are completely optional. +For more about the options and their availability, check the [zmq_ctx_set documentation](http://api.zeromq.org). + +**Getting the default context** + +```js +var ctx = zmq.getDefaultContext(); // always returns the same instance +``` + +**Setting and getting options** + +You can set the options one by one, and read their current values: + +```js +ctx.set('ZMQ_MAX_SOCKETS', 1024); +var maxSockets = ctx.get('max_sockets'); // 1024 +``` + +**Creating a socket** + +You can create a socket from a context: + +```js +var type = 'pub'; // or 'ZMQ_PUB' +var options = { + ZMQ_SNDHWM: 1000 +}; + +var sock = ctx.socket(type, options); +``` + +You may pass along an options object as illustrated above. Every option, and the object itself are completely optional. +For more about the options and their availability, check the [zmq_setsockopt documentation](http://api.zeromq.org). + +### Sockets + +**Creating a socket using the default context** + +You have now seen that you can create a socket from a context. If you're satisfied with using the default context, you +can create a socket more easily: + +```js +var type = 'pub'; // or 'ZMQ_PUB' +var options = { + ZMQ_SNDHWM: 1000 +}; + +var sock = zmq.socket(type, options); +``` + +You may pass along an options object as illustrated above. Every option, and the object itself are completely optional. +For more about the options and their availability, check the [zmq_setsockopt documentation](http://api.zeromq.org). + +**Setting and getting options** + +You can set the options one by one, and read their current values: + +```js +sock.set('ZMQ_SNDHWM', 10); +var sndhwm = sock.get('sndhwm'); // returns 10 +``` + +**Binding and unbinding** + +You can bind sockets asynchronously (advisable): + +```js +var address = 'tcp://0.0.0.0:12345'; + +sock.bind(address, function (error) { + // if there was no error, we are now accepting connections +}); + +sock.unbind(address, function (error) { + // if there was no error, we are no longer accepting connections }); ``` -### Pub/Sub +Or synchronously: ```js -// pubber.js -var zmq = require('zmq') - , sock = zmq.socket('pub'); +var address = 'tcp://0.0.0.0:12345'; -sock.bindSync('tcp://127.0.0.1:3000'); -console.log('Publisher bound to port 3000'); +sock.bindSync(address); -setInterval(function(){ - console.log('sending a multipart message envelope'); - sock.send(['kitty cats', 'meow!']); -}, 500); +// if no error was thrown, we are now accepting connections + +sock.unbindSync(address); + +// if no error was thrown, we are no longer accepting connections ``` +**Connecting and disconnecting** + ```js -// subber.js -var zmq = require('zmq') - , sock = zmq.socket('sub'); +var address = 'tcp://1.2.3.4:12345'; -sock.connect('tcp://127.0.0.1:3000'); -sock.subscribe('kitty cats'); -console.log('Subscriber connected to port 3000'); +sock.connect(address); +sock.disconnect(address); +``` -sock.on('message', function(topic, message) { - console.log('received a message related to:', topic, 'containing message:', message); +**Sending messages** + +The `sock.send(message, flags, callback)` method takes 3 arguments: + +* message: a Buffer, string, or an array of string/Buffer. +* flags: a string or array of strings (optional). +* callback: called once the message is in the send-queue (optional). + +```js +sock.send(['hello', 'world'], function (error) { + // if no error was returned, the multi-part message is now in the send-queue +}); + +sock.send('hello', function (error) { + // if no error was returned, the single-part message is now in the send-queue +}); + +sock.send('hello', ['ZMQ_SNDMORE']); +sock.send('world'); +``` + +**Receiving messages** + +Sockets will emit a `"message"` event when messages come in. + +```js +sock.on('message', function (part1, part2, etc) { + // all parts are individual arguments }); ``` -## Monitoring -You can get socket state changes events by calling to the `monitor` function. -The supported events are (see ZMQ [docs](http://api.zeromq.org/4-2:zmq-socket-monitor) for full description): +When a socket is paused (see the API below), events will not be emitted, but you can still extract messages from the +queue by calling: + +```js +var parts = sock.read(); // returns an array of parts +``` + +**Pausing a socket** + +You can pause a socket's incoming message flow. + +```js +sock.pause(); +// no "message" events will be emitted until sock.resume() is called +sock.resume(); +``` + +**Detaching from the event loop** + +You may temporarily disable polling on a socket and let the Node.js process terminate without closing sockets explicitly +by removing their event loop references. Newly created sockets are already `ref()`-ed. + +```js +// Detach the socket from the main event loop of the node.js runtime. +// Calling this on already detached sockets is a no-op. + +sock.unref(); + +// Attach the socket to the main event loop. +// Calling this on already attached sockets is a no-op. + +sock.ref(); +``` + +**Monitoring a socket** + +You can get socket state changes events by calling the `monitor` method on a socket. The supported events are (see ZMQ +[docs](http://api.zeromq.org/4-2:zmq-socket-monitor) for full description): * connect - `ZMQ_EVENT_CONNECTED` * connect_delay - `ZMQ_EVENT_CONNECT_DELAYED` @@ -121,38 +287,44 @@ All events get 2 arguments: A special `monitor_error` event will be raised when there was an error in the monitoring process, after this event no more monitoring events will be sent, you can try and call `monitor` again to restart the monitoring process. -### monitor(interval, numOfEvents) -Will create an inproc PAIR socket where zmq will publish socket state changes events, the events from this socket will -be read every `interval` (defaults to 10ms). -By default only 1 message will be read every interval, this can be configured by using the `numOfEvents` parameter, -where passing 0 will read all available messages per interval. +```js +// Create an inproc PAIR socket where ZeroMQ will publish socket state changes events, the events from this socket will +// be read every `interval` (defaults to 10ms). By default only 1 message will be read every interval. This can be +// configured by using the `numOfEvents` parameter, where passing 0 will read all available messages per interval. + +var interval = 10; +var numOfEvents = 0; + +sock.monitor(interval, numOfEvents); + +// Stop monitoring -### unmonitor() -Stop the monitoring process +sock.unmonitor(); +``` -### example +A full example: ```js // Create a socket var zmq = require('zmq'); -socket = zmq.socket('req'); +var socket = zmq.socket('req'); // Register to monitoring events -socket.on('connect', function(fd, ep) {console.log('connect, endpoint:', ep);}); -socket.on('connect_delay', function(fd, ep) {console.log('connect_delay, endpoint:', ep);}); -socket.on('connect_retry', function(fd, ep) {console.log('connect_retry, endpoint:', ep);}); -socket.on('listen', function(fd, ep) {console.log('listen, endpoint:', ep);}); -socket.on('bind_error', function(fd, ep) {console.log('bind_error, endpoint:', ep);}); -socket.on('accept', function(fd, ep) {console.log('accept, endpoint:', ep);}); -socket.on('accept_error', function(fd, ep) {console.log('accept_error, endpoint:', ep);}); -socket.on('close', function(fd, ep) {console.log('close, endpoint:', ep);}); -socket.on('close_error', function(fd, ep) {console.log('close_error, endpoint:', ep);}); -socket.on('disconnect', function(fd, ep) {console.log('disconnect, endpoint:', ep);}); - -// Handle monitor error +socket.on('connect', function (fd, ep) { console.log('connect, endpoint:', ep); }); +socket.on('connect_delay', function (fd, ep) { console.log('connect_delay, endpoint:', ep); }); +socket.on('connect_retry', function (fd, ep) { console.log('connect_retry, endpoint:', ep); }); +socket.on('listen', function (fd, ep) { console.log('listen, endpoint:', ep); }); +socket.on('bind_error', function (fd, ep) { console.log('bind_error, endpoint:', ep); }); +socket.on('accept', function (fd, ep) { console.log('accept, endpoint:', ep); }); +socket.on('accept_error', function (fd, ep) { console.log('accept_error, endpoint:', ep); }); +socket.on('close', function (fd, ep) { console.log('close, endpoint:', ep); }); +socket.on('close_error', function (fd, ep) { console.log('close_error, endpoint:', ep); }); +socket.on('disconnect', function (fd, ep) { console.log('disconnect, endpoint:', ep); }); + +// Handle monitor errors socket.on('monitor_error', function(err) { - console.log('Error in monitoring: %s, will restart monitoring in 5 seconds', err); - setTimeout(function() { socket.monitor(500, 0); }, 5000); + console.log('Error in monitoring: %s, will restart monitoring in 5 seconds', err); + setTimeout(function () { socket.monitor(500, 0); }, 5000); }); // Call monitor, check for events every 500ms and get all available events. @@ -161,91 +333,126 @@ socket.monitor(500, 0); socket.connect('tcp://127.0.0.1:1234'); setTimeout(function() { - console.log('Stop the monitoring...'); - socket.unmonitor(); + socket.unmonitor(); }, 20000); +``` + +**Helpers for "sub" sockets** +The way to subscribe to messages from a publisher is a bit archaic by having to go through socket options instead of +being able to call a function, which would feel more natural. For that reason there are the following helper functions: + +```js +sock.subscribe('foo'); +sock.unsubscribe('foo'); ``` -## Detaching from the event loop -You may temporarily disable polling on a specific ZMQ socket and let the node.js -process to terminate without closing sockets explicitly by removing their event loop -references. Newly created sockets are already `ref()`-ed. +Please note that this is equal to: + +```js +sock.set('ZMQ_SUBSCRIBE', 'foo'); +sock.set('ZMQ_UNSUBSCRIBE', 'foo'); +``` + +**Destroying a socket** + +You can destroy a socket by calling its `close()` method. + +```js +sock.close(); +``` -### unref() -Detach the socket from the main event loop of the node.js runtime. -Calling this on already detached sockets is a no-op. +## Usage examples -### ref() -Attach the socket to the main event loop. -Calling this on already attached sockets is a no-op. +### Push/Pull -### Example ```js +// producer.js var zmq = require('zmq'); -socket = zmq.socket('sub'); -socket.bindSync('tcp://127.0.0.1:1234'); -socket.subscribe(''); -socket.on('message', function(msg) { console.log(msg.toString(); }); -// Here blocks indefinitely unless interrupted. -// Let it terminate after 1 second. -setTimeout(function() { socket.unref(); }, 1000); +var sock = zmq.socket('push'); + +sock.bindSync('tcp://127.0.0.1:3000'); +console.log('Producer bound to port 3000'); + +setInterval(function(){ + console.log('sending work'); + sock.send('some work'); +}, 500); ``` -## Running tests +```js +// worker.js +var zmq = require('zmq'); +var sock = zmq.socket('pull'); -#### Install dev deps: -```sh -$ git clone https://github.com/JustinTulloss/zeromq.node.git zmq && cd zmq -$ npm i +sock.connect('tcp://127.0.0.1:3000'); +console.log('Worker connected to port 3000'); + +sock.on('message', function (msg) { + console.log('work: %s', msg.toString()); +}); ``` -#### Build: -```sh -# on unix: -$ make -# building on windows: -> npm i +### Pub/Sub + +```js +// publisher.js +var zmq = require('zmq'); +var sock = zmq.socket('pub'); + +sock.bindSync('tcp://127.0.0.1:3000'); +console.log('Publisher bound to port 3000'); + +setInterval(function () { + console.log('sending a multipart message envelope'); + sock.send(['kitty cats', 'meow!']); +}, 500); ``` -#### Test: -```sh -# on unix: -$ make test -# testing on windows: -> npm t +```js +// subscriber.js +var zmq = require('zmq'); +var sock = zmq.socket('sub'); + +sock.connect('tcp://127.0.0.1:3000'); +sock.subscribe('kitty cats'); +console.log('Subscriber connected to port 3000'); + +sock.on('message', function (topic, message) { + console.log('received a message related to:', topic, 'containing message:', message); +}); ``` + ## Running benchmarks -Benchmarks are available in the `perf` directory, and have been implemented -according to the zmq documentation: +Benchmarks are available in the `perf` directory, and have been implemented according to the ZeroMQ documentation: [How to run performance tests](http://www.zeromq.org/results:perf-howto) In the following examples, the arguments are respectively: -- the host to connect to/bind on -- message size (in bytes) -- message count +* the host to connect to/bind on +* message size (in bytes) +* message count You can run a latency benchmark by running these two commands in two separate shells: ```sh -node ./local_lat.js tcp://127.0.0.1:5555 1 100000 +node ./perf/local_lat.js tcp://127.0.0.1:5555 1 100000 ``` ```sh -node ./remote_lat.js tcp://127.0.0.1:5555 1 100000 +node ./perf/remote_lat.js tcp://127.0.0.1:5555 1 100000 ``` And you can run throughput tests by running these two commands in two separate shells: ```sh -node ./local_thr.js tcp://127.0.0.1:5555 1 100000 +node ./perf/local_thr.js tcp://127.0.0.1:5555 1 100000 ``` ```sh -node ./remote_thr.js tcp://127.0.0.1:5555 1 100000 +node ./perf/remote_thr.js tcp://127.0.0.1:5555 1 100000 ``` Running `make perf` will run the commands listed above. diff --git a/binding.cc b/binding.cc index d4685e2..ff462de 100644 --- a/binding.cc +++ b/binding.cc @@ -55,10 +55,10 @@ }; #endif -#define ZMQ_CAN_DISCONNECT (ZMQ_VERSION_MAJOR == 3 && ZMQ_VERSION_MINOR >= 2) || ZMQ_VERSION_MAJOR > 3 -#define ZMQ_CAN_UNBIND (ZMQ_VERSION_MAJOR == 3 && ZMQ_VERSION_MINOR >= 2) || ZMQ_VERSION_MAJOR > 3 +#define ZMQ_CAN_DISCONNECT (ZMQ_VERSION >= 30200) +#define ZMQ_CAN_UNBIND (ZMQ_VERSION >= 30200) #define ZMQ_CAN_MONITOR (ZMQ_VERSION > 30201) -#define ZMQ_CAN_SET_CTX (ZMQ_VERSION_MAJOR == 3 && ZMQ_VERSION_MINOR >= 2) || ZMQ_VERSION_MAJOR > 3 +#define ZMQ_CAN_SET_CTX (ZMQ_VERSION >= 30200) using namespace v8; using namespace node; @@ -77,7 +77,90 @@ namespace zmq { std::set opts_uint64; std::set opts_binary; - class Socket; + /* + * Helpers for dealing with ØMQ errors. + */ + + static inline const char* + ErrorMessage() { + return zmq_strerror(zmq_errno()); + } + + static inline Local + ExceptionFromError() { + return Nan::Error(ErrorMessage()); + } + + /* + * Manages a reference to a zmq_msg_t instance + * Will return the zmq_msg_t instance by being deferenced + */ + + class MessageReference { + public: + inline MessageReference() { + if (zmq_msg_init(&msg_) < 0) + throw std::runtime_error(ErrorMessage()); + } + + inline ~MessageReference() { + if (zmq_msg_close(&msg_) < 0) + throw std::runtime_error(ErrorMessage()); + } + + inline operator zmq_msg_t*() { + return &msg_; + } + + private: + zmq_msg_t msg_; + }; + + + /* + * An object that creates an empty ØMQ message, which can be used for + * zmq_recv. After the receive call, a Buffer object wrapping the ØMQ + * message can be requested. The reference for the ØMQ message will + * remain while the data is in use by the Buffer. + */ + + class IncomingMessage { + public: + inline IncomingMessage() { + msgref_ = new MessageReference(); + }; + + inline ~IncomingMessage() { + if (buf_.IsEmpty() && msgref_) { + delete msgref_; + msgref_ = NULL; + } else + buf_.Reset(); + }; + + inline operator zmq_msg_t*() { + return *msgref_; + } + + inline Local GetBuffer() { + if (buf_.IsEmpty()) { + Local buf_obj = Nan::NewBuffer((char*)zmq_msg_data(*msgref_), zmq_msg_size(*msgref_), FreeCallback, msgref_).ToLocalChecked(); + if (buf_obj.IsEmpty()) { + return Local(); + } + buf_.Reset(buf_obj); + } + return Nan::New(buf_); + } + + private: + static void FreeCallback(char* data, void* message) { + delete static_cast(message); + } + + Nan::Persistent buf_; + MessageReference* msgref_; + }; class Context : public Nan::ObjectWrap { friend class Socket; @@ -86,14 +169,14 @@ namespace zmq { virtual ~Context(); private: - Context(int io_threads); + Context(); static NAN_METHOD(New); static Context *GetContext(const Nan::FunctionCallbackInfo&); void Close(); static NAN_METHOD(Close); #if ZMQ_CAN_SET_CTX - static NAN_METHOD(GetOpt); - static NAN_METHOD(SetOpt); + static NAN_METHOD(Get); + static NAN_METHOD(Set); #endif void* context_; @@ -151,12 +234,8 @@ namespace zmq { static NAN_METHOD(Disconnect); #endif - class IncomingMessage; - static NAN_METHOD(Recv); - static NAN_METHOD(Readv); - class OutgoingMessage; + static NAN_METHOD(Read); static NAN_METHOD(Send); - static NAN_METHOD(Sendv); void Close(); static NAN_METHOD(Close); @@ -190,21 +269,6 @@ namespace zmq { static NAN_MODULE_INIT(Initialize); - /* - * Helpers for dealing with ØMQ errors. - */ - - static inline const char* - ErrorMessage() { - return zmq_strerror(zmq_errno()); - } - - static inline Local - ExceptionFromError() { - return Nan::Error(ErrorMessage()); - } - - /* * Context methods. */ @@ -216,8 +280,8 @@ namespace zmq { Nan::SetPrototypeMethod(t, "close", Close); #if ZMQ_CAN_SET_CTX - Nan::SetPrototypeMethod(t, "setOpt", SetOpt); - Nan::SetPrototypeMethod(t, "getOpt", GetOpt); + Nan::SetPrototypeMethod(t, "set", Set); + Nan::SetPrototypeMethod(t, "get", Get); #endif Nan::Set(target, Nan::New("Context").ToLocalChecked(), Nan::GetFunction(t).ToLocalChecked()); @@ -230,23 +294,18 @@ namespace zmq { NAN_METHOD(Context::New) { assert(info.IsConstructCall()); - int io_threads = 1; - if (info.Length() == 1) { - if (!info[0]->IsNumber()) { - return Nan::ThrowTypeError("io_threads must be an integer"); - } - io_threads = Nan::To(info[0]).FromJust(); - if (io_threads < 1) { - return Nan::ThrowRangeError("io_threads must be a positive number"); - } - } - Context *context = new Context(io_threads); + Context *context = new Context(); context->Wrap(info.This()); info.GetReturnValue().Set(info.This()); } - Context::Context(int io_threads) : Nan::ObjectWrap() { - context_ = zmq_init(io_threads); + Context::Context() : Nan::ObjectWrap() { +#if ZMQ_VERSION < 30200 + context_ = zmq_init(1); +#else + context_ = zmq_ctx_new(); +#endif + if (!context_) throw std::runtime_error(ErrorMessage()); } @@ -258,18 +317,35 @@ namespace zmq { void Context::Close() { if (context_ != NULL) { - if (zmq_term(context_) < 0) throw std::runtime_error(ErrorMessage()); + int rc; + + while (true) { +#if ZMQ_VERSION < 30200 + rc = zmq_term(context_); +#else + rc = zmq_ctx_destroy(context_); +#endif + + if (rc < 0) { + if (zmq_errno() == EINTR) { + continue; + } + + throw std::runtime_error(ErrorMessage()); + } + break; + } + context_ = NULL; } } NAN_METHOD(Context::Close) { GetContext(info)->Close(); - return; } #if ZMQ_CAN_SET_CTX - NAN_METHOD(Context::SetOpt) { + NAN_METHOD(Context::Set) { if (info.Length() != 2) return Nan::ThrowError("Must pass an option and a value"); if (!info[0]->IsNumber() || !info[1]->IsNumber()) @@ -278,12 +354,13 @@ namespace zmq { int value = Nan::To(info[1]).FromJust(); Context *context = GetContext(info); - if (zmq_ctx_set(context->context_, option, value) < 0) - return Nan::ThrowError(ExceptionFromError()); + while (zmq_ctx_set(context->context_, option, value)) + if (zmq_errno() != EINTR) + return Nan::ThrowError(ExceptionFromError()); return; } - NAN_METHOD(Context::GetOpt) { + NAN_METHOD(Context::Get) { if (info.Length() != 1) return Nan::ThrowError("Must pass an option"); if (!info[0]->IsNumber()) @@ -291,8 +368,18 @@ namespace zmq { int option = Nan::To(info[0]).FromJust(); Context *context = GetContext(info); - int value = zmq_ctx_get(context->context_, option); - info.GetReturnValue().Set(Nan::New(value)); + + int rc; + do { + rc = zmq_ctx_get(context->context_, option); + if (rc < 0) { + if (zmq_errno() != EINTR) + return Nan::ThrowError(ExceptionFromError()); + continue; + } + } while (rc < 0); + + info.GetReturnValue().Set(Nan::New(rc)); } #endif /* @@ -304,8 +391,6 @@ namespace zmq { Local t = Nan::New(New); t->InstanceTemplate()->SetInternalFieldCount(1); - Nan::SetAccessor(t->InstanceTemplate(), - Nan::New("state").ToLocalChecked(), Socket::GetState); Nan::SetAccessor(t->InstanceTemplate(), Nan::New("pending").ToLocalChecked(), GetPending, SetPending); @@ -320,10 +405,8 @@ namespace zmq { Nan::SetPrototypeMethod(t, "setsockopt", SetSockOpt); Nan::SetPrototypeMethod(t, "ref", AttachToEventLoop); Nan::SetPrototypeMethod(t, "unref", DetachFromEventLoop); - Nan::SetPrototypeMethod(t, "recv", Recv); - Nan::SetPrototypeMethod(t, "readv", Readv); + Nan::SetPrototypeMethod(t, "read", Read); Nan::SetPrototypeMethod(t, "send", Send); - Nan::SetPrototypeMethod(t, "sendv", Sendv); Nan::SetPrototypeMethod(t, "close", Close); #if ZMQ_CAN_DISCONNECT @@ -375,7 +458,7 @@ namespace zmq { while (true) { int rc = zmq_poll(&item, 1, 0); if (rc < 0) { - if (zmq_errno()==EINTR) { + if (zmq_errno() == EINTR) { continue; } throw std::runtime_error(ErrorMessage()); @@ -494,7 +577,7 @@ namespace zmq { #else // monitoring on zmq < 4 used zmq_event_t zmq_event_t event; - memcpy (&event, zmq_msg_data (&msg1), sizeof (zmq_event_t)); + memcpy(&event, zmq_msg_data (&msg1), sizeof (zmq_event_t)); event_id = event.event; // Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types. @@ -548,9 +631,9 @@ namespace zmq { throw std::runtime_error(ErrorMessage()); } - #if ZMQ_CAN_MONITOR - this->monitor_socket_ = NULL; - #endif +#if ZMQ_CAN_MONITOR + this->monitor_socket_ = NULL; +#endif uv_poll_init_socket(uv_default_loop(), poll_handle_, socket); uv_poll_start(poll_handle_, UV_READABLE, Socket::UV_PollCallback); @@ -568,9 +651,9 @@ namespace zmq { #define GET_SOCKET(info) \ Socket* socket = GetSocket(info); \ if (socket->state_ == STATE_CLOSED) \ - return Nan::ThrowTypeError("Socket is closed"); \ + return Nan::ThrowTypeError("Socket is closed"); \ if (socket->state_ == STATE_BUSY) \ - return Nan::ThrowTypeError("Socket is busy"); + return Nan::ThrowTypeError("Socket is busy"); NAN_GETTER(Socket::GetState) { Socket* socket = Nan::ObjectWrap::Unwrap(info.Holder()); @@ -597,7 +680,7 @@ namespace zmq { while (true) { int rc = zmq_getsockopt(socket_, option, &value, &len); if (rc < 0) { - if(zmq_errno()==EINTR) { + if (zmq_errno() == EINTR) { continue; } Nan::ThrowError(ExceptionFromError()); @@ -749,14 +832,12 @@ namespace zmq { UV_BindAsync, (uv_after_work_cb)UV_BindAsyncAfter); socket->state_ = STATE_BUSY; - - return; } void Socket::UV_BindAsync(uv_work_t* req) { BindState* state = static_cast(req->data); if (zmq_bind(state->sock, *state->addr) < 0) - state->error = zmq_errno(); + state->error = zmq_errno(); } void Socket::UV_BindAsyncAfter(uv_work_t* req) { @@ -807,8 +888,6 @@ namespace zmq { } socket->endpoints += 1; - - return; } #if ZMQ_CAN_UNBIND @@ -830,13 +909,12 @@ namespace zmq { UV_UnbindAsync, (uv_after_work_cb)UV_UnbindAsyncAfter); socket->state_ = STATE_BUSY; - return; } void Socket::UV_UnbindAsync(uv_work_t* req) { BindState* state = static_cast(req->data); if (zmq_unbind(state->sock, *state->addr) < 0) - state->error = zmq_errno(); + state->error = zmq_errno(); } void Socket::UV_UnbindAsyncAfter(uv_work_t* req) { @@ -884,8 +962,6 @@ namespace zmq { socket->Unref(); socket->_DetachFromEventLoop(); } - - return; } #endif @@ -904,97 +980,26 @@ namespace zmq { socket->Ref(); socket->_AttachToEventLoop(); } - - return; } #if ZMQ_CAN_DISCONNECT NAN_METHOD(Socket::Disconnect) { - - if (!info[0]->IsString()) { + if (!info[0]->IsString()) return Nan::ThrowTypeError("Address must be a string!"); - } GET_SOCKET(info); Nan::Utf8String address(info[0].As()); if (zmq_disconnect(socket->socket_, *address)) return Nan::ThrowError(ErrorMessage()); + if (--socket->endpoints == 0) { socket->Unref(); socket->_DetachFromEventLoop(); } - - return; } #endif - /* - * An object that creates an empty ØMQ message, which can be used for - * zmq_recv. After the receive call, a Buffer object wrapping the ØMQ - * message can be requested. The reference for the ØMQ message will - * remain while the data is in use by the Buffer. - */ - - class Socket::IncomingMessage { - public: - inline IncomingMessage() { - msgref_ = new MessageReference(); - }; - - inline ~IncomingMessage() { - if (buf_.IsEmpty() && msgref_) { - delete msgref_; - msgref_ = NULL; - } else { - buf_.Reset(); - } - }; - - inline operator zmq_msg_t*() { - return *msgref_; - } - - inline Local GetBuffer() { - if (buf_.IsEmpty()) { - Local buf_obj = Nan::NewBuffer((char*)zmq_msg_data(*msgref_), zmq_msg_size(*msgref_), FreeCallback, msgref_).ToLocalChecked(); - if (buf_obj.IsEmpty()) { - return Local(); - } - buf_.Reset(buf_obj); - } - return Nan::New(buf_); - } - - private: - static void FreeCallback(char* data, void* message) { - delete static_cast(message); - } - - class MessageReference { - public: - inline MessageReference() { - if (zmq_msg_init(&msg_) < 0) - throw std::runtime_error(ErrorMessage()); - } - - inline ~MessageReference() { - if (zmq_msg_close(&msg_) < 0) - throw std::runtime_error(ErrorMessage()); - } - - inline operator zmq_msg_t*() { - return &msg_; - } - - private: - zmq_msg_t msg_; - }; - - Nan::Persistent buf_; - MessageReference* msgref_; - }; - #if ZMQ_CAN_MONITOR NAN_METHOD(Socket::Monitor) { int64_t timer_interval = 10; // default to 10ms interval @@ -1021,8 +1026,8 @@ namespace zmq { Context *context = Nan::ObjectWrap::Unwrap(Nan::New(socket->context_)); sprintf(addr, "%s%d", "inproc://monitor.req.", monitors_count++); - if(zmq_socket_monitor(socket->socket_, addr, ZMQ_EVENT_ALL) != -1) { - socket->monitor_socket_ = zmq_socket (context->context_, ZMQ_PAIR); + if (zmq_socket_monitor(socket->socket_, addr, ZMQ_EVENT_ALL) != -1) { + socket->monitor_socket_ = zmq_socket(context->context_, ZMQ_PAIR); zmq_connect (socket->monitor_socket_, addr); socket->timer_interval_ = timer_interval; socket->num_of_events_ = num_of_events; @@ -1032,8 +1037,6 @@ namespace zmq { uv_timer_init(uv_default_loop(), socket->monitor_handle_); uv_timer_start(socket->monitor_handle_, reinterpret_cast(Socket::UV_MonitorCallback), timer_interval, 0); } - - return; } void @@ -1059,12 +1062,10 @@ namespace zmq { // which might not always be the case Socket* socket = GetSocket(info); socket->Unmonitor(); - return; } - #endif - NAN_METHOD(Socket::Readv) { + NAN_METHOD(Socket::Read) { Socket* socket = GetSocket(info); if (socket->state_ != STATE_READY) return; @@ -1075,7 +1076,13 @@ namespace zmq { int rc = 0; int flags = 0; + +#if ZMQ_VERSION_MAJOR <= 2 int64_t more = 1; +#else + int more = 1; +#endif + size_t more_size = sizeof(more); size_t index = 0; @@ -1083,10 +1090,9 @@ namespace zmq { while (more == 1) { if (checkPollIn) { - while (zmq_getsockopt(socket->socket_, ZMQ_EVENTS, &events, &events_size)) { + while (zmq_getsockopt(socket->socket_, ZMQ_EVENTS, &events, &events_size)) if (zmq_errno() != EINTR) return Nan::ThrowError(ErrorMessage()); - } if ((events & ZMQ_POLLIN) == 0) return; @@ -1095,25 +1101,14 @@ namespace zmq { IncomingMessage part; while (true) { - rc = zmq_msg_init(part); - if (rc != 0) { - if (zmq_errno()==EINTR) { - continue; - } - return Nan::ThrowError(ErrorMessage()); - } - break; - } - - while (true) { - #if ZMQ_VERSION_MAJOR == 2 +#if ZMQ_VERSION_MAJOR == 2 rc = zmq_recv(socket->socket_, part, flags); - #elif ZMQ_VERSION_MAJOR == 3 +#elif ZMQ_VERSION_MAJOR == 3 rc = zmq_recvmsg(socket->socket_, part, flags); - #else +#else rc = zmq_msg_recv(part, socket->socket_, flags); checkPollIn = false; - #endif +#endif if (rc < 0) { if (zmq_errno() == EINTR) @@ -1134,99 +1129,7 @@ namespace zmq { info.GetReturnValue().Set(result); } - NAN_METHOD(Socket::Recv) { - int flags = 0; - int argc = info.Length(); - if (argc == 1) { - if (!info[0]->IsNumber()) - return Nan::ThrowTypeError("Argument should be an integer"); - flags = Nan::To(info[0]).FromJust(); - } else if (argc != 0) { - return Nan::ThrowTypeError("Only one argument at most was expected"); - } - - GET_SOCKET(info); - - IncomingMessage msg; - while (true) { - int rc; - #if ZMQ_VERSION_MAJOR == 2 - rc = zmq_recv(socket->socket_, msg, flags); - #else - rc = zmq_recvmsg(socket->socket_, msg, flags); - #endif - if (rc < 0) { - if (zmq_errno()==EINTR) { - continue; - } - return Nan::ThrowError(ErrorMessage()); - } else { - break; - } - } - info.GetReturnValue().Set(msg.GetBuffer()); - } - - /* - * An object that creates a ØMQ message from the given Buffer Object, - * and manages the reference to it using RAII. A persistent V8 handle - * for the Buffer object will remain while its data is in use by ØMQ. - */ - - class Socket::OutgoingMessage { - public: - inline OutgoingMessage(Local buf) { - bufref_ = new BufferReference(buf); - if (zmq_msg_init_data(&msg_, Buffer::Data(buf), Buffer::Length(buf), - BufferReference::FreeCallback, bufref_) < 0) { - delete bufref_; - throw std::runtime_error(ErrorMessage()); - } - }; - - inline ~OutgoingMessage() { - if (zmq_msg_close(&msg_) < 0) - throw std::runtime_error(ErrorMessage()); - }; - - inline operator zmq_msg_t*() { - return &msg_; - } - - private: - class BufferReference { - public: - inline BufferReference(Local buf) { - loop = uv_default_loop(); - uv_async_init(loop, &async, reinterpret_cast(cleanup)); - async.data = this; - persistent.Reset(buf); - } - - inline ~BufferReference() { - persistent.Reset(); - } - - // Called by zmq when the message has been sent. - // NOTE: May be called from a worker thread. Do not modify V8/Node. - static void FreeCallback(void* data, void* message) { - uv_async_send(&static_cast(message)->async); - } - - static void cleanup(uv_async_t *handle, int status) { - delete static_cast(handle->data); - } - private: - Nan::Persistent persistent; - uv_async_t async; - uv_loop_t *loop; - }; - - zmq_msg_t msg_; - BufferReference* bufref_; - }; - - NAN_METHOD(Socket::Sendv) { + NAN_METHOD(Socket::Send) { Socket* socket = GetSocket(info); if (socket->state_ != STATE_READY) return info.GetReturnValue().Set(false); @@ -1274,14 +1177,14 @@ namespace zmq { while (true) { int rc; - #if ZMQ_VERSION_MAJOR == 2 +#if ZMQ_VERSION_MAJOR == 2 rc = zmq_send(socket->socket_, &msg, flags); - #elif ZMQ_VERSION_MAJOR == 3 +#elif ZMQ_VERSION_MAJOR == 3 rc = zmq_sendmsg(socket->socket_, &msg, flags); - #else +#else rc = zmq_msg_send(&msg, socket->socket_, flags); checkPollOut = false; - #endif +#endif if (rc < 0){ if (zmq_errno() == EINTR) { continue; @@ -1302,68 +1205,8 @@ namespace zmq { return info.GetReturnValue().Set(true); } - // WARNING: the buffer passed here will be kept alive - // until zmq_send completes, possibly on another thread. - // Do not modify or reuse any buffer passed to send. - // This is bad, but allows us to send without copying. - NAN_METHOD(Socket::Send) { - - int argc = info.Length(); - if (argc != 1 && argc != 2) - return Nan::ThrowTypeError("Must pass a Buffer and optionally flags"); - if (!Buffer::HasInstance(info[0])) - return Nan::ThrowTypeError("First argument should be a Buffer"); - int flags = 0; - if (argc == 2) { - if (!info[1]->IsNumber()) - return Nan::ThrowTypeError("Second argument should be an integer"); - flags = Nan::To(info[1]).FromJust(); - } - - GET_SOCKET(info); - -#if 0 // zero-copy version, but doesn't properly pin buffer and so has GC issues - OutgoingMessage msg(info[0].As()); - if (zmq_send(socket->socket_, msg, flags) < 0) - return Nan::ThrowError(ErrorMessage()); -#else // copying version that has no GC issues - zmq_msg_t msg; - Local buf = info[0].As(); - size_t len = Buffer::Length(buf); - int res = zmq_msg_init_size(&msg, len); - if (res != 0) - return Nan::ThrowError(ErrorMessage()); - - char * cp = static_cast(zmq_msg_data(&msg)); - const char * dat = Buffer::Data(buf); - std::copy(dat, dat + len, cp); - while (true) { - int rc; - #if ZMQ_VERSION_MAJOR == 2 - rc = zmq_send(socket->socket_, &msg, flags); - #elif ZMQ_VERSION_MAJOR == 3 - rc = zmq_sendmsg(socket->socket_, &msg, flags); - #else - rc = zmq_msg_send(&msg, socket->socket_, flags); - #endif - if (rc < 0){ - if (zmq_errno()==EINTR) { - continue; - } - return Nan::ThrowError(ErrorMessage()); - } else { - break; - } - } -#endif // zero copy / copying version - - return; - } - - static void - on_uv_close(uv_handle_t *handle) - { + on_uv_close(uv_handle_t *handle) { delete handle; } @@ -1388,37 +1231,25 @@ namespace zmq { NAN_METHOD(Socket::Close) { GET_SOCKET(info); socket->Close(); - return; } - // Make zeromq versions less than 2.1.3 work by defining - // the new constants if they don't already exist - #if (ZMQ_VERSION < 20103) - # define ZMQ_DEALER ZMQ_XREQ - # define ZMQ_ROUTER ZMQ_XREP - #endif - /* * Module functions. */ - static NAN_METHOD(ZmqVersion) { - int major, minor, patch; - zmq_version(&major, &minor, &patch); - + static NAN_METHOD(ZmqVersion) { char version_info[16]; - snprintf(version_info, 16, "%d.%d.%d", major, minor, patch); + snprintf(version_info, 16, "%d.%d.%d", ZMQ_VERSION_MAJOR, ZMQ_VERSION_MINOR, ZMQ_VERSION_PATCH); info.GetReturnValue().Set(Nan::New(version_info).ToLocalChecked()); } #if ZMQ_VERSION_MAJOR >= 4 static NAN_METHOD(ZmqCurveKeypair) { + char public_key[41]; + char secret_key[41]; - char public_key [41]; - char secret_key [41]; - - int rc = zmq_curve_keypair( public_key, secret_key); + int rc = zmq_curve_keypair(public_key, secret_key); if (rc < 0) { return Nan::ThrowError("zmq_curve_keypair operation failed. Method support in libzmq v4+ -with-libsodium."); } @@ -1434,114 +1265,453 @@ namespace zmq { static NAN_MODULE_INIT(Initialize) { Nan::HandleScope scope; - opts_int.insert(14); // ZMQ_FD - opts_int.insert(16); // ZMQ_TYPE - opts_int.insert(17); // ZMQ_LINGER - opts_int.insert(18); // ZMQ_RECONNECT_IVL - opts_int.insert(19); // ZMQ_BACKLOG - opts_int.insert(21); // ZMQ_RECONNECT_IVL_MAX - opts_int.insert(23); // ZMQ_SNDHWM - opts_int.insert(24); // ZMQ_RCVHWM - opts_int.insert(25); // ZMQ_MULTICAST_HOPS - opts_int.insert(27); // ZMQ_RCVTIMEO - opts_int.insert(28); // ZMQ_SNDTIMEO - opts_int.insert(29); // ZMQ_RCVLABEL - opts_int.insert(30); // ZMQ_RCVCMD - opts_int.insert(31); // ZMQ_IPV4ONLY - opts_int.insert(33); // ZMQ_ROUTER_MANDATORY - opts_int.insert(34); // ZMQ_TCP_KEEPALIVE - opts_int.insert(35); // ZMQ_TCP_KEEPALIVE_CNT - opts_int.insert(36); // ZMQ_TCP_KEEPALIVE_IDLE - opts_int.insert(37); // ZMQ_TCP_KEEPALIVE_INTVL - opts_int.insert(39); // ZMQ_DELAY_ATTACH_ON_CONNECT - opts_int.insert(40); // ZMQ_XPUB_VERBOSE - opts_int.insert(41); // ZMQ_ROUTER_RAW - opts_int.insert(42); // ZMQ_IPV6 - - opts_int64.insert(3); // ZMQ_SWAP - opts_int64.insert(8); // ZMQ_RATE - opts_int64.insert(10); // ZMQ_MCAST_LOOP - opts_int64.insert(20); // ZMQ_RECOVERY_IVL_MSEC - opts_int64.insert(22); // ZMQ_MAXMSGSIZE - - opts_uint64.insert(1); // ZMQ_HWM - opts_uint64.insert(4); // ZMQ_AFFINITY - - opts_binary.insert(5); // ZMQ_IDENTITY - opts_binary.insert(6); // ZMQ_SUBSCRIBE - opts_binary.insert(7); // ZMQ_UNSUBSCRIBE - opts_binary.insert(32); // ZMQ_LAST_ENDPOINT - opts_binary.insert(38); // ZMQ_TCP_ACCEPT_FILTER - - // transition types - #if ZMQ_VERSION_MAJOR >= 3 - opts_int.insert(15); // ZMQ_EVENTS 3.x int - opts_int.insert(8); // ZMQ_RATE 3.x int - opts_int.insert(9); // ZMQ_RECOVERY_IVL 3.x int - opts_int.insert(13); // ZMQ_RCVMORE 3.x int - opts_int.insert(11); // ZMQ_SNDBUF 3.x int - opts_int.insert(12); // ZMQ_RCVBUF 3.x int - #else - opts_uint32.insert(15); // ZMQ_EVENTS 2.x uint32_t - opts_int64.insert(8); // ZMQ_RATE 2.x int64_t - opts_int64.insert(9); // ZMQ_RECOVERY_IVL 2.x int64_t - opts_int64.insert(13); // ZMQ_RCVMORE 2.x int64_t - opts_uint64.insert(11); // ZMQ_SNDBUF 2.x uint64_t - opts_uint64.insert(12); // ZMQ_RCVBUF 2.x uint64_t - #endif - - #if ZMQ_VERSION_MAJOR >= 4 - opts_int.insert(43); // ZMQ_MECHANISM - opts_int.insert(44); // ZMQ_PLAIN_SERVER - opts_binary.insert(45); // ZMQ_PLAIN_USERNAME - opts_binary.insert(46); // ZMQ_PLAIN_PASSWORD - opts_int.insert(47); // ZMQ_CURVE_SERVER - opts_binary.insert(48); // ZMQ_CURVE_PUBLICKEY - opts_binary.insert(49); // ZMQ_CURVE_SECRETKEY - opts_binary.insert(50); // ZMQ_CURVE_SERVERKEY - opts_binary.insert(55); // ZMQ_ZAP_DOMAIN - #endif - - NODE_DEFINE_CONSTANT(target, ZMQ_CAN_DISCONNECT); - NODE_DEFINE_CONSTANT(target, ZMQ_CAN_UNBIND); - NODE_DEFINE_CONSTANT(target, ZMQ_CAN_MONITOR); - NODE_DEFINE_CONSTANT(target, ZMQ_CAN_SET_CTX); - NODE_DEFINE_CONSTANT(target, ZMQ_PUB); - NODE_DEFINE_CONSTANT(target, ZMQ_SUB); - #if ZMQ_VERSION_MAJOR >= 3 - NODE_DEFINE_CONSTANT(target, ZMQ_XPUB); - NODE_DEFINE_CONSTANT(target, ZMQ_XSUB); - #endif - NODE_DEFINE_CONSTANT(target, ZMQ_REQ); - NODE_DEFINE_CONSTANT(target, ZMQ_XREQ); - NODE_DEFINE_CONSTANT(target, ZMQ_REP); - NODE_DEFINE_CONSTANT(target, ZMQ_XREP); - NODE_DEFINE_CONSTANT(target, ZMQ_DEALER); - NODE_DEFINE_CONSTANT(target, ZMQ_ROUTER); - NODE_DEFINE_CONSTANT(target, ZMQ_PUSH); - NODE_DEFINE_CONSTANT(target, ZMQ_PULL); - NODE_DEFINE_CONSTANT(target, ZMQ_PAIR); - #if ZMQ_VERSION_MAJOR >= 4 - NODE_DEFINE_CONSTANT(target, ZMQ_STREAM); - #endif - - NODE_DEFINE_CONSTANT(target, ZMQ_POLLIN); - NODE_DEFINE_CONSTANT(target, ZMQ_POLLOUT); - NODE_DEFINE_CONSTANT(target, ZMQ_POLLERR); - - NODE_DEFINE_CONSTANT(target, ZMQ_SNDMORE); - #if ZMQ_VERSION_MAJOR == 2 - NODE_DEFINE_CONSTANT(target, ZMQ_NOBLOCK); - #endif - - NODE_DEFINE_CONSTANT(target, STATE_READY); - NODE_DEFINE_CONSTANT(target, STATE_BUSY); - NODE_DEFINE_CONSTANT(target, STATE_CLOSED); + // empty objects to hold (string -> mixed) values for constants + + Local sendFlags = Nan::New(); + Local types = Nan::New(); + Local options = Nan::New(); + Local ctxOptions = Nan::New(); + + // Helper macros for storing and exposing constants + + #define OPT(type, name) \ + opts_ ## type.insert(name); \ + Nan::Set(options, Nan::New(#name).ToLocalChecked(), Nan::New(name)); + + #define CTX_OPT(name) \ + Nan::Set(ctxOptions, Nan::New(#name).ToLocalChecked(), Nan::New(name)); + + #define SENDFLAG(name) \ + Nan::Set(sendFlags, Nan::New(#name).ToLocalChecked(), Nan::New(name)); + + #define TYPE(name) \ + Nan::Set(types, Nan::New(#name).ToLocalChecked(), Nan::New(name)); + + // Message send flags + + // Message send flags since ZMQ 2.2 + // http://api.zeromq.org/2-2:zmq-send + + #ifdef ZMQ_NOBLOCK + SENDFLAG(ZMQ_NOBLOCK); + #endif + + #ifdef ZMQ_SNDMORE + SENDFLAG(ZMQ_SNDMORE); + #endif + + // Message send flags since ZMQ 3.0 + // http://api.zeromq.org/3-0:zmq-send + + #ifdef ZMQ_DONTWAIT + SENDFLAG(ZMQ_DONTWAIT); + #endif + + #ifdef ZMQ_SNDLABEL + SENDFLAG(ZMQ_SNDLABEL); + #endif + + + // Socket types + + // Socket types since ZMQ 2.2: + // http://api.zeromq.org/2-2:zmq-socket + + #ifdef ZMQ_REQ + TYPE(ZMQ_REQ); + #endif + + #ifdef ZMQ_REP + TYPE(ZMQ_REP); + #endif + + #ifdef ZMQ_DEALER + TYPE(ZMQ_DEALER); + #endif + + #ifdef ZMQ_ROUTER + TYPE(ZMQ_ROUTER); + #endif + + #ifdef ZMQ_PUB + TYPE(ZMQ_PUB); + #endif + + #ifdef ZMQ_SUB + TYPE(ZMQ_SUB); + #endif + + #ifdef ZMQ_PUSH + TYPE(ZMQ_PUSH); + #endif + + #ifdef ZMQ_PULL + TYPE(ZMQ_PULL); + #endif + + #ifdef ZMQ_PAIR + TYPE(ZMQ_PAIR); + #endif + + // Socket types since ZMQ 3.0: + // http://api.zeromq.org/3-0:zmq-socket + + #ifdef ZMQ_XREQ + TYPE(ZMQ_XREQ); + #endif + + #ifdef ZMQ_XREP + TYPE(ZMQ_XREP); + #endif + + #ifdef ZMQ_XPUB + TYPE(ZMQ_XPUB); + #endif + + #ifdef ZMQ_XSUB + TYPE(ZMQ_XSUB); + #endif + + // Socket types since ZMQ 4.0: + // http://api.zeromq.org/4-0:zmq-socket + + #ifdef ZMQ_STREAM + TYPE(ZMQ_STREAM); + #endif + + + // Context options + + // Context options since ZMQ 3.2: + // http://api.zeromq.org/3-2:zmq-ctx-set + + #ifdef ZMQ_IO_THREADS + CTX_OPT(ZMQ_IO_THREADS); + #endif + + #ifdef ZMQ_MAX_SOCKETS + CTX_OPT(ZMQ_MAX_SOCKETS); + #endif + + // Context options since ZMQ 4.0: + // http://api.zeromq.org/4-0:zmq-ctx-set + + #ifdef ZMQ_THREAD_SCHED_POLICY + CTX_OPT(ZMQ_THREAD_SCHED_POLICY); + #endif + + #ifdef ZMQ_THREAD_PRIORITY + CTX_OPT(ZMQ_THREAD_PRIORITY); + #endif + + #ifdef ZMQ_IPV6 + CTX_OPT(ZMQ_IPV6); + #endif + + + // Socket options since ZMQ 2.2: + // http://api.zeromq.org/2-2:zmq-setsockopt + // http://api.zeromq.org/2-2:zmq-getsockopt + + #ifdef ZMQ_TYPE + OPT(int, ZMQ_TYPE); + #endif + + #ifdef ZMQ_RCVMORE + #if ZMQ_VERSION_MAJOR <= 2 + OPT(int64, ZMQ_RCVMORE); + #else + OPT(int, ZMQ_RCVMORE); + #endif + #endif + + #ifdef ZMQ_HWM + OPT(uint64, ZMQ_HWM); + #endif + + #ifdef ZMQ_SWAP + OPT(int64, ZMQ_SWAP); + #endif + + #ifdef ZMQ_AFFINITY + OPT(uint64, ZMQ_AFFINITY); + #endif + + #ifdef ZMQ_IDENTITY + OPT(binary, ZMQ_IDENTITY); + #endif + + #ifdef ZMQ_SUBSCRIBE + OPT(binary, ZMQ_SUBSCRIBE); + #endif + + #ifdef ZMQ_UNSUBSCRIBE + OPT(binary, ZMQ_UNSUBSCRIBE); + #endif + + #ifdef ZMQ_RCVTIMEO + OPT(int, ZMQ_RCVTIMEO); + #endif + + #ifdef ZMQ_SNDTIMEO + OPT(int, ZMQ_SNDTIMEO); + #endif + + #ifdef ZMQ_RATE + #if ZMQ_VERSION_MAJOR <= 2 + OPT(int64, ZMQ_RATE); + #else + OPT(int, ZMQ_RATE); + #endif + #endif + + #ifdef ZMQ_RECOVERY_IVL + #if ZMQ_VERSION_MAJOR <= 2 + OPT(int64, ZMQ_RECOVERY_IVL); + #else + OPT(int, ZMQ_RECOVERY_IVL); + #endif + #endif + + #ifdef ZMQ_RECOVERY_IVL_MSEC + OPT(int64, ZMQ_RECOVERY_IVL_MSEC); + #endif + + #ifdef ZMQ_MCAST_LOOP + OPT(int64, ZMQ_MCAST_LOOP); + #endif + + #ifdef ZMQ_SNDBUF + #if ZMQ_VERSION_MAJOR <= 2 + OPT(uint64, ZMQ_SNDBUF); + #else + OPT(int, ZMQ_SNDBUF); + #endif + #endif + + #ifdef ZMQ_RCVBUF + #if ZMQ_VERSION_MAJOR <= 2 + OPT(uint64, ZMQ_RCVBUF); + #else + OPT(int, ZMQ_RCVBUF); + #endif + #endif + + #ifdef ZMQ_LINGER + OPT(int, ZMQ_LINGER); + #endif + + #ifdef ZMQ_RECONNECT_IVL + OPT(int, ZMQ_RECONNECT_IVL); + #endif + + #ifdef ZMQ_RECONNECT_IVL_MAX + OPT(int, ZMQ_RECONNECT_IVL_MAX); + #endif + + #ifdef ZMQ_BACKLOG + OPT(int, ZMQ_BACKLOG); + #endif + + #ifdef ZMQ_FD + OPT(int, ZMQ_FD); + #endif + + #ifdef ZMQ_EVENTS + #if ZMQ_VERSION_MAJOR <= 2 + OPT(int, ZMQ_EVENTS); + #else + OPT(uint32, ZMQ_EVENTS); + #endif + #endif + + // Socket options since ZMQ 3.0: + // http://api.zeromq.org/3-0:zmq-setsockopt + // http://api.zeromq.org/3-0:zmq-getsockopt + + #ifdef ZMQ_RCVLABEL + OPT(int, ZMQ_RCVLABEL); + #endif + + // Socket options since ZMQ 3.2: + // http://api.zeromq.org/3-2:zmq-setsockopt + // http://api.zeromq.org/3-2:zmq-getsockopt + + #ifdef ZMQ_SNDHWM + OPT(int, ZMQ_SNDHWM); + #endif + + #ifdef ZMQ_RCVHWM + OPT(int, ZMQ_RCVHWM); + #endif + + #ifdef ZMQ_MAXMSGSIZE + OPT(int64, ZMQ_MAXMSGSIZE); + #endif + + #ifdef ZMQ_MULTICAST_HOPS + OPT(int, ZMQ_MULTICAST_HOPS); + #endif + + #ifdef ZMQ_IPV4ONLY + OPT(int, ZMQ_IPV4ONLY); + #endif + + #ifdef ZMQ_DELAY_ATTACH_ON_CONNECT + OPT(int, ZMQ_DELAY_ATTACH_ON_CONNECT); + #endif + + #ifdef ZMQ_LAST_ENDPOINT + OPT(binary, ZMQ_LAST_ENDPOINT); + #endif + + #ifdef ZMQ_ROUTER_MANDATORY + OPT(int, ZMQ_ROUTER_MANDATORY); + #endif + + #ifdef ZMQ_XPUB_VERBOSE + OPT(int, ZMQ_XPUB_VERBOSE); + #endif + + #ifdef ZMQ_TCP_KEEPALIVE + OPT(int, ZMQ_TCP_KEEPALIVE); + #endif + + #ifdef ZMQ_TCP_KEEPALIVE_IDLE + OPT(int, ZMQ_TCP_KEEPALIVE_IDLE); + #endif + + #ifdef ZMQ_TCP_KEEPALIVE_CNT + OPT(int, ZMQ_TCP_KEEPALIVE_CNT); + #endif + + #ifdef ZMQ_TCP_KEEPALIVE_INTVL + OPT(int, ZMQ_TCP_KEEPALIVE_INTVL); + #endif + + #ifdef ZMQ_TCP_ACCEPT_FILTER + OPT(binary, ZMQ_TCP_ACCEPT_FILTER); + #endif + + // Socket options since ZMQ 4.0: + // http://api.zeromq.org/4-0:zmq-setsockopt + // http://api.zeromq.org/4-0:zmq-getsockopt + + #ifdef ZMQ_IPV6 + OPT(int, ZMQ_IPV6); + #endif + + #ifdef ZMQ_IMMEDIATE + OPT(int, ZMQ_IMMEDIATE); + #endif + + #ifdef ZMQ_MECHANISM + OPT(int, ZMQ_MECHANISM); + #endif + + #ifdef ZMQ_ROUTER_RAW + OPT(int, ZMQ_ROUTER_RAW); + #endif + + #ifdef ZMQ_PROBE_ROUTER + OPT(int, ZMQ_PROBE_ROUTER); + #endif + + #ifdef ZMQ_REQ_CORRELATE + OPT(int, ZMQ_REQ_CORRELATE); + #endif + + #ifdef ZMQ_REQ_RELAXED + OPT(int, ZMQ_REQ_RELAXED); + #endif + + #ifdef ZMQ_PLAIN_SERVER + OPT(int, ZMQ_PLAIN_SERVER); + #endif + + #ifdef ZMQ_PLAIN_USERNAME + OPT(binary, ZMQ_PLAIN_USERNAME); + #endif + + #ifdef ZMQ_PLAIN_PASSWORD + OPT(binary, ZMQ_PLAIN_PASSWORD); + #endif + + #ifdef ZMQ_CURVE_SERVER + OPT(int, ZMQ_CURVE_SERVER); + #endif + + #ifdef ZMQ_CURVE_PUBLICKEY + OPT(binary, ZMQ_CURVE_PUBLICKEY); + #endif + + #ifdef ZMQ_CURVE_SECRETKEY + OPT(binary, ZMQ_CURVE_SECRETKEY); + #endif + + #ifdef ZMQ_CURVE_SERVERKEY + OPT(binary, ZMQ_CURVE_SERVERKEY); + #endif + + #ifdef ZMQ_ZAP_DOMAIN + OPT(binary, ZMQ_ZAP_DOMAIN); + #endif + + #ifdef ZMQ_CONFLATE + OPT(int, ZMQ_CONFLATE); + #endif + + // Socket options since ZMQ 4.1: + // http://api.zeromq.org/4-1:zmq-setsockopt + // http://api.zeromq.org/4-1:zmq-getsockopt + + #ifdef ZMQ_CONNECT_RID + OPT(binary, ZMQ_CONNECT_RID); + #endif + + #ifdef ZMQ_GSSAPI_PLAINTEXT + OPT(int, ZMQ_GSSAPI_PLAINTEXT); + #endif + + #ifdef ZMQ_GSSAPI_PRINCIPAL + OPT(binary, ZMQ_GSSAPI_PRINCIPAL); + #endif + + #ifdef ZMQ_GSSAPI_SERVER + OPT(int, ZMQ_GSSAPI_SERVER); + #endif + + #ifdef ZMQ_GSSAPI_SERVICE_PRINCIPAL + OPT(binary, ZMQ_GSSAPI_SERVICE_PRINCIPAL); + #endif + + #ifdef ZMQ_HANDSHAKE_IVL + OPT(int, ZMQ_HANDSHAKE_IVL); + #endif + + #ifdef ZMQ_ROUTER_HANDOVER + OPT(int, ZMQ_ROUTER_HANDOVER); + #endif + + #ifdef ZMQ_TOS + OPT(int, ZMQ_TOS); + #endif + + // TODO: ZMQ_IPC_FILTER_GID + // TODO: ZMQ_IPC_FILTER_PID + // TODO: ZMQ_IPC_FILTER_UID + + // Expose properties and methods + + Nan::Set(target, Nan::New("sendFlags").ToLocalChecked(), sendFlags); + Nan::Set(target, Nan::New("types").ToLocalChecked(), types); + Nan::Set(target, Nan::New("options").ToLocalChecked(), options); + Nan::Set(target, Nan::New("ctxOptions").ToLocalChecked(), ctxOptions); Nan::SetMethod(target, "zmqVersion", ZmqVersion); - #if ZMQ_VERSION_MAJOR >= 4 +#if ZMQ_VERSION_MAJOR >= 4 Nan::SetMethod(target, "zmqCurveKeypair", ZmqCurveKeypair); - #endif +#endif Context::Initialize(target); Socket::Initialize(target); diff --git a/lib/Batch.js b/lib/Batch.js new file mode 100644 index 0000000..4b528db --- /dev/null +++ b/lib/Batch.js @@ -0,0 +1,50 @@ +var ZMQ_SNDMORE = require('bindings')('zmq.node').sendFlags.ZMQ_SNDMORE; + + +/** + * A batch consists of 1 or more message parts with their flags that need to be sent as one unit + */ + +function Batch() { + this.content = []; // buf, flags, buf, flags, ... + this.cbs = []; // callbacks + this.isClosed = false; // true if the last message does not have SNDMORE in its flags, false otherwise + this.next = null; // next batch (for linked list of batches) +} + +module.exports = Batch; + + +Batch.prototype.append = function (buf, flags, cb) { + if (!Buffer.isBuffer(buf)) { + buf = new Buffer(String(buf), 'utf8'); + } + + this.content.push(buf, flags); + + if (cb) { + this.cbs.push(cb); + } + + if ((flags & ZMQ_SNDMORE) === 0) { + this.isClosed = true; + } +}; + +Batch.prototype.invokeError = function (socket, error) { + var returned = false; + for (var i = 0; i < this.cbs.length; i += 1) { + this.cbs[i].call(socket, error); + returned = true; + } + + if (!returned) { + throw error; + } +}; + +Batch.prototype.invokeSent = function (socket) { + for (var i = 0; i < this.cbs.length; i += 1) { + this.cbs[i].call(socket); + } +}; diff --git a/lib/BatchList.js b/lib/BatchList.js new file mode 100644 index 0000000..0772326 --- /dev/null +++ b/lib/BatchList.js @@ -0,0 +1,54 @@ +// linked list for message batches + +var Batch = require('./Batch'); + + +function BatchList() { + this.firstBatch = null; + this.lastBatch = null; + this.length = 0; +} + +module.exports = BatchList; + + +BatchList.prototype.canSend = function () { + return this.firstBatch ? this.firstBatch.isClosed : false; +}; + +BatchList.prototype.append = function (buf, flags, cb) { + var batch = this.lastBatch; + + if (!batch || batch.isClosed) { + batch = new Batch(); + + if (this.lastBatch) { + this.lastBatch.next = batch; + } + + this.lastBatch = batch; + + if (!this.firstBatch) { + this.firstBatch = batch; + } + + this.length += 1; + } + + batch.append(buf, flags, cb); +}; + +BatchList.prototype.fetch = function () { + var batch = this.firstBatch; + if (batch && batch.isClosed) { + this.firstBatch = batch.next; + this.length -= 1; + return batch; + } + return undefined; +}; + +BatchList.prototype.restore = function (batch) { + this.firstBatch = batch; + this.length += 1; +}; diff --git a/lib/context.js b/lib/context.js new file mode 100644 index 0000000..bb824cb --- /dev/null +++ b/lib/context.js @@ -0,0 +1,69 @@ +var binding = require('bindings')('zmq.node'); +var socket = require('./socket'); + + +/** + * Map of context options in both "ZMQ_IO_THREADS" and "io_threads" style. + */ + +var opts = {}; +Object.keys(binding.ctxOptions).forEach(function (name) { + var shortName = name.replace(/^ZMQ_/, '').toLowerCase(); + + opts[name] = opts[shortName] = binding.ctxOptions[name]; +}); + + +function Context() { + this._ctx = new binding.Context(); +} + +Context.prototype.set = function (option, value) { + return this._ctx.set(opts[option], value); +}; + +Context.prototype.get = function (option) { + return this._ctx.get(opts[option]); +}; + +Context.prototype.createSocket = function (type, options) { + return socket.createSocket(type, options, this); +}; + +Context.prototype.socket = Context.prototype.createSocket; + + +exports.createContext = function (options) { + var context = new Context(); + + if (options) { + for (var key in options) { + context.set(key, options[key]); + } + } + + return context; +}; + + +var defaultContext; + +exports.getDefaultContext = function () { + if (!defaultContext) { + defaultContext = new Context(); + + if (process.env.ZMQ_IO_THREADS) { + defaultContext.set('ZMQ_IO_THREADS', parseInt(process.env.ZMQ_IO_THREADS, 10)); + } + } + + return defaultContext; +}; + + +/** + * Expose the Context class and supported options + */ + +exports.Context = Context; +exports.opts = opts; diff --git a/lib/index.js b/lib/index.js index f04529d..de78a80 100644 --- a/lib/index.js +++ b/lib/index.js @@ -2,832 +2,145 @@ * Module dependencies. */ -var EventEmitter = require('events').EventEmitter - , zmq = require('bindings')('zmq.node') - , util = require('util'); +var binding = require('bindings')('zmq.node'); +var context = require('./context'); +var socket = require('./socket'); /** - * Expose bindings as the module. + * Expose bindings. */ -exports = module.exports = zmq; +exports.binding = binding; /** * Expose zmq version. */ -exports.version = zmq.zmqVersion(); +exports.version = binding.zmqVersion(); /** * Expose zmq_curve_keypair */ -exports.curveKeypair = zmq.zmqCurveKeypair; +exports.curveKeypair = binding.zmqCurveKeypair; /** - * Map of socket types. - */ - -var types = exports.types = { - pub: zmq.ZMQ_PUB - , xpub: zmq.ZMQ_XPUB - , sub: zmq.ZMQ_SUB - , xsub: zmq.ZMQ_XSUB - , req: zmq.ZMQ_REQ - , xreq: zmq.ZMQ_XREQ - , rep: zmq.ZMQ_REP - , xrep: zmq.ZMQ_XREP - , push: zmq.ZMQ_PUSH - , pull: zmq.ZMQ_PULL - , dealer: zmq.ZMQ_DEALER - , router: zmq.ZMQ_ROUTER - , pair: zmq.ZMQ_PAIR - , stream: zmq.ZMQ_STREAM -}; - -var longOptions = { - ZMQ_HWM: 1 - , ZMQ_SWAP: 3 - , ZMQ_AFFINITY: 4 - , ZMQ_IDENTITY: 5 - , ZMQ_SUBSCRIBE: 6 - , ZMQ_UNSUBSCRIBE: 7 - , ZMQ_RATE: 8 - , ZMQ_RECOVERY_IVL: 9 - , ZMQ_MCAST_LOOP: 10 - , ZMQ_SNDBUF: 11 - , ZMQ_RCVBUF: 12 - , ZMQ_RCVMORE: 13 - , ZMQ_FD: 14 - , ZMQ_EVENTS: 15 - , ZMQ_TYPE: 16 - , ZMQ_LINGER: 17 - , ZMQ_RECONNECT_IVL: 18 - , ZMQ_BACKLOG: 19 - , ZMQ_RECOVERY_IVL_MSEC: 20 - , ZMQ_RECONNECT_IVL_MAX: 21 - , ZMQ_MAXMSGSIZE: 22 - , ZMQ_SNDHWM: 23 - , ZMQ_RCVHWM: 24 - , ZMQ_MULTICAST_HOPS: 25 - , ZMQ_RCVTIMEO: 27 - , ZMQ_SNDTIMEO: 28 - , ZMQ_IPV4ONLY: 31 - , ZMQ_LAST_ENDPOINT: 32 - , ZMQ_ROUTER_MANDATORY: 33 - , ZMQ_TCP_KEEPALIVE: 34 - , ZMQ_TCP_KEEPALIVE_CNT: 35 - , ZMQ_TCP_KEEPALIVE_IDLE: 36 - , ZMQ_TCP_KEEPALIVE_INTVL: 37 - , ZMQ_TCP_ACCEPT_FILTER: 38 - , ZMQ_DELAY_ATTACH_ON_CONNECT: 39 - , ZMQ_XPUB_VERBOSE: 40 - , ZMQ_ROUTER_RAW: 41 - , ZMQ_IPV6: 42 - , ZMQ_MECHANISM: 43 - , ZMQ_PLAIN_SERVER: 44 - , ZMQ_PLAIN_USERNAME: 45 - , ZMQ_PLAIN_PASSWORD: 46 - , ZMQ_CURVE_SERVER: 47 - , ZMQ_CURVE_PUBLICKEY: 48 - , ZMQ_CURVE_SECRETKEY: 49 - , ZMQ_CURVE_SERVERKEY: 50 - , ZMQ_ZAP_DOMAIN: 55 - , ZMQ_IO_THREADS: 1 - , ZMQ_MAX_SOCKETS: 2 -}; - -Object.keys(longOptions).forEach(function(name){ - Object.defineProperty(zmq, name, { - enumerable: true, - configurable: false, - writable: false, - value: longOptions[name] - }); -}); - -/** - * Map of socket options. - */ - -var opts = exports.options = { - _fd: zmq.ZMQ_FD - , _ioevents: zmq.ZMQ_EVENTS - , _receiveMore: zmq.ZMQ_RCVMORE - , _subscribe: zmq.ZMQ_SUBSCRIBE - , _unsubscribe: zmq.ZMQ_UNSUBSCRIBE - , affinity: zmq.ZMQ_AFFINITY - , backlog: zmq.ZMQ_BACKLOG - , hwm: zmq.ZMQ_HWM - , identity: zmq.ZMQ_IDENTITY - , linger: zmq.ZMQ_LINGER - , mcast_loop: zmq.ZMQ_MCAST_LOOP - , rate: zmq.ZMQ_RATE - , rcvbuf: zmq.ZMQ_RCVBUF - , last_endpoint: zmq.ZMQ_LAST_ENDPOINT - , reconnect_ivl: zmq.ZMQ_RECONNECT_IVL - , recovery_ivl: zmq.ZMQ_RECOVERY_IVL - , sndbuf: zmq.ZMQ_SNDBUF - , swap: zmq.ZMQ_SWAP - , mechanism: zmq.ZMQ_MECHANISM - , plain_server: zmq.ZMQ_PLAIN_SERVER - , plain_username: zmq.ZMQ_PLAIN_USERNAME - , plain_password: zmq.ZMQ_PLAIN_PASSWORD - , curve_server: zmq.ZMQ_CURVE_SERVER - , curve_publickey: zmq.ZMQ_CURVE_PUBLICKEY - , curve_secretkey: zmq.ZMQ_CURVE_SECRETKEY - , curve_serverkey: zmq.ZMQ_CURVE_SERVERKEY - , zap_domain: zmq.ZMQ_ZAP_DOMAIN -}; - -/** - * Monitor events - */ -var events = exports.events = { - 1: "connect" // zmq.ZMQ_EVENT_CONNECTED - , 2: "connect_delay" // zmq.ZMQ_EVENT_CONNECT_DELAYED - , 4: "connect_retry" // zmq.ZMQ_EVENT_CONNECT_RETRIED - , 8: "listen" // zmq.ZMQ_EVENT_LISTENING - , 16: "bind_error" // zmq.ZMQ_EVENT_BIND_FAILED - , 32: "accept" // zmq.ZMQ_EVENT_ACCEPTED - , 64: "accept_error" // zmq.ZMQ_EVENT_ACCEPT_FAILED - , 128: "close" // zmq.ZMQ_EVENT_CLOSED - , 256: "close_error" // zmq.ZMQ_EVENT_CLOSE_FAILED - , 512: "disconnect" // zmq.ZMQ_EVENT_DISCONNECTED -} - -// Context management happens here. We lazily initialize a default context, -// and use that everywhere. Also cleans up on exit. -var ctx; -function defaultContext() { - if (ctx) return ctx; - - var io_threads = 1; - if (process.env.ZMQ_IO_THREADS) { - io_threads = parseInt(process.env.ZMQ_IO_THREADS, 10); - if (!io_threads || io_threads < 1) { - console.warn('Invalid number in ZMQ_IO_THREADS, using 1 IO thread.'); - io_threads = 1; - } - } - - ctx = new zmq.Context(io_threads); - process.on('exit', function(){ - // ctx.close(); - ctx = null; - }); - - return ctx; -}; - -/** - * A batch consists of 1 or more message parts with their flags that need to be sent as one unit - */ - -function OutBatch() { - this.content = []; // buf, flags, buf, flags, ... - this.cbs = []; // callbacks - this.isClosed = false; // true if the last message does not have SNDMORE in its flags, false otherwise - this.next = null; // next batch (for linked list of batches) -} - -OutBatch.prototype.append = function (buf, flags, cb) { - if (!Buffer.isBuffer(buf)) { - buf = new Buffer(String(buf), 'utf8'); - } - - this.content.push(buf, flags); - - if (cb) { - this.cbs.push(cb); - } - - if ((flags & zmq.ZMQ_SNDMORE) === 0) { - this.isClosed = true; - } -}; - -OutBatch.prototype.invokeError = function (socket, error) { - var returned = false; - for (var i = 0; i < this.cbs.length; i += 1) { - this.cbs[i].call(socket, error); - returned = true; - } - - if (!returned) { - throw error; - } -}; - -OutBatch.prototype.invokeSent = function (socket) { - for (var i = 0; i < this.cbs.length; i += 1) { - this.cbs[i].call(socket); - } -}; - - -function BatchList() { - this.firstBatch = null; - this.lastBatch = null; - this.length = 0; -} - -BatchList.prototype.canSend = function () { - return this.firstBatch ? this.firstBatch.isClosed : false; -}; - -BatchList.prototype.append = function (buf, flags, cb) { - var batch = this.lastBatch; - - if (!batch || batch.isClosed) { - batch = new OutBatch(); - - if (this.lastBatch) { - this.lastBatch.next = batch; - } - - this.lastBatch = batch; - - if (!this.firstBatch) { - this.firstBatch = batch; - } - - this.length += 1; - } - - batch.append(buf, flags, cb); -}; - -BatchList.prototype.fetch = function () { - var batch = this.firstBatch; - if (batch && batch.isClosed) { - this.firstBatch = batch.next; - this.length -= 1; - return batch; - } - return undefined; -}; - -BatchList.prototype.restore = function (batch) { - this.firstBatch = batch; - this.length += 1; -}; - - -/** - * Create a new socket of the given `type`. - * - * @constructor - * @param {String|Number} type - * @api public - */ - -var Socket = -exports.Socket = function (type) { - var self = this; - EventEmitter.call(this); - this.type = type; - this._zmq = new zmq.SocketBinding(defaultContext(), types[type]); - this._paused = false; - this._isFlushingReads = false; - this._isFlushingWrites = false; - this._outgoing = new BatchList(); - - this._zmq.onReady = function(readable, writable) { - if (readable) self._flushReads(); - if (writable) self._flushWrites(); - }; -}; - -/** - * Inherit from `EventEmitter.prototype`. - */ - -util.inherits(Socket, EventEmitter); - -/** - * Set socket to pause mode - * no data will be emit until resume() is called - * all send() calls will be queued - * - * @api public - */ - -Socket.prototype.pause = function() { - this._paused = true; -} - -/** - * Set a socket back to normal work mode - * - * @api public - */ - -Socket.prototype.resume = function() { - this._paused = false; - this._flushReads(); - this._flushWrites(); -} - -Socket.prototype.ref = function() { - this._zmq.ref(); -} - -Socket.prototype.unref = function() { - this._zmq.unref(); -} - -Socket.prototype.read = function() { - var message = [], flags; - - if (this._zmq.state !== zmq.STATE_READY) { - return null; - } - - flags = this._zmq.getsockopt(zmq.ZMQ_EVENTS); - - if (flags & zmq.ZMQ_POLLIN) { - do { - message.push(this._zmq.recv()); - } while (this._zmq.getsockopt(zmq.ZMQ_RCVMORE)); - - return message; - } - - return null; -} - - -/** - * Set `opt` to `val`. - * - * @param {String|Number} opt - * @param {Mixed} val - * @return {Socket} for chaining - * @api public - */ - -Socket.prototype.setsockopt = function(opt, val){ - this._zmq.setsockopt(opts[opt] || opt, val); - return this; -}; - -/** - * Get socket `opt`. + * Context creation * - * @param {String|Number} opt - * @return {Mixed} - * @api public + * @return {Context} */ -Socket.prototype.getsockopt = function(opt){ - return this._zmq.getsockopt(opts[opt] || opt); +exports.context = exports.createContext = function (options) { + return context.createContext(options); }; /** - * Socket opt accessors allowing `sock.backlog = val` - * instead of `sock.setsockopt('backlog', val)`. - */ - -Object.keys(opts).forEach(function(name){ - Socket.prototype.__defineGetter__(name, function() { - return this._zmq.getsockopt(opts[name]); - }); - - Socket.prototype.__defineSetter__(name, function(val) { - if ('string' == typeof val) val = new Buffer(val, 'utf8'); - return this._zmq.setsockopt(opts[name], val); - }); -}); - -/** - * Async bind. - * - * Emits the "bind" event. + * Exposing the default context * - * @param {String} addr - * @param {Function} cb - * @return {Socket} for chaining - * @api public + * @return {Context} */ -Socket.prototype.bind = function(addr, cb) { - var self = this; - this._zmq.bind(addr, function(err) { - if (err) { - return cb && cb(err); - } - - self._flushReads(); - self._flushWrites(); - - self.emit('bind', addr); - cb && cb(); - }); - return this; +exports.getDefaultContext = function () { + return context.getDefaultContext(); }; /** - * Sync bind. + * Socket creation, using a default context * - * @param {String} addr - * @return {Socket} for chaining - * @api public - */ - -Socket.prototype.bindSync = function(addr) { - this._zmq.bindSync(addr); - - return this; -}; - -/** - * Async unbind. - * - * Emits the "unbind" event. - * - * @param {String} addr - * @param {Function} cb - * @return {Socket} for chaining - * @api public - */ - -Socket.prototype.unbind = function(addr, cb) { - if (zmq.ZMQ_CAN_UNBIND) { - var self = this; - this._zmq.unbind(addr, function(err) { - if (err) { - return cb && cb(err); - } - self.emit('unbind', addr); - - self._flushReads(); - self._flushWrites(); - cb && cb(); - }); - } else { - cb && cb(); - } - return this; -}; - -/** - * Sync unbind. - * - * @param {String} addr - * @return {Socket} for chaining - * @api public - */ - -Socket.prototype.unbindSync = function(addr) { - if (zmq.ZMQ_CAN_UNBIND) { - this._zmq.unbindSync(addr); - } - return this; -} - -/** - * Connect to `addr`. - * - * @param {String} addr - * @return {Socket} for chaining - * @api public - */ - -Socket.prototype.connect = function(addr) { - this._zmq.connect(addr); - return this; -}; - -/** - * Disconnect from `addr`. - * - * @param {String} addr - * @return {Socket} for chaining - * @api public - */ - -Socket.prototype.disconnect = function(addr) { - if (zmq.ZMQ_CAN_DISCONNECT) { - this._zmq.disconnect(addr); - } - return this; -}; - -/** - * Enable monitoring of a Socket - * - * @param {Number} timer interval in ms > 0 or Undefined for default - * @param {Number} The maximum number of events to read on each interval, default is 1, use 0 for reading all events - * @return {Socket} for chaining - * @api public + * @return {Socket} */ -Socket.prototype.monitor = function(interval, numOfEvents) { - if (zmq.ZMQ_CAN_MONITOR) { - var self = this; - - self._zmq.onMonitorEvent = function(event_id, event_value, event_endpoint_addr) { - self.emit(events[event_id], event_value, event_endpoint_addr); - } - - self._zmq.onMonitorError = function(error) { - self.emit('monitor_error', error); - } - - this._zmq.monitor(interval, numOfEvents); - } else { - throw new Error('Monitoring support disabled check zmq version is > 3.2.1 and recompile this addon'); - } - return this; +exports.socket = exports.createSocket = function (type, options) { + return socket.createSocket(type, options, context.getDefaultContext()); }; /** - * Disable monitoring of a Socket release idle handler - * and close the socket + * Tests if a context option exists in this version of ZMQ * - * @return {Socket} for chaining - * @api public + * @return {Boolean} */ -Socket.prototype.unmonitor = function() { - if (zmq.ZMQ_CAN_MONITOR) { - this._zmq.unmonitor(); - } - return this; +exports.contextOptionExists = function (option) { + return context.opts.hasOwnProperty(option); }; - /** - * Subscribe with the given `filter`. + * Tests if a socket option exists in this version of ZMQ * - * @param {String} filter - * @return {Socket} for chaining - * @api public + * @return {Boolean} */ -Socket.prototype.subscribe = function(filter) { - this._subscribe = filter; - return this; +exports.socketOptionExists = function (option) { + return socket.opts.hasOwnProperty(option); }; /** - * Unsubscribe with the given `filter`. + * Tests if a socket type exists in this version of ZMQ * - * @param {String} filter - * @return {Socket} for chaining - * @api public + * @return {Boolean} */ -Socket.prototype.unsubscribe = function(filter) { - this._unsubscribe = filter; - return this; +exports.socketTypeExists = function (type) { + return socket.types.hasOwnProperty(type); }; - /** - * Send the given `msg`. - * - * @param {String|Buffer|Array} msg - * @param {Number} [flags] - * @param {Function} [cb] - * @return {Socket} for chaining - * @api public + * JS proxy method based on API characteristics of the native zmq_proxy() */ -Socket.prototype.send = function(msg, flags, cb) { - flags = flags | 0; - - if (Array.isArray(msg)) { - for (var i = 0, len = msg.length; i < len; i++) { - var isLast = i === len - 1; - var msgFlags = isLast ? flags : flags | zmq.ZMQ_SNDMORE; - var callback = isLast ? cb : undefined; - - this._outgoing.append(msg[i], msgFlags, callback); - } - } else { - this._outgoing.append(msg, flags, cb); - } - - if (this._outgoing.canSend()) { - this._zmq.pending = true; - this._flushWrites(); - } else { - this._zmq.pending = false; - } - - return this; -}; - - -Socket.prototype._flushRead = function () { - var message = this._zmq.readv(); // can throw - if (!message) { - return false; - } - - // Handle received message immediately to prevent memory leak in driver - if (message.length === 1) { - // hot path - this.emit('message', message[0]); - } else { - this.emit.apply(this, ['message'].concat(message)); - } - return true; -}; - -Socket.prototype._flushWrite = function () { - var batch = this._outgoing.fetch(); - if (!batch) { - this._zmq.pending = false; - return false; - } - - try { - if (this._zmq.sendv(batch.content)) { - this._zmq.pending = this._outgoing.canSend(); - batch.invokeSent(this); - return true; - } - - this._outgoing.restore(batch); - return false; - } catch (sendError) { - this._zmq.pending = this._outgoing.canSend(); - batch.invokeError(this, sendError); // can throw - return false; - } -}; - - -Socket.prototype._flushReads = function() { - if (this._paused || this._isFlushingReads) return; - - this._isFlushingReads = true; - - var received; - - do { - try { - received = this._flushRead(); - } catch (error) { - this._isFlushingReads = false; - this.emit('error', error); // can throw - return; - } - } while (received); - - this._isFlushingReads = false; -}; - -Socket.prototype._flushWrites = function() { - if (this._paused || this._isFlushingWrites) return; - - this._isFlushingWrites = true; - - var sent; - - do { - try { - sent = this._flushWrite(); - } catch (error) { - this._isFlushingWrites = false; - this.emit('error', error); // can throw - return; - } - } while (sent); - - this._isFlushingWrites = false; -}; - -/** - * Close the socket. - * - * @return {Socket} for chaining - * @api public - */ - -Socket.prototype.close = function() { - this._zmq.close(); - return this; -}; - -/** - * Create a `type` socket with the given `options`. - * - * @param {String} type - * @param {Object} options - * @return {Socket} - * @api public - */ - -exports.socket = -exports.createSocket = function(type, options) { - var sock = new Socket(type); - for (var key in options) sock[key] = options[key]; - return sock; -}; - -exports.Context.setMaxThreads = function(value) { - if (!zmq.ZMQ_CAN_SET_CTX) { - throw new Error('Setting of context options disabled, check zmq version is >= 3.2.1 and recompile this addon'); - } - var defaultCtx = defaultContext(); - defaultCtx.setOpt(zmq.ZMQ_IO_THREADS, value); -}; - -exports.Context.getMaxThreads = function() { - if (!zmq.ZMQ_CAN_SET_CTX) { - throw new Error('Getting of context options disabled, check zmq version is >= 3.2.1 and recompile this addon'); - } - var defaultCtx = defaultContext(); - return defaultCtx.getOpt(zmq.ZMQ_IO_THREADS); -}; - -exports.Context.setMaxSockets = function(value) { - if (!zmq.ZMQ_CAN_SET_CTX) { - throw new Error('Setting of context options disabled, check zmq version is >= 3.2.1 and recompile this addon'); - } - var defaultCtx = defaultContext(); - defaultCtx.setOpt(zmq.ZMQ_MAX_SOCKETS, value); -}; - -exports.Context.getMaxSockets = function() { - if (!zmq.ZMQ_CAN_SET_CTX) { - throw new Error('Getting of context options disabled, check zmq version is >= 3.2.1 and recompile this addon'); - } - var defaultCtx = defaultContext(); - return defaultCtx.getOpt(zmq.ZMQ_MAX_SOCKETS); -}; - -/** - * JS based on API characteristics of the native zmq_proxy() - */ - -function proxy (frontend, backend, capture){ - switch(frontend.type+'/'+backend.type){ +exports.proxy = function (frontend, backend, capture) { + switch (frontend.type + '/' + backend.type) { case 'push/pull': case 'pull/push': case 'xpub/xsub': - if(capture){ - - frontend.on('message',function (msg){ + if (capture) { + frontend.on('message', function (msg) { backend.send(msg); }); - backend.on('message',function (msg){ + backend.on('message', function (msg) { frontend.send(msg); //forwarding messages over capture socket capture.send(msg); }); - } else { - //no capture socket provided, just forwarding msgs to respective sockets - frontend.on('message',function (msg){ + frontend.on('message', function (msg) { backend.send(msg); }); - backend.on('message',function (msg){ + backend.on('message', function (msg) { frontend.send(msg); }); - } break; case 'router/dealer': case 'xrep/xreq': - if(capture){ - - //forwarding router/dealer pack signature: id, delimiter, msg - frontend.on('message',function (id,delimiter,msg){ - backend.send([id,delimiter,msg]); + if (capture) { + // forwarding router/dealer pack signature: id, delimiter, msg + frontend.on('message', function (id, delimiter, msg){ + backend.send([id, delimiter, msg]); }); - backend.on('message',function (id,delimiter,msg){ - frontend.send([id,delimiter,msg]); + backend.on('message', function (id, delimiter, msg){ + frontend.send([id, delimiter, msg]); //forwarding message to the capture socket capture.send(msg); }); - } else { - //forwarding router/dealer signatures without capture - frontend.on('message',function (id,delimiter,msg){ - backend.send([id,delimiter,msg]); + frontend.on('message', function (id, delimiter, msg){ + backend.send([id, delimiter, msg]); }); - backend.on('message',function (id,delimiter,msg){ - frontend.send([id,delimiter,msg]); + backend.on('message', function (id, delimiter, msg){ + frontend.send([id, delimiter, msg]); }); - } break; default: - throw new Error('wrong socket order to proxy'); + throw new Error('Wrong socket order to proxy'); } -} - -exports.proxy = proxy; +}; diff --git a/lib/socket.js b/lib/socket.js new file mode 100644 index 0000000..f9435fe --- /dev/null +++ b/lib/socket.js @@ -0,0 +1,540 @@ +var EventEmitter = require('events').EventEmitter; +var util = require('util'); +var binding = require('bindings')('zmq.node'); +var BatchList = require('./BatchList'); +var context = require('./context'); + +/** + * Map of socket options in both "ZMQ_LAST_ENDPOINT" and "last_endpoint" style. + */ + +var opts = {}; +Object.keys(binding.options).forEach(function (name) { + var shortName = name.replace(/^ZMQ_/, '').toLowerCase(); + + opts[name] = opts[shortName] = binding.options[name]; +}); + +/** + * Map of socket types in both "ZMQ_ROUTER" and "router" style + */ + +var types = {}; +Object.keys(binding.types).forEach(function (name) { + var shortName = name.replace(/^ZMQ_/, '').toLowerCase(); + + types[name] = types[shortName] = binding.types[name]; +}); + +/** + * Map of send flags in both "ZMQ_SNDMORE" and "sndmore" style + */ + +var sendFlags = {}; +Object.keys(binding.sendFlags).forEach(function (name) { + var shortName = name.replace(/^ZMQ_/, '').toLowerCase(); + + sendFlags[name] = sendFlags[shortName] = binding.sendFlags[name]; +}); + +function flagsFromArray(flags) { + var result = 0; + for (var i = 0; i < flags.length; i += 1) { + var flag = flags[i]; + if (!sendFlags.hasOwnProperty(flag)) { + throw new Error('Unknown flag name: ' + flag); + } + + result |= sendFlags[flag]; + } + return result; +} + +/** + * Monitor events + */ + +var events = { + 1: 'connect', // ZMQ_EVENT_CONNECTED + 2: 'connect_delay', // ZMQ_EVENT_CONNECT_DELAYED + 4: 'connect_retry', // ZMQ_EVENT_CONNECT_RETRIED + 8: 'listen', // ZMQ_EVENT_LISTENING + 16: 'bind_error', // ZMQ_EVENT_BIND_FAILED + 32: 'accept', // ZMQ_EVENT_ACCEPTED + 64: 'accept_error', // ZMQ_EVENT_ACCEPT_FAILED + 128: 'close', // ZMQ_EVENT_CLOSED + 256: 'close_error', // ZMQ_EVENT_CLOSE_FAILED + 512: 'disconnect' // ZMQ_EVENT_DISCONNECTED +}; + +/** + * Create a new socket of the given `type`. + * + * @constructor + * @param {String|Number} type + */ + +function Socket(type, ctx) { + if (typeof type !== 'string') { + throw new TypeError('Type argument must be a string'); + } + + if (!types.hasOwnProperty(type)) { + throw new Error('Socket type "' + type + '" not supported by ZMQ'); + } + + if (!(ctx instanceof context.Context)) { + throw new TypeError('Context is invalid: ' + ctx); + } + + EventEmitter.call(this); + this.type = type; + this._sock = new binding.SocketBinding(ctx._ctx, types[type]); + this._paused = false; + this._isFlushingReads = false; + this._isFlushingWrites = false; + this._outgoing = new BatchList(); + + var self = this; + + this._sock.onReady = function (readable, writable) { + if (readable) self._flushReads(); + if (writable) self._flushWrites(); + }; +}; + +/** + * Inherit from `EventEmitter.prototype`. + */ + +util.inherits(Socket, EventEmitter); + +/** + * Set socket to pause mode + * no data will be emit until resume() is called + * all send() calls will be queued + * + * @return {Socket} for chaining + */ + +Socket.prototype.pause = function () { + this._paused = true; + return this; +}; + +/** + * Set a socket back to normal work mode + * + * @return {Socket} for chaining + */ + +Socket.prototype.resume = function () { + this._paused = false; + this._flushReads(); + this._flushWrites(); + return this; +}; + +/** + * Manually read from a socket, even when paused + * + * @return {undefined|Buffer[]} An array of message parts, or undefined if there was nothing in the queue + */ + +Socket.prototype.read = function () { + return this._sock.read(); // can throw +}; + +/** + * Ref the socket on the event-loop. A no-op if already ref()ed + * + * @return {Socket} for chaining + */ + +Socket.prototype.ref = function () { + this._sock.ref(); + return this; +}; + +/** + * Unref the socket from the event-loop. A no-op if already unref()ed + * + * @return {Socket} for chaining + */ + +Socket.prototype.unref = function () { + this._sock.unref(); + return this; +}; + + +/** + * Set `opt` to `val`. + * + * @param {String} opt + * @param {Number|String|Buffer} val + * @return {Socket} for chaining + */ + +Socket.prototype.set = function (opt, val) { + if (typeof val === 'string') { + val = new Buffer(val, 'utf8'); + } + + this._sock.setsockopt(opts[opt], val); + return this; +}; + +Socket.prototype.setsockopt = Socket.prototype.set; + +/** + * Get socket `opt`. + * + * @param {String} opt + * @return {Number|String|Buffer} + */ + +Socket.prototype.get = function (opt) { + return this._sock.getsockopt(opts[opt] || opt); +}; + +Socket.prototype.getsockopt = Socket.prototype.get; + +/** + * Async bind. + * + * Emits the "bind" event. + * + * @param {String} addr + * @param {Function} cb + * @return {Socket} for chaining + */ + +Socket.prototype.bind = function (addr, cb) { + var self = this; + this._sock.bind(addr, function (err) { + if (err) { + return cb && cb(err); + } + + self._flushReads(); + self._flushWrites(); + + self.emit('bind', addr); + cb && cb(); + }); + return this; +}; + +/** + * Sync bind. + * + * @param {String} addr + * @return {Socket} for chaining + */ + +Socket.prototype.bindSync = function (addr) { + this._sock.bindSync(addr); + return this; +}; + +/** + * Async unbind. + * + * Emits the "unbind" event. + * + * @param {String} addr + * @param {Function} cb + * @return {Socket} for chaining + */ + +Socket.prototype.unbind = function (addr, cb) { + if (!this._sock.unbind) { + cb && cb(); + return this; + } + + var self = this; + this._sock.unbind(addr, function (err) { + if (err) { + return cb && cb(err); + } + self.emit('unbind', addr); + + self._flushReads(); + self._flushWrites(); + cb && cb(); + }); + return this; +}; + +/** + * Sync unbind. + * + * @param {String} addr + * @return {Socket} for chaining + */ + +Socket.prototype.unbindSync = function (addr) { + if (this._sock.unbindSync) { + this._sock.unbindSync(addr); + } + return this; +} + +/** + * Connect to `addr`. + * + * @param {String} addr + * @return {Socket} for chaining + * @api public + */ + +Socket.prototype.connect = function (addr) { + this._sock.connect(addr); + return this; +}; + +/** + * Disconnect from `addr`. + * + * @param {String} addr + * @return {Socket} for chaining + */ + +Socket.prototype.disconnect = function (addr) { + if (this._sock.disconnect) { + this._sock.disconnect(addr); + } + return this; +}; + +/** + * Enable monitoring of a Socket + * + * @param {Number} timer interval in ms > 0 or Undefined for default + * @param {Number} The maximum number of events to read on each interval, default is 1, use 0 for reading all events + * @return {Socket} for chaining + */ + +Socket.prototype.monitor = function (interval, numOfEvents) { + if (!this._sock.monitor) { + throw new Error('Monitoring support disabled. Ensure ZMQ version is > 3.2.1 and recompile node-binding.'); + } + + var self = this; + + self._sock.onMonitorEvent = function (event_id, event_value, event_endpoint_addr) { + self.emit(events[event_id], event_value, event_endpoint_addr); + } + + self._sock.onMonitorError = function (error) { + self.emit('monitor_error', error); + } + + this._sock.monitor(interval, numOfEvents); + return this; +}; + +/** + * Disable monitoring of a Socket release idle handler + * and close the socket + * + * @return {Socket} for chaining + */ + +Socket.prototype.unmonitor = function () { + if (this._sock.unmonitor) { + this._sock.unmonitor(); + } + return this; +}; + + +/** + * Subscribe with the given `filter`. + * + * @param {String} filter + * @return {Socket} for chaining + */ + +Socket.prototype.subscribe = function (filter) { + this.set('ZMQ_SUBSCRIBE', filter); + return this; +}; + +/** + * Unsubscribe with the given `filter`. + * + * @param {String} filter + * @return {Socket} for chaining + */ + +Socket.prototype.unsubscribe = function (filter) { + this.set('ZMQ_UNSUBSCRIBE', filter); + return this; +}; + +/** + * Send the given `msg`. + * + * @param {String|Buffer|Array} msg + * @param {Number} [flags] + * @param {Function} [cb] + * @return {Socket} for chaining + */ + +Socket.prototype.send = function (msg, flags, cb) { + if (typeof flags === 'function') { + cb = flags; + flags = 0; + } else if (typeof flags === 'string') { + flags = sendFlags[flags] | 0; + } else if (Array.isArray(flags)) { + flags = flagsFromArray(flags); + } else { + flags = flags | 0; + } + + if (Array.isArray(msg)) { + for (var i = 0, len = msg.length; i < len; i++) { + var isLast = i === len - 1; + var msgFlags = isLast ? flags : flags | sendFlags.ZMQ_SNDMORE; + var callback = isLast ? cb : undefined; + + this._outgoing.append(msg[i], msgFlags, callback); + } + } else { + this._outgoing.append(msg, flags, cb); + } + + if (this._outgoing.canSend()) { + this._sock.pending = true; + this._flushWrites(); + } else { + this._sock.pending = false; + } + + return this; +}; + +/** + * Close the socket. + * + * @return {Socket} for chaining + */ + +Socket.prototype.close = function () { + this._sock.close(); + return this; +}; + + +Socket.prototype._flushRead = function () { + var message = this._sock.read(); // can throw + if (!message) { + return false; + } + + // Handle received message immediately to prevent memory leak in driver + if (message.length === 1) { + // hot path + this.emit('message', message[0]); + } else { + this.emit.apply(this, ['message'].concat(message)); + } + return true; +}; + +Socket.prototype._flushWrite = function () { + var batch = this._outgoing.fetch(); + if (!batch) { + this._sock.pending = false; + return false; + } + + try { + if (this._sock.send(batch.content)) { + this._sock.pending = this._outgoing.canSend(); + batch.invokeSent(this); + return true; + } + + this._outgoing.restore(batch); + return false; + } catch (sendError) { + this._sock.pending = this._outgoing.canSend(); + batch.invokeError(this, sendError); // can throw + return false; + } +}; + + +Socket.prototype._flushReads = function () { + if (this._paused || this._isFlushingReads) return; + + this._isFlushingReads = true; + + var received; + + do { + try { + received = this._flushRead(); + } catch (error) { + this._isFlushingReads = false; + this.emit('error', error); // can throw + return; + } + } while (received); + + this._isFlushingReads = false; +}; + +Socket.prototype._flushWrites = function () { + if (this._paused || this._isFlushingWrites) return; + + this._isFlushingWrites = true; + + var sent; + + do { + try { + sent = this._flushWrite(); + } catch (error) { + this._isFlushingWrites = false; + this.emit('error', error); // can throw + return; + } + } while (sent); + + this._isFlushingWrites = false; +}; + +/** + * Create a `type` socket with the given `options`. + * + * @param {String} type + * @param {Object} options + * @return {Socket} + */ + +exports.createSocket = function (type, options, ctx) { + var sock = new Socket(type, ctx); + + if (options) { + for (var key in options) { + sock.set(key, options[key]); + } + } + + return sock; +}; + + +/** + * Expose Socket class and constants + */ + +exports.Socket = Socket; +exports.types = types; +exports.opts = opts; diff --git a/perf/local_lat.js b/perf/local_lat.js index 7ea5598..aa3b804 100644 --- a/perf/local_lat.js +++ b/perf/local_lat.js @@ -1,7 +1,7 @@ -var zmq = require('../'); +var zmq = require('..'); var assert = require('assert'); -if (process.argv.length != 5) { +if (process.argv.length !== 5) { console.log('usage: local_lat '); process.exit(1); } @@ -17,9 +17,9 @@ rep.bindSync(bind_to); rep.on('message', function (data) { assert.equal(data.length, message_size, 'message-size did not match'); rep.send(data); - if (++counter === roundtrip_count){ - setTimeout( function(){ + if (++counter === roundtrip_count) { + setTimeout(function () { rep.close(); - }, 1000); + }, 1000); } -}) +}); diff --git a/perf/local_thr.js b/perf/local_thr.js index 2b7fce6..0d2e3dc 100644 --- a/perf/local_thr.js +++ b/perf/local_thr.js @@ -1,7 +1,7 @@ -var zmq = require('../'); +var zmq = require('..'); var assert = require('assert'); -if (process.argv.length != 5) { +if (process.argv.length !== 5) { console.log('usage: local_thr '); process.exit(1); } @@ -24,11 +24,11 @@ sock.on('message', function (data) { assert.equal(data.length, message_size, 'message-size did not match'); if (++counter === message_count) finish(); -}) +}); function finish(){ var endtime = process.hrtime(timer); - var sec = endtime[0] + (endtime[1]/1000000000); + var sec = endtime[0] + (endtime[1] / 1000000000); var throughput = message_count / sec; var megabits = (throughput * message_size * 8) / 1000000; diff --git a/perf/remote_lat.js b/perf/remote_lat.js index 18baf22..e1b1735 100644 --- a/perf/remote_lat.js +++ b/perf/remote_lat.js @@ -1,7 +1,7 @@ -var zmq = require('../'); +var zmq = require('..'); var assert = require('assert'); -if (process.argv.length != 5) { +if (process.argv.length !== 5) { console.log('usage: remote_lat '); process.exit(1); } diff --git a/perf/remote_thr.js b/perf/remote_thr.js index b44fe5f..6e554b3 100644 --- a/perf/remote_thr.js +++ b/perf/remote_thr.js @@ -1,37 +1,32 @@ -var zmq = require('../') -var assert = require('assert') +var zmq = require('..'); +var assert = require('assert'); -if (process.argv.length != 5) { - console.log('usage: remote_thr ') - process.exit(1) +if (process.argv.length !== 5) { + console.log('usage: remote_thr '); + process.exit(1); } -var connect_to = process.argv[2] -var message_size = Number(process.argv[3]) -var message_count = Number(process.argv[4]) -var message = new Buffer(message_size) -message.fill('h') +var connect_to = process.argv[2]; +var message_size = Number(process.argv[3]); +var message_count = Number(process.argv[4]); +var message = new Buffer(message_size); +message.fill('h'); -var counter = 0 +var counter = 0; -var sock = zmq.socket('push') +var sock = zmq.socket('push'); //sock.setsockopt(zmq.ZMQ_SNDHWM, message_count); -sock.connect(connect_to) +sock.connect(connect_to); -function send(){ +function send() { for (var i = 0; i < message_count; i++) { - sock.send(message) + sock.send(message); } // all messages may not be received by local_thr if closed immediately setTimeout(function () { - sock.close() + sock.close(); }, 1000); } -// because of what seems to be a bug in node-zmq, we would lose messages -// if we start sending immediately after calling connect(), so to make this -// benchmark behave well, we wait a bit... - -setTimeout(send, 1000); - +send(); diff --git a/test/context.js b/test/context.js index 09eda65..1dae0c6 100644 --- a/test/context.js +++ b/test/context.js @@ -1,31 +1,28 @@ -var zmq = require('..') - , should = require('should') - , semver = require('semver') +var zmq = require('..'); +var should = require('should'); +var semver = require('semver'); -describe('context', function() { +describe('context', function () { + if (!semver.gte(zmq.version, '3.2.0')) { + return console.warn('Test requires libzmq >= 3.2.0'); + } - it('should support setting max io threads', function(done) { - // 3.2 and above. - if (!semver.gte(zmq.version, '3.2.0')) { - done(); - return console.warn('Test requires libzmq >= 3.2.0'); - } - zmq.Context.setMaxThreads(3); - zmq.Context.getMaxThreads().should.equal(3); - zmq.Context.setMaxThreads(1); + it('should support setting max io threads', function (done) { + var ctx = zmq.getDefaultContext(); + + ctx.set('ZMQ_IO_THREADS', 3); + ctx.get('ZMQ_IO_THREADS').should.equal(3); + ctx.set('ZMQ_IO_THREADS', 1); done(); }); - it('should support setting max number of sockets', function(done) { - // 3.2 and above. - if (!semver.gte(zmq.version, '3.2.0')) { - done(); - return console.warn('Test requires libzmq >= 3.2.0'); - } - var currMaxSockets = zmq.Context.getMaxSockets(); - zmq.Context.setMaxSockets(256); - zmq.Context.getMaxSockets().should.equal(256); - zmq.Context.setMaxSockets(currMaxSockets); + it('should support setting max number of sockets', function (done) { + var ctx = zmq.getDefaultContext(); + + var currMaxSockets = ctx.get('ZMQ_MAX_SOCKETS'); + ctx.set('ZMQ_MAX_SOCKETS', 256); + ctx.get('ZMQ_MAX_SOCKETS').should.equal(256); + ctx.set('ZMQ_MAX_SOCKETS', currMaxSockets); done(); }); diff --git a/test/exports.js b/test/exports.js index 6c75885..8de76a1 100644 --- a/test/exports.js +++ b/test/exports.js @@ -3,15 +3,15 @@ var zmq = require('..') , should = require('should') , semver = require('semver'); -describe('exports', function(){ - it('should export a valid version', function(){ +describe('exports', function () { + it('should export a valid version', function () { semver.valid(zmq.version).should.be.ok; }); - it('should generate valid curve keypair', function(done) { + it('should generate valid curve keypair', function (done) { try { var rep = zmq.socket('rep'); - rep.curve_server = 0; + rep.set('curve_server', 0); } catch(e) { console.log("libsodium seems to be missing (skipping curve test)"); done(); @@ -27,111 +27,10 @@ describe('exports', function(){ done(); }); - it('should export socket types and options', function(){ - // All versions. - var constants = [ - 'PUB', - 'SUB', - 'REQ', - 'XREQ', - 'REP', - 'XREP', - 'DEALER', - 'ROUTER', - 'PUSH', - 'PULL', - 'PAIR', - 'AFFINITY', - 'IDENTITY', - 'SUBSCRIBE', - 'UNSUBSCRIBE', - 'RCVTIMEO', - 'SNDTIMEO', - 'RATE', - 'RECOVERY_IVL', - 'SNDBUF', - 'RCVBUF', - 'RCVMORE', - 'FD', - 'EVENTS', - 'TYPE', - 'LINGER', - 'RECONNECT_IVL', - 'RECONNECT_IVL_MAX', - 'BACKLOG', - 'POLLIN', - 'POLLOUT', - 'POLLERR', - 'SNDMORE' - ]; - - // 2.x only. - if (semver.satisfies(zmq.version, '2.x')) { - constants.concat([ - 'HWM', - 'SWAP', - 'MCAST_LOOP', - 'ZMQ_RECOVERY_IVL_MSEC', - 'NOBLOCK' - ]); - } - - // 3.0 and above. - if (semver.gte(zmq.version, '3.0.0')) { - constants.concat([ - 'XPUB', - 'XSUB', - 'SNDHWM', - 'RCVHWM', - 'MAXMSGSIZE', - 'ZMQ_MULTICAST_HOPS', - 'TCP_KEEPALIVE', - 'TCP_KEEPALIVE_CNT', - 'TCP_KEEPALIVE_IDLE', - 'TCP_KEEPALIVE_INTVL' - ]); - } - - // 3.2 and above. - if (semver.gte(zmq.version, '3.2.0')) { - constants.concat([ - 'IPV4ONLY', - 'DELAY_ATTACH_ON_CONNECT', - 'ROUTER_MANDATORY', - 'XPUB_VERBOSE', - 'TCP_KEEPALIVE', - 'TCP_KEEPALIVE_IDLE', - 'TCP_KEEPALIVE_CNT', - 'TCP_KEEPALIVE_INTVL', - 'TCP_ACCEPT_FILTER', - 'LAST_ENDPOINT' - ]); - } - - // 3.3 and above. - if (semver.gte(zmq.version, '3.3.0')) { - constants.concat([ - 'ROUTER_RAW' - ]); - } - - constants.forEach(function(typeOrProp){ - zmq['ZMQ_' + typeOrProp].should.be.a.Number; - }); - }); - - it('should export states', function(){ - ['STATE_READY', 'STATE_BUSY', 'STATE_CLOSED'].forEach(function(state){ - zmq[state].should.be.a.Number; - }); - }); - - it('should export constructors', function(){ - zmq.Context.should.be.a.Function; - zmq.Socket.should.be.a.Function; - }); - - it('should export methods', function(){ + it('should export methods', function () { zmq.socket.should.be.a.Function; + zmq.createSocket.should.be.a.Function; + zmq.context.should.be.a.Function; + zmq.createContext.should.be.a.Function; }); }); diff --git a/test/gc.js b/test/gc.js index 0b2e94c..5d99c6d 100644 --- a/test/gc.js +++ b/test/gc.js @@ -1,10 +1,9 @@ +var zmq = require('..'); +var should = require('should'); -var zmq = require('..') - , should = require('should'); - -it('should cooperate with gc', function(done){ - var a = zmq.socket('dealer') - , b = zmq.socket('dealer'); +it('should cooperate with gc', function (done) { + var a = zmq.socket('dealer'); + var b = zmq.socket('dealer'); /** * We create 2 dealer sockets. @@ -15,7 +14,7 @@ it('should cooperate with gc', function(done){ * If a message is delivered, than everything is ok. Otherwise the guard * timeout will make the test fail. */ - a.on('message', function(msg){ + a.on('message', function (msg) { msg.should.be.an.instanceof(Buffer); msg.toString().should.equal('hello'); this.close(); @@ -26,7 +25,7 @@ it('should cooperate with gc', function(done){ var bound = false; - a.bind('tcp://127.0.0.1:5555', function(e){ + a.bind('tcp://127.0.0.1:5555', function (e) { if (e) { clearInterval(interval); done(e); @@ -35,7 +34,7 @@ it('should cooperate with gc', function(done){ } }); - var interval = setInterval(function(){ + var interval = setInterval(function () { gc(); if (bound) { clearInterval(interval); @@ -45,7 +44,7 @@ it('should cooperate with gc', function(done){ }, 100); // guard against hanging - var timeout = setTimeout(function(){ + var timeout = setTimeout(function () { clearInterval(interval); done(new Error('timeout of 5000ms exceeded (bound: ' + bound + ')')); }, 15000); diff --git a/test/socket.error-callback.js b/test/socket.error-callback.js index 8e09558..5576675 100644 --- a/test/socket.error-callback.js +++ b/test/socket.error-callback.js @@ -1,20 +1,18 @@ -var zmq = require('..') - , should = require('should') - , semver = require('semver'); +var zmq = require('..'); +var should = require('should'); +var semver = require('semver'); -var version = semver.gte(zmq.version, '3.2.0'); - -describe('socket.error-callback', function(){ - var sock; - - if (!version) { +describe('socket.error-callback', function () { + if (!semver.gte(zmq.version, '3.2.0')) { return console.warn("ZMQ_ROUTER_MANDATORY requires libzmq 3.2.0, skipping test"); } + var sock; + it('should create a socket and set ZMQ_ROUTER_MANDATORY', function () { sock = zmq.socket('router'); - sock.setsockopt(zmq.ZMQ_ROUTER_MANDATORY, 1); + sock.set('ZMQ_ROUTER_MANDATORY', 1); }); it('should callback with error when not connected', function (done) { diff --git a/test/socket.events.js b/test/socket.events.js index bfe56b0..e226c8a 100644 --- a/test/socket.events.js +++ b/test/socket.events.js @@ -1,13 +1,13 @@ var zmq = require('..') , should = require('should'); -describe('socket.events', function(){ +describe('socket.events', function () { - it('should support events', function(done){ - var rep = zmq.socket('rep') - , req = zmq.socket('req'); + it('should support events', function (done) { + var rep = zmq.socket('rep'); + var req = zmq.socket('req'); - rep.on('message', function(msg){ + rep.on('message', function (msg) { msg.should.be.an.instanceof(Buffer); msg.toString().should.equal('hello'); rep.send('world'); @@ -17,9 +17,9 @@ describe('socket.events', function(){ if (error) throw error; }); - rep.on('bind', function(){ + rep.on('bind', function () { req.connect('inproc://stuffevents'); - req.on('message', function(msg){ + req.on('message', function (msg) { msg.should.be.an.instanceof(Buffer); msg.toString().should.equal('world'); req.close(); diff --git a/test/socket.js b/test/socket.js index c583b86..7491347 100644 --- a/test/socket.js +++ b/test/socket.js @@ -1,50 +1,44 @@ +var zmq = require('..'); +var should = require('should'); +var semver = require('semver'); -var zmq = require('..') - , should = require('should') - , semver = require('semver'); - -describe('socket', function(){ +describe('socket', function () { var sock; - it('should alias socket', function(){ + it('should alias socket', function () { zmq.createSocket.should.equal(zmq.socket); }); - it('should include type and close', function(){ + it('should include type and close', function () { sock = zmq.socket('req'); sock.type.should.equal('req'); sock.close.should.be.a.Function; }); - it('should use socketopt', function(){ - sock.getsockopt(zmq.ZMQ_BACKLOG).should.not.equal(75); - sock.setsockopt(zmq.ZMQ_BACKLOG, 75).should.equal(sock); - sock.getsockopt(zmq.ZMQ_BACKLOG).should.equal(75); - sock.setsockopt(zmq.ZMQ_BACKLOG, 100); + it('should use socketopt', function () { + sock.get('ZMQ_BACKLOG').should.not.equal(75); + sock.set('ZMQ_BACKLOG', 75).should.equal(sock); + sock.get('ZMQ_BACKLOG').should.equal(75); + sock.set('ZMQ_BACKLOG', 100); }); - it('should use socketopt with sugar', function(){ - sock.getsockopt('backlog').should.not.equal(75); - sock.setsockopt('backlog', 75).should.equal(sock); - sock.getsockopt('backlog').should.equal(75); - - sock.backlog.should.be.a.Number; - sock.backlog.should.not.equal(50); - sock.backlog = 50; - sock.backlog.should.equal(50); + it('should use socketopt with sugar', function () { + sock.get('backlog').should.not.equal(75); + sock.set('backlog', 75).should.equal(sock); + sock.get('backlog').should.equal(75); }); - it('should close', function(){ + it('should close', function () { sock.close(); }); - it('should support options', function(){ + it('should support options', function () { sock = zmq.socket('req', { backlog: 30 }); - sock.backlog.should.equal(30); + sock.get('backlog').should.equal(30); sock.close(); }); - it('should throw a javascript error if it hits the system file descriptor limit', function() { + it('should throw a javascript error if it hits the system file descriptor limit', function () { var i, socks = [], numSocks = 10000; function hitlimit() { for (i = 0; i < numSocks; i++) { diff --git a/test/socket.messages.js b/test/socket.messages.js index 69e8210..7205e9a 100644 --- a/test/socket.messages.js +++ b/test/socket.messages.js @@ -2,18 +2,18 @@ var zmq = require('..') , should = require('should') , semver = require('semver'); -describe('socket.messages', function(){ +describe('socket.messages', function () { var push, pull; - beforeEach(function(){ + beforeEach(function () { push = zmq.socket('push'); pull = zmq.socket('pull'); }); - it('should support messages', function(done){ + it('should support messages', function (done) { var n = 0; - pull.on('message', function(msg){ + pull.on('message', function (msg) { msg = msg.toString(); switch (n++) { case 0: @@ -40,8 +40,8 @@ describe('socket.messages', function(){ }); }); - it('should support multipart messages', function(done){ - pull.on('message', function(msg1, msg2, msg3){ + it('should support multipart messages', function (done) { + pull.on('message', function (msg1, msg2, msg3) { msg1.toString().should.equal('string'); msg2.toString().should.equal('15.99'); msg3.toString().should.equal('buffer'); @@ -57,8 +57,8 @@ describe('socket.messages', function(){ }); }); - it('should support sndmore', function(done){ - pull.on('message', function(a, b, c, d, e){ + it('should support sndmore', function (done) { + pull.on('message', function (a, b, c, d, e) { a.toString().should.equal('tobi'); b.toString().should.equal('loki'); c.toString().should.equal('jane'); @@ -72,16 +72,16 @@ describe('socket.messages', function(){ pull.bind('inproc://stuff_sss', function (error) { if (error) throw error; push.connect('inproc://stuff_sss'); - push.send(['tobi', 'loki'], zmq.ZMQ_SNDMORE); - push.send(['jane', 'luna'], zmq.ZMQ_SNDMORE); + push.send(['tobi', 'loki'], 'ZMQ_SNDMORE'); + push.send(['jane', 'luna'], 'ZMQ_SNDMORE'); push.send('manny'); }); }); - it('should handle late connect', function(done){ + it('should handle late connect', function (done) { var n = 0; - pull.on('message', function(msg){ + pull.on('message', function (msg) { msg = msg.toString(); switch (n++) { case 0: @@ -100,11 +100,11 @@ describe('socket.messages', function(){ }); if (semver.satisfies(zmq.version, '>=3.x')) { - push.setsockopt(zmq.ZMQ_SNDHWM, 1); - pull.setsockopt(zmq.ZMQ_RCVHWM, 1); + push.set('ZMQ_SNDHWM', 1); + pull.set('ZMQ_RCVHWM', 1); } else if (semver.satisfies(zmq.version, '2.x')) { - push.setsockopt(zmq.ZMQ_HWM, 1); - pull.setsockopt(zmq.ZMQ_HWM, 1); + push.set('ZMQ_HWM', 1); + pull.set('ZMQ_HWM', 1); } push.bind('tcp://127.0.0.1:12345', function (error) { @@ -116,7 +116,7 @@ describe('socket.messages', function(){ }); }); - it('should call send() callbacks', function(done){ + it('should call send() callbacks', function (done) { var received = 0; var callbacks = 0; diff --git a/test/socket.monitor.js b/test/socket.monitor.js index 8324cbf..f245e3d 100644 --- a/test/socket.monitor.js +++ b/test/socket.monitor.js @@ -2,26 +2,26 @@ var zmq = require('..') , should = require('should') , semver = require('semver'); -describe('socket.monitor', function() { +describe('socket.monitor', function () { if (!zmq.ZMQ_CAN_MONITOR) { console.log("monitoring not enabled skipping test"); return; } - it('should be able to monitor the socket', function(done) { + it('should be able to monitor the socket', function (done) { var rep = zmq.socket('rep') , req = zmq.socket('req') , events = []; - rep.on('message', function(msg){ + rep.on('message', function (msg) { msg.should.be.an.instanceof(Buffer); msg.toString().should.equal('hello'); rep.send('world'); }); var testedEvents = ['listen', 'accept', 'disconnect', 'close']; - testedEvents.forEach(function(e) { - rep.on(e, function(event_value, event_endpoint_addr) { + testedEvents.forEach(function (e) { + rep.on(e, function (event_value, event_endpoint_addr) { // Test the endpoint addr arg event_endpoint_addr.toString().should.equal('tcp://127.0.0.1:5423'); @@ -45,10 +45,10 @@ describe('socket.monitor', function() { if (error) throw error; }); - rep.on('bind', function(){ + rep.on('bind', function () { req.connect('tcp://127.0.0.1:5423'); req.send('hello'); - req.on('message', function(msg){ + req.on('message', function (msg) { msg.should.be.an.instanceof(Buffer); msg.toString().should.equal('world'); req.close(); @@ -56,9 +56,9 @@ describe('socket.monitor', function() { }); }); - it('should use default interval and numOfEvents', function(done) { + it('should use default interval and numOfEvents', function (done) { var req = zmq.socket('req'); - req.setsockopt(zmq.ZMQ_RECONNECT_IVL, 5); // We want a quick connect retry from zmq + req.set('ZMQ_RECONNECT_IVL', 5); // We want a quick connect retry from zmq // We will try to connect to a non-existing server, zmq will issue events: "connect_retry", "close", "connect_retry" // The connect_retry will be issued immediately after the close event, so we will measure the time between the close @@ -66,11 +66,11 @@ describe('socket.monitor', function() { // the monitor socket). var closeTime; - req.on('close', function() { + req.on('close', function () { closeTime = Date.now(); }); - req.on('connect_retry', function() { + req.on('connect_retry', function () { var diff = Date.now() - closeTime; req.unmonitor(); req.close(); @@ -82,15 +82,15 @@ describe('socket.monitor', function() { req.connect('tcp://127.0.0.1:5423'); }); - it('should read multiple events on monitor interval', function(done) { + it('should read multiple events on monitor interval', function (done) { var req = zmq.socket('req'); - req.setsockopt(zmq.ZMQ_RECONNECT_IVL, 5); + req.set('ZMQ_RECONNECT_IVL', 5); var closeTime; - req.on('close', function() { + req.on('close', function () { closeTime = Date.now(); }); - req.on('connect_retry', function() { + req.on('connect_retry', function () { var diff = Date.now() - closeTime; req.unmonitor(); req.close(); diff --git a/test/socket.pair.js b/test/socket.pair.js index 294b2fa..a0076e1 100644 --- a/test/socket.pair.js +++ b/test/socket.pair.js @@ -1,9 +1,9 @@ var zmq = require('..') , should = require('should'); -describe('socket.pair', function(){ +describe('socket.pair', function () { - it('should support pair-pair', function (done){ + it('should support pair-pair', function (done) { var pairB = zmq.socket('pair') , pairC = zmq.socket('pair'); diff --git a/test/socket.pub-sub.js b/test/socket.pub-sub.js index b50c22e..13c274a 100644 --- a/test/socket.pub-sub.js +++ b/test/socket.pub-sub.js @@ -2,19 +2,19 @@ var zmq = require('..') , should = require('should') , semver = require('semver'); -describe('socket.pub-sub', function(){ +describe('socket.pub-sub', function () { var pub, sub; - beforeEach(function() { + beforeEach(function () { pub = zmq.socket('pub'); sub = zmq.socket('sub'); }); - it('should support pub-sub', function(done){ + it('should support pub-sub', function (done) { var n = 0; sub.subscribe(''); - sub.on('message', function(msg){ + sub.on('message', function (msg) { msg.should.be.an.instanceof(Buffer); switch (n++) { case 0: @@ -47,7 +47,7 @@ describe('socket.pub-sub', function(){ // established before we start the send. This fixes the observed // hang. - setTimeout(function() { + setTimeout(function () { pub.send('foo'); pub.send('bar'); pub.send('baz'); @@ -55,13 +55,13 @@ describe('socket.pub-sub', function(){ }); }); - it('should support pub-sub filter', function(done){ + it('should support pub-sub filter', function (done) { var n = 0; sub.subscribe('js'); sub.subscribe('luna'); - sub.on('message', function(msg){ + sub.on('message', function (msg) { msg.should.be.an.instanceof(Buffer); switch (n++) { case 0: @@ -82,7 +82,7 @@ describe('socket.pub-sub', function(){ // See comments on pub-sub test. - setTimeout(function() { + setTimeout(function () { pub.send('js is cool'); pub.send('ruby is meh'); pub.send('py is pretty cool'); diff --git a/test/socket.push-pull.js b/test/socket.push-pull.js index 8eeb66f..e39d0d3 100644 --- a/test/socket.push-pull.js +++ b/test/socket.push-pull.js @@ -2,14 +2,14 @@ var zmq = require('..') , should = require('should') , semver = require('semver'); -describe('socket.push-pull', function(){ +describe('socket.push-pull', function () { - it('should support push-pull', function(done){ + it('should support push-pull', function (done) { var push = zmq.socket('push') , pull = zmq.socket('pull'); var n = 0; - pull.on('message', function(msg){ + pull.on('message', function (msg) { msg.should.be.an.instanceof(Buffer); switch (n++) { case 0: @@ -40,13 +40,13 @@ describe('socket.push-pull', function(){ }); - it('should not emit messages after pause()', function(done){ + it('should not emit messages after pause()', function (done) { var push = zmq.socket('push') , pull = zmq.socket('pull'); var n = 0; - pull.on('message', function(msg){ + pull.on('message', function (msg) { if(n++ === 0) { msg.toString().should.equal('foo'); } @@ -67,14 +67,14 @@ describe('socket.push-pull', function(){ push.send('baz'); }); - setTimeout(function (){ + setTimeout(function () { pull.close(); push.close(); done(); }, 100); }); - it('should be able to read messages after pause()', function(done){ + it('should be able to read messages after pause()', function (done) { var push = zmq.socket('push') , pull = zmq.socket('pull'); @@ -86,16 +86,16 @@ describe('socket.push-pull', function(){ push.connect(addr); pull.pause() - messages.forEach(function(message){ + messages.forEach(function (message) { push.send(message); }); - messages.forEach(function(message){ + messages.forEach(function (message) { pull.read().toString().should.eql(message); }); }); - setTimeout(function (){ + setTimeout(function () { pull.close(); push.close(); done(); @@ -103,17 +103,17 @@ describe('socket.push-pull', function(){ }); - it('should emit messages after resume()', function(done){ + it('should emit messages after resume()', function (done) { var push = zmq.socket('push') , pull = zmq.socket('pull'); var n = 0; - function checkNoMessages(msg){ + function checkNoMessages(msg) { should.not.exist(msg); } - function checkMessages(msg){ + function checkMessages(msg) { msg.should.be.an.instanceof(Buffer); switch (n++) { case 0: @@ -144,7 +144,7 @@ describe('socket.push-pull', function(){ push.send('bar'); push.send('baz'); - setTimeout(function (){ + setTimeout(function () { pull.removeListener('message', checkNoMessages) pull.on('message', checkMessages) pull.resume() diff --git a/test/socket.req-rep.js b/test/socket.req-rep.js index 93ce601..14df03b 100644 --- a/test/socket.req-rep.js +++ b/test/socket.req-rep.js @@ -1,12 +1,12 @@ var zmq = require('..') , should = require('should'); -describe('socket.req-rep', function(){ - it('should support req-rep', function(done){ +describe('socket.req-rep', function () { + it('should support req-rep', function (done) { var rep = zmq.socket('rep') , req = zmq.socket('req'); - rep.on('message', function(msg){ + rep.on('message', function (msg) { msg.should.be.an.instanceof(Buffer); msg.toString().should.equal('hello'); rep.send('world'); @@ -16,7 +16,7 @@ describe('socket.req-rep', function(){ if (error) throw error; req.connect('inproc://stuffreqrep'); req.send('hello'); - req.on('message', function(msg){ + req.on('message', function (msg) { msg.should.be.an.instanceof(Buffer); msg.toString().should.equal('world'); rep.close(); @@ -26,15 +26,15 @@ describe('socket.req-rep', function(){ }); }); - it('should support multiple', function(done){ + it('should support multiple', function (done) { var n = 5; for (var i = 0; i < n; i++) { - (function(n){ + (function (n) { var rep = zmq.socket('rep') , req = zmq.socket('req'); - rep.on('message', function(msg){ + rep.on('message', function (msg) { msg.should.be.an.instanceof(Buffer); msg.toString().should.equal('hello'); rep.send('world'); @@ -44,7 +44,7 @@ describe('socket.req-rep', function(){ if (error) throw error; req.connect('inproc://' + n); req.send('hello'); - req.on('message', function(msg){ + req.on('message', function (msg) { msg.should.be.an.instanceof(Buffer); msg.toString().should.equal('world'); req.close(); diff --git a/test/socket.router.js b/test/socket.router.js index 45feee2..514a1d3 100644 --- a/test/socket.router.js +++ b/test/socket.router.js @@ -2,8 +2,8 @@ var zmq = require('..') , should = require('should') , semver = require('semver'); -describe('socket.router', function(){ - it('should handle the unroutable', function(done){ +describe('socket.router', function () { + it('should handle the unroutable', function (done) { var complete = 0; if (!semver.gte(zmq.version, '3.2.0')) { @@ -30,7 +30,7 @@ describe('socket.router', function(){ // should emit an error event on unroutable msgs if mandatory = 1 and error handler is set - (function(){ + (function () { var sock = zmq.socket('router'); sock.on('error', function (err) { sock.close(); @@ -38,17 +38,17 @@ describe('socket.router', function(){ if (++complete === 2) done(); }); - sock.setsockopt(zmq.ZMQ_ROUTER_MANDATORY, 1); + sock.set('ZMQ_ROUTER_MANDATORY', 1); sock.send([envelope, '']); })(); // should throw an error on unroutable msgs if mandatory = 1 and no error handler is set - (function(){ + (function () { var sock = zmq.socket('router'); - sock.setsockopt(zmq.ZMQ_ROUTER_MANDATORY, 1); + sock.set('ZMQ_ROUTER_MANDATORY', 1); try { sock.send([envelope, '']); @@ -73,10 +73,10 @@ describe('socket.router', function(){ // should silently ignore unroutable msgs if mandatory = 0 - (function(){ + (function () { var sock = zmq.socket('router'); - (function(){ + (function () { sock.send([envelope, '']); sock.close(); }).should.not.throw; diff --git a/test/socket.stream.js b/test/socket.stream.js index 4efe0b0..c68b5fa 100644 --- a/test/socket.stream.js +++ b/test/socket.stream.js @@ -1,75 +1,67 @@ -var zmq = require('..') - , http = require('http') - , should = require('should') - , semver = require('semver'); +var zmq = require('..'); +var http = require('http'); +var should = require('should'); +var semver = require('semver'); -describe('socket.stream', function(){ +describe('socket.stream', function () { + // socket stream type API after libzmq4+, target > 4.0.0 + // v4.1.3's stream socket is a little buggy, let's exclude it for now - it('should support a stream socket type', function (done){ - - // socket stream type API after libzmq4+, target > 4.0.0 - // v4.1.3's stream socket is a little buggy, let's exclude it for now - if (semver.gte(zmq.version, '4.0.0') && semver.lt(zmq.version, '4.1.3')) { - - var stream = zmq.socket('stream'); - stream.on('message', function (id,msg){ + it('should support a stream socket type', function (done) { + if (semver.lt(zmq.version, '4.0.0') || zmq.version !== '4.1.3') { + done(); + return console.warn('Stream socket type requires libzmq v4+ (skipping also on 4.1.3)'); + } - msg.should.be.an.instanceof(Buffer); + var stream = zmq.socket('stream'); + stream.on('message', function (id, msg) { + msg.should.be.an.instanceof(Buffer); - var raw_header = String(msg).split('\r\n'); - var method = raw_header[0].split(' ')[0]; - method.should.equal('GET'); + var raw_header = msg.toString().split('\r\n'); + var method = raw_header[0].split(' ')[0]; + method.should.equal('GET'); - //finding an HTTP GET method, prepare HTTP response for TCP socket - var httpProtocolString = 'HTTP/1.0 200 OK\r\n' //status code - + 'Content-Type: text/html\r\n' //headers - + '\r\n' - + '' //response body - + '' //make it xml, json, html or something else - + '' - + '' - + '' - +'

derpin over protocols

' - + '' - +'' + //finding an HTTP GET method, prepare HTTP response for TCP socket + var httpProtocolString = 'HTTP/1.0 200 OK\r\n' //status code + + 'Content-Type: text/html\r\n' //headers + + '\r\n' + + '' //response body + + '' //make it xml, json, html or something else + + '' + + '' + + '' + +'

derpin over protocols

' + + '' + +'' - //zmq streaming prefixed by envelope's routing identifier - stream.send([id,httpProtocolString]); - }); + //zmq streaming prefixed by envelope's routing identifier + stream.send([id,httpProtocolString]); + }); - var addr = '127.0.0.1:5513'; - stream.bind('tcp://'+addr, function (error) { - if (error) throw error; - //send non-peer request to zmq, like an http GET method with URI path - http.get('http://'+addr+'/aRandomRequestPath', function (httpMsg){ + var addr = '127.0.0.1:5513'; + stream.bind('tcp://' + addr, function (error) { + if (error) throw error; + //send non-peer request to zmq, like an http GET method with URI path + http.get('http://' + addr + '/aRandomRequestPath', function (httpMsg) { - //msg should now be a node readable stream as the good lord intended - if (semver.gte(process.versions.node, '0.11.0')){ - httpMsg.socket._readableState.reading.should.be.false - } else { - if(semver.gte(process.versions.node, '0.10.0')){ - httpMsg.socket._readableState.reading.should.be.true - } - } + //msg should now be a node readable stream as the good lord intended + if (semver.gte(process.versions.node, '0.11.0')) { + httpMsg.socket._readableState.reading.should.be.false + } else { + httpMsg.socket._readableState.reading.should.be.true + } - //conventional node streams emit data events to process zmq stream response - httpMsg.on('data',function (msg){ - msg.should.be.an.instanceof(Buffer); - String(msg).should.equal('' - +'' - +'

derpin over protocols

' - +'' - +''); - done(); - }); + //conventional node streams emit data events to process zmq stream response + httpMsg.on('data', function (msg) { + msg.should.be.an.instanceof(Buffer); + msg.toString().should.equal('' + +'' + +'

derpin over protocols

' + +'' + +''); + done(); }); }); - - } else { - - done(); - return console.warn('stream socket type in libzmq v4+'); - - } + }); }); }); diff --git a/test/socket.unbind.js b/test/socket.unbind.js index 4d7b627..e37e518 100644 --- a/test/socket.unbind.js +++ b/test/socket.unbind.js @@ -2,9 +2,9 @@ var zmq = require('..') , should = require('should') , semver = require('semver'); -describe('socket.unbind', function(){ +describe('socket.unbind', function () { - it('should be able to unbind', function(done){ + it('should be able to unbind', function (done) { if (!zmq.ZMQ_CAN_UNBIND) { done(); return; @@ -25,7 +25,7 @@ describe('socket.unbind', function(){ }); }); - a.on('unbind', function(addr) { + a.on('unbind', function (addr) { if (addr === 'tcp://127.0.0.1:5420') { b.send('Error from b.'); c.send('Messsage from c.'); @@ -35,7 +35,7 @@ describe('socket.unbind', function(){ } }); - a.on('message', function(msg) { + a.on('message', function (msg) { message_count++; if (msg.toString() === 'Hello from b.') { a.unbind('tcp://127.0.0.1:5420', function (error) { diff --git a/test/socket.zap.js b/test/socket.zap.js index 4148f30..aaba5d5 100644 --- a/test/socket.zap.js +++ b/test/socket.zap.js @@ -1,66 +1,63 @@ -var zmq = require('..') - , should = require('should') - , semver = require('semver'); +var zmq = require('..'); +var should = require('should'); -describe('socket.zap', function(){ +describe('socket.zap', function () { + if (!zmq.socketOptionExists('ZMQ_CURVE_SERVER')) { + return console.log('ZMQ_CURVE_SERVER does not exist in this version of ZMQ (skipping curve test)'); + } - var zap = require('./zap') - , zapSocket, rep, req, count = 0; + var zap = require('./zap'); + var zapSocket, rep, req, count = 0; - beforeEach(function(){ + beforeEach(function () { count++; zapSocket = zap.start(count); rep = zmq.socket('rep'); req = zmq.socket('req'); }); - afterEach(function(){ + afterEach(function () { req.close(); rep.close(); zapSocket.close(); }); - it('should support curve', function(done){ + it('should support curve', function (done) { var port = 'tcp://127.0.0.1:12347'; - if (!semver.gte(zmq.version, '4.0.0')) { - done(); - return; - } try { - rep.curve_server = 0; + rep.set('ZMQ_CURVE_SERVER', 0); } catch(e) { - console.log("libsodium seems to be missing (skipping curve test)"); - done(); - return; + console.log('libsodium seems to be missing (skipping curve test)'); + return done(); } - var serverPublicKey = new Buffer('7f188e5244b02bf497b86de417515cf4d4053ce4eb977aee91a55354655ec33a', 'hex') - , serverPrivateKey = new Buffer('1f5d3873472f95e11f4723d858aaf0919ab1fb402cb3097742c606e61dd0d7d8', 'hex') - , clientPublicKey = new Buffer('ea1cc8bd7c8af65497d43fc21dbec6560c5e7b61bcfdcbd2b0dfacf0b4c38d45', 'hex') - , clientPrivateKey = new Buffer('83f99afacfab052406e5f421612568034e85f4c8182a1c92671e83dca669d31d', 'hex'); + var serverPublicKey = new Buffer('7f188e5244b02bf497b86de417515cf4d4053ce4eb977aee91a55354655ec33a', 'hex'); + var serverPrivateKey = new Buffer('1f5d3873472f95e11f4723d858aaf0919ab1fb402cb3097742c606e61dd0d7d8', 'hex'); + var clientPublicKey = new Buffer('ea1cc8bd7c8af65497d43fc21dbec6560c5e7b61bcfdcbd2b0dfacf0b4c38d45', 'hex'); + var clientPrivateKey = new Buffer('83f99afacfab052406e5f421612568034e85f4c8182a1c92671e83dca669d31d', 'hex'); - rep.on('message', function(msg){ + rep.on('message', function (msg) { msg.should.be.an.instanceof(Buffer); msg.toString().should.equal('hello'); rep.send('world'); }); - rep.zap_domain = "test"; - rep.curve_server = 1; - rep.curve_secretkey = serverPrivateKey; - rep.mechanism.should.eql(2); + rep.zap_domain = 'test'; + rep.set('curve_server', 1); + rep.set('curve_secretkey', serverPrivateKey); + rep.get('mechanism').should.eql(2); rep.bind(port, function (error) { if (error) throw error; - req.curve_serverkey = serverPublicKey; - req.curve_publickey = clientPublicKey; - req.curve_secretkey = clientPrivateKey; - req.mechanism.should.eql(2); + req.set('curve_serverkey', serverPublicKey); + req.set('curve_publickey', clientPublicKey); + req.set('curve_secretkey', clientPrivateKey); + req.get('mechanism').should.eql(2); req.connect(port); req.send('hello'); - req.on('message', function(msg){ + req.on('message', function (msg) { msg.should.be.an.instanceof(Buffer); msg.toString().should.equal('world'); done(); @@ -69,28 +66,24 @@ describe('socket.zap', function(){ }); - it('should support null', function(done){ + it('should support null', function (done) { var port = 'tcp://127.0.0.1:12345'; - if (!semver.gte(zmq.version, '4.0.0')) { - done(); - return; - } - rep.on('message', function(msg){ + rep.on('message', function (msg) { msg.should.be.an.instanceof(Buffer); msg.toString().should.equal('hello'); rep.send('world'); }); - rep.zap_domain = "test"; - rep.mechanism.should.eql(0); + rep.set('zap_domain', 'test'); + rep.get('mechanism').should.eql(0); rep.bind(port, function (error) { if (error) throw error; - req.mechanism.should.eql(0); + req.get('mechanism').should.eql(0); req.connect(port); req.send('hello'); - req.on('message', function(msg){ + req.on('message', function (msg) { msg.should.be.an.instanceof(Buffer); msg.toString().should.equal('world'); done(); @@ -98,32 +91,28 @@ describe('socket.zap', function(){ }); }); - it('should support plain', function(done){ + it('should support plain', function (done) { var port = 'tcp://127.0.0.1:12346'; - if (!semver.gte(zmq.version, '4.0.0')) { - done(); - return; - } - rep.on('message', function(msg){ + rep.on('message', function (msg) { msg.should.be.an.instanceof(Buffer); msg.toString().should.equal('hello'); rep.send('world'); }); - rep.zap_domain = "test"; - rep.plain_server = 1; - rep.mechanism.should.eql(1); + rep.set('zap_domain', 'test'); + rep.set('plain_server', 1); + rep.get('mechanism').should.eql(1); rep.bind(port, function (error) { if (error) throw error; - req.plain_username = "user"; - req.plain_password = "pass"; - req.mechanism.should.eql(1); + req.set('plain_username', 'user'); + req.set('plain_password', 'pass'); + req.get('mechanism').should.eql(1); req.connect(port); req.send('hello'); - req.on('message', function(msg){ + req.on('message', function (msg) { msg.should.be.an.instanceof(Buffer); msg.toString().should.equal('world'); done(); diff --git a/test/zap.js b/test/zap.js index 238fee9..baa859d 100644 --- a/test/zap.js +++ b/test/zap.js @@ -1,25 +1,30 @@ // This is mainly for testing that the security mechanisms themselves are working -// not the ZAP protocol itself. As long as the request is valid, this will +// not the ZAP protocol itself. As long as the request is valid, this will // authenticate it. var zmq = require('../'); -module.exports.start = function(count) { +exports.start = function (count) { var zap = zmq.socket('router'); - zap.on('message', function() { + zap.on('message', function () { var data = Array.prototype.slice.call(arguments); - - if (!data || !data.length) throw new Error("Invalid ZAP request"); - - var returnPath = [], - frame = data.shift(); - while (frame && (frame.length != 0)) { + + if (!data || !data.length) { + throw new Error("Invalid ZAP request"); + } + + var returnPath = []; + var frame = data.shift(); + + while (frame && (frame.length !== 0)) { returnPath.push(frame); frame = data.shift(); } returnPath.push(frame); - if (data.length < 6) throw new Error("Invalid ZAP request"); + if (data.length < 6) { + throw new Error('Invalid ZAP request'); + } var zapReq = { version: data.shift(), @@ -34,13 +39,13 @@ module.exports.start = function(count) { zap.send(returnPath.concat([ zapReq.version, zapReq.requestId, - new Buffer("200", "utf8"), - new Buffer("OK", "utf8"), + new Buffer('200', 'utf8'), + new Buffer('OK', 'utf8'), new Buffer(0), new Buffer(0) ])); }); - - zap.bindSync("inproc://zeromq.zap.01."+count); + + zap.bindSync('inproc://zeromq.zap.01.' + count); return zap; -} +}; diff --git a/test/zmq_proxy.js b/test/zmq_proxy.js index 438814e..e2e4e43 100644 --- a/test/zmq_proxy.js +++ b/test/zmq_proxy.js @@ -2,7 +2,7 @@ var zmq = require('..') , should = require('should'); -describe('proxy', function() { +describe('proxy', function () { it('should be a function off the module namespace', function (done) { zmq.proxy.should.be.a.Function; done(); diff --git a/test/zmq_proxy.push-pull.js b/test/zmq_proxy.push-pull.js index c26942d..c07b3b4 100644 --- a/test/zmq_proxy.push-pull.js +++ b/test/zmq_proxy.push-pull.js @@ -1,15 +1,15 @@ -var zmq = require('..') - , should = require('should') - , semver = require('semver'); +var zmq = require('..'); +var should = require('should'); +var semver = require('semver'); -var addr = 'tcp://127.0.0.1' - , frontendAddr = addr+':5501' - , backendAddr = addr+':5502' - , captureAddr = addr+':5503'; +var addr = 'tcp://127.0.0.1'; +var frontendAddr = addr + ':5501'; +var backendAddr = addr + ':5502'; +var captureAddr = addr + ':5503'; -describe('proxy.push-pull', function() { +describe('proxy.push-pull', function () { - it('should proxy push-pull connected to pull-push',function (done) { + it('should proxy push-pull connected to pull-push', function (done) { var frontend = zmq.socket('pull'); var backend = zmq.socket('push'); @@ -23,7 +23,7 @@ describe('proxy.push-pull', function() { push.connect(frontendAddr); pull.connect(backendAddr); - pull.on('message',function (msg) { + pull.on('message', function (msg) { frontend.close(); backend.close(); @@ -35,15 +35,14 @@ describe('proxy.push-pull', function() { done(); }); - setTimeout(function() { + setTimeout(function () { push.send('foo'); - }, 100.0); - - zmq.proxy(frontend,backend); + }, 100); + zmq.proxy(frontend, backend); }); - it('should proxy pull-push connected to push-pull with capture',function (done) { + it('should proxy pull-push connected to push-pull with capture', function (done) { var frontend = zmq.socket('push'); var backend = zmq.socket('pull'); @@ -62,17 +61,17 @@ describe('proxy.push-pull', function() { push.connect(backendAddr); capSub.connect(captureAddr); - pull.on('message',function (msg) { + pull.on('message', function (msg) { msg.should.be.an.instanceof(Buffer); msg.toString().should.equal('foo'); }); capSub.subscribe(''); - capSub.on('message',function (msg) { + capSub.on('message', function (msg) { capture.close(); capSub.close(); - setTimeout(function() { + setTimeout(function () { frontend.close(); backend.close(); push.close(); @@ -81,12 +80,12 @@ describe('proxy.push-pull', function() { msg.should.be.an.instanceof(Buffer); msg.toString().should.equal('foo'); done(); - },100.0); + }, 100); }); - setTimeout(function() { + setTimeout(function () { push.send('foo'); - }, 100.0); + }, 100); zmq.proxy(frontend,backend,capture); diff --git a/test/zmq_proxy.router-dealer.js b/test/zmq_proxy.router-dealer.js index b412503..def81a6 100644 --- a/test/zmq_proxy.router-dealer.js +++ b/test/zmq_proxy.router-dealer.js @@ -1,15 +1,15 @@ -var zmq = require('..') - , should = require('should') - , semver = require('semver'); +var zmq = require('..'); +var should = require('should'); +var semver = require('semver'); -var addr = 'tcp://127.0.0.1' - , frontendAddr = addr+':5504' - , backendAddr = addr+':5505' - , captureAddr = addr+':5506'; +var addr = 'tcp://127.0.0.1'; +var frontendAddr = addr + ':5504'; +var backendAddr = addr + ':5505'; +var captureAddr = addr + ':5506'; -describe('proxy.router-dealer', function() { +describe('proxy.router-dealer', function () { - it('should proxy req-rep connected over router-dealer', function (done){ + it('should proxy req-rep connected over router-dealer', function (done) { var frontend = zmq.socket('router'); var backend = zmq.socket('dealer'); @@ -23,7 +23,7 @@ describe('proxy.router-dealer', function() { req.connect(frontendAddr); rep.connect(backendAddr); - req.on('message',function(msg){ + req.on('message', function (msg) { msg.should.be.an.instanceof(Buffer); msg.toString().should.equal('foo bar'); frontend.close(); @@ -34,18 +34,18 @@ describe('proxy.router-dealer', function() { }); rep.on('message', function (msg) { - rep.send(msg+' bar'); + rep.send(msg + ' bar'); }); - setTimeout(function() { + setTimeout(function () { req.send('foo'); - }, 100.0); + }, 100); zmq.proxy(frontend,backend); }); - it('should proxy rep-req connections with capture', function (done){ + it('should proxy rep-req connections with capture', function (done) { var frontend = zmq.socket('router'); var backend = zmq.socket('dealer'); @@ -65,7 +65,7 @@ describe('proxy.router-dealer', function() { capSub.connect(captureAddr); capSub.subscribe(''); - req.on('message',function (msg) { + req.on('message', function (msg) { req.close(); rep.close(); }); @@ -74,21 +74,21 @@ describe('proxy.router-dealer', function() { rep.send(msg+' bar'); }); - capSub.on('message',function (msg) { + capSub.on('message', function (msg) { backend.close(); frontend.close(); capture.close(); capSub.close(); - setTimeout(function() { + setTimeout(function () { msg.should.be.an.instanceof(Buffer); msg.toString().should.equal('foo bar'); done(); - },100.0) + }, 100); }); - setTimeout(function() { + setTimeout(function () { req.send('foo'); - },200.0) + }, 200); zmq.proxy(frontend,backend,capture); diff --git a/test/zmq_proxy.xpub-xsub.js b/test/zmq_proxy.xpub-xsub.js index 01034a5..276f173 100644 --- a/test/zmq_proxy.xpub-xsub.js +++ b/test/zmq_proxy.xpub-xsub.js @@ -9,7 +9,7 @@ var addr = 'tcp://127.0.0.1' var version = semver.gte(zmq.version, '3.1.0'); -describe('proxy.xpub-xsub', function() { +describe('proxy.xpub-xsub', function () { it('should proxy pub-sub connected to xpub-xsub', function (done) { if (!version) { @@ -24,8 +24,7 @@ describe('proxy.xpub-xsub', function() { var pub = zmq.socket('pub'); sub.subscribe(''); - sub.on('message',function (msg) { - + sub.on('message', function (msg) { frontend.close(); backend.close(); sub.close(); @@ -45,12 +44,11 @@ describe('proxy.xpub-xsub', function() { sub.connect(frontendAddr); pub.connect(backendAddr); - setTimeout(function() { + setTimeout(function () { pub.send('foo'); - }, 200.0); + }, 200); zmq.proxy(frontend, backend); - }); }); }); @@ -72,7 +70,6 @@ describe('proxy.xpub-xsub', function() { sub.subscribe(''); sub.on('message', function (msg) { - sub.close(); pub.close(); backend.close(); @@ -83,16 +80,15 @@ describe('proxy.xpub-xsub', function() { }); capSub.subscribe(''); - capSub.on('message',function (msg) { - + capSub.on('message', function (msg) { capture.close(); capSub.close(); - setTimeout(function(){ + setTimeout(function () { msg.should.be.an.instanceof(Buffer); msg.toString().should.equal('foo'); done(); - },100.0); + }, 100); }); capture.bind(captureAddr, function (error) { @@ -108,7 +104,7 @@ describe('proxy.xpub-xsub', function() { setTimeout(function () { pub.send('foo'); - }, 200.0); + }, 200); zmq.proxy(frontend,backend,capture); }); @@ -134,24 +130,20 @@ describe('proxy.xpub-xsub', function() { sub.connect(frontendAddr); pub.connect(backendAddr); - try{ - + try { zmq.proxy(backend,frontend); - - } catch(e){ - - e.message.should.equal('wrong socket order to proxy'); - - } finally{ + } catch (e) { + e.message.should.equal('Wrong socket order to proxy'); + } finally { frontend.close(); backend.close(); pub.close(); sub.close(); //allow time for TCP sockets to close - setTimeout(function(){ + setTimeout(function () { done(); - },200) + }, 200); } }) }); diff --git a/test/zmq_proxy.xrep-xreq.js b/test/zmq_proxy.xrep-xreq.js index c0115a0..210ea95 100644 --- a/test/zmq_proxy.xrep-xreq.js +++ b/test/zmq_proxy.xrep-xreq.js @@ -1,16 +1,16 @@ var zmq = require('..') - , should = require('should') - , semver = require('semver'); +var should = require('should'); +var semver = require('semver'); -var addr = 'tcp://127.0.0.1' - , frontendAddr = addr+':5510' - , backendAddr = addr+':5511' - , captureAddr = addr+':5512'; +var addr = 'tcp://127.0.0.1'; +var frontendAddr = addr + ':5510'; +var backendAddr = addr + ':5511'; +var captureAddr = addr + ':5512'; //since its for libzmq2, we target versions < 3.0.0 var version = semver.lte(zmq.version, '3.0.0'); -describe('proxy.xrep-xreq', function() { +describe('proxy.xrep-xreq', function () { it('should proxy req-rep connected to xrep-xreq', function (done) { if (!version) { console.warn('Test requires libzmq v2 (skipping)'); @@ -29,7 +29,7 @@ describe('proxy.xrep-xreq', function() { req.connect(frontendAddr); rep.connect(backendAddr); - req.on('message',function(msg){ + req.on('message', function (msg) { msg.should.be.an.instanceof(Buffer); msg.toString().should.equal('foo bar'); frontend.close(); @@ -40,13 +40,13 @@ describe('proxy.xrep-xreq', function() { }); rep.on('message', function (msg) { - rep.send(msg+' bar'); + rep.send(msg + ' bar'); }); - setTimeout(function() { + setTimeout(function () { req.send('foo'); - }, 100.0); + }, 100); - zmq.proxy(frontend,backend); + zmq.proxy(frontend, backend); }); });