Skip to content

Commit 65e242b

Browse files
committed
Add FlowableTransformers.coalesce
1 parent f047804 commit 65e242b

File tree

4 files changed

+523
-1
lines changed

4 files changed

+523
-1
lines changed

README.md

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ Maven search:
4545
- [debounceFirst()](#flowabletransformersdebouncefirst), [switchFlatMap()](#flowabletransformersswitchflatmap), [flatMapSync()](#flowabletransformersflatmapsync),
4646
- [flatMapAsync()](#flowabletransformersflatmapasync), [switchIfEmpty()](#flowabletransformersswitchifempty--switchifemptyarray),
4747
- [expand()](#flowabletransformersexpand), [mapAsync()](#flowabletransformersmapasync), [filterAsync()](#flowabletransformerfilterasync),
48-
- [refCount()](#flowabletransformersrefcount), [zipLatest()](#flowablesziplatest)
48+
- [refCount()](#flowabletransformersrefcount), [zipLatest()](#flowablesziplatest), [coalesce()](#flowabletransformerscoalesce)
4949
- [Custom parallel operators and transformers](#custom-parallel-operators-and-transformers)
5050
- [sumX()](#paralleltransformerssumx)
5151
- [Special Publisher implementations](#special-publisher-implementations)
@@ -1083,7 +1083,22 @@ scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
10831083
ts.assertResult("[2, 4]", "[4, 5]", "[6, 6]");
10841084
```
10851085
1086+
### FlowableTransformers.coalesce()
10861087
1088+
Coalesces items from upstream into a container via a consumer and emits the container if
1089+
there is a downstream demand, otherwise it keeps coalescing into the same container. Note
1090+
that the operator keeps an internal unbounded buffer to collect up upstream values before
1091+
the coalescing happens and thus a computational heavy upstream hogging the emission thread
1092+
may lead to excessive memory usage. It is recommended to use `observeOn` in this case.
1093+
1094+
```java
1095+
Flowable.range(1, 5)
1096+
.compose(FlowableTransformers.coalesce(ArrayList::new, (a, b) -> a.add(b))
1097+
.test(1)
1098+
.assertValue(Arrays.asList(1))
1099+
.requestMore(1)
1100+
.assertResult(Arrays.asList(1), Arrays.asList(2, 3, 4, 5));
1101+
```
10871102
10881103
## Custom parallel operators and transformers
10891104
Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
/*
2+
* Copyright 2016-2017 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package hu.akarnokd.rxjava2.operators;
18+
19+
import java.util.concurrent.Callable;
20+
import java.util.concurrent.atomic.*;
21+
22+
import org.reactivestreams.*;
23+
24+
import io.reactivex.*;
25+
import io.reactivex.exceptions.Exceptions;
26+
import io.reactivex.functions.BiConsumer;
27+
import io.reactivex.internal.fuseable.SimplePlainQueue;
28+
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
29+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
30+
import io.reactivex.internal.util.BackpressureHelper;
31+
32+
/**
33+
* Coalesces items into a container if the downstream is not ready to receive items.
34+
*
35+
* @param <T> the upstream element type
36+
* @param <R> the container type emitted to downstream
37+
*
38+
* @since 0.17.3
39+
*/
40+
final class FlowableCoalesce<T, R> extends Flowable<R> implements FlowableTransformer<T, R> {
41+
42+
final Publisher<T> source;
43+
44+
final Callable<R> containerSupplier;
45+
46+
final BiConsumer<R, T> coalescer;
47+
48+
final int bufferSize;
49+
50+
FlowableCoalesce(Publisher<T> source, Callable<R> containerSupplier, BiConsumer<R, T> coalescer, int bufferSize) {
51+
this.source = source;
52+
this.containerSupplier = containerSupplier;
53+
this.coalescer = coalescer;
54+
this.bufferSize = bufferSize;
55+
}
56+
57+
@Override
58+
public Publisher<R> apply(Flowable<T> upstream) {
59+
return new FlowableCoalesce<T, R>(upstream, containerSupplier, coalescer, bufferSize);
60+
}
61+
62+
@Override
63+
protected void subscribeActual(Subscriber<? super R> s) {
64+
source.subscribe(new CoalesceSubscriber<T, R>(s, containerSupplier, coalescer, bufferSize));
65+
}
66+
67+
static final class CoalesceSubscriber<T, R> extends AtomicInteger
68+
implements FlowableSubscriber<T>, Subscription {
69+
70+
private static final long serialVersionUID = -6157179110480235565L;
71+
72+
final Subscriber<? super R> actual;
73+
74+
final Callable<R> containerSupplier;
75+
76+
final BiConsumer<R, T> coalescer;
77+
78+
final AtomicLong requested;
79+
80+
final int bufferSize;
81+
82+
volatile SimplePlainQueue<T> queue;
83+
84+
Subscription upstream;
85+
86+
R container;
87+
88+
volatile boolean done;
89+
90+
volatile boolean cancelled;
91+
92+
Throwable error;
93+
94+
long emitted;
95+
96+
CoalesceSubscriber(Subscriber<? super R> actual, Callable<R> containerSupplier,
97+
BiConsumer<R, T> coalescer, int bufferSize) {
98+
this.actual = actual;
99+
this.containerSupplier = containerSupplier;
100+
this.coalescer = coalescer;
101+
this.requested = new AtomicLong();
102+
this.bufferSize = bufferSize;
103+
}
104+
105+
@Override
106+
public void onSubscribe(Subscription s) {
107+
if (SubscriptionHelper.validate(upstream, s)) {
108+
upstream = s;
109+
actual.onSubscribe(this);
110+
111+
s.request(Long.MAX_VALUE);
112+
}
113+
}
114+
115+
@Override
116+
public void onNext(T t) {
117+
if (get() == 0 && compareAndSet(0, 1)) {
118+
SimplePlainQueue<T> q = queue;
119+
if (q == null || q.isEmpty()) {
120+
R c = container;
121+
try {
122+
if (c == null) {
123+
c = containerSupplier.call();
124+
container = c;
125+
}
126+
coalescer.accept(c, t);
127+
} catch (Throwable ex) {
128+
Exceptions.throwIfFatal(ex);
129+
upstream.cancel();
130+
container = null;
131+
actual.onError(ex);
132+
return;
133+
}
134+
long r = requested.get();
135+
long e = emitted;
136+
if (e != r) {
137+
container = null;
138+
actual.onNext(c);
139+
emitted = e + 1;
140+
}
141+
if (decrementAndGet() == 0) {
142+
return;
143+
}
144+
}
145+
} else {
146+
SimplePlainQueue<T> q = queue;
147+
if (q == null) {
148+
q = new SpscLinkedArrayQueue<T>(bufferSize);
149+
queue = q;
150+
}
151+
q.offer(t);
152+
if (getAndIncrement() != 0) {
153+
return;
154+
}
155+
}
156+
drainLoop();
157+
}
158+
159+
@Override
160+
public void onError(Throwable t) {
161+
error = t;
162+
done = true;
163+
drain();
164+
}
165+
166+
@Override
167+
public void onComplete() {
168+
done = true;
169+
drain();
170+
}
171+
172+
@Override
173+
public void request(long n) {
174+
if (SubscriptionHelper.validate(n)) {
175+
BackpressureHelper.add(requested, n);
176+
drain();
177+
}
178+
}
179+
180+
@Override
181+
public void cancel() {
182+
cancelled = true;
183+
upstream.cancel();
184+
if (getAndIncrement() == 0) {
185+
container = null;
186+
}
187+
}
188+
189+
void drain() {
190+
if (getAndIncrement() != 0) {
191+
return;
192+
}
193+
194+
drainLoop();
195+
}
196+
197+
void drainLoop() {
198+
int missed = 1;
199+
long e = emitted;
200+
R c = container;
201+
Subscriber<? super R> a = actual;
202+
203+
for (;;) {
204+
if (cancelled) {
205+
container = null;
206+
return;
207+
}
208+
boolean d = done;
209+
SimplePlainQueue<T> q = queue;
210+
boolean empty = q == null || q.isEmpty();
211+
212+
if (!empty) {
213+
try {
214+
if (c == null) {
215+
c = containerSupplier.call();
216+
container = c;
217+
}
218+
219+
for (;;) {
220+
T v = q.poll();
221+
if (v == null) {
222+
break;
223+
}
224+
coalescer.accept(c, v);
225+
}
226+
} catch (Throwable ex) {
227+
Exceptions.throwIfFatal(ex);
228+
container = null;
229+
a.onError(ex);
230+
return;
231+
}
232+
}
233+
234+
if (c != null && e != requested.get()) {
235+
a.onNext(c);
236+
c = null;
237+
container = null;
238+
e++;
239+
}
240+
241+
if (d && c == null) {
242+
Throwable ex = error;
243+
container = null;
244+
if (ex != null) {
245+
a.onError(ex);
246+
} else {
247+
a.onComplete();
248+
}
249+
return;
250+
}
251+
252+
emitted = e;
253+
missed = addAndGet(-missed);
254+
if (missed == 0) {
255+
break;
256+
}
257+
}
258+
}
259+
}
260+
}

src/main/java/hu/akarnokd/rxjava2/operators/FlowableTransformers.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1123,4 +1123,36 @@ public static <T> FlowableTransformer<T, T> refCount(int subscriberCount, long t
11231123
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
11241124
return new FlowableRefCountTimeout<T>(null, subscriberCount, timeout, unit, scheduler);
11251125
}
1126+
1127+
/**
1128+
* Coalesces items from upstream into a container via a consumer and emits the container if
1129+
* there is a downstream demand, otherwise it keeps coalescing into the same container.
1130+
* @param <T> the upstream value type
1131+
* @param <R> the container and result type
1132+
* @param containerSupplier the function called and should return a fresh container to coalesce into
1133+
* @param coalescer the consumer receiving the current container and upstream item to handle
1134+
* @return the new FlowableTransformer instance
1135+
* @since 0.17.3
1136+
*/
1137+
public static <T, R> FlowableTransformer<T, R> coalesce(Callable<R> containerSupplier, BiConsumer<R, T> coalescer) {
1138+
return coalesce(containerSupplier, coalescer, Flowable.bufferSize());
1139+
}
1140+
1141+
/**
1142+
* Coalesces items from upstream into a container via a consumer and emits the container if
1143+
* there is a downstream demand, otherwise it keeps coalescing into the same container.
1144+
* @param <T> the upstream value type
1145+
* @param <R> the container and result type
1146+
* @param containerSupplier the function called and should return a fresh container to coalesce into
1147+
* @param coalescer the consumer receiving the current container and upstream item to handle
1148+
* @param bufferSize the island size of the internal unbounded buffer
1149+
* @return the new FlowableTransformer instance
1150+
* @since 0.17.3
1151+
*/
1152+
public static <T, R> FlowableTransformer<T, R> coalesce(Callable<R> containerSupplier, BiConsumer<R, T> coalescer, int bufferSize) {
1153+
ObjectHelper.requireNonNull(containerSupplier, "containerSupplier is null");
1154+
ObjectHelper.requireNonNull(coalescer, "coalescer is null");
1155+
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
1156+
return new FlowableCoalesce<T, R>(null, containerSupplier, coalescer, bufferSize);
1157+
}
11261158
}

0 commit comments

Comments
 (0)