Skip to content

Commit a33bb93

Browse files
committed
Fix MulticastProcessor lockstep request accounting
1 parent 3591489 commit a33bb93

File tree

2 files changed

+50
-15
lines changed

2 files changed

+50
-15
lines changed

src/main/java/hu/akarnokd/rxjava2/processors/MulticastProcessor.java

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,6 @@ public final class MulticastProcessor<T> extends FlowableProcessor<T> {
5757

5858
int consumed;
5959

60-
long emitted;
61-
6260
int fusionMode;
6361

6462
@SuppressWarnings("rawtypes")
@@ -360,27 +358,30 @@ void drain() {
360358
AtomicReference<MulticastSubscription<T>[]> subs = subscribers;
361359
int c = consumed;
362360
int lim = limit;
363-
long e = emitted;
364361
SimpleQueue<T> q = queue;
365362
int fm = fusionMode;
366363

367364
outer:
368365
for (;;) {
369366

370367
MulticastSubscription<T>[] as = subs.get();
368+
int n = as.length;
371369

372-
long r = Long.MAX_VALUE;
370+
if (n != 0) {
371+
long r = -1L;
373372

374-
for (MulticastSubscription<T> a : as) {
375-
long ra = a.get();
376-
if (ra >= 0L) {
377-
r = Math.min(r, ra);
373+
for (MulticastSubscription<T> a : as) {
374+
long ra = a.get();
375+
if (ra >= 0L) {
376+
if (r == -1L) {
377+
r = ra - a.emitted;
378+
} else {
379+
r = Math.min(r, ra - a.emitted);
380+
}
381+
}
378382
}
379-
}
380-
381-
if (as.length != 0) {
382383

383-
while (e != r) {
384+
while (r > 0L) {
384385
MulticastSubscription<T>[] bs = subs.get();
385386

386387
if (bs == TERMINATED) {
@@ -430,7 +431,7 @@ void drain() {
430431
inner.onNext(v);
431432
}
432433

433-
e++;
434+
r--;
434435

435436
if (fm != QueueSubscription.SYNC) {
436437
if (++c == lim) {
@@ -440,7 +441,7 @@ void drain() {
440441
}
441442
}
442443

443-
if (e == r) {
444+
if (r == 0) {
444445
MulticastSubscription<T>[] bs = subs.get();
445446

446447
if (bs == TERMINATED) {
@@ -471,7 +472,6 @@ void drain() {
471472
int w = wip.get();
472473
if (w == missed) {
473474
consumed = c;
474-
emitted = e;
475475
missed = wip.addAndGet(-missed);
476476
if (missed == 0) {
477477
break;
@@ -490,6 +490,8 @@ static final class MulticastSubscription<T> extends AtomicLong implements Subscr
490490

491491
final MulticastProcessor<T> parent;
492492

493+
long emitted;
494+
493495
MulticastSubscription(Subscriber<? super T> actual, MulticastProcessor<T> parent) {
494496
this.actual = actual;
495497
this.parent = parent;
@@ -524,6 +526,7 @@ public void cancel() {
524526

525527
void onNext(T t) {
526528
if (get() != Long.MIN_VALUE) {
529+
emitted++;
527530
actual.onNext(t);
528531
}
529532
}

src/test/java/hu/akarnokd/rxjava2/processors/MulticastProcessorTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,4 +495,36 @@ public Integer apply(Integer v) throws Exception {
495495

496496
mp.test().assertFailure(IOException.class);
497497
}
498+
499+
@Test
500+
public void lockstep() {
501+
MulticastProcessor<Integer> mp = MulticastProcessor.create();
502+
503+
TestSubscriber<Integer> ts1 = mp.test();
504+
mp.start();
505+
506+
mp.onNext(1);
507+
mp.onNext(2);
508+
509+
ts1.assertValues(1, 2);
510+
511+
TestSubscriber<Integer> ts2 = mp.test(0);
512+
513+
ts2.assertEmpty();
514+
515+
mp.onNext(3);
516+
517+
ts1.assertValues(1, 2);
518+
ts2.assertEmpty();
519+
520+
mp.onComplete();
521+
522+
ts1.assertValues(1, 2);
523+
ts2.assertEmpty();
524+
525+
ts2.request(1);
526+
527+
ts1.assertResult(1, 2, 3);
528+
ts2.assertResult(3);
529+
}
498530
}

0 commit comments

Comments
 (0)