Skip to content

Commit 84df52d

Browse files
committed
Fix ending a stream while updating
1 parent cbb5aec commit 84df52d

File tree

3 files changed

+47
-1
lines changed

3 files changed

+47
-1
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@ logs
66
# Dependency directory
77
node_modules
88
bower_components
9-
9+
.idea

lib/index.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ function trueFn() { return true; }
1010

1111
// Globals
1212
var toUpdate = [];
13+
var toEnd = [];
1314
var inStream;
1415
var order = [];
1516
var orderNextIdx = -1;
@@ -696,6 +697,9 @@ function flushUpdate() {
696697
var nextUpdateFn = stream.updaters.shift();
697698
if (nextUpdateFn && stream.shouldUpdate) nextUpdateFn(stream);
698699
}
700+
while (toEnd.length > 0) {
701+
toEnd.shift()(true);
702+
}
699703
flushingUpdateQueue = false;
700704
}
701705

@@ -715,6 +719,8 @@ function updateStreamValue(n, s) {
715719
flushingStreamValue = false;
716720
} else if (inStream === s) {
717721
markListeners(s, s.listeners);
722+
} else if (inStream.end === s) {
723+
toEnd.push(inStream.end)
718724
} else {
719725
updateLaterUsing(function(s) { updateStreamValue(n, s); }, s);
720726
}

test/delayed-stream-end.js

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
var assert = require('assert');
2+
var R = require('ramda');
3+
4+
var flyd = require('../lib');
5+
6+
function once(s) {
7+
return flyd.combine(function(s, self) {
8+
self(s.val)
9+
self.end(true)
10+
}, [s])
11+
}
12+
function withLatestFrom() {
13+
var streams = arguments
14+
return flyd.combine(function() {
15+
var self = arguments[arguments.length - 2]
16+
var result = []
17+
for (var i = 0; i < streams.length; ++i) {
18+
if (!streams[i].hasVal) return
19+
result.push(streams[i].val)
20+
}
21+
self(result)
22+
}, streams)
23+
}
24+
25+
describe('ending a stream', function() {
26+
it('delays ending the current stream until dependents have been updated', function() {
27+
var stream = flyd.stream(1)
28+
function doubled() { return stream.map(R.multiply(2)) }
29+
var count = 0;
30+
stream
31+
.map(function() {
32+
withLatestFrom(doubled(), doubled())
33+
.pipe(once)
34+
.map(function() {
35+
count++
36+
})
37+
})
38+
assert.equal(count, 1)
39+
})
40+
})

0 commit comments

Comments
 (0)