Skip to content

Commit 3738fcb

Browse files
committed
stream: add kStreamBase marker for internal pipe optimization
Signed-off-by: Mert Can Altin <mertgold60@gmail.com>
1 parent 546b596 commit 3738fcb

9 files changed

Lines changed: 338 additions & 23 deletions

File tree

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
'use strict';
2+
const common = require('../common.js');
3+
const fsp = require('fs/promises');
4+
const path = require('path');
5+
const os = require('os');
6+
const { pipeline } = require('stream/promises');
7+
const {
8+
ReadableStream,
9+
WritableStream,
10+
} = require('node:stream/web');
11+
12+
const bench = common.createBenchmark(main, {
13+
type: [
14+
'node-streams',
15+
'webstream-js',
16+
'webstream-file-read',
17+
],
18+
size: [1024, 16384, 65536],
19+
n: [1e4, 1e5],
20+
});
21+
22+
async function main({ type, size, n }) {
23+
const chunk = Buffer.alloc(size, 'x');
24+
const totalBytes = size * n;
25+
26+
switch (type) {
27+
case 'node-streams': {
28+
// Baseline: Node.js streams
29+
let received = 0;
30+
const readable = new (require('stream').Readable)({
31+
read() {
32+
for (let i = 0; i < 100 && received < n; i++) {
33+
this.push(chunk);
34+
received++;
35+
}
36+
if (received >= n) this.push(null);
37+
},
38+
});
39+
40+
const writable = new (require('stream').Writable)({
41+
write(data, enc, cb) { cb(); },
42+
});
43+
44+
bench.start();
45+
await pipeline(readable, writable);
46+
bench.end(totalBytes);
47+
break;
48+
}
49+
50+
case 'webstream-js': {
51+
// Web streams with pure JS source/sink
52+
let sent = 0;
53+
const rs = new ReadableStream({
54+
pull(controller) {
55+
if (sent++ < n) {
56+
controller.enqueue(chunk);
57+
} else {
58+
controller.close();
59+
}
60+
},
61+
});
62+
63+
const ws = new WritableStream({
64+
write() {},
65+
close() { bench.end(totalBytes); },
66+
});
67+
68+
bench.start();
69+
await rs.pipeTo(ws);
70+
break;
71+
}
72+
73+
case 'webstream-file-read': {
74+
// Create a temporary file with test data
75+
const tmpDir = os.tmpdir();
76+
const tmpFile = path.join(tmpDir, `bench-webstream-${process.pid}.tmp`);
77+
78+
// Write test data to file
79+
const fd = await fsp.open(tmpFile, 'w');
80+
for (let i = 0; i < n; i++) {
81+
await fd.write(chunk);
82+
}
83+
await fd.close();
84+
85+
// Read using readableWebStream
86+
const readFd = await fsp.open(tmpFile, 'r');
87+
const rs = readFd.readableWebStream({ type: 'bytes' });
88+
89+
const ws = new WritableStream({
90+
write() {},
91+
close() {
92+
bench.end(totalBytes);
93+
// Cleanup
94+
readFd.close().then(() => fsp.unlink(tmpFile));
95+
},
96+
});
97+
98+
bench.start();
99+
await rs.pipeTo(ws);
100+
break;
101+
}
102+
103+
default:
104+
throw new Error(`Unknown type: ${type}`);
105+
}
106+
}

lib/internal/test_runner/reporter/junit.js

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,6 @@ const {
55
ArrayPrototypeMap,
66
ArrayPrototypePush,
77
ArrayPrototypeSome,
8-
Date,
9-
DateNow,
10-
DatePrototypeToISOString,
118
NumberPrototypeToFixed,
129
ObjectEntries,
1310
RegExpPrototypeSymbolReplace,
@@ -115,11 +112,6 @@ module.exports = async function* junitReporter(source) {
115112
currentTest.attrs.tests = nonCommentChildren.length;
116113
currentTest.attrs.failures = ArrayPrototypeFilter(currentTest.children, isFailure).length;
117114
currentTest.attrs.skipped = ArrayPrototypeFilter(currentTest.children, isSkipped).length;
118-
// A suite's `test:start` is emitted lazily (when its first subtest
119-
// reports), so derive the start time from the end minus the measured
120-
// duration rather than stamping the (late) test:start moment.
121-
currentTest.attrs.timestamp =
122-
DatePrototypeToISOString(new Date(DateNow() - event.data.details.duration_ms));
123115
currentTest.attrs.hostname = HOSTNAME;
124116
} else {
125117
currentTest.tag = 'testcase';

lib/internal/webstreams/adapters.js

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ const {
3535
ByteLengthQueuingStrategy,
3636
} = require('internal/webstreams/queuingstrategies');
3737

38+
const {
39+
kStreamBase,
40+
} = require('internal/webstreams/util');
41+
3842
const {
3943
Writable,
4044
Readable,
@@ -1004,7 +1008,7 @@ function newWritableStreamFromStreamBase(streamBase, strategy) {
10041008
return promise.promise;
10051009
}
10061010

1007-
return new WritableStream({
1011+
const stream = new WritableStream({
10081012
write(chunk, controller) {
10091013
current = current !== undefined ?
10101014
PromisePrototypeThen(
@@ -1025,6 +1029,10 @@ function newWritableStreamFromStreamBase(streamBase, strategy) {
10251029
return promise.promise;
10261030
},
10271031
}, strategy);
1032+
1033+
stream[kStreamBase] = streamBase;
1034+
1035+
return stream;
10281036
}
10291037

10301038
/**
@@ -1075,7 +1083,7 @@ function newReadableStreamFromStreamBase(streamBase, strategy, options = kEmptyO
10751083
}
10761084
};
10771085

1078-
return new ReadableStream({
1086+
const stream = new ReadableStream({
10791087
start(c) { controller = c; },
10801088

10811089
pull() {
@@ -1098,6 +1106,10 @@ function newReadableStreamFromStreamBase(streamBase, strategy, options = kEmptyO
10981106
return promise.promise;
10991107
},
11001108
}, strategy);
1109+
1110+
stream[kStreamBase] = streamBase;
1111+
1112+
return stream;
11011113
}
11021114

11031115
module.exports = {

lib/internal/webstreams/readablestream.js

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ const {
3232

3333
const {
3434
AbortError,
35+
ErrnoException,
3536
codes: {
3637
ERR_ARG_NOT_ITERABLE,
3738
ERR_ILLEGAL_CONSTRUCTOR,
@@ -51,6 +52,13 @@ const {
5152
markPromiseAsHandled,
5253
} = internalBinding('util');
5354

55+
const {
56+
kReadBytesOrError,
57+
streamBaseState,
58+
} = internalBinding('stream_wrap');
59+
60+
const { UV_EOF } = internalBinding('uv');
61+
5462
const {
5563
isArrayBufferView,
5664
isDataView,
@@ -123,6 +131,7 @@ const {
123131
resetQueue,
124132
resolvedRecord,
125133
setPromiseHandled,
134+
kStreamBase,
126135
} = require('internal/webstreams/util');
127136

128137
const {
@@ -134,6 +143,7 @@ const {
134143
isWritableStreamDefaultWriter,
135144

136145
writableStreamAbort,
146+
writableStreamClose,
137147
writableStreamCloseQueuedOrInFlight,
138148
writableStreamDefaultWriterCloseWithErrorPropagation,
139149
writableStreamDefaultWriterRelease,
@@ -1462,6 +1472,83 @@ function readableStreamPipeTo(
14621472
preventCancel,
14631473
signal) {
14641474

1475+
const sourceStreamBase = source?.[kStreamBase];
1476+
const destStreamBase = dest?.[kStreamBase];
1477+
1478+
if (sourceStreamBase !== undefined &&
1479+
destStreamBase !== undefined &&
1480+
signal === undefined &&
1481+
!preventClose &&
1482+
!preventAbort &&
1483+
!preventCancel) {
1484+
// Native C++ StreamPipe path for internal-to-internal piping.
1485+
// Ref: https://github.com/nodejs/performance/issues/134
1486+
const promise = PromiseWithResolvers();
1487+
1488+
source[kState].disturbed = true;
1489+
1490+
let pipeError = null;
1491+
let isComplete = false;
1492+
const originalSourceOnread = sourceStreamBase.onread;
1493+
1494+
sourceStreamBase.onread = (arrayBuffer) => {
1495+
const nread = streamBaseState[kReadBytesOrError];
1496+
if (nread < 0 && nread !== UV_EOF) {
1497+
pipeError = new ErrnoException(nread, 'read');
1498+
}
1499+
if (originalSourceOnread) {
1500+
return originalSourceOnread(arrayBuffer);
1501+
}
1502+
};
1503+
1504+
function finalize(error) {
1505+
if (isComplete) return;
1506+
isComplete = true;
1507+
sourceStreamBase.onread = originalSourceOnread;
1508+
1509+
if (error) {
1510+
if (source[kState].state === 'readable') {
1511+
readableStreamError(source, error);
1512+
}
1513+
if (dest[kState].state === 'writable') {
1514+
writableStreamAbort(dest, error);
1515+
}
1516+
promise.reject(error);
1517+
} else {
1518+
if (source[kState].state === 'readable') {
1519+
readableStreamClose(source);
1520+
}
1521+
if (dest[kState].state === 'writable' &&
1522+
!writableStreamCloseQueuedOrInFlight(dest)) {
1523+
PromisePrototypeThen(
1524+
writableStreamClose(dest),
1525+
() => promise.resolve(),
1526+
(err) => promise.reject(err),
1527+
);
1528+
} else {
1529+
promise.resolve();
1530+
}
1531+
}
1532+
}
1533+
1534+
try {
1535+
const pipe = new StreamPipe(sourceStreamBase, destStreamBase);
1536+
pipe.onunpipe = () => {
1537+
if (pipeError) {
1538+
finalize(pipeError);
1539+
}
1540+
};
1541+
pipe.oncomplete = () => finalize(pipeError);
1542+
pipe.start();
1543+
} catch (error) {
1544+
sourceStreamBase.onread = originalSourceOnread;
1545+
promise.reject(error);
1546+
}
1547+
1548+
return promise.promise;
1549+
}
1550+
1551+
// Use JS-based piping
14651552
let reader;
14661553
let writer;
14671554
let disposable;

lib/internal/webstreams/util.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ const {
4747

4848
const kState = Symbol('kState');
4949
const kType = Symbol('kType');
50+
const kStreamBase = Symbol('kStreamBase');
5051

5152
const AsyncIterator = {
5253
__proto__: AsyncIteratorPrototype,
@@ -280,4 +281,5 @@ module.exports = {
280281
resetQueue,
281282
resolvedRecord,
282283
setPromiseHandled,
284+
kStreamBase,
283285
};

test/common/assertSnapshot.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,6 @@ function replaceJunitDuration(str) {
226226
.replaceAll(/time="[0-9.]+"/g, 'time="*"')
227227
.replaceAll(/duration_ms [0-9.]+/g, 'duration_ms *')
228228
.replaceAll(`hostname="${hostname()}"`, 'hostname="HOSTNAME"')
229-
.replaceAll(/timestamp="[^"]*"/g, 'timestamp="*"')
230229
.replaceAll(/file="[^"]*"/g, 'file="*"');
231230
}
232231

test/fixtures/test-runner/output/junit_reporter.snapshot

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ true !== false
128128
<testcase name="immediate throw - passes but warns" time="*" classname="test" file="*"/>
129129
<testcase name="immediate reject - passes but warns" time="*" classname="test" file="*"/>
130130
<testcase name="immediate resolve pass" time="*" classname="test" file="*"/>
131-
<testsuite name="subtest sync throw fail" time="*" disabled="0" errors="0" tests="1" failures="1" skipped="0" timestamp="*" hostname="HOSTNAME">
131+
<testsuite name="subtest sync throw fail" time="*" disabled="0" errors="0" tests="1" failures="1" skipped="0" hostname="HOSTNAME">
132132
<testcase name="+sync throw fail" time="*" classname="test" file="*" failure="thrown from subtest sync throw fail">
133133
<failure type="testCodeFailure" message="thrown from subtest sync throw fail">
134134
Error [ERR_TEST_FAILURE]: thrown from subtest sync throw fail
@@ -151,15 +151,15 @@ Error [ERR_TEST_FAILURE]: thrown from subtest sync throw fail
151151
[Error [ERR_TEST_FAILURE]: Symbol(thrown symbol from sync throw non-error fail)] { code: 'ERR_TEST_FAILURE', failureType: 'testCodeFailure', cause: Symbol(thrown symbol from sync throw non-error fail) }
152152
</failure>
153153
</testcase>
154-
<testsuite name="level 0a" time="*" disabled="0" errors="0" tests="4" failures="0" skipped="0" timestamp="*" hostname="HOSTNAME">
154+
<testsuite name="level 0a" time="*" disabled="0" errors="0" tests="4" failures="0" skipped="0" hostname="HOSTNAME">
155155
<testcase name="level 1a" time="*" classname="test" file="*"/>
156156
<testcase name="level 1b" time="*" classname="test" file="*"/>
157157
<testcase name="level 1c" time="*" classname="test" file="*"/>
158158
<testcase name="level 1d" time="*" classname="test" file="*"/>
159159
</testsuite>
160-
<testsuite name="top level" time="*" disabled="0" errors="0" tests="2" failures="0" skipped="0" timestamp="*" hostname="HOSTNAME">
160+
<testsuite name="top level" time="*" disabled="0" errors="0" tests="2" failures="0" skipped="0" hostname="HOSTNAME">
161161
<testcase name="+long running" time="*" classname="test" file="*"/>
162-
<testsuite name="+short running" time="*" disabled="0" errors="0" tests="1" failures="0" skipped="0" timestamp="*" hostname="HOSTNAME">
162+
<testsuite name="+short running" time="*" disabled="0" errors="0" tests="1" failures="0" skipped="0" hostname="HOSTNAME">
163163
<testcase name="++short running" time="*" classname="test" file="*"/>
164164
</testsuite>
165165
</testsuite>
@@ -266,7 +266,7 @@ Error [ERR_TEST_FAILURE]: thrown from callback async throw
266266
</failure>
267267
</testcase>
268268
<testcase name="callback async throw after done" time="*" classname="test" file="*"/>
269-
<testsuite name="only is set on subtests but not in only mode" time="*" disabled="0" errors="0" tests="3" failures="0" skipped="0" timestamp="*" hostname="HOSTNAME">
269+
<testsuite name="only is set on subtests but not in only mode" time="*" disabled="0" errors="0" tests="3" failures="0" skipped="0" hostname="HOSTNAME">
270270
<testcase name="running subtest 1" time="*" classname="test" file="*"/>
271271
<testcase name="running subtest 3" time="*" classname="test" file="*"/>
272272
<testcase name="running subtest 4" time="*" classname="test" file="*"/>
@@ -288,7 +288,7 @@ Error [ERR_TEST_FAILURE]: thrown from callback async throw
288288
}
289289
</failure>
290290
</testcase>
291-
<testsuite name="subtest sync throw fails" time="*" disabled="0" errors="0" tests="2" failures="2" skipped="0" timestamp="*" hostname="HOSTNAME">
291+
<testsuite name="subtest sync throw fails" time="*" disabled="0" errors="0" tests="2" failures="2" skipped="0" hostname="HOSTNAME">
292292
<testcase name="sync throw fails at first" time="*" classname="test" file="*" failure="thrown from subtest sync throw fails at first">
293293
<failure type="testCodeFailure" message="thrown from subtest sync throw fails at first">
294294
Error [ERR_TEST_FAILURE]: thrown from subtest sync throw fails at first

test/parallel/test-runner-reporters.js

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -200,12 +200,6 @@ describe('node:test reporters', { concurrency: true }, () => {
200200
assert.strictEqual(child.stdout.toString(), '');
201201
const fileContents = fs.readFileSync(file, 'utf8');
202202
assert.match(fileContents, /<testsuite .*name="nested".*tests="2".*failures="1".*skipped="0".*>/);
203-
// The exact timestamp format is intentionally not pinned here (still under
204-
// discussion); assert only that the value is present and a real date.
205-
const { 1: timestamp } = fileContents.match(/<testsuite [^>]*timestamp="([^"]+)"/) ?? [];
206-
assert.ok(timestamp, 'testsuite should have a timestamp attribute');
207-
assert.match(timestamp, /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}/);
208-
assert.ok(!Number.isNaN(Date.parse(timestamp)), `expected a valid date, got ${timestamp}`);
209203
assert.match(fileContents, /<testcase .*name="failing".*>\s*<failure .*type="testCodeFailure".*message="error".*>/);
210204
assert.match(fileContents, /<testcase .*name="ok".*classname="test".*\/>/);
211205
assert.match(fileContents, /<testcase .*name="top level".*classname="test".*\/>/);

0 commit comments

Comments
 (0)