Skip to content

Commit 454dbe5

Browse files
committed
patch: delay setImmediate until after timeouts
1 parent a67fe1d commit 454dbe5

File tree

4 files changed

+253
-1
lines changed

4 files changed

+253
-1
lines changed

packages/next/errors.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -913,5 +913,6 @@
913913
"912": "Unexpected stream chunk while in Before stage",
914914
"913": "getFlightStream should always receive a ReadableStream when using the edge runtime",
915915
"914": "nodeStreamFromReadableStream cannot be used in the edge runtime",
916-
"915": "createNodeStreamFromChunks cannot be used in the edge runtime"
916+
"915": "createNodeStreamFromChunks cannot be used in the edge runtime",
917+
"916": "The \"callback\" argument must be of type function. Received %s"
917918
}

packages/next/src/server/app-render/app-render-render-utils.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
import { InvariantError } from '../../shared/lib/invariant-error'
2+
import {
3+
startBufferingImmediates,
4+
stopBufferingImmediates,
5+
} from './buffered-set-immediate.external'
26

37
/**
48
* This is a utility function to make scheduling sequential tasks that run back to back easier.
@@ -16,13 +20,15 @@ export function scheduleInSequentialTasks<R>(
1620
return new Promise((resolve, reject) => {
1721
let pendingResult: R | Promise<R>
1822
setTimeout(() => {
23+
startBufferingImmediates()
1924
try {
2025
pendingResult = render()
2126
} catch (err) {
2227
reject(err)
2328
}
2429
}, 0)
2530
setTimeout(() => {
31+
stopBufferingImmediates()
2632
followup()
2733
resolve(pendingResult)
2834
}, 0)
@@ -48,6 +54,7 @@ export function pipelineInSequentialTasks<A, B, C>(
4854
return new Promise((resolve, reject) => {
4955
let oneResult: A
5056
setTimeout(() => {
57+
startBufferingImmediates()
5158
try {
5259
oneResult = one()
5360
} catch (err) {
@@ -85,6 +92,7 @@ export function pipelineInSequentialTasks<A, B, C>(
8592

8693
// We wait a task before resolving/rejecting
8794
const fourId = setTimeout(() => {
95+
stopBufferingImmediates()
8896
resolve(threeResult)
8997
}, 0)
9098
})
Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
import { promisify } from 'node:util'
2+
3+
let isEnabled = false
4+
const bufferedImmediatesQueue: QueueItem[] = []
5+
6+
const originalSetImmediate = globalThis.setImmediate
7+
const originalClearImmediate = globalThis.clearImmediate
8+
9+
export function install() {
10+
globalThis.setImmediate =
11+
// Workaround for missing __promisify__ which is not a real property
12+
patchedSetImmediate as unknown as typeof setImmediate
13+
globalThis.clearImmediate = patchedClearImmediate
14+
}
15+
16+
export function startBufferingImmediates() {
17+
isEnabled = true
18+
}
19+
20+
export function stopBufferingImmediates() {
21+
if (!isEnabled) {
22+
return
23+
}
24+
isEnabled = false
25+
26+
// Now, we actually schedule the immediates that we queued for later
27+
scheduleBufferedImmediates()
28+
}
29+
30+
function scheduleBufferedImmediates() {
31+
for (const queueItem of bufferedImmediatesQueue) {
32+
if (queueItem.isCleared) {
33+
continue
34+
}
35+
const { immediateObject, callback, args, hasRef } = queueItem
36+
const nativeImmediateObject = args
37+
? originalSetImmediate(callback, ...args)
38+
: originalSetImmediate(callback)
39+
40+
// Mirror unref() calls
41+
if (!hasRef) {
42+
nativeImmediateObject.unref()
43+
}
44+
45+
// Now that we're no longer buffering the immediate,
46+
// make the BufferedImmediate proxy calls to the native object instead
47+
immediateObject[INTERNALS].queueItem = null
48+
immediateObject[INTERNALS].nativeImmediate = nativeImmediateObject
49+
clearQueueItem(queueItem)
50+
}
51+
bufferedImmediatesQueue.length = 0
52+
}
53+
54+
type QueueItem = ActiveQueueItem | ClearedQueueItem
55+
type ActiveQueueItem = {
56+
isCleared: false
57+
callback: (...args: any[]) => any
58+
args: any[] | null
59+
hasRef: boolean
60+
immediateObject: BufferedImmediate
61+
}
62+
type ClearedQueueItem = {
63+
isCleared: true
64+
callback: null
65+
args: null
66+
hasRef: null
67+
immediateObject: null
68+
}
69+
70+
function clearQueueItem(originalQueueItem: QueueItem) {
71+
const queueItem = originalQueueItem as ClearedQueueItem
72+
queueItem.isCleared = true
73+
queueItem.callback = null
74+
queueItem.args = null
75+
queueItem.hasRef = null
76+
queueItem.immediateObject = null
77+
}
78+
79+
//========================================================
80+
81+
function patchedSetImmediate<TArgs extends any[]>(
82+
callback: (...args: TArgs) => void,
83+
...args: TArgs
84+
): NodeJS.Immediate
85+
function patchedSetImmediate(callback: (args: void) => void): NodeJS.Immediate
86+
function patchedSetImmediate(): NodeJS.Immediate {
87+
if (!isEnabled) {
88+
return originalSetImmediate.apply(
89+
null,
90+
// @ts-expect-error: this is valid, but typescript doesn't get it
91+
arguments
92+
)
93+
}
94+
95+
if (arguments.length === 0 || typeof arguments[0] !== 'function') {
96+
// Replicate the error that setImmediate throws
97+
const error = new TypeError(
98+
`The "callback" argument must be of type function. Received ${typeof arguments[0]}`
99+
)
100+
;(error as any).code = 'ERR_INVALID_ARG_TYPE'
101+
throw error
102+
}
103+
104+
const callback: (...args: any[]) => any = arguments[0]
105+
let args: any[] | null =
106+
arguments.length > 1 ? Array.prototype.slice.call(arguments, 1) : null
107+
108+
const immediateObject = new BufferedImmediate()
109+
110+
const queueItem: ActiveQueueItem = {
111+
isCleared: false,
112+
callback,
113+
args,
114+
hasRef: true,
115+
immediateObject,
116+
}
117+
bufferedImmediatesQueue.push(queueItem)
118+
119+
immediateObject[INTERNALS].queueItem = queueItem
120+
121+
return immediateObject
122+
}
123+
124+
function patchedSetImmediatePromisify<T = void>(
125+
value: T,
126+
options?: import('node:timers').TimerOptions
127+
): Promise<T> {
128+
if (!isEnabled) {
129+
const originalPromisify: (typeof setImmediate)['__promisify__'] =
130+
// @ts-expect-error: the types for `promisify.custom` are strange
131+
originalSetImmediate[promisify.custom]
132+
return originalPromisify(value, options)
133+
}
134+
135+
return new Promise<T>((resolve, reject) => {
136+
// The abort signal makes the promise reject.
137+
// If it is already aborted, we reject immediately.
138+
const signal = options?.signal
139+
if (signal && signal.aborted) {
140+
return reject(signal.reason)
141+
}
142+
143+
const immediate = patchedSetImmediate(resolve, value)
144+
if (options?.ref === false) {
145+
immediate.unref()
146+
}
147+
148+
if (signal) {
149+
signal.addEventListener(
150+
'abort',
151+
() => {
152+
patchedClearImmediate(immediate)
153+
reject(signal.reason)
154+
},
155+
{ once: true }
156+
)
157+
}
158+
})
159+
}
160+
161+
patchedSetImmediate[promisify.custom] = patchedSetImmediatePromisify
162+
163+
const patchedClearImmediate = (
164+
immediateObject: NodeJS.Immediate | undefined
165+
) => {
166+
if (immediateObject && INTERNALS in immediateObject) {
167+
;(immediateObject as BufferedImmediate)[Symbol.dispose]()
168+
} else {
169+
originalClearImmediate(immediateObject)
170+
}
171+
}
172+
173+
//========================================================
174+
175+
const INTERNALS: unique symbol = Symbol.for('next.Immediate.internals')
176+
177+
type QueuedImmediateInternals =
178+
| {
179+
queueItem: ActiveQueueItem | null
180+
nativeImmediate: null
181+
}
182+
| {
183+
queueItem: null
184+
nativeImmediate: NodeJS.Immediate
185+
}
186+
187+
/** Makes sure that we're implementing all the public `Immediate` methods */
188+
interface NativeImmediate extends NodeJS.Immediate {}
189+
190+
/** Implements a shim for the native `Immediate` class returned by `setImmediate` */
191+
class BufferedImmediate implements NativeImmediate {
192+
[INTERNALS]: QueuedImmediateInternals = {
193+
queueItem: null,
194+
nativeImmediate: null,
195+
}
196+
hasRef() {
197+
const internals = this[INTERNALS]
198+
if (internals.queueItem) {
199+
return internals.queueItem.hasRef
200+
} else if (internals.nativeImmediate) {
201+
return internals.nativeImmediate.hasRef()
202+
} else {
203+
return false
204+
}
205+
}
206+
ref() {
207+
const internals = this[INTERNALS]
208+
if (internals.queueItem) {
209+
internals.queueItem.hasRef = true
210+
} else if (internals.nativeImmediate) {
211+
internals.nativeImmediate.ref()
212+
}
213+
return this
214+
}
215+
unref() {
216+
const internals = this[INTERNALS]
217+
if (internals.queueItem) {
218+
internals.queueItem.hasRef = false
219+
} else if (internals.nativeImmediate) {
220+
internals.nativeImmediate.unref()
221+
}
222+
return this
223+
}
224+
225+
// TODO: is this just a noop marker?
226+
_onImmediate() {}
227+
228+
[Symbol.dispose]() {
229+
// This is equivalent to `clearImmediate`.
230+
const internals = this[INTERNALS]
231+
if (internals.queueItem) {
232+
// this is still queued. drop it.
233+
const queueItem = internals.queueItem
234+
internals.queueItem = null
235+
clearQueueItem(queueItem)
236+
} else if (internals.nativeImmediate) {
237+
// If we executed the queue, and we have a native immediate.
238+
originalClearImmediate(internals.nativeImmediate)
239+
}
240+
}
241+
}

packages/next/src/server/node-environment.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,5 @@ import './node-environment-extensions/random'
1717
import './node-environment-extensions/date'
1818
import './node-environment-extensions/web-crypto'
1919
import './node-environment-extensions/node-crypto'
20+
import { install } from './app-render/buffered-set-immediate.external'
21+
install()

0 commit comments

Comments
 (0)