@@ -105,6 +105,227 @@ describe( 'Crossed Wires', () => {
105105 notes . map ( note => note . data . content ) ,
106106 )
107107 } )
108+
109+ it ( 'ignores ccid after receiving a 409 for it' , async ( ) => {
110+ /**
111+ * Scenario:
112+ *
113+ * Client 1 sends a change (ccid x) "AC" => "ACD" : "=2\t+D"
114+ * Client 2 sends a change (ccid y) "AC" => "ABC" : "=1\t+B\t=1"
115+ *
116+ * Server accepts ccid x as is and broadcasts back to clients
117+ *
118+ * c:{ccids:[y],v:"=1\t+B\t=1"}
119+ *
120+ * Server accepts ccid y, server sees that the change needs to be modified because of x:
121+ *
122+ * c:{ccids:[x],v:"=3\t+D"}
123+ *
124+ * Client 1 and Client 2 should now have Ghosts that match.
125+ */
126+
127+ // Two clients that need indexes downloaded
128+ const bucketX = createBucket ( ) ;
129+ bucketX . id = 'x' ;
130+ const bucketY = createBucket ( ) ;
131+ bucketY . id = 'y' ;
132+ const clients = [ bucketX , bucketY ] ;
133+
134+ const responses = await Promise . all ( [
135+ waitForClient ( bucketX , ( ) => bucketX . channel . handleMessage ( 'auth:user' ) ) ,
136+ waitForClient ( bucketY , ( ) => bucketY . channel . handleMessage ( 'auth:user' ) ) ,
137+ ] ) ;
138+
139+ deepEqual (
140+ Array ( 2 ) . fill ( 'i:1:::10' ) ,
141+ responses
142+ ) ;
143+
144+ const cvs = await Promise . all ( clients . map ( client => {
145+ const indexed = new Promise ( resolve => {
146+ client . once ( 'index' , resolve ) ;
147+ } ) ;
148+ client . channel . handleMessage ( 'i:' + JSON . stringify ( {
149+ index : [ {
150+ id : 'note-id' ,
151+ v : 1 ,
152+ d : { content : 'AC' }
153+ } ] ,
154+ current : 'cv-1' ,
155+ } ) ) ;
156+ return indexed ;
157+ } ) ) ;
158+
159+ deepEqual ( Array ( 2 ) . fill ( 'cv-1' ) , cvs ) ;
160+
161+ deepEqual (
162+ Array ( 2 ) . fill ( { data : { content : 'AC' } , id : 'note-id' } ) ,
163+ await Promise . all ( clients . map ( client => client . get ( 'note-id' ) ) ) ,
164+ ) ;
165+
166+ const [ changeY , changeX ] = ( await Promise . all ( [
167+ waitForClient ( bucketY , ( ) => bucketY . update ( 'note-id' , { content : 'ABC' } ) ) ,
168+ waitForClient ( bucketX , ( ) => bucketX . update ( 'note-id' , { content : 'ACD' } ) ) ,
169+ ] ) ) . map ( msg => JSON . parse ( parseMessage ( msg ) . data ) ) ;
170+
171+ equal ( '=1\t+B\t=1' , changeY . v . content . v ) ;
172+ equal ( '=2\t+D' , changeX . v . content . v ) ;
173+
174+ /**
175+ * At this point, both clients have sent a change and are waiting for the
176+ * server to respond. Their `localQueue`s should have a `.sent['note-id']`.
177+ *
178+ * If a client were to update `note-id` at this moment, since it is waiting
179+ * for the sent change to be acknowledged by the server it will indicate
180+ * that with a `localQueue.queues['note-id']`.
181+ */
182+ const [ serverChange1 ] = [
183+ [ { cv : 'cv-2' , ccids : [ changeY . ccid ] , sv : 1 , ev : 2 , id : 'note-id' , o : 'M' , v : { content : {
184+ o : 'd' , v : '=1\t+B\t=1'
185+ } } } ] ,
186+ // This ccid/change is modified by the server, see: '=3\t+D' vs '=2\t+D'
187+ [ { cv : 'cv-3' , ccids : [ changeX . ccid ] , sv : 1 , ev : 2 , id : 'note-id' , o : 'M' , v : { content : {
188+ o : 'd' , v : '=3\t+D'
189+ } } } ] ,
190+ ] ;
191+
192+ const notes = await Promise . all ( [
193+ new Promise ( ( resolve , reject ) => {
194+ bucketY . channel . on ( 'acknowledge' , ( ) => {
195+ setTimeout ( ( ) => resolve ( bucketY . get ( 'note-id' ) ) , 10 ) ;
196+ } ) ;
197+
198+ bucketY . channel . on ( 'send' , ( data ) => {
199+ reject ( new Error ( 'should not send more things' ) ) ;
200+ } ) ;
201+
202+ bucketY . channel . handleMessage ( 'c:' + JSON . stringify ( [ {
203+ id : 'note-id' ,
204+ error : 409 ,
205+ ccids : serverChange1 [ 0 ] . ccids ,
206+ } ] ) ) ;
207+
208+ bucketY . channel . handleMessage ( 'c:' + JSON . stringify ( serverChange1 ) ) ;
209+ } ) ,
210+ new Promise ( resolve => {
211+ bucketX . once ( 'update' , ( ) => resolve ( bucketX . get ( 'note-id' ) ) ) ;
212+ bucketX . channel . handleMessage ( 'c:' + JSON . stringify ( serverChange1 ) ) ;
213+ } )
214+ ] ) ;
215+
216+ deepEqual (
217+ [ 'ABC' , 'ABCD' ] ,
218+ notes . map ( note => note . data . content ) ,
219+ )
220+ } )
221+
222+ it ( 'ignores inbound changes after they have already been applied' , async ( ) => {
223+ /**
224+ * Scenario:
225+ *
226+ * Client 1 sends a change (ccid x) "AC" => "ACD" : "=2\t+D"
227+ * Client 2 sends a change (ccid y) "AC" => "ABC" : "=1\t+B\t=1"
228+ *
229+ * Server accepts ccid x as is and broadcasts back to clients
230+ *
231+ * c:{ccids:[y],v:"=1\t+B\t=1"}
232+ *
233+ * Server accepts ccid y, server sees that the change needs to be modified because of x:
234+ *
235+ * c:{ccids:[x],v:"=3\t+D"}
236+ *
237+ * Client 1 and Client 2 should now have Ghosts that match.
238+ */
239+
240+ // Two clients that need indexes downloaded
241+ const bucketX = createBucket ( ) ;
242+ bucketX . id = 'x' ;
243+ const bucketY = createBucket ( ) ;
244+ bucketY . id = 'y' ;
245+ const clients = [ bucketX , bucketY ] ;
246+
247+ const responses = await Promise . all ( [
248+ waitForClient ( bucketX , ( ) => bucketX . channel . handleMessage ( 'auth:user' ) ) ,
249+ waitForClient ( bucketY , ( ) => bucketY . channel . handleMessage ( 'auth:user' ) ) ,
250+ ] ) ;
251+
252+ deepEqual (
253+ Array ( 2 ) . fill ( 'i:1:::10' ) ,
254+ responses
255+ ) ;
256+
257+ const cvs = await Promise . all ( clients . map ( client => {
258+ const indexed = new Promise ( resolve => {
259+ client . once ( 'index' , resolve ) ;
260+ } ) ;
261+ client . channel . handleMessage ( 'i:' + JSON . stringify ( {
262+ index : [ {
263+ id : 'note-id' ,
264+ v : 1 ,
265+ d : { content : 'AC' }
266+ } ] ,
267+ current : 'cv-1' ,
268+ } ) ) ;
269+ return indexed ;
270+ } ) ) ;
271+
272+ deepEqual ( Array ( 2 ) . fill ( 'cv-1' ) , cvs ) ;
273+
274+ deepEqual (
275+ Array ( 2 ) . fill ( { data : { content : 'AC' } , id : 'note-id' } ) ,
276+ await Promise . all ( clients . map ( client => client . get ( 'note-id' ) ) ) ,
277+ ) ;
278+
279+ const [ changeY , changeX ] = ( await Promise . all ( [
280+ waitForClient ( bucketY , ( ) => bucketY . update ( 'note-id' , { content : 'ABC' } ) ) ,
281+ waitForClient ( bucketX , ( ) => bucketX . update ( 'note-id' , { content : 'ACD' } ) ) ,
282+ ] ) ) . map ( msg => JSON . parse ( parseMessage ( msg ) . data ) ) ;
283+
284+ equal ( '=1\t+B\t=1' , changeY . v . content . v ) ;
285+ equal ( '=2\t+D' , changeX . v . content . v ) ;
286+
287+ /**
288+ * At this point, both clients have sent a change and are waiting for the
289+ * server to respond. Their `localQueue`s should have a `.sent['note-id']`.
290+ *
291+ * If a client were to update `note-id` at this moment, since it is waiting
292+ * for the sent change to be acknowledged by the server it will indicate
293+ * that with a `localQueue.queues['note-id']`.
294+ */
295+ const [ serverChange1 ] = [
296+ [ { cv : 'cv-2' , ccids : [ changeY . ccid ] , sv : 1 , ev : 2 , id : 'note-id' , o : 'M' , v : { content : {
297+ o : 'd' , v : '=1\t+B\t=1'
298+ } } } ] ,
299+ // This ccid/change is modified by the server, see: '=3\t+D' vs '=2\t+D'
300+ [ { cv : 'cv-3' , ccids : [ changeX . ccid ] , sv : 1 , ev : 2 , id : 'note-id' , o : 'M' , v : { content : {
301+ o : 'd' , v : '=3\t+D'
302+ } } } ] ,
303+ ] ;
304+
305+ const notes = await Promise . all ( [
306+ new Promise ( ( resolve , reject ) => {
307+ bucketY . channel . on ( 'acknowledge' , ( ) => {
308+ setTimeout ( ( ) => resolve ( bucketY . get ( 'note-id' ) ) , 10 ) ;
309+ } ) ;
310+
311+ bucketY . channel . on ( 'send' , ( data ) => {
312+ reject ( new Error ( 'should not send more things' ) ) ;
313+ } ) ;
314+
315+ bucketY . channel . handleMessage ( 'c:' + JSON . stringify ( serverChange1 ) ) ;
316+ bucketY . channel . handleMessage ( 'c:' + JSON . stringify ( serverChange1 ) ) ;
317+ } ) ,
318+ new Promise ( resolve => {
319+ bucketX . once ( 'update' , ( ) => resolve ( bucketX . get ( 'note-id' ) ) ) ;
320+ bucketX . channel . handleMessage ( 'c:' + JSON . stringify ( serverChange1 ) ) ;
321+ } )
322+ ] ) ;
323+
324+ deepEqual (
325+ [ 'ABC' , 'ABCD' ] ,
326+ notes . map ( note => note . data . content ) ,
327+ )
328+ } )
108329} ) ;
109330
110331function waitForClient ( client , action ) {
0 commit comments