Skip to content

Commit db5272a

Browse files
committed
patch: delay setImmediate until after timeouts
1 parent a67fe1d commit db5272a

File tree

4 files changed

+252
-1
lines changed

4 files changed

+252
-1
lines changed

packages/next/errors.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -913,5 +913,6 @@
913913
"912": "Unexpected stream chunk while in Before stage",
914914
"913": "getFlightStream should always receive a ReadableStream when using the edge runtime",
915915
"914": "nodeStreamFromReadableStream cannot be used in the edge runtime",
916-
"915": "createNodeStreamFromChunks cannot be used in the edge runtime"
916+
"915": "createNodeStreamFromChunks cannot be used in the edge runtime",
917+
"916": "The \"callback\" argument must be of type function. Received undefined"
917918
}

packages/next/src/server/app-render/app-render-render-utils.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
import { InvariantError } from '../../shared/lib/invariant-error'
2+
import {
3+
startBufferingImmediates,
4+
stopBufferingImmediates,
5+
} from './buffered-set-immediate.external'
26

37
/**
48
* This is a utility function to make scheduling sequential tasks that run back to back easier.
@@ -16,13 +20,15 @@ export function scheduleInSequentialTasks<R>(
1620
return new Promise((resolve, reject) => {
1721
let pendingResult: R | Promise<R>
1822
setTimeout(() => {
23+
startBufferingImmediates()
1924
try {
2025
pendingResult = render()
2126
} catch (err) {
2227
reject(err)
2328
}
2429
}, 0)
2530
setTimeout(() => {
31+
stopBufferingImmediates()
2632
followup()
2733
resolve(pendingResult)
2834
}, 0)
@@ -48,6 +54,7 @@ export function pipelineInSequentialTasks<A, B, C>(
4854
return new Promise((resolve, reject) => {
4955
let oneResult: A
5056
setTimeout(() => {
57+
startBufferingImmediates()
5158
try {
5259
oneResult = one()
5360
} catch (err) {
@@ -85,6 +92,7 @@ export function pipelineInSequentialTasks<A, B, C>(
8592

8693
// We wait a task before resolving/rejecting
8794
const fourId = setTimeout(() => {
95+
stopBufferingImmediates()
8896
resolve(threeResult)
8997
}, 0)
9098
})
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
import { promisify } from 'node:util'
2+
3+
let isEnabled = false
4+
const bufferedImmediatesQueue: QueueItem[] = []
5+
6+
const originalSetImmediate = globalThis.setImmediate
7+
const originalClearImmediate = globalThis.clearImmediate
8+
9+
export function install() {
10+
globalThis.setImmediate =
11+
// Workaround for missing __promisify__ which is not a real property
12+
patchedSetImmediate as unknown as typeof setImmediate
13+
globalThis.clearImmediate = patchedClearImmediate
14+
}
15+
16+
export function startBufferingImmediates() {
17+
isEnabled = true
18+
}
19+
20+
export function stopBufferingImmediates() {
21+
if (!isEnabled) {
22+
return
23+
}
24+
isEnabled = false
25+
26+
// Now, we actually schedule the immediates that we queued for later
27+
scheduleBufferedImmediates()
28+
}
29+
30+
function scheduleBufferedImmediates() {
31+
for (const queueItem of bufferedImmediatesQueue) {
32+
if (queueItem.isCleared) {
33+
continue
34+
}
35+
const { immediateObject, callback, args, hasRef } = queueItem
36+
const nativeImmediateObject = args
37+
? originalSetImmediate(callback, ...args)
38+
: originalSetImmediate(callback)
39+
40+
// Mirror unref() calls
41+
if (!hasRef) {
42+
nativeImmediateObject.unref()
43+
}
44+
45+
// Now that we're no longer buffering the immediate,
46+
// make the BufferedImmediate proxy calls to the native object instead
47+
immediateObject[INTERNALS].queueItem = null
48+
immediateObject[INTERNALS].nativeImmediate = nativeImmediateObject
49+
clearQueueItem(queueItem)
50+
}
51+
bufferedImmediatesQueue.length = 0
52+
}
53+
54+
type QueueItem = ActiveQueueItem | ClearedQueueItem
55+
type ActiveQueueItem = {
56+
isCleared: false
57+
callback: (...args: any[]) => any
58+
args: any[] | null
59+
hasRef: boolean
60+
immediateObject: BufferedImmediate
61+
}
62+
type ClearedQueueItem = {
63+
isCleared: true
64+
callback: null
65+
args: null
66+
hasRef: null
67+
immediateObject: null
68+
}
69+
70+
function clearQueueItem(originalQueueItem: QueueItem) {
71+
const queueItem = originalQueueItem as ClearedQueueItem
72+
queueItem.isCleared = true
73+
queueItem.callback = null
74+
queueItem.args = null
75+
queueItem.hasRef = null
76+
queueItem.immediateObject = null
77+
}
78+
79+
//========================================================
80+
81+
function patchedSetImmediate<TArgs extends any[]>(
82+
callback: (...args: TArgs) => void,
83+
...args: TArgs
84+
): NodeJS.Immediate
85+
function patchedSetImmediate(callback: (args: void) => void): NodeJS.Immediate
86+
function patchedSetImmediate(): NodeJS.Immediate {
87+
if (!isEnabled) {
88+
return originalSetImmediate.apply(
89+
null,
90+
// @ts-expect-error: this is valid, but typescript doesn't get it
91+
arguments
92+
)
93+
}
94+
95+
if (arguments.length === 0) {
96+
const error = new TypeError(
97+
'The "callback" argument must be of type function. Received undefined'
98+
)
99+
;(error as any).code = 'ERR_INVALID_ARG_TYPE'
100+
throw error
101+
}
102+
103+
const callback: (...args: any[]) => any = arguments[0]
104+
let args: any[] | null =
105+
arguments.length > 1 ? Array.prototype.slice.call(arguments, 1) : null
106+
107+
const immediateObject = new BufferedImmediate()
108+
109+
const queueItem: ActiveQueueItem = {
110+
isCleared: false,
111+
callback,
112+
args,
113+
hasRef: true,
114+
immediateObject,
115+
}
116+
bufferedImmediatesQueue.push(queueItem)
117+
118+
immediateObject[INTERNALS].queueItem = queueItem
119+
120+
return immediateObject
121+
}
122+
123+
function patchedSetImmediatePromisify<T = void>(
124+
value: T,
125+
options?: import('node:timers').TimerOptions
126+
): Promise<T> {
127+
if (!isEnabled) {
128+
const originalPromisify: (typeof setImmediate)['__promisify__'] =
129+
// @ts-expect-error: the types for `promisify.custom` are strange
130+
originalSetImmediate[promisify.custom]
131+
return originalPromisify(value, options)
132+
}
133+
134+
return new Promise<T>((resolve, reject) => {
135+
// The abort signal makes the promise reject.
136+
// If it is already aborted, we reject immediately.
137+
const signal = options?.signal
138+
if (signal && signal.aborted) {
139+
return reject(signal.reason)
140+
}
141+
142+
const immediate = patchedSetImmediate(resolve, value)
143+
if (options?.ref === false) {
144+
immediate.unref()
145+
}
146+
147+
if (signal) {
148+
signal.addEventListener(
149+
'abort',
150+
() => {
151+
patchedClearImmediate(immediate)
152+
reject(signal.reason)
153+
},
154+
{ once: true }
155+
)
156+
}
157+
})
158+
}
159+
160+
patchedSetImmediate[promisify.custom] = patchedSetImmediatePromisify
161+
162+
const patchedClearImmediate = (
163+
immediateObject: NodeJS.Immediate | undefined
164+
) => {
165+
if (immediateObject && INTERNALS in immediateObject) {
166+
;(immediateObject as BufferedImmediate)[Symbol.dispose]()
167+
} else {
168+
originalClearImmediate(immediateObject)
169+
}
170+
}
171+
172+
//========================================================
173+
174+
const INTERNALS: unique symbol = Symbol.for('next.Immediate.internals')
175+
176+
type QueuedImmediateInternals =
177+
| {
178+
queueItem: ActiveQueueItem | null
179+
nativeImmediate: null
180+
}
181+
| {
182+
queueItem: null
183+
nativeImmediate: NodeJS.Immediate
184+
}
185+
186+
/** Makes sure that we're implementing all the public `Immediate` methods */
187+
interface NativeImmediate extends NodeJS.Immediate {}
188+
189+
/** Implements a shim for the native `Immediate` class returned by `setImmediate` */
190+
class BufferedImmediate implements NativeImmediate {
191+
[INTERNALS]: QueuedImmediateInternals = {
192+
queueItem: null,
193+
nativeImmediate: null,
194+
}
195+
hasRef() {
196+
const internals = this[INTERNALS]
197+
if (internals.queueItem) {
198+
return internals.queueItem.hasRef
199+
} else if (internals.nativeImmediate) {
200+
return internals.nativeImmediate.hasRef()
201+
} else {
202+
return false
203+
}
204+
}
205+
ref() {
206+
const internals = this[INTERNALS]
207+
if (internals.queueItem) {
208+
internals.queueItem.hasRef = true
209+
} else if (internals.nativeImmediate) {
210+
internals.nativeImmediate.ref()
211+
}
212+
return this
213+
}
214+
unref() {
215+
const internals = this[INTERNALS]
216+
if (internals.queueItem) {
217+
internals.queueItem.hasRef = false
218+
} else if (internals.nativeImmediate) {
219+
internals.nativeImmediate.unref()
220+
}
221+
return this
222+
}
223+
224+
// TODO: is this just a noop marker?
225+
_onImmediate() {}
226+
227+
[Symbol.dispose]() {
228+
// This is equivalent to `clearImmediate`.
229+
const internals = this[INTERNALS]
230+
if (internals.queueItem) {
231+
// this is still queued. drop it.
232+
const queueItem = internals.queueItem
233+
internals.queueItem = null
234+
clearQueueItem(queueItem)
235+
} else if (internals.nativeImmediate) {
236+
// If we executed the queue, and we have a native immediate.
237+
originalClearImmediate(internals.nativeImmediate)
238+
}
239+
}
240+
}

packages/next/src/server/node-environment.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,5 @@ import './node-environment-extensions/random'
1717
import './node-environment-extensions/date'
1818
import './node-environment-extensions/web-crypto'
1919
import './node-environment-extensions/node-crypto'
20+
import { install } from './app-render/buffered-set-immediate.external'
21+
install()

0 commit comments

Comments
 (0)