Skip to content

Commit f047804

Browse files
committed
Add Flowables.zipLatest() operator.
1 parent e4b2c5b commit f047804

File tree

8 files changed

+1421
-7
lines changed

8 files changed

+1421
-7
lines changed

README.md

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ RxJava 2.x implementation of extra sources, operators and components and ports o
1313

1414
```
1515
dependencies {
16-
compile "com.github.akarnokd:rxjava2-extensions:0.17.2"
16+
compile "com.github.akarnokd:rxjava2-extensions:0.17.3"
1717
}
1818
```
1919

@@ -45,7 +45,7 @@ Maven search:
4545
- [debounceFirst()](#flowabletransformersdebouncefirst), [switchFlatMap()](#flowabletransformersswitchflatmap), [flatMapSync()](#flowabletransformersflatmapsync),
4646
- [flatMapAsync()](#flowabletransformersflatmapasync), [switchIfEmpty()](#flowabletransformersswitchifempty--switchifemptyarray),
4747
- [expand()](#flowabletransformersexpand), [mapAsync()](#flowabletransformersmapasync), [filterAsync()](#flowabletransformerfilterasync),
48-
- [refCount()](#flowabletransformersrefcount)
48+
- [refCount()](#flowabletransformersrefcount), [zipLatest()](#flowablesziplatest)
4949
- [Custom parallel operators and transformers](#custom-parallel-operators-and-transformers)
5050
- [sumX()](#paralleltransformerssumx)
5151
- [Special Publisher implementations](#special-publisher-implementations)
@@ -1044,6 +1044,46 @@ Thread.sleep(1200);
10441044
assertFalse(pp.hasSubscribers());
10451045
```
10461046
1047+
### Flowables.zipLatest()
1048+
1049+
Zips the latest values from multiple sources and calls a combiner function for them.
1050+
If one of the sources is faster then the others, its unconsumed values will be overwritten by newer
1051+
values.
1052+
Unlike `combineLatest`, source items are participating in the combination at most once; i.e., the
1053+
operator emits only if all sources have produced an item.
1054+
The emission speed of this operator is determined by the slowest emitting source and the speed of the downstream consumer.
1055+
There are several overloads available: methods taking 2-4 sources and the respective combiner functions, a method taking a
1056+
varargs of sources and a method taking an `Iterable` of source `Publisher`s.
1057+
The operator supports combining and scheduling the emission of the result via a custom `Scheduler`, thus
1058+
allows avoiding the buffering effects of the `observeOn` operator.
1059+
The operator terminates if any of the sources runs out of items and terminates by itself.
1060+
The operator works with asynchronous sources the best; synchronous sources may get consumed fully
1061+
in order they appear among the parameters and possibly never emit more than one combined result even
1062+
if the last source has more than one item.
1063+
1064+
```java
1065+
TestScheduler scheduler = new TestScheduler();
1066+
1067+
TestSubscriber<String> ts = Flowables.zipLatest(toString,
1068+
Flowable.intervalRange(1, 6, 99, 100, TimeUnit.MILLISECONDS, scheduler),
1069+
Flowable.intervalRange(4, 3, 200, 200, TimeUnit.MILLISECONDS, scheduler)
1070+
)
1071+
.test();
1072+
1073+
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
1074+
1075+
ts.assertValue("[2, 4]");
1076+
1077+
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
1078+
1079+
ts.assertValues("[2, 4]", "[4, 5]");
1080+
1081+
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
1082+
1083+
ts.assertResult("[2, 4]", "[4, 5]", "[6, 6]");
1084+
```
1085+
1086+
10471087
10481088
## Custom parallel operators and transformers
10491089

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.17.2
1+
version=0.17.3

src/main/java/hu/akarnokd/rxjava2/basetypes/Nono.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1062,7 +1062,7 @@ public final Nono retry(long times) {
10621062
}
10631063

10641064
/**
1065-
* Retry a failed Nono if the predicate return true
1065+
* Retry a failed Nono if the predicate return true.
10661066
* @param predicate the predicate receiving the failure Throwable and
10671067
* returns true to trigger a retry.
10681068
* @return the new Nono instance

src/main/java/hu/akarnokd/rxjava2/operators/FlowableFilterAsync.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ static final class FilterAsyncSubscriber<T>
100100
volatile boolean cancelled;
101101

102102
Boolean innerResult;
103-
103+
104104
long emitted;
105105

106106
volatile int state;

0 commit comments

Comments
 (0)