Skip to content

Commit e5621f3

Browse files
committed
More cancellation support, better coverage
1 parent 80b3959 commit e5621f3

30 files changed

+956
-45
lines changed

src/main/java/hu/akarnokd/asyncenum/AsyncBlockingLast.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ T blockingGet() {
7474
}
7575
Throwable ex = error;
7676
if (ex != null) {
77-
throw ExceptionHelper.wrapOrThrow(ex);
77+
throw ThrowableHelper.wrapOrThrow(ex);
7878
}
7979
if (hasValue) {
8080
return result;

src/main/java/hu/akarnokd/asyncenum/AsyncCollect.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,5 +99,10 @@ public void accept(Boolean aBoolean, Throwable throwable) {
9999
cf.complete(true);
100100
}
101101
}
102+
103+
@Override
104+
public void cancel() {
105+
source.cancel();
106+
}
102107
}
103108
}

src/main/java/hu/akarnokd/asyncenum/AsyncConcatMap.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package hu.akarnokd.asyncenum;
1818

1919
import java.util.concurrent.*;
20-
import java.util.concurrent.atomic.AtomicInteger;
20+
import java.util.concurrent.atomic.*;
2121
import java.util.function.*;
2222

2323
final class AsyncConcatMap<T, R> implements AsyncEnumerable<R> {
@@ -47,7 +47,7 @@ static final class ConcatMapEnumerator<T, R>
4747

4848
final AtomicInteger wipInner;
4949

50-
AsyncEnumerator<? extends R> currentSource;
50+
final AtomicReference<AsyncEnumerator<R>> currentSource;
5151

5252
volatile CompletableFuture<Boolean> completable;
5353

@@ -56,6 +56,7 @@ static final class ConcatMapEnumerator<T, R>
5656
ConcatMapEnumerator(AsyncEnumerator<T> source, Function<? super T, ? extends AsyncEnumerable<? extends R>> mapper) {
5757
this.source = source;
5858
this.mapper = mapper;
59+
this.currentSource = new AtomicReference<>();
5960
this.wipMain = new AtomicInteger();
6061
this.wipInner = new AtomicInteger();
6162
}
@@ -64,7 +65,7 @@ static final class ConcatMapEnumerator<T, R>
6465
public CompletionStage<Boolean> moveNext() {
6566
CompletableFuture<Boolean> cf = new CompletableFuture<>();
6667
completable = cf;
67-
if (currentSource == null) {
68+
if (currentSource.getPlain() == null) {
6869
nextMain();
6970
} else {
7071
nextInner();
@@ -80,26 +81,28 @@ public R current() {
8081
@Override
8182
public void accept(Boolean aBoolean, Throwable throwable) {
8283
if (throwable != null) {
84+
source.cancel();
8385
completable.completeExceptionally(throwable);
8486
return;
8587
}
8688
if (aBoolean) {
87-
current = currentSource.current();
89+
current = currentSource.getPlain().current();
8890
completable.complete(true);
8991
} else {
90-
currentSource = null;
9192
nextMain();
9293
}
9394
}
9495

96+
@SuppressWarnings("unchecked")
9597
public void acceptMain(Boolean aBoolean, Throwable throwable) {
9698
if (throwable != null) {
9799
completable.completeExceptionally(throwable);
98100
return;
99101
}
100102
if (aBoolean) {
101-
currentSource = mapper.apply(source.current()).enumerator();
102-
nextInner();
103+
if (AsyncEnumeratorHelper.replace(currentSource, (AsyncEnumerator<R>)mapper.apply(source.current()).enumerator())) {
104+
nextInner();
105+
}
103106
} else {
104107
completable.complete(false);
105108
}
@@ -116,9 +119,14 @@ void nextMain() {
116119
void nextInner() {
117120
if (wipInner.getAndIncrement() == 0) {
118121
do {
119-
currentSource.moveNext().whenComplete(this);
122+
currentSource.getPlain().moveNext().whenComplete(this);
120123
} while (wipInner.decrementAndGet() != 0);
121124
}
122125
}
126+
127+
@Override
128+
public void cancel() {
129+
AsyncEnumeratorHelper.cancel(currentSource);
130+
}
123131
}
124132
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright 2017 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package hu.akarnokd.asyncenum;
18+
19+
import java.util.concurrent.*;
20+
import java.util.concurrent.atomic.AtomicBoolean;
21+
import java.util.function.BiConsumer;
22+
23+
final class AsyncDoFinally<T> implements AsyncEnumerable<T> {
24+
25+
final AsyncEnumerable<T> source;
26+
27+
final Runnable onFinally;
28+
29+
AsyncDoFinally(AsyncEnumerable<T> source, Runnable onFinally) {
30+
this.source = source;
31+
this.onFinally = onFinally;
32+
}
33+
34+
@Override
35+
public AsyncEnumerator<T> enumerator() {
36+
return new DoFinallyEnumerator<>(source.enumerator(), onFinally);
37+
}
38+
39+
static final class DoFinallyEnumerator<T>
40+
extends AtomicBoolean
41+
implements AsyncEnumerator<T>, BiConsumer<Boolean, Throwable> {
42+
43+
final AsyncEnumerator<T> source;
44+
45+
final Runnable onFinally;
46+
47+
CompletableFuture<Boolean> completable;
48+
49+
T result;
50+
51+
DoFinallyEnumerator(AsyncEnumerator<T> source, Runnable onFinally) {
52+
this.source = source;
53+
this.onFinally = onFinally;
54+
}
55+
56+
@Override
57+
public CompletionStage<Boolean> moveNext() {
58+
result = null;
59+
CompletableFuture<Boolean> cf = new CompletableFuture<>();
60+
completable = cf;
61+
source.moveNext().whenComplete(this);
62+
return cf;
63+
}
64+
65+
@Override
66+
public T current() {
67+
return result;
68+
}
69+
70+
@Override
71+
public void cancel() {
72+
source.cancel();
73+
runFinally();
74+
}
75+
76+
void runFinally() {
77+
if (compareAndSet(false, true)) {
78+
onFinally.run();
79+
}
80+
}
81+
82+
@Override
83+
public void accept(Boolean aBoolean, Throwable throwable) {
84+
if (throwable != null) {
85+
completable.completeExceptionally(throwable);
86+
runFinally();
87+
return;
88+
}
89+
90+
if (aBoolean) {
91+
result = source.current();
92+
completable.complete(true);
93+
} else {
94+
completable.complete(false);
95+
runFinally();
96+
}
97+
}
98+
}
99+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright 2017 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package hu.akarnokd.asyncenum;
18+
19+
import java.util.concurrent.*;
20+
import java.util.function.*;
21+
22+
final class AsyncDoOn<T> implements AsyncEnumerable<T> {
23+
24+
final AsyncEnumerable<T> source;
25+
26+
final Consumer<? super T> onNext;
27+
28+
final Consumer<? super Throwable> onError;
29+
30+
final Runnable onComplete;
31+
32+
AsyncDoOn(AsyncEnumerable<T> source, Consumer<? super T> onNext, Consumer<? super Throwable> onError, Runnable onComplete) {
33+
this.source = source;
34+
this.onNext = onNext;
35+
this.onError = onError;
36+
this.onComplete = onComplete;
37+
}
38+
39+
@Override
40+
public AsyncEnumerator<T> enumerator() {
41+
return new DoOnEnumerator<>(source.enumerator(), onNext, onError, onComplete);
42+
}
43+
44+
static final class DoOnEnumerator<T> implements AsyncEnumerator<T>, BiConsumer<Boolean, Throwable> {
45+
46+
final AsyncEnumerator<T> source;
47+
48+
final Consumer<? super T> onNext;
49+
50+
final Consumer<? super Throwable> onError;
51+
52+
final Runnable onComplete;
53+
54+
CompletableFuture<Boolean> completable;
55+
56+
T result;
57+
58+
DoOnEnumerator(AsyncEnumerator<T> source, Consumer<? super T> onNext,
59+
Consumer<? super Throwable> onError, Runnable onComplete) {
60+
this.source = source;
61+
this.onNext = onNext;
62+
this.onError = onError;
63+
this.onComplete = onComplete;
64+
}
65+
66+
@Override
67+
public CompletionStage<Boolean> moveNext() {
68+
result = null;
69+
CompletableFuture<Boolean> cf = new CompletableFuture<>();
70+
completable = cf;
71+
// Note that returning this directly results in CompletionException(Throwable)
72+
// instead of the original failure for some reason.
73+
source.moveNext().whenComplete(this);
74+
return cf;
75+
}
76+
77+
@Override
78+
public T current() {
79+
return result;
80+
}
81+
82+
@Override
83+
public void cancel() {
84+
source.cancel();
85+
}
86+
87+
@Override
88+
public void accept(Boolean aBoolean, Throwable throwable) {
89+
if (throwable != null) {
90+
onError.accept(throwable);
91+
completable.completeExceptionally(throwable);
92+
return;
93+
}
94+
if (aBoolean) {
95+
T r = source.current();
96+
result = r;
97+
onNext.accept(r);
98+
completable.complete(true);
99+
} else {
100+
onComplete.run();
101+
completable.complete(false);
102+
}
103+
}
104+
}
105+
}

src/main/java/hu/akarnokd/asyncenum/AsyncEnumerable.java

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,25 @@
2020
import java.util.concurrent.*;
2121
import java.util.function.*;
2222

23+
/**
24+
* Represents an possibly asynchronous, cold-deferred, source of zero or more items
25+
* optionally followed by a Throwable.
26+
* @param <T> the value type.
27+
*/
28+
@FunctionalInterface
2329
public interface AsyncEnumerable<T> {
2430

31+
/**
32+
* Returns an AsyncEnumerator that can be iterated over to receive
33+
* the next item, the end-of-sequence indicator or a Throwable.
34+
* @return the new AsyncEnumerator instance
35+
*/
2536
AsyncEnumerator<T> enumerator();
2637

38+
/** Constant and already completed CompletionStage signalling {@code true}. */
2739
CompletionStage<Boolean> TRUE = CompletableFuture.completedStage(true);
2840

41+
/** Constant and already completed CompletionStage signalling {@code false}. */
2942
CompletionStage<Boolean> FALSE = CompletableFuture.completedStage(false);
3043

3144
CompletionStage<Boolean> CANCELLED = CompletableFuture.failedStage(new CancellationException() {
@@ -214,6 +227,22 @@ default AsyncEnumerable<T> mergeWith(AsyncEnumerable<T> other) {
214227
return mergeArray(this, other);
215228
}
216229

230+
default AsyncEnumerable<T> doOnNext(Consumer<? super T> onNext) {
231+
return new AsyncDoOn<>(this, onNext, t -> { }, () -> { });
232+
}
233+
234+
default AsyncEnumerable<T> doOnError(Consumer<? super Throwable> onError) {
235+
return new AsyncDoOn<>(this, t -> { }, onError, () -> { });
236+
}
237+
238+
default AsyncEnumerable<T> doOnComplete(Runnable onComplete) {
239+
return new AsyncDoOn<>(this, t -> { }, t -> { }, onComplete);
240+
}
241+
242+
default AsyncEnumerable<T> doFinally(Runnable onFinally) {
243+
return new AsyncDoFinally<>(this, onFinally);
244+
}
245+
217246
// -------------------------------------------------------------------------------------
218247
// Instance consumers
219248

@@ -226,12 +255,15 @@ default T blockingFirst() {
226255
try {
227256
Boolean result = en.moveNext().toCompletableFuture().get();
228257
if (result) {
229-
// TODO cancel rest
230-
return en.current();
258+
T r = en.current();
259+
en.cancel();
260+
return r;
231261
}
232262
throw new NoSuchElementException();
233-
} catch (InterruptedException | ExecutionException ex) {
263+
} catch (InterruptedException ex) {
234264
throw new RuntimeException(ex);
265+
} catch (ExecutionException ex) {
266+
throw ThrowableHelper.wrapOrThrow(ex.getCause());
235267
}
236268
}
237269

0 commit comments

Comments
 (0)