Skip to content

Commit 94de390

Browse files
committed
More operators, prepare release 0.3.0
1 parent 9ca58db commit 94de390

21 files changed

+1367
-2
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ Prototype Java 9 library based on the asynchronous enumerable concept (where mov
99
### Gradle
1010

1111
```groovy
12-
compile "com.github.akarnokd:async-enumerable:0.2.0"
12+
compile "com.github.akarnokd:async-enumerable:0.3.0"
1313
```
1414

1515
### Getting started

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
version=0.2.0
1+
version=0.3.0
22
org.gradle.jvmargs=-XX:+IgnoreUnrecognizedVMOptions --permit-illegal-access --show-version
33

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2017 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package hu.akarnokd.asyncenum;
18+
19+
import java.util.concurrent.CompletionStage;
20+
21+
final class AsyncDoOnCancel<T> implements AsyncEnumerable<T> {
22+
23+
final AsyncEnumerable<T> source;
24+
25+
final Runnable onCancel;
26+
27+
AsyncDoOnCancel(AsyncEnumerable<T> source, Runnable onCancel) {
28+
this.source = source;
29+
this.onCancel = onCancel;
30+
}
31+
32+
@Override
33+
public AsyncEnumerator<T> enumerator() {
34+
return new DoOnCancelEnumerator<>(source.enumerator(), onCancel);
35+
}
36+
37+
static final class DoOnCancelEnumerator<T> implements AsyncEnumerator<T> {
38+
39+
final AsyncEnumerator<T> source;
40+
41+
final Runnable onCancel;
42+
43+
DoOnCancelEnumerator(AsyncEnumerator<T> source, Runnable onCancel) {
44+
this.source = source;
45+
this.onCancel = onCancel;
46+
}
47+
48+
@Override
49+
public CompletionStage<Boolean> moveNext() {
50+
return source.moveNext();
51+
}
52+
53+
@Override
54+
public T current() {
55+
return source.current();
56+
}
57+
58+
@Override
59+
public void cancel() {
60+
onCancel.run();
61+
source.cancel();
62+
}
63+
}
64+
}

src/main/java/hu/akarnokd/asyncenum/AsyncEnumerable.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,18 @@ static AsyncEnumerable<Long> interval(long initialDelay, long period, TimeUnit u
120120
return new AsyncInterval(initialDelay, period, unit, executor);
121121
}
122122

123+
static <T> AsyncEnumerable<T> fromCallable(Callable<? extends T> callable) {
124+
return new AsyncFromCallable<>(callable);
125+
}
126+
127+
static <T> AsyncEnumerable<T> repeatItem(T item) {
128+
return new AsyncRepeatItem<>(item);
129+
}
130+
131+
static <T> AsyncEnumerable<T> repeatCallable(Callable<? extends T> callable) {
132+
return new AsyncRepeatCallable<>(callable);
133+
}
134+
123135
// -------------------------------------------------------------------------------------
124136
// Instance transformations
125137

@@ -238,6 +250,54 @@ default AsyncEnumerable<T> doFinally(Runnable onFinally) {
238250
return new AsyncDoFinally<>(this, onFinally);
239251
}
240252

253+
default AsyncEnumerable<T> ignoreElements() {
254+
return new AsyncIgnoreElements<>(this);
255+
}
256+
257+
default AsyncEnumerable<T> doOnCancel(Runnable onCancel) {
258+
return new AsyncDoOnCancel<>(this, onCancel);
259+
}
260+
261+
default AsyncEnumerable<T> repeat(long times) {
262+
return repeat(times, () -> false);
263+
}
264+
265+
default AsyncEnumerable<T> repeat(BooleanSupplier stop) {
266+
return repeat(Long.MAX_VALUE, stop);
267+
}
268+
269+
default AsyncEnumerable<T> repeat(long times, BooleanSupplier stop) {
270+
return new AsyncRepeat<>(this, times, stop);
271+
}
272+
273+
default AsyncEnumerable<T> retry(long times) {
274+
return retry(times, e -> true);
275+
}
276+
277+
default AsyncEnumerable<T> retry(Predicate<? super Throwable> predicate) {
278+
return retry(Long.MAX_VALUE, predicate);
279+
}
280+
281+
default AsyncEnumerable<T> retry(long times, Predicate<? super Throwable> predicate) {
282+
return new AsyncRetry<>(this, times, predicate);
283+
}
284+
285+
default AsyncEnumerable<T> repeatWhen(Supplier<? extends CompletionStage<Boolean>> completer) {
286+
return repeatWhen(() -> null, s -> completer.get());
287+
}
288+
289+
default <S> AsyncEnumerable<T> repeatWhen(Supplier<S> stateSupplier, Function<? super S, ? extends CompletionStage<Boolean>> completer) {
290+
return new AsyncRepeatWhen<>(this, stateSupplier, completer);
291+
}
292+
293+
default <S> AsyncEnumerable<T> retryWhen(Function<? super Throwable, ? extends CompletionStage<Boolean>> completer) {
294+
return retryWhen(() -> null, (s, e) -> completer.apply(e));
295+
}
296+
297+
default <S> AsyncEnumerable<T> retryWhen(Supplier<S> stateSupplier, BiFunction<? super S, ? super Throwable, ? extends CompletionStage<Boolean>> completer) {
298+
return new AsyncRetryWhen<T, S>(this, stateSupplier, completer);
299+
}
300+
241301
// -------------------------------------------------------------------------------------
242302
// Instance consumers
243303

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright 2017 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package hu.akarnokd.asyncenum;
18+
19+
import java.util.concurrent.*;
20+
21+
final class AsyncFromCallable<T> implements AsyncEnumerable<T> {
22+
23+
final Callable<? extends T> callable;
24+
25+
AsyncFromCallable(Callable<? extends T> callable) {
26+
this.callable = callable;
27+
}
28+
29+
@Override
30+
public AsyncEnumerator<T> enumerator() {
31+
return new FromCallableEnumerator<>(callable);
32+
}
33+
34+
static final class FromCallableEnumerator<T> implements AsyncEnumerator<T> {
35+
36+
final Callable<? extends T> callable;
37+
38+
T result;
39+
40+
boolean once;
41+
42+
FromCallableEnumerator(Callable<? extends T> callable) {
43+
this.callable = callable;
44+
}
45+
46+
@Override
47+
public CompletionStage<Boolean> moveNext() {
48+
if (once) {
49+
result = null;
50+
return FALSE;
51+
}
52+
once = true;
53+
try {
54+
result = callable.call();
55+
} catch (Exception ex) {
56+
return CompletableFuture.failedStage(ex);
57+
}
58+
return TRUE;
59+
}
60+
61+
@Override
62+
public T current() {
63+
return result;
64+
}
65+
}
66+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright 2017 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package hu.akarnokd.asyncenum;
18+
19+
import java.util.concurrent.*;
20+
import java.util.concurrent.atomic.AtomicInteger;
21+
import java.util.function.BiConsumer;
22+
23+
final class AsyncIgnoreElements<T> implements AsyncEnumerable<T> {
24+
25+
final AsyncEnumerable<T> source;
26+
27+
AsyncIgnoreElements(AsyncEnumerable<T> source) {
28+
this.source = source;
29+
}
30+
31+
@Override
32+
public AsyncEnumerator<T> enumerator() {
33+
return new IgnoreElementsEnumerator<>(source.enumerator());
34+
}
35+
36+
static final class IgnoreElementsEnumerator<T>
37+
extends AtomicInteger
38+
implements AsyncEnumerator<T>, BiConsumer<Boolean, Throwable> {
39+
40+
final AsyncEnumerator<T> source;
41+
42+
CompletableFuture<Boolean> completable;
43+
44+
IgnoreElementsEnumerator(AsyncEnumerator<T> source) {
45+
this.source = source;
46+
}
47+
48+
@Override
49+
public CompletionStage<Boolean> moveNext() {
50+
CompletableFuture<Boolean> cf = new CompletableFuture<>();
51+
completable = cf;
52+
nextSource();
53+
return cf;
54+
}
55+
56+
void nextSource() {
57+
if (getAndIncrement() == 0) {
58+
do {
59+
source.moveNext().whenComplete(this);
60+
} while (decrementAndGet() != 0);
61+
}
62+
}
63+
64+
@Override
65+
public T current() {
66+
return null; // elements are ignored
67+
}
68+
69+
@Override
70+
public void cancel() {
71+
source.cancel();
72+
}
73+
74+
@Override
75+
public void accept(Boolean aBoolean, Throwable throwable) {
76+
if (throwable != null) {
77+
completable.completeExceptionally(throwable);
78+
return;
79+
}
80+
if (aBoolean) {
81+
nextSource();
82+
} else {
83+
completable.complete(false);
84+
}
85+
}
86+
}
87+
}

0 commit comments

Comments
 (0)