Skip to content

Commit de3fb2b

Browse files
target sender
1 parent 2d84306 commit de3fb2b

File tree

1 file changed

+42
-16
lines changed

1 file changed

+42
-16
lines changed

lib/index.ts

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import {
55
ClusterMessage,
66
ServerId,
77
ClusterResponse,
8+
MessageType,
89
} from "socket.io-adapter";
910
import debugModule from "debug";
1011

@@ -32,13 +33,15 @@ export class NodeClusterAdapter extends ClusterAdapterWithHeartbeat {
3233
constructor(nsp: any, opts: ClusterAdapterOptions = {}) {
3334
super(nsp, opts);
3435
process.on("message", (message: any) => {
35-
if (message?.source !== MESSAGE_SOURCE) {
36-
debug("ignore unknown source");
36+
const isValidSource = message?.source === MESSAGE_SOURCE;
37+
if (!isValidSource) {
38+
debug("[%s] ignore unknown source", this.uid);
39+
return;
3740
}
3841

39-
// note: this check should be done in the onMessage handler
42+
// note: this check should be done in the onMessage() handler
4043
if (message.nsp !== this.nsp.name) {
41-
debug("ignore other namespace");
44+
debug("[%s] ignore other namespace", this.uid);
4245
return;
4346
}
4447

@@ -49,42 +52,65 @@ export class NodeClusterAdapter extends ClusterAdapterWithHeartbeat {
4952
this.init();
5053
}
5154

52-
override doPublish(message: ClusterMessage & { source: string }) {
55+
protected override doPublish(message: ClusterMessage & { source: string }) {
5356
message.source = MESSAGE_SOURCE;
5457

5558
process.send(message, null, {}, ignoreError);
5659

5760
return Promise.resolve(""); // connection state recovery is not supported
5861
}
5962

60-
override doPublishResponse(
63+
protected override doPublishResponse(
6164
requesterUid: ServerId,
62-
response: ClusterResponse & { source: string },
65+
response: ClusterResponse & { source: string; requesterUid: string },
6366
) {
6467
response.source = MESSAGE_SOURCE;
68+
response.requesterUid = requesterUid;
6569

6670
process.send(response, null, {}, ignoreError);
6771

6872
return Promise.resolve();
6973
}
7074
}
7175

76+
const UIDS = Symbol("uids");
77+
7278
export function setupPrimary() {
7379
cluster.on("message", (worker, message) => {
7480
const isValidSource = message?.source === MESSAGE_SOURCE;
7581
if (!isValidSource) {
7682
return;
7783
}
7884

79-
const emitterIdAsString = String(worker.id);
80-
// emit to all workers but the requester
81-
for (const workerId in cluster.workers) {
82-
if (
83-
hasOwnProperty.call(cluster.workers, workerId) &&
84-
workerId !== emitterIdAsString
85-
) {
86-
cluster.workers[workerId].send(message, null, ignoreError);
87-
}
85+
// store the requester's uids (one per namespace) so that the response can be sent specifically to them
86+
worker[UIDS] = worker[UIDS] || new Set();
87+
worker[UIDS].add(message.uid);
88+
89+
switch (message.type) {
90+
case MessageType.FETCH_SOCKETS_RESPONSE:
91+
case MessageType.SERVER_SIDE_EMIT_RESPONSE:
92+
const requesterUid = message.requesterUid;
93+
for (const workerId in cluster.workers) {
94+
if (
95+
hasOwnProperty.call(cluster.workers, workerId) &&
96+
cluster.workers[workerId][UIDS]?.has(requesterUid)
97+
) {
98+
cluster.workers[workerId].send(message, null, ignoreError);
99+
break;
100+
}
101+
}
102+
break;
103+
default:
104+
const emitterIdAsString = String(worker.id);
105+
// emit to all workers but the requester
106+
for (const workerId in cluster.workers) {
107+
if (
108+
hasOwnProperty.call(cluster.workers, workerId) &&
109+
workerId !== emitterIdAsString
110+
) {
111+
cluster.workers[workerId].send(message, null, ignoreError);
112+
}
113+
}
88114
}
89115
});
90116
}

0 commit comments

Comments
 (0)