Skip to content

Commit 452b2c7

Browse files
committed
+ groupBy operator
1 parent 94de390 commit 452b2c7

File tree

4 files changed

+531
-0
lines changed

4 files changed

+531
-0
lines changed

src/main/java/hu/akarnokd/asyncenum/AsyncEnumerable.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,14 @@ default <S> AsyncEnumerable<T> retryWhen(Supplier<S> stateSupplier, BiFunction<?
298298
return new AsyncRetryWhen<T, S>(this, stateSupplier, completer);
299299
}
300300

301+
default <K> AsyncEnumerable<GroupedAsyncEnumerable<T, K>> groupBy(Function<? super T, ? extends K> keySelector) {
302+
return groupBy(keySelector, v -> v);
303+
}
304+
305+
default <K, V> AsyncEnumerable<GroupedAsyncEnumerable<V, K>> groupBy(Function<? super T, ? extends K> keySelector, Function<? super T, ? extends V> valueSelector) {
306+
return new AsyncGroupBy<>(this, keySelector, valueSelector);
307+
}
308+
301309
// -------------------------------------------------------------------------------------
302310
// Instance consumers
303311

Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
/*
2+
* Copyright 2017 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package hu.akarnokd.asyncenum;
18+
19+
import java.util.concurrent.*;
20+
import java.util.concurrent.atomic.*;
21+
import java.util.function.*;
22+
23+
final class AsyncGroupBy<T, K, V> implements AsyncEnumerable<GroupedAsyncEnumerable<V, K>> {
24+
25+
final AsyncEnumerable<T> source;
26+
27+
final Function<? super T, ? extends K> keySelector;
28+
29+
final Function<? super T, ? extends V> valueSelector;
30+
31+
AsyncGroupBy(AsyncEnumerable<T> source, Function<? super T, ? extends K> keySelector, Function<? super T, ? extends V> valueSelector) {
32+
this.source = source;
33+
this.keySelector = keySelector;
34+
this.valueSelector = valueSelector;
35+
}
36+
37+
@Override
38+
public AsyncEnumerator<GroupedAsyncEnumerable<V, K>> enumerator() {
39+
return new GroupByEnumerator<>(source.enumerator(), keySelector, valueSelector);
40+
}
41+
42+
static final class GroupByEnumerator<T, K, V>
43+
implements AsyncEnumerator<GroupedAsyncEnumerable<V, K>>,
44+
BiConsumer<Boolean, Throwable> {
45+
46+
final AsyncEnumerator<T> source;
47+
48+
final Function<? super T, ? extends K> keySelector;
49+
50+
final Function<? super T, ? extends V> valueSelector;
51+
52+
final AtomicInteger sourceWip;
53+
54+
final AtomicInteger wip;
55+
56+
final AtomicInteger dispatchWip;
57+
58+
final ConcurrentMap<K, GroupedEnumerator<T, K, V>> groups;
59+
60+
final AtomicBoolean cancelled;
61+
62+
final AtomicInteger active;
63+
64+
volatile CompletableFuture<Boolean> completable;
65+
66+
volatile GroupedAsyncEnumerable<V, K> current;
67+
volatile boolean done;
68+
volatile Throwable error;
69+
70+
GroupByEnumerator(AsyncEnumerator<T> source, Function<? super T, ? extends K> keySelector, Function<? super T, ? extends V> valueSelector) {
71+
this.source = source;
72+
this.keySelector = keySelector;
73+
this.valueSelector = valueSelector;
74+
this.sourceWip = new AtomicInteger();
75+
this.wip = new AtomicInteger();
76+
this.dispatchWip = new AtomicInteger();
77+
this.groups = new ConcurrentHashMap<>();
78+
this.cancelled = new AtomicBoolean();
79+
this.active = new AtomicInteger(1);
80+
}
81+
82+
@Override
83+
public CompletionStage<Boolean> moveNext() {
84+
current = null;
85+
CompletableFuture<Boolean> cf = new CompletableFuture<>();
86+
completable = cf;
87+
consumersReady();
88+
drain();
89+
return cf;
90+
}
91+
92+
@Override
93+
public GroupedAsyncEnumerable<V, K> current() {
94+
return current;
95+
}
96+
97+
@Override
98+
public void cancel() {
99+
if (cancelled.compareAndSet(false, true)) {
100+
if (active.decrementAndGet() == 0) {
101+
source.cancel();
102+
} else {
103+
consumersReady();
104+
}
105+
}
106+
}
107+
108+
void remove(K key) {
109+
groups.remove(key);
110+
if (active.decrementAndGet() == 0) {
111+
source.cancel();
112+
} else {
113+
consumersReady();
114+
}
115+
}
116+
117+
@Override
118+
public void accept(Boolean aBoolean, Throwable throwable) {
119+
if (throwable != null) {
120+
for (GroupedEnumerator<T, K, V> gr : groups.values()) {
121+
gr.error = throwable;
122+
gr.done = true;
123+
gr.drain();
124+
}
125+
groups.clear();
126+
error = throwable;
127+
drain();
128+
} else
129+
if (aBoolean) {
130+
T v = source.current();
131+
K key = keySelector.apply(v);
132+
V value = valueSelector.apply(v);
133+
134+
boolean isNew = false;
135+
GroupedEnumerator<T, K, V> gr = groups.get(key);
136+
if (gr == null) {
137+
if (cancelled.get()) {
138+
consumersReady();
139+
return;
140+
}
141+
142+
active.getAndIncrement();
143+
144+
gr = new GroupedEnumerator<>(key, this);
145+
groups.put(key, gr);
146+
isNew = true;
147+
}
148+
149+
if (isNew) {
150+
current = gr;
151+
drain();
152+
}
153+
gr.result = value;
154+
gr.hasValue = true;
155+
gr.drain();
156+
} else {
157+
for (GroupedEnumerator<T, K, V> gr : groups.values()) {
158+
gr.done = true;
159+
gr.drain();
160+
}
161+
groups.clear();
162+
done = true;
163+
drain();
164+
}
165+
}
166+
167+
void nextSource() {
168+
if (sourceWip.getAndIncrement() == 0) {
169+
do {
170+
source.moveNext().whenComplete(this);
171+
} while (sourceWip.decrementAndGet() != 0);
172+
}
173+
}
174+
175+
void consumersReady() {
176+
if (dispatchWip.getAndIncrement() == 0) {
177+
do {
178+
if (completable != null || cancelled.get()) {
179+
int s = 0;
180+
int r = 0;
181+
for (GroupedEnumerator<T, K, V> gr : groups.values()) {
182+
if (gr.nonFirst && gr.completable != null) {
183+
r++;
184+
}
185+
s++;
186+
}
187+
if (s == r) {
188+
nextSource();
189+
}
190+
}
191+
} while (dispatchWip.decrementAndGet() != 0);
192+
}
193+
}
194+
195+
void drain() {
196+
if (wip.getAndIncrement() == 0) {
197+
do {
198+
CompletableFuture<Boolean> cf = completable;
199+
if (cf != null) {
200+
completable = null;
201+
Throwable ex = error;
202+
if (ex != null) {
203+
cf.completeExceptionally(ex);
204+
return;
205+
}
206+
if (done) {
207+
cf.complete(false);
208+
} else {
209+
cf.complete(true);
210+
}
211+
}
212+
} while (wip.decrementAndGet() != 0);
213+
}
214+
}
215+
216+
static final class GroupedEnumerator<T, K, V> implements GroupedAsyncEnumerable<V, K>, AsyncEnumerator<V> {
217+
218+
final K key;
219+
220+
final GroupByEnumerator<T, K, V> parent;
221+
222+
final AtomicBoolean once;
223+
224+
final AtomicBoolean cancelled;
225+
226+
final AtomicInteger wip;
227+
228+
volatile CompletableFuture<Boolean> completable;
229+
230+
V result;
231+
volatile boolean hasValue;
232+
volatile boolean done;
233+
Throwable error;
234+
235+
volatile boolean nonFirst;
236+
237+
GroupedEnumerator(K key, GroupByEnumerator<T, K, V> parent) {
238+
this.key = key;
239+
this.parent = parent;
240+
this.once = new AtomicBoolean();
241+
this.wip = new AtomicInteger();
242+
this.cancelled = new AtomicBoolean();
243+
}
244+
245+
@Override
246+
public K key() {
247+
return key;
248+
}
249+
250+
@Override
251+
public AsyncEnumerator<V> enumerator() {
252+
if (once.compareAndSet(false, true)) {
253+
return this;
254+
}
255+
return new AsyncError<V>(new IllegalStateException("Only one AsyncEnumerator allowed"));
256+
}
257+
258+
@Override
259+
public CompletionStage<Boolean> moveNext() {
260+
CompletableFuture<Boolean> cf = new CompletableFuture<>();
261+
completable = cf;
262+
if (nonFirst) {
263+
result = null;
264+
hasValue = false;
265+
parent.consumersReady();
266+
} else {
267+
nonFirst = true;
268+
}
269+
drain();
270+
return cf;
271+
}
272+
273+
@Override
274+
public V current() {
275+
return result;
276+
}
277+
278+
void drain() {
279+
if (wip.getAndIncrement() == 0) {
280+
do {
281+
CompletableFuture<Boolean> cf = completable;
282+
if (cf != null) {
283+
if (done) {
284+
Throwable ex = error;
285+
if (ex == null) {
286+
cf.complete(false);
287+
} else {
288+
cf.completeExceptionally(ex);
289+
}
290+
return;
291+
}
292+
293+
if (hasValue) {
294+
completable = null;
295+
cf.complete(true);
296+
}
297+
}
298+
} while (wip.decrementAndGet() != 0);
299+
}
300+
}
301+
302+
@Override
303+
public void cancel() {
304+
if (cancelled.compareAndSet(false, true)) {
305+
parent.remove(key);
306+
}
307+
}
308+
}
309+
}
310+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright 2017 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package hu.akarnokd.asyncenum;
18+
19+
public interface GroupedAsyncEnumerable<T, K> extends AsyncEnumerable<T> {
20+
21+
K key();
22+
}

0 commit comments

Comments
 (0)