Skip to content

Commit 5905f4d

Browse files
committed
Even more operators, cancellation fixes
- blockingFirstOptional - blockingLastOptional - collectWith - distinct - distinctUntilChanged - generate - reduce - reduceWith - skipLast - takeLast
1 parent a43aae0 commit 5905f4d

28 files changed

+1878
-17
lines changed
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2017 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package hu.akarnokd.asyncenum;
18+
19+
import java.util.*;
20+
import java.util.concurrent.ExecutionException;
21+
22+
final class AsyncBlockingFirst {
23+
24+
private AsyncBlockingFirst() {
25+
throw new IllegalStateException("No instances!");
26+
}
27+
28+
public static <T> T blockingFirst(AsyncEnumerator<T> source) {
29+
try {
30+
Boolean result = source.moveNext().toCompletableFuture().get();
31+
if (result) {
32+
T r = source.current();
33+
source.cancel();
34+
return r;
35+
}
36+
throw new NoSuchElementException();
37+
} catch (InterruptedException ex) {
38+
source.cancel();
39+
throw new RuntimeException(ex);
40+
} catch (ExecutionException ex) {
41+
throw ThrowableHelper.wrapOrThrow(ex.getCause());
42+
}
43+
}
44+
45+
public static <T> Optional<T> blockingFirstOptional(AsyncEnumerator<T> source) {
46+
try {
47+
Boolean result = source.moveNext().toCompletableFuture().get();
48+
if (result) {
49+
T r = source.current();
50+
source.cancel();
51+
return Optional.ofNullable(r);
52+
}
53+
return Optional.empty();
54+
} catch (InterruptedException ex) {
55+
source.cancel();
56+
throw new RuntimeException(ex);
57+
} catch (ExecutionException ex) {
58+
throw ThrowableHelper.wrapOrThrow(ex.getCause());
59+
}
60+
}
61+
}

src/main/java/hu/akarnokd/asyncenum/AsyncBlockingLast.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package hu.akarnokd.asyncenum;
1818

19-
import java.util.NoSuchElementException;
19+
import java.util.*;
2020
import java.util.concurrent.CountDownLatch;
2121
import java.util.concurrent.atomic.AtomicInteger;
2222
import java.util.function.BiConsumer;
@@ -69,6 +69,7 @@ T blockingGet() {
6969
try {
7070
await();
7171
} catch (InterruptedException ex) {
72+
source.cancel();
7273
throw new RuntimeException(ex);
7374
}
7475
}
@@ -81,4 +82,24 @@ T blockingGet() {
8182
}
8283
throw new NoSuchElementException();
8384
}
85+
86+
87+
Optional<T> blockingGetOptional() {
88+
if (getCount() != 0) {
89+
try {
90+
await();
91+
} catch (InterruptedException ex) {
92+
source.cancel();
93+
throw new RuntimeException(ex);
94+
}
95+
}
96+
Throwable ex = error;
97+
if (ex != null) {
98+
throw ThrowableHelper.wrapOrThrow(ex);
99+
}
100+
if (hasValue) {
101+
return Optional.ofNullable(result);
102+
}
103+
return Optional.empty();
104+
}
84105
}

src/main/java/hu/akarnokd/asyncenum/AsyncCollect.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ static final class CollectEnumerator<T, C> extends AtomicInteger
5252

5353
CompletableFuture<Boolean> cf;
5454

55+
volatile boolean cancelled;
56+
5557
CollectEnumerator(AsyncEnumerator<T> source, BiConsumer<C, T> collector, C collection) {
5658
this.source = source;
5759
this.collector = collector;
@@ -77,6 +79,9 @@ public C current() {
7779
void collectSource() {
7880
if (getAndIncrement() == 0) {
7981
do {
82+
if (cancelled) {
83+
return;
84+
}
8085
source.moveNext().whenComplete(this);
8186
} while (decrementAndGet() != 0);
8287
}
@@ -102,6 +107,7 @@ public void accept(Boolean aBoolean, Throwable throwable) {
102107

103108
@Override
104109
public void cancel() {
110+
cancelled = true;
105111
source.cancel();
106112
}
107113
}

src/main/java/hu/akarnokd/asyncenum/AsyncCollectWith.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ static final class CollectWithEnumerator<T, A, R> extends AtomicInteger
5656

5757
CompletableFuture<Boolean> cf;
5858

59+
volatile boolean cancelled;
60+
5961
CollectWithEnumerator(AsyncEnumerator<T> source, A collection, BiConsumer<A, T> accumulator, Function<A, R> finisher) {
6062
this.source = source;
6163
this.collection = collection;
@@ -82,6 +84,9 @@ public R current() {
8284
void collectSource() {
8385
if (getAndIncrement() == 0) {
8486
do {
87+
if (cancelled) {
88+
return;
89+
}
8590
source.moveNext().whenComplete(this);
8691
} while (decrementAndGet() != 0);
8792
}
@@ -107,6 +112,7 @@ public void accept(Boolean aBoolean, Throwable throwable) {
107112

108113
@Override
109114
public void cancel() {
115+
cancelled = true;
110116
source.cancel();
111117
}
112118
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright 2017 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package hu.akarnokd.asyncenum;
18+
19+
import java.util.*;
20+
import java.util.concurrent.*;
21+
import java.util.concurrent.atomic.AtomicInteger;
22+
import java.util.function.*;
23+
24+
final class AsyncDistinct<T, K, C extends Collection<? super K>> implements AsyncEnumerable<T> {
25+
26+
final AsyncEnumerable<T> source;
27+
28+
final Function<? super T, ? extends K> keySelector;
29+
30+
final Supplier<C> setSupplier;
31+
32+
AsyncDistinct(AsyncEnumerable<T> source, Function<? super T, ? extends K> keySelector, Supplier<C> setSupplier) {
33+
this.source = source;
34+
this.keySelector = keySelector;
35+
this.setSupplier = setSupplier;
36+
}
37+
38+
@Override
39+
public AsyncEnumerator<T> enumerator() {
40+
return new DistinctEnumerator<>(source.enumerator(), keySelector, setSupplier.get());
41+
}
42+
43+
static final class DistinctEnumerator<T, K>
44+
extends AtomicInteger
45+
implements AsyncEnumerator<T>, BiConsumer<Boolean, Throwable> {
46+
47+
final AsyncEnumerator<T> source;
48+
49+
final Function<? super T, ? extends K> keySelector;
50+
51+
final Collection<? super K> set;
52+
53+
CompletableFuture<Boolean> completable;
54+
55+
T result;
56+
57+
volatile boolean cancelled;
58+
59+
DistinctEnumerator(AsyncEnumerator<T> source, Function<? super T, ? extends K> keySelector, Collection<? super K> set) {
60+
this.source = source;
61+
this.keySelector = keySelector;
62+
this.set = set;
63+
}
64+
65+
@Override
66+
public CompletionStage<Boolean> moveNext() {
67+
result = null;
68+
CompletableFuture<Boolean> cf = new CompletableFuture<>();
69+
completable = cf;
70+
nextSource();
71+
return cf;
72+
}
73+
74+
void nextSource() {
75+
if (getAndIncrement() == 0) {
76+
do {
77+
if (cancelled) {
78+
return;
79+
}
80+
source.moveNext().whenComplete(this);
81+
} while (decrementAndGet() != 0);
82+
}
83+
}
84+
85+
@Override
86+
public T current() {
87+
return result;
88+
}
89+
90+
@Override
91+
public void cancel() {
92+
cancelled = true;
93+
source.cancel();
94+
}
95+
96+
@Override
97+
public void accept(Boolean aBoolean, Throwable throwable) {
98+
if (throwable != null) {
99+
set.clear();
100+
completable.completeExceptionally(throwable);
101+
return;
102+
}
103+
104+
if (aBoolean) {
105+
T v = source.current();
106+
if (set.add(keySelector.apply(v))) {
107+
result = v;
108+
completable.complete(true);
109+
} else {
110+
nextSource();
111+
}
112+
} else {
113+
set.clear();
114+
completable.complete(false);
115+
}
116+
117+
}
118+
}
119+
}

0 commit comments

Comments
 (0)