Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 49 additions & 19 deletions lib/engine_socketio.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ function markEndTime(ee, context, startedAt) {
}

function isResponseRequired(spec) {
return (spec.emit && spec.emit.response && spec.emit.response.channel);
return (spec.emit && spec.emit.response && (spec.emit.response.channel || (_.isArray(spec.emit.response) && !_.isEmpty(spec.emit.response))));
}

function isAcknowledgeRequired(spec) {
Expand Down Expand Up @@ -193,32 +193,62 @@ SocketIoEngine.prototype.step = function (requestSpec, ee) {
};

if (isResponseRequired(requestSpec)) {
let response = {
channel: template(requestSpec.emit.response.channel, context),
data: template(requestSpec.emit.response.data, context),
capture: template(requestSpec.emit.response.capture, context),
match: template(requestSpec.emit.response.match, context)
};
// Listen for the socket.io response on the specified channel
let done = false;
socketio.on(response.channel, function receive(data) {
done = true;
processResponse(ee, data, response, context, function(err) {
if (!err) {
markEndTime(ee, context, startedAt);
}
// Stop listening on the response channel
socketio.off(response.channel);
return endCallback(err, context);
let requiredResponses = requestSpec.emit.response;
if (!_.isArray(requiredResponses)) {
requiredResponses = [requestSpec.emit.response];
}

// Group required responses by channel in order to keep open channels with multiple messages
const channelsResponses = _.groupBy(requiredResponses, 'channel');

let requiredResponsesCount = 0;
let currentResponsesCount = 0;
let requiredResponsesCountsByChannel = {};
_.forEach(channelsResponses, function(channelResponses, channel) {
requiredResponsesCountsByChannel[channel] = 0;
// Expect that messages should be sent in the same order
const responses = [];
_.forEach(channelResponses, function(channelResponse, idx) {
requiredResponsesCountsByChannel[channel]++;
requiredResponsesCount++;
responses.push({
channel: template(channelResponse.channel, context),
data: template(channelResponse.data, context),
capture: template(channelResponse.capture, context),
match: template(channelResponse.match, context)
});
});

let index = -1;
// Listen for the socket.io response on the specified channel
socketio.on(channel, function receive(data) {
index++;
currentResponsesCount++;
processResponse(ee, data, responses[index], context, function(err) {
if (!err) {
markEndTime(ee, context, startedAt);
}
// Stop listening on the response channel only if all messages were received or if there is an error
if (requiredResponsesCountsByChannel[channel] - 1 === index || err) {
socketio.off(channel);
}

// Call the callback function if all channel messages were received or if there is an error
if (requiredResponsesCount === currentResponsesCount || err) {
callback(err, context);
}
});
});
});

// Send the data on the specified socket.io channel
socketio.emit(outgoing.channel, outgoing.data);
// If we don't get a response within the timeout, fire an error
let waitTime = self.config.timeout || 10;
waitTime *= 1000;
setTimeout(function responseTimeout() {
if (!done) {
// Check if all messages have been received
if (requiredResponsesCount !== currentResponsesCount) {
let err = 'response timeout';
ee.emit('error', err);
return callback(err, context);
Expand Down
1 change: 1 addition & 0 deletions test/scripts/hello_socketio.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
{"emit": { "channel": "echo", "data": "ping", "acknowledge": {"match": {"value": "pong"}} }},
{"think": 1},
{"emit": { "channel": "echo", "data": "ping", "acknowledge": {"match": {"json": "$.1.answer", "value": 42}} }},
{"emit": { "channel": "echo", "data": {"key": "{{ rand }}", "sendAck": true}, "response": [{ "channel": "ack", "match": {"json": "$.key", "value": "{{ rand }}" } }, { "channel": "echoed", "match": {"json": "$.key", "value": "{{ rand }}" } }]}},
{"think": 1}
]
}
Expand Down
6 changes: 5 additions & 1 deletion test/targets/simple_socketio.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ function createServer() {
ws.on('echo', function incoming(message, cb) {
MESSAGE_COUNT++;
if (message === 'ping') {
cb("pong", {answer: 42});
cb('pong', {answer: 42});
}
if (message.sendAck === true) {
console.log('Socket.io sending message ack: ', message);
ws.emit('ack', message);
}
debug('Socket.io echoing message: %s', message);
ws.emit('echoed', message);
Expand Down