Skip to content

Commit 0020c2b

Browse files
committed
Upgrade to RxJava 2.0.4, +bufferSplit, minor fixes
1 parent 0694236 commit 0020c2b

File tree

6 files changed

+176
-19
lines changed

6 files changed

+176
-19
lines changed

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,11 @@ dependencies {
4949
signature 'org.codehaus.mojo.signature:java16:1.1@signature'
5050

5151
compile "org.reactivestreams:reactive-streams:1.0.0"
52-
compile "io.reactivex.rxjava2:rxjava:2.0.3"
52+
compile "io.reactivex.rxjava2:rxjava:2.0.4"
5353

5454
testCompile group: 'junit', name: 'junit', version: '4.12'
5555

56-
testCompile 'org.mockito:mockito-core:2.1.0'
56+
testCompile 'org.mockito:mockito-core:2.5.0'
5757
}
5858

5959
task sourcesJar(type: Jar, dependsOn: classes) {

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.14.2
1+
version=0.14.3

src/main/java/hu/akarnokd/rxjava2/operators/FlowableBufferPredicate.java

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,19 +40,28 @@
4040
*/
4141
final class FlowableBufferPredicate<T, C extends Collection<? super T>> extends Flowable<C> implements FlowableTransformer<T, C> {
4242

43+
enum Mode {
44+
/** The item triggering the new buffer will be part of the new buffer. */
45+
BEFORE,
46+
/** The item triggering the new buffer will be part of the old buffer. */
47+
AFTER,
48+
/** The item won't be part of any buffers. */
49+
SPLIT
50+
}
51+
4352
final Publisher<T> source;
4453

4554
final Predicate<? super T> predicate;
4655

47-
final boolean cutAfter;
56+
final Mode mode;
4857

4958
final Callable<C> bufferSupplier;
5059

51-
FlowableBufferPredicate(Publisher<T> source, Predicate<? super T> predicate, boolean cutAfter,
60+
FlowableBufferPredicate(Publisher<T> source, Predicate<? super T> predicate, Mode mode,
5261
Callable<C> bufferSupplier) {
5362
this.source = source;
5463
this.predicate = predicate;
55-
this.cutAfter = cutAfter;
64+
this.mode = mode;
5665
this.bufferSupplier = bufferSupplier;
5766
}
5867

@@ -68,12 +77,12 @@ protected void subscribeActual(Subscriber<? super C> s) {
6877
return;
6978
}
7079

71-
source.subscribe(new BufferPredicateSubscriber<T, C>(s, buffer, predicate, cutAfter, bufferSupplier));
80+
source.subscribe(new BufferPredicateSubscriber<T, C>(s, buffer, predicate, mode, bufferSupplier));
7281
}
7382

7483
@Override
7584
public Publisher<C> apply(Flowable<T> upstream) {
76-
return new FlowableBufferPredicate<T, C>(upstream, predicate, cutAfter, bufferSupplier);
85+
return new FlowableBufferPredicate<T, C>(upstream, predicate, mode, bufferSupplier);
7786
}
7887

7988
static final class BufferPredicateSubscriber<T, C extends Collection<? super T>>
@@ -83,7 +92,7 @@ static final class BufferPredicateSubscriber<T, C extends Collection<? super T>>
8392

8493
final Predicate<? super T> predicate;
8594

86-
final boolean cutAfter;
95+
final Mode mode;
8796

8897
final Callable<C> bufferSupplier;
8998

@@ -95,11 +104,11 @@ static final class BufferPredicateSubscriber<T, C extends Collection<? super T>>
95104

96105
BufferPredicateSubscriber(Subscriber<? super C> actual,
97106
C buffer,
98-
Predicate<? super T> predicate, boolean cutAfter,
107+
Predicate<? super T> predicate, Mode mode,
99108
Callable<C> bufferSupplier) {
100109
this.actual = actual;
101110
this.predicate = predicate;
102-
this.cutAfter = cutAfter;
111+
this.mode = mode;
103112
this.buffer = buffer;
104113
this.bufferSupplier = bufferSupplier;
105114
}
@@ -136,7 +145,8 @@ public boolean tryOnNext(T t) {
136145
return true;
137146
}
138147

139-
if (cutAfter) {
148+
switch (mode) {
149+
case AFTER: {
140150
buf.add(t);
141151
if (b) {
142152
actual.onNext(buf);
@@ -155,7 +165,9 @@ public boolean tryOnNext(T t) {
155165
count++;
156166
return false;
157167
}
158-
} else {
168+
break;
169+
}
170+
case BEFORE: {
159171
if (b) {
160172
buf.add(t);
161173
count++;
@@ -175,6 +187,27 @@ public boolean tryOnNext(T t) {
175187
buffer = buf;
176188
count = 1;
177189
}
190+
break;
191+
}
192+
default:
193+
if (b) {
194+
actual.onNext(buf);
195+
196+
try {
197+
buffer = bufferSupplier.call();
198+
} catch (Throwable ex) {
199+
Exceptions.throwIfFatal(ex);
200+
s.cancel();
201+
onError(ex);
202+
return true;
203+
}
204+
205+
count = 0;
206+
} else {
207+
buf.add(t);
208+
count++;
209+
return false;
210+
}
178211
}
179212
}
180213
return true;

src/main/java/hu/akarnokd/rxjava2/operators/FlowableTransformers.java

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,14 +148,14 @@ public static <T> FlowableTransformer<T, List<T>> bufferWhile(Predicate<? super
148148
@SchedulerSupport(SchedulerSupport.NONE)
149149
@BackpressureSupport(BackpressureKind.FULL)
150150
public static <T, C extends Collection<? super T>> FlowableTransformer<T, C> bufferWhile(Predicate<? super T> predicate, Callable<C> bufferSupplier) {
151-
return new FlowableBufferPredicate<T, C>(null, predicate, false, bufferSupplier);
151+
return new FlowableBufferPredicate<T, C>(null, predicate, FlowableBufferPredicate.Mode.BEFORE, bufferSupplier);
152152
}
153153

154154
/**
155155
* Buffers elements into a List until the given predicate returns true at which
156156
* point a new empty buffer is started.
157157
* @param <T> the source value type
158-
* @param predicate the predicate receiving the current itam and if returns true,
158+
* @param predicate the predicate receiving the current item and if returns true,
159159
* the current buffer is emitted and a fresh empty buffer is created
160160
* @return the new FlowableTransformer instance
161161
*
@@ -173,7 +173,7 @@ public static <T> FlowableTransformer<T, List<T>> bufferUntil(Predicate<? super
173173
* point a new empty custom collection is started.
174174
* @param <T> the source value type
175175
* @param <C> the collection type
176-
* @param predicate the predicate receiving the current itam and if returns true,
176+
* @param predicate the predicate receiving the current item and if returns true,
177177
* the current collection is emitted and a fresh empty collection is created
178178
* @param bufferSupplier the callable that returns a fresh collection
179179
* @return the new Flowable instance
@@ -183,7 +183,42 @@ public static <T> FlowableTransformer<T, List<T>> bufferUntil(Predicate<? super
183183
@SchedulerSupport(SchedulerSupport.NONE)
184184
@BackpressureSupport(BackpressureKind.FULL)
185185
public static <T, C extends Collection<? super T>> FlowableTransformer<T, C> bufferUntil(Predicate<? super T> predicate, Callable<C> bufferSupplier) {
186-
return new FlowableBufferPredicate<T, C>(null, predicate, true, bufferSupplier);
186+
return new FlowableBufferPredicate<T, C>(null, predicate, FlowableBufferPredicate.Mode.AFTER, bufferSupplier);
187+
}
188+
189+
/**
190+
* Buffers elements into a List until the given predicate returns true at which
191+
* point a new empty buffer is started; the particular item will be dropped.
192+
* @param <T> the source value type
193+
* @param predicate the predicate receiving the current item and if returns true,
194+
* the current buffer is emitted and a fresh empty buffer is created
195+
* @return the new FlowableTransformer instance
196+
*
197+
* @since 0.14.3
198+
*/
199+
@SchedulerSupport(SchedulerSupport.NONE)
200+
@BackpressureSupport(BackpressureKind.FULL)
201+
public static <T> FlowableTransformer<T, List<T>> bufferSplit(Predicate<? super T> predicate) {
202+
return bufferSplit(predicate, Functions.<T>createArrayList(16));
203+
}
204+
205+
206+
/**
207+
* Buffers elements into a custom collection until the given predicate returns true at which
208+
* point a new empty custom collection is started; the particular item will be dropped.
209+
* @param <T> the source value type
210+
* @param <C> the collection type
211+
* @param predicate the predicate receiving the current item and if returns true,
212+
* the current collection is emitted and a fresh empty collection is created
213+
* @param bufferSupplier the callable that returns a fresh collection
214+
* @return the new Flowable instance
215+
*
216+
* @since 0.14.3
217+
*/
218+
@SchedulerSupport(SchedulerSupport.NONE)
219+
@BackpressureSupport(BackpressureKind.FULL)
220+
public static <T, C extends Collection<? super T>> FlowableTransformer<T, C> bufferSplit(Predicate<? super T> predicate, Callable<C> bufferSupplier) {
221+
return new FlowableBufferPredicate<T, C>(null, predicate, FlowableBufferPredicate.Mode.SPLIT, bufferSupplier);
187222
}
188223

189224
/**

src/main/java/hu/akarnokd/rxjava2/operators/Flowables.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ public static <T extends Comparable<? super T>> Flowable<T> orderedMerge(Iterabl
271271
@SchedulerSupport(SchedulerSupport.NONE)
272272
public static <T> Flowable<T> repeat(T item) {
273273
ObjectHelper.requireNonNull(item, "item is null");
274-
return new FlowableRepeatScalar<T>(item);
274+
return RxJavaPlugins.onAssembly(new FlowableRepeatScalar<T>(item));
275275
}
276276

277277
/**
@@ -286,6 +286,6 @@ public static <T> Flowable<T> repeat(T item) {
286286
@SchedulerSupport(SchedulerSupport.NONE)
287287
public static <T> Flowable<T> repeatCallable(Callable<T> callable) {
288288
ObjectHelper.requireNonNull(callable, "callable is null");
289-
return new FlowableRepeatCallable<T>(callable);
289+
return RxJavaPlugins.onAssembly(new FlowableRepeatCallable<T>(callable));
290290
}
291291
}

src/test/java/hu/akarnokd/rxjava2/operators/FlowableBufferPredicateTest.java

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,29 @@ public List<Integer> call() throws Exception {
332332
.assertFailure(IllegalArgumentException.class, Arrays.asList(1, 2));
333333
}
334334

335+
@SuppressWarnings("unchecked")
336+
@Test
337+
public void bufferSupplierCrash3() {
338+
Flowable.just(1, 2, -1, 3, 4, 5, -1, -1, 6)
339+
.compose(FlowableTransformers.bufferSplit(new Predicate<Integer>() {
340+
@Override
341+
public boolean test(Integer v) throws Exception {
342+
return v == -1;
343+
}
344+
}, new Callable<List<Integer>>() {
345+
int c;
346+
@Override
347+
public List<Integer> call() throws Exception {
348+
if (c++ == 1) {
349+
throw new IllegalArgumentException();
350+
}
351+
return new ArrayList<Integer>();
352+
}
353+
}))
354+
.test()
355+
.assertFailure(IllegalArgumentException.class, Arrays.asList(1, 2));
356+
}
357+
335358
@SuppressWarnings("unchecked")
336359
@Test
337360
public void doubleError() {
@@ -359,4 +382,70 @@ public boolean test(Integer v) throws Exception {
359382
RxJavaPlugins.reset();
360383
}
361384
}
385+
386+
@SuppressWarnings("unchecked")
387+
@Test
388+
public void splitNormal() {
389+
Flowable.just(1, 2, -1, 3, 4, 5, -1, -1, 6)
390+
.compose(FlowableTransformers.bufferSplit(new Predicate<Integer>() {
391+
@Override
392+
public boolean test(Integer v) throws Exception {
393+
return v == -1;
394+
}
395+
}))
396+
.test()
397+
.assertResult(
398+
Arrays.asList(1, 2),
399+
Arrays.asList(3, 4, 5),
400+
Arrays.<Integer>asList(),
401+
Arrays.asList(6)
402+
);
403+
}
404+
405+
@SuppressWarnings("unchecked")
406+
@Test
407+
public void splitNormalHidden() {
408+
Flowable.just(1, 2, -1, 3, 4, 5, -1, -1, 6).hide()
409+
.compose(FlowableTransformers.bufferSplit(new Predicate<Integer>() {
410+
@Override
411+
public boolean test(Integer v) throws Exception {
412+
return v == -1;
413+
}
414+
}))
415+
.test()
416+
.assertResult(
417+
Arrays.asList(1, 2),
418+
Arrays.asList(3, 4, 5),
419+
Arrays.<Integer>asList(),
420+
Arrays.asList(6)
421+
);
422+
}
423+
424+
@SuppressWarnings("unchecked")
425+
@Test
426+
public void splitNormalBackpressured() {
427+
Flowable.just(1, 2, -1, 3, 4, 5, -1, -1, 6)
428+
.compose(FlowableTransformers.bufferSplit(new Predicate<Integer>() {
429+
@Override
430+
public boolean test(Integer v) throws Exception {
431+
return v == -1;
432+
}
433+
}))
434+
.test(0)
435+
.assertNoValues()
436+
.requestMore(1)
437+
.assertValue(Arrays.asList(1, 2))
438+
.requestMore(2)
439+
.assertValues(Arrays.asList(1, 2),
440+
Arrays.asList(3, 4, 5),
441+
Arrays.<Integer>asList())
442+
.requestMore(1)
443+
.assertResult(
444+
Arrays.asList(1, 2),
445+
Arrays.asList(3, 4, 5),
446+
Arrays.<Integer>asList(),
447+
Arrays.asList(6)
448+
);
449+
}
450+
362451
}

0 commit comments

Comments
 (0)