From 6e791f39736521e1dc306c4b1c30811a9a53c231 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Einar=20Nor=C3=B0fj=C3=B6r=C3=B0?= Date: Tue, 30 Jan 2024 22:48:27 -0500 Subject: [PATCH 1/2] Fix ending a stream while updating --- .gitignore | 2 +- lib/index.js | 6 ++++++ test/delayed-stream-end.js | 40 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 47 insertions(+), 1 deletion(-) create mode 100644 test/delayed-stream-end.js diff --git a/.gitignore b/.gitignore index f2e1cb0..3d69158 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,4 @@ logs # Dependency directory node_modules bower_components - +.idea diff --git a/lib/index.js b/lib/index.js index bedbbb8..5e6afdd 100644 --- a/lib/index.js +++ b/lib/index.js @@ -10,6 +10,7 @@ function trueFn() { return true; } // Globals var toUpdate = []; +var toEnd = []; var inStream; var order = []; var orderNextIdx = -1; @@ -696,6 +697,9 @@ function flushUpdate() { var nextUpdateFn = stream.updaters.shift(); if (nextUpdateFn && stream.shouldUpdate) nextUpdateFn(stream); } + while (toEnd.length > 0) { + toEnd.shift()(true); + } flushingUpdateQueue = false; } @@ -715,6 +719,8 @@ function updateStreamValue(n, s) { flushingStreamValue = false; } else if (inStream === s) { markListeners(s, s.listeners); + } else if (inStream.end === s) { + toEnd.push(inStream.end); } else { updateLaterUsing(function(s) { updateStreamValue(n, s); }, s); } diff --git a/test/delayed-stream-end.js b/test/delayed-stream-end.js new file mode 100644 index 0000000..297caf8 --- /dev/null +++ b/test/delayed-stream-end.js @@ -0,0 +1,40 @@ +var assert = require('assert'); +var R = require('ramda'); + +var flyd = require('../lib'); + +function once(s) { + return flyd.combine(function(s, self) { + self(s.val) + self.end(true) + }, [s]) +} +function withLatestFrom() { + var streams = arguments + return flyd.combine(function() { + var self = arguments[arguments.length - 2] + var result = [] + for (var i = 0; i < streams.length; ++i) { + if (!streams[i].hasVal) return + result.push(streams[i].val) + } + self(result) + }, streams) +} + +describe('ending a stream', function() { + it('delays ending the current stream until dependents have been updated', function() { + var stream = flyd.stream(1) + function doubled() { return stream.map(R.multiply(2)) } + var count = 0; + stream + .map(function() { + withLatestFrom(doubled(), doubled()) + .pipe(once) + .map(function() { + count++ + }) + }) + assert.equal(count, 1) + }) +}) From 7e5a6b6c554028a5ea4a2989701f5fcccd5f62aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Einar=20Nor=C3=B0fj=C3=B6r=C3=B0?= Date: Mon, 5 Feb 2024 15:10:54 -0500 Subject: [PATCH 2/2] simplify test --- test/delayed-stream-end.js | 60 +++++++++++++++----------------------- 1 file changed, 23 insertions(+), 37 deletions(-) diff --git a/test/delayed-stream-end.js b/test/delayed-stream-end.js index 297caf8..671ebad 100644 --- a/test/delayed-stream-end.js +++ b/test/delayed-stream-end.js @@ -1,40 +1,26 @@ -var assert = require('assert'); -var R = require('ramda'); +var assert = require("assert"); +var R = require("ramda"); -var flyd = require('../lib'); +var flyd = require("../lib"); -function once(s) { - return flyd.combine(function(s, self) { - self(s.val) - self.end(true) - }, [s]) -} -function withLatestFrom() { - var streams = arguments - return flyd.combine(function() { - var self = arguments[arguments.length - 2] - var result = [] - for (var i = 0; i < streams.length; ++i) { - if (!streams[i].hasVal) return - result.push(streams[i].val) - } - self(result) - }, streams) -} - -describe('ending a stream', function() { - it('delays ending the current stream until dependents have been updated', function() { - var stream = flyd.stream(1) - function doubled() { return stream.map(R.multiply(2)) } +describe("ending a stream", function () { + it("delays ending the current stream until dependents have been updated", function () { + var stream = flyd.stream(1); var count = 0; - stream - .map(function() { - withLatestFrom(doubled(), doubled()) - .pipe(once) - .map(function() { - count++ - }) - }) - assert.equal(count, 1) - }) -}) + stream.map(function () { + var s1 = stream.map(R.add(1)); + var s2 = stream.map(R.add(2)); + flyd + .combine( + function (s1, s2, self) { + self(s1() + s2()); + self.end(true); + }, + [s1, s2], + ) + // was not called prior to #229 + .map(function () { count++ }); + }); + assert.equal(count, 1); + }); +});