Skip to content

Commit 48c58e0

Browse files
committed
+ publish operator
1 parent 5905f4d commit 48c58e0

File tree

7 files changed

+597
-6
lines changed

7 files changed

+597
-6
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ buildscript {
88

99
dependencies {
1010
classpath 'gradle.plugin.nl.javadude.gradle.plugins:license-gradle-plugin:0.13.1'
11-
classpath "me.champeau.gradle:jmh-gradle-plugin:0.4.4"
11+
classpath "me.champeau.gradle:jmh-gradle-plugin:0.4.5"
1212
}
1313
}
1414

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright 2017 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package hu.akarnokd.assyncenum;
18+
19+
import hu.akarnokd.asyncenum.*;
20+
import org.openjdk.jmh.annotations.*;
21+
import org.openjdk.jmh.infra.Blackhole;
22+
23+
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
import java.util.function.BiConsumer;
26+
27+
@BenchmarkMode(Mode.Throughput)
28+
@Warmup(iterations = 5)
29+
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
30+
@OutputTimeUnit(TimeUnit.SECONDS)
31+
@Fork(value = 1)
32+
@State(Scope.Thread)
33+
public class AsyncRangePerf extends AtomicInteger implements BiConsumer<Boolean, Throwable> {
34+
35+
@Param({"1", "10", "100", "1000", "10000", "100000", "1000000"})
36+
int count;
37+
38+
AsyncEnumerable<Integer> range;
39+
40+
AsyncEnumerator<Integer> en;
41+
42+
Blackhole bh;
43+
44+
@Setup
45+
public void setup(Blackhole bh) {
46+
range = AsyncEnumerable.range(1, count);
47+
this.bh = bh;
48+
}
49+
50+
@Benchmark
51+
public void range() {
52+
en = range.enumerator();
53+
consume();
54+
}
55+
56+
void consume() {
57+
if (getAndIncrement() == 0) {
58+
do {
59+
en.moveNext().whenComplete(this);
60+
} while (decrementAndGet() != 0);
61+
}
62+
}
63+
64+
@Override
65+
public void accept(Boolean aBoolean, Throwable throwable) {
66+
if (throwable != null) {
67+
bh.consume(throwable);
68+
return;
69+
}
70+
71+
if (aBoolean) {
72+
bh.consume(en.current());
73+
consume();
74+
} else {
75+
bh.consume(false);
76+
}
77+
}
78+
}

src/main/java/hu/akarnokd/asyncenum/AsyncBlockingIterable.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public Iterator<T> iterator() {
3232
return new BlockingIterator<>(source.enumerator());
3333
}
3434

35-
static final class BlockingIterator<T> implements Iterator<T> {
35+
static final class BlockingIterator<T> implements Iterator<T>, Runnable {
3636

3737
final AsyncEnumerator<T> source;
3838

@@ -79,5 +79,10 @@ public T next() {
7979
}
8080
throw new NoSuchElementException();
8181
}
82+
83+
@Override
84+
public void run() {
85+
source.cancel();
86+
}
8287
}
8388
}

src/main/java/hu/akarnokd/asyncenum/AsyncEnumerable.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,10 @@ default AsyncEnumerable<T> takeLast(int n) {
394394
return new AsyncTakeLast<>(this, n);
395395
}
396396

397+
default <R> AsyncEnumerable<R> publish(Function<? super AsyncEnumerable<T>, ? extends AsyncEnumerable<R>> handler) {
398+
return new AsyncPublish<>(this, handler);
399+
}
400+
397401
// -------------------------------------------------------------------------------------
398402
// Instance consumers
399403

@@ -416,7 +420,8 @@ default Iterable<T> blockingIterable() {
416420
}
417421

418422
default Stream<T> blockingStream() {
419-
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(blockingIterable().iterator(), 0), false);
423+
Iterator<T> it = blockingIterable().iterator();
424+
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, 0), false).onClose((Runnable)it);
420425
}
421426

422427
default Optional<T> blockingFirstOptional() {

0 commit comments

Comments
 (0)