|
1 | 1 | 'use strict'; |
2 | 2 |
|
3 | | -define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async, chai) { |
| 3 | +define(['ably', 'shared_helper', 'async', 'chai', 'interception_proxy_client'], function ( |
| 4 | + Ably, |
| 5 | + Helper, |
| 6 | + async, |
| 7 | + chai, |
| 8 | + interceptionProxyClient, |
| 9 | +) { |
4 | 10 | var expect = chai.expect; |
5 | 11 | var createPM = Ably.protocolMessageFromDeserialized; |
6 | 12 |
|
@@ -185,119 +191,123 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async |
185 | 191 | * @spec RTN19a2 |
186 | 192 | */ |
187 | 193 | it('connectionQueuing', function (done) { |
188 | | - var helper = this.test.helper, |
189 | | - realtime = helper.AblyRealtime({ transports: [helper.bestTransport] }), |
190 | | - channel = realtime.channels.get('connectionQueuing'), |
191 | | - connectionManager = realtime.connection.connectionManager; |
192 | | - |
193 | | - realtime.connection.once('connected', function () { |
194 | | - helper.recordPrivateApi('read.connectionManager.activeProtocol.transport'); |
195 | | - var transport = connectionManager.activeProtocol.transport; |
196 | | - Helper.whenPromiseSettles(channel.attach(), function (err) { |
197 | | - if (err) { |
198 | | - helper.closeAndFinish(done, realtime, err); |
199 | | - return; |
200 | | - } |
| 194 | + interceptionProxyClient.intercept(done, (done, interceptionContext) => { |
| 195 | + var helper = this.test.helper, |
| 196 | + realtime = helper.AblyRealtime({ transports: [helper.bestTransport] }), |
| 197 | + channel = realtime.channels.get('connectionQueuing'), |
| 198 | + connectionManager = realtime.connection.connectionManager; |
201 | 199 |
|
202 | | - let transportSendCallback; |
| 200 | + realtime.connection.once('connected', function () { |
| 201 | + Helper.whenPromiseSettles(channel.attach(), function (err) { |
| 202 | + if (err) { |
| 203 | + helper.closeAndFinish(done, realtime, err); |
| 204 | + return; |
| 205 | + } |
203 | 206 |
|
204 | | - helper.recordPrivateApi('replace.transport.send'); |
205 | | - /* Sabotage sending the message */ |
206 | | - transport.send = function (msg) { |
207 | | - if (msg.action == 15) { |
208 | | - expect(msg.msgSerial).to.equal(0, 'Expect msgSerial to be 0'); |
| 207 | + let transportSendCallback; |
209 | 208 |
|
210 | | - if (!transportSendCallback) { |
211 | | - done(new Error('transport.send override called before transportSendCallback populated')); |
212 | | - } |
| 209 | + /* Sabotage sending the message */ |
| 210 | + interceptionContext.transformClientMessage = (msg) => { |
| 211 | + if (msg.deserialized.action == 15) { |
| 212 | + expect(msg.deserialized.msgSerial).to.equal(0, 'Expect msgSerial to be 0'); |
213 | 213 |
|
214 | | - transportSendCallback(null); |
215 | | - } |
216 | | - }; |
| 214 | + if (!transportSendCallback) { |
| 215 | + done(new Error('transport.send override called before transportSendCallback populated')); |
| 216 | + } |
217 | 217 |
|
218 | | - let publishCallback; |
| 218 | + transportSendCallback(null); |
| 219 | + } |
| 220 | + }; |
219 | 221 |
|
220 | | - async.series( |
221 | | - [ |
222 | | - function (cb) { |
223 | | - transportSendCallback = cb; |
| 222 | + let publishCallback; |
224 | 223 |
|
225 | | - /* Sabotaged publish */ |
226 | | - Helper.whenPromiseSettles(channel.publish('first', null), function (err) { |
227 | | - if (!publishCallback) { |
228 | | - done(new Error('publish completed before publishCallback populated')); |
229 | | - } |
230 | | - publishCallback(err); |
231 | | - }); |
232 | | - }, |
| 224 | + async.series( |
| 225 | + [ |
| 226 | + function (cb) { |
| 227 | + transportSendCallback = cb; |
233 | 228 |
|
234 | | - // We wait for transport.send to recieve the message that we just |
235 | | - // published before we proceed to disconnecting the transport, to |
236 | | - // make sure that the message got marked as `sendAttempted`. |
| 229 | + /* Sabotaged publish */ |
| 230 | + Helper.whenPromiseSettles(channel.publish('first', null), function (err) { |
| 231 | + if (!publishCallback) { |
| 232 | + done(new Error('publish completed before publishCallback populated')); |
| 233 | + } |
| 234 | + publishCallback(err); |
| 235 | + }); |
| 236 | + }, |
237 | 237 |
|
238 | | - function (cb) { |
239 | | - async.parallel( |
240 | | - [ |
241 | | - function (cb) { |
242 | | - publishCallback = function (err) { |
243 | | - try { |
244 | | - expect(!err, 'Check publish happened (eventually) without err').to.be.ok; |
245 | | - } catch (err) { |
246 | | - cb(err); |
247 | | - return; |
248 | | - } |
249 | | - cb(); |
250 | | - }; |
251 | | - }, |
252 | | - function (cb) { |
253 | | - /* After the disconnect, on reconnect, spy on transport.send again */ |
254 | | - helper.recordPrivateApi('listen.connectionManager.transport.pending'); |
255 | | - connectionManager.once('transport.pending', function (transport) { |
256 | | - var oldSend = transport.send; |
| 238 | + // We wait for transport.send to recieve the message that we just |
| 239 | + // published before we proceed to disconnecting the transport, to |
| 240 | + // make sure that the message got marked as `sendAttempted`. |
257 | 241 |
|
258 | | - helper.recordPrivateApi('replace.transport.send'); |
259 | | - transport.send = function (msg, msgCb) { |
260 | | - if (msg.action === 15) { |
261 | | - if (msg.messages[0].name === 'first') { |
262 | | - try { |
263 | | - expect(msg.msgSerial).to.equal(0, 'Expect msgSerial of original message to still be 0'); |
264 | | - expect(msg.messages.length).to.equal( |
265 | | - 1, |
266 | | - 'Expect second message to not have been merged with the attempted message', |
267 | | - ); |
268 | | - } catch (err) { |
269 | | - cb(err); |
270 | | - return; |
271 | | - } |
272 | | - } else if (msg.messages[0].name === 'second') { |
273 | | - try { |
274 | | - expect(msg.msgSerial).to.equal(1, 'Expect msgSerial of new message to be 1'); |
275 | | - } catch (err) { |
276 | | - cb(err); |
277 | | - return; |
278 | | - } |
279 | | - cb(); |
280 | | - } |
| 242 | + function (cb) { |
| 243 | + async.parallel( |
| 244 | + [ |
| 245 | + function (cb) { |
| 246 | + publishCallback = function (err) { |
| 247 | + try { |
| 248 | + expect(!err, 'Check publish happened (eventually) without err').to.be.ok; |
| 249 | + } catch (err) { |
| 250 | + cb(err); |
| 251 | + return; |
281 | 252 | } |
282 | | - helper.recordPrivateApi('call.transport.send'); |
283 | | - oldSend.call(transport, msg, msgCb); |
| 253 | + cb(); |
284 | 254 | }; |
285 | | - channel.publish('second', null); |
286 | | - }); |
| 255 | + }, |
| 256 | + function (cb) { |
| 257 | + /* After the disconnect, on reconnect, spy on transport.send again */ |
| 258 | + helper.recordPrivateApi('listen.connectionManager.transport.pending'); |
| 259 | + connectionManager.once('transport.pending', function (transport) { |
| 260 | + // TODO does the identity of this transport matter, and can we replace the `transport.pending` check with something external too (e.g. detecting a new connection)? perhaps let's have an EventEmitter interface on the interception context that says when there's a new connection or something |
| 261 | + interceptionContext.transformClientMessage = function (msg) { |
| 262 | + if (msg.deserialized.action === 15) { |
| 263 | + if (msg.deserialized.messages[0].name === 'first') { |
| 264 | + try { |
| 265 | + expect(msg.deserialized.msgSerial).to.equal( |
| 266 | + 0, |
| 267 | + 'Expect msgSerial of original message to still be 0', |
| 268 | + ); |
| 269 | + expect(msg.deserialized.messages.length).to.equal( |
| 270 | + 1, |
| 271 | + 'Expect second message to not have been merged with the attempted message', |
| 272 | + ); |
| 273 | + } catch (err) { |
| 274 | + cb(err); |
| 275 | + return msg.deserialized; |
| 276 | + } |
| 277 | + } else if (msg.deserialized.messages[0].name === 'second') { |
| 278 | + try { |
| 279 | + expect(msg.deserialized.msgSerial).to.equal( |
| 280 | + 1, |
| 281 | + 'Expect msgSerial of new message to be 1', |
| 282 | + ); |
| 283 | + } catch (err) { |
| 284 | + cb(err); |
| 285 | + return msg.deserialized; |
| 286 | + } |
| 287 | + cb(); |
| 288 | + } |
| 289 | + } |
| 290 | + |
| 291 | + // preserve the message |
| 292 | + return msg.deserialized; |
| 293 | + }; |
| 294 | + channel.publish('second', null); |
| 295 | + }); |
287 | 296 |
|
288 | | - /* Disconnect the transport (will automatically reconnect and resume) () */ |
289 | | - helper.recordPrivateApi('call.connectionManager.disconnectAllTransports'); |
290 | | - connectionManager.disconnectAllTransports(); |
291 | | - }, |
292 | | - ], |
293 | | - cb, |
294 | | - ); |
| 297 | + /* Disconnect the transport (will automatically reconnect and resume) () */ |
| 298 | + helper.recordPrivateApi('call.connectionManager.disconnectAllTransports'); |
| 299 | + connectionManager.disconnectAllTransports(); |
| 300 | + }, |
| 301 | + ], |
| 302 | + cb, |
| 303 | + ); |
| 304 | + }, |
| 305 | + ], |
| 306 | + function (err) { |
| 307 | + helper.closeAndFinish(done, realtime, err); |
295 | 308 | }, |
296 | | - ], |
297 | | - function (err) { |
298 | | - helper.closeAndFinish(done, realtime, err); |
299 | | - }, |
300 | | - ); |
| 309 | + ); |
| 310 | + }); |
301 | 311 | }); |
302 | 312 | }); |
303 | 313 | }); |
|
0 commit comments