Skip to content

Commit 468c3c8

Browse files
feat: implement utility methods from Socket.IO v4
Reference: https://socket.io/docs/v3/migrating-from-3-x-to-4-0/#Additional-utility-methods
1 parent fc19812 commit 468c3c8

File tree

5 files changed

+375
-73
lines changed

5 files changed

+375
-73
lines changed

lib/index.ts

Lines changed: 154 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ enum RequestType {
1717
REMOTE_JOIN = 2,
1818
REMOTE_LEAVE = 3,
1919
REMOTE_DISCONNECT = 4,
20+
REMOTE_FETCH = 5,
2021
}
2122

2223
interface Request {
@@ -222,6 +223,14 @@ export class RedisAdapter extends Adapter {
222223
break;
223224

224225
case RequestType.REMOTE_JOIN:
226+
if (request.opts) {
227+
const opts = {
228+
rooms: new Set<Room>(request.opts.rooms),
229+
except: new Set<Room>(request.opts.except),
230+
};
231+
return super.addSockets(opts, request.rooms);
232+
}
233+
225234
socket = this.nsp.sockets.get(request.sid);
226235
if (!socket) {
227236
return;
@@ -237,6 +246,14 @@ export class RedisAdapter extends Adapter {
237246
break;
238247

239248
case RequestType.REMOTE_LEAVE:
249+
if (request.opts) {
250+
const opts = {
251+
rooms: new Set<Room>(request.opts.rooms),
252+
except: new Set<Room>(request.opts.except),
253+
};
254+
return super.delSockets(opts, request.rooms);
255+
}
256+
240257
socket = this.nsp.sockets.get(request.sid);
241258
if (!socket) {
242259
return;
@@ -252,6 +269,14 @@ export class RedisAdapter extends Adapter {
252269
break;
253270

254271
case RequestType.REMOTE_DISCONNECT:
272+
if (request.opts) {
273+
const opts = {
274+
rooms: new Set<Room>(request.opts.rooms),
275+
except: new Set<Room>(request.opts.except),
276+
};
277+
return super.disconnectSockets(opts, request.close);
278+
}
279+
255280
socket = this.nsp.sockets.get(request.sid);
256281
if (!socket) {
257282
return;
@@ -266,6 +291,30 @@ export class RedisAdapter extends Adapter {
266291
this.pubClient.publish(this.responseChannel, response);
267292
break;
268293

294+
case RequestType.REMOTE_FETCH:
295+
if (this.requests.has(request.requestId)) {
296+
return;
297+
}
298+
299+
const opts = {
300+
rooms: new Set<Room>(request.opts.rooms),
301+
except: new Set<Room>(request.opts.except),
302+
};
303+
const localSockets = await super.fetchSockets(opts);
304+
305+
response = JSON.stringify({
306+
requestId: request.requestId,
307+
sockets: localSockets.map((socket) => ({
308+
id: socket.id,
309+
handshake: socket.handshake,
310+
rooms: [...socket.rooms],
311+
data: socket.data,
312+
})),
313+
});
314+
315+
this.pubClient.publish(this.responseChannel, response);
316+
break;
317+
269318
default:
270319
debug("ignoring unknown request type: %s", request.type);
271320
}
@@ -299,12 +348,17 @@ export class RedisAdapter extends Adapter {
299348

300349
switch (request.type) {
301350
case RequestType.SOCKETS:
351+
case RequestType.REMOTE_FETCH:
302352
request.msgCount++;
303353

304354
// ignore if response does not contain 'sockets' key
305355
if (!response.sockets || !Array.isArray(response.sockets)) return;
306356

307-
response.sockets.forEach((s) => request.sockets.add(s));
357+
if (request.type === RequestType.SOCKETS) {
358+
response.sockets.forEach((s) => request.sockets.add(s));
359+
} else {
360+
response.sockets.forEach((s) => request.sockets.push(s));
361+
}
308362

309363
if (request.msgCount === request.numSub) {
310364
clearTimeout(request.timeout);
@@ -587,6 +641,105 @@ export class RedisAdapter extends Adapter {
587641
});
588642
}
589643

644+
public async fetchSockets(opts: BroadcastOptions): Promise<any[]> {
645+
const localSockets = await super.fetchSockets(opts);
646+
647+
if (opts.flags?.local) {
648+
return localSockets;
649+
}
650+
651+
const numSub = await this.getNumSub();
652+
debug('waiting for %d responses to "fetchSockets" request', numSub);
653+
654+
if (numSub <= 1) {
655+
return localSockets;
656+
}
657+
658+
const requestId = uid2(6);
659+
660+
const request = JSON.stringify({
661+
requestId,
662+
type: RequestType.REMOTE_FETCH,
663+
opts: {
664+
rooms: [...opts.rooms],
665+
except: [...opts.except],
666+
},
667+
});
668+
669+
return new Promise((resolve, reject) => {
670+
const timeout = setTimeout(() => {
671+
if (this.requests.has(requestId)) {
672+
reject(
673+
new Error("timeout reached while waiting for fetchSockets response")
674+
);
675+
this.requests.delete(requestId);
676+
}
677+
}, this.requestsTimeout);
678+
679+
this.requests.set(requestId, {
680+
type: RequestType.REMOTE_FETCH,
681+
numSub,
682+
resolve,
683+
timeout,
684+
msgCount: 1,
685+
sockets: localSockets,
686+
});
687+
688+
this.pubClient.publish(this.requestChannel, request);
689+
});
690+
}
691+
692+
public addSockets(opts: BroadcastOptions, rooms: Room[]) {
693+
if (opts.flags?.local) {
694+
return super.addSockets(opts, rooms);
695+
}
696+
697+
const request = JSON.stringify({
698+
type: RequestType.REMOTE_JOIN,
699+
opts: {
700+
rooms: [...opts.rooms],
701+
except: [...opts.except],
702+
},
703+
rooms: [...rooms],
704+
});
705+
706+
this.pubClient.publish(this.requestChannel, request);
707+
}
708+
709+
public delSockets(opts: BroadcastOptions, rooms: Room[]) {
710+
if (opts.flags?.local) {
711+
return super.delSockets(opts, rooms);
712+
}
713+
714+
const request = JSON.stringify({
715+
type: RequestType.REMOTE_LEAVE,
716+
opts: {
717+
rooms: [...opts.rooms],
718+
except: [...opts.except],
719+
},
720+
rooms: [...rooms],
721+
});
722+
723+
this.pubClient.publish(this.requestChannel, request);
724+
}
725+
726+
public disconnectSockets(opts: BroadcastOptions, close: boolean) {
727+
if (opts.flags?.local) {
728+
return super.disconnectSockets(opts, close);
729+
}
730+
731+
const request = JSON.stringify({
732+
type: RequestType.REMOTE_DISCONNECT,
733+
opts: {
734+
rooms: [...opts.rooms],
735+
except: [...opts.except],
736+
},
737+
close,
738+
});
739+
740+
this.pubClient.publish(this.requestChannel, request);
741+
}
742+
590743
/**
591744
* Get the number of subscribers of the request channel
592745
*

0 commit comments

Comments
 (0)