Skip to content

Commit bfe4aa0

Browse files
committed
Fix repeatCallable fusion, request accounting, coverage
1 parent e8593e3 commit bfe4aa0

File tree

3 files changed

+194
-2
lines changed

3 files changed

+194
-2
lines changed

src/main/java/hu/akarnokd/rxjava2/operators/FlowableRepeatCallable.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ void slowpath(long r) {
138138
if (r == 0L) {
139139
break;
140140
}
141+
e = 0L;
141142
}
142143
}
143144
}
@@ -149,7 +150,10 @@ public void cancel() {
149150

150151
@Override
151152
public int requestFusion(int mode) {
152-
return mode & SYNC;
153+
if ((mode & BOUNDARY) == 0) {
154+
return mode & SYNC;
155+
}
156+
return NONE;
153157
}
154158

155159
@Override
@@ -254,6 +258,7 @@ void slowpath(long r) {
254258
if (r == 0L) {
255259
break;
256260
}
261+
e = 0L;
257262
}
258263
}
259264
}
@@ -265,7 +270,10 @@ public void cancel() {
265270

266271
@Override
267272
public int requestFusion(int mode) {
268-
return mode & SYNC;
273+
if ((mode & BOUNDARY) == 0) {
274+
return mode & SYNC;
275+
}
276+
return NONE;
269277
}
270278

271279
@Override

src/main/java/hu/akarnokd/rxjava2/operators/FlowableRepeatScalar.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ void slowpath(long r) {
112112
if (r == 0L) {
113113
break;
114114
}
115+
e = 0L;
115116
}
116117
}
117118
}
@@ -207,6 +208,7 @@ void slowpath(long r) {
207208
if (r == 0L) {
208209
break;
209210
}
211+
e = 0L;
210212
}
211213
}
212214
}
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/*
2+
* Copyright 2016-2017 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package hu.akarnokd.rxjava2.operators;
18+
19+
import java.io.IOException;
20+
import java.util.concurrent.Callable;
21+
22+
import org.junit.Test;
23+
24+
import io.reactivex.Flowable;
25+
import io.reactivex.internal.functions.Functions;
26+
import io.reactivex.subscribers.TestSubscriber;
27+
28+
public class FlowableRepeatCallableTest {
29+
30+
@Test
31+
public void fastPathCrash() {
32+
Flowables.repeatCallable(new Callable<Object>() {
33+
@Override
34+
public Object call() throws Exception {
35+
throw new IOException();
36+
}
37+
})
38+
.test()
39+
.assertFailure(IOException.class);
40+
}
41+
42+
@Test
43+
public void slowPathCrash() {
44+
Flowables.repeatCallable(new Callable<Object>() {
45+
@Override
46+
public Object call() throws Exception {
47+
throw new IOException();
48+
}
49+
})
50+
.test(5)
51+
.assertFailure(IOException.class);
52+
}
53+
54+
@Test
55+
public void requestLimited() {
56+
Flowables.repeatCallable(new Callable<Object>() {
57+
@Override
58+
public Object call() throws Exception {
59+
return 1;
60+
}
61+
})
62+
.test(5)
63+
.assertValues(1, 1, 1, 1, 1)
64+
.assertNoErrors()
65+
.assertNotComplete();
66+
}
67+
68+
@Test
69+
public void take() {
70+
TestSubscriber<Object> ts = new TestSubscriber<Object>(2) {
71+
@Override
72+
public void onNext(Object t) {
73+
super.onNext(t);
74+
cancel();
75+
onComplete();
76+
}
77+
};
78+
79+
Flowables.repeatCallable(new Callable<Object>() {
80+
@Override
81+
public Object call() throws Exception {
82+
return 1;
83+
}
84+
})
85+
.subscribe(ts);
86+
87+
ts.assertResult(1);
88+
}
89+
90+
@Test
91+
public void fused() {
92+
Flowables.repeatCallable(new Callable<Object>() {
93+
@Override
94+
public Object call() throws Exception {
95+
return 1;
96+
}
97+
})
98+
.concatMap(Functions.justFunction(Flowable.just(2)))
99+
.take(5)
100+
.test()
101+
.assertResult(2, 2, 2, 2, 2);
102+
}
103+
104+
@Test
105+
public void fastPathCrashConditional() {
106+
Flowables.repeatCallable(new Callable<Object>() {
107+
@Override
108+
public Object call() throws Exception {
109+
throw new IOException();
110+
}
111+
})
112+
.filter(Functions.alwaysTrue())
113+
.test()
114+
.assertFailure(IOException.class);
115+
}
116+
117+
@Test
118+
public void slowPathCrashConditional() {
119+
Flowables.repeatCallable(new Callable<Object>() {
120+
@Override
121+
public Object call() throws Exception {
122+
throw new IOException();
123+
}
124+
})
125+
.filter(Functions.alwaysTrue())
126+
.test(5)
127+
.assertFailure(IOException.class);
128+
}
129+
130+
@Test
131+
public void requestLimitedConditional() {
132+
Flowables.repeatCallable(new Callable<Object>() {
133+
@Override
134+
public Object call() throws Exception {
135+
return 1;
136+
}
137+
})
138+
.filter(Functions.alwaysTrue())
139+
.test(5)
140+
.assertValues(1, 1, 1, 1, 1)
141+
.assertNoErrors()
142+
.assertNotComplete();
143+
}
144+
145+
@Test
146+
public void takeConditional() {
147+
TestSubscriber<Object> ts = new TestSubscriber<Object>(2) {
148+
@Override
149+
public void onNext(Object t) {
150+
super.onNext(t);
151+
cancel();
152+
onComplete();
153+
}
154+
};
155+
156+
Flowables.repeatCallable(new Callable<Object>() {
157+
@Override
158+
public Object call() throws Exception {
159+
return 1;
160+
}
161+
})
162+
.filter(Functions.alwaysTrue())
163+
.subscribe(ts);
164+
165+
ts.assertResult(1);
166+
}
167+
168+
@Test
169+
public void fusedConditional() {
170+
Flowables.repeatCallable(new Callable<Object>() {
171+
@Override
172+
public Object call() throws Exception {
173+
return 1;
174+
}
175+
})
176+
.filter(Functions.alwaysTrue())
177+
.concatMap(Functions.justFunction(Flowable.just(2)))
178+
.take(5)
179+
.test()
180+
.assertResult(2, 2, 2, 2, 2);
181+
}
182+
}

0 commit comments

Comments
 (0)