Skip to content

Commit 9808ba5

Browse files
committed
add cache() operator
1 parent f518abd commit 9808ba5

File tree

11 files changed

+743
-2
lines changed

11 files changed

+743
-2
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ RxJava 2.x implementation of extra sources, operators and components and ports o
1313

1414
```
1515
dependencies {
16-
compile "com.github.akarnokd:rxjava2-extensions:0.14.0"
16+
compile "com.github.akarnokd:rxjava2-extensions:0.14.1"
1717
}
1818
```
1919

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.14.0
1+
version=0.14.1

src/main/java/hu/akarnokd/rxjava2/basetypes/Nono.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1107,6 +1107,17 @@ public final Nono takeUntil(Publisher<?> other) {
11071107
return onAssembly(new NonoTakeUntil(this, other));
11081108
}
11091109

1110+
/**
1111+
* Caches the terminal event of the upstream Nono
1112+
* and relays/replays it to Subscribers.
1113+
* @return the new Nono instance
1114+
*
1115+
* @since 0.14.1
1116+
*/
1117+
public final Nono cache() {
1118+
return onAssembly(new NonoCache(this));
1119+
}
1120+
11101121
// -----------------------------------------------------------
11111122
// Consumers and subscribers (leave)
11121123
// -----------------------------------------------------------
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
/*
2+
* Copyright 2016 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package hu.akarnokd.rxjava2.basetypes;
18+
19+
import java.util.concurrent.atomic.*;
20+
21+
import org.reactivestreams.*;
22+
23+
/**
24+
* Cache the terminal event of the upstream source.
25+
*
26+
* @since 0.14.1
27+
*/
28+
final class NonoCache extends Nono implements Subscriber<Void> {
29+
30+
static final CacheSubscription[] EMPTY = new CacheSubscription[0];
31+
32+
static final CacheSubscription[] TERMINATED = new CacheSubscription[0];
33+
34+
final Nono source;
35+
36+
final AtomicBoolean once;
37+
38+
final AtomicReference<CacheSubscription[]> subscribers;
39+
40+
Throwable error;
41+
42+
NonoCache(Nono source) {
43+
this.source = source;
44+
this.once = new AtomicBoolean();
45+
this.subscribers = new AtomicReference<CacheSubscription[]>(EMPTY);
46+
}
47+
48+
@Override
49+
protected void subscribeActual(Subscriber<? super Void> s) {
50+
CacheSubscription inner = new CacheSubscription(s);
51+
s.onSubscribe(inner);
52+
53+
if (add(inner)) {
54+
if (inner.get() != 0) {
55+
remove(inner);
56+
}
57+
if (once.compareAndSet(false, true)) {
58+
source.subscribe(this);
59+
}
60+
} else {
61+
if (inner.get() == 0) {
62+
Throwable ex = error;
63+
if (ex != null) {
64+
inner.actual.onError(ex);
65+
} else {
66+
inner.actual.onComplete();
67+
}
68+
}
69+
}
70+
}
71+
72+
boolean add(CacheSubscription inner) {
73+
for (;;) {
74+
CacheSubscription[] a = subscribers.get();
75+
if (a == TERMINATED) {
76+
return false;
77+
}
78+
int n = a.length;
79+
80+
CacheSubscription[] b = new CacheSubscription[n + 1];
81+
System.arraycopy(a, 0, b, 0, n);
82+
b[n] = inner;
83+
if (subscribers.compareAndSet(a, b)) {
84+
return true;
85+
}
86+
}
87+
}
88+
89+
void remove(CacheSubscription inner) {
90+
for (;;) {
91+
CacheSubscription[] a = subscribers.get();
92+
int n = a.length;
93+
if (n == 0) {
94+
break;
95+
}
96+
97+
int j = -1;
98+
99+
for (int i = 0; i < n; i++) {
100+
if (a[i] == inner) {
101+
j = i;
102+
break;
103+
}
104+
}
105+
106+
if (j < 0) {
107+
break;
108+
}
109+
110+
CacheSubscription[] b;
111+
if (n == 1) {
112+
b = EMPTY;
113+
} else {
114+
b = new CacheSubscription[n - 1];
115+
System.arraycopy(a, 0, b, 0, j);
116+
System.arraycopy(a, j + 1, b, j, n - j - 1);
117+
}
118+
if (subscribers.compareAndSet(a, b)) {
119+
break;
120+
}
121+
}
122+
}
123+
124+
@Override
125+
public void onSubscribe(Subscription s) {
126+
// not used
127+
}
128+
129+
@Override
130+
public void onNext(Void t) {
131+
// not called
132+
}
133+
134+
@Override
135+
public void onError(Throwable t) {
136+
error = t;
137+
for (CacheSubscription inner : subscribers.getAndSet(TERMINATED)) {
138+
if (inner.get() == 0) {
139+
inner.actual.onError(t);
140+
}
141+
}
142+
}
143+
144+
@Override
145+
public void onComplete() {
146+
for (CacheSubscription inner : subscribers.getAndSet(TERMINATED)) {
147+
if (inner.get() == 0) {
148+
inner.actual.onComplete();
149+
}
150+
}
151+
}
152+
153+
final class CacheSubscription extends BasicNonoIntQueueSubscription {
154+
155+
private static final long serialVersionUID = -5746624477415417500L;
156+
157+
final Subscriber<? super Void> actual;
158+
159+
CacheSubscription(Subscriber<? super Void> actual) {
160+
this.actual = actual;
161+
}
162+
163+
@Override
164+
public void cancel() {
165+
if (compareAndSet(0, 1)) {
166+
remove(this);
167+
}
168+
}
169+
}
170+
}

src/main/java/hu/akarnokd/rxjava2/basetypes/Perhaps.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1359,6 +1359,17 @@ public final Maybe<T> toMaybe() {
13591359
return RxJavaPlugins.onAssembly(new PerhapsToMaybe<T>(this));
13601360
}
13611361

1362+
/**
1363+
* Caches the value or error event of the upstream Perhaps
1364+
* and relays/replays it to Subscribers.
1365+
* @return the new Perhaps instance
1366+
*
1367+
* @since 0.14.1
1368+
*/
1369+
public final Perhaps<T> cache() {
1370+
return onAssembly(new PerhapsCache<T>(this));
1371+
}
1372+
13621373
// ----------------------------------------------------
13631374
// Consumers (leave)
13641375
// ----------------------------------------------------

0 commit comments

Comments
 (0)