From 4f96d4d414aca875817cf2e370dd22adba24e18c Mon Sep 17 00:00:00 2001 From: User Date: Fri, 26 Sep 2025 10:37:52 -0400 Subject: [PATCH] fix(redis-adapter): support async parser --- lib/index.ts | 35 ++++++++++++++++------------------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/lib/index.ts b/lib/index.ts index 57a0f62..032127a 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -199,7 +199,7 @@ export class RedisAdapter extends Adapter { * * @private */ - private onmessage(pattern, channel, msg) { + private async onmessage(pattern, channel, msg) { channel = channel.toString(); const channelMatches = channel.startsWith(this.channel); @@ -212,7 +212,7 @@ export class RedisAdapter extends Adapter { return debug("ignore unknown room %s", room); } - const args = this.parser.decode(msg); + const args = await this.parser.decode(msg); const [uid, packet, opts] = args; if (this.uid === uid) return debug("ignore same uid"); @@ -257,7 +257,7 @@ export class RedisAdapter extends Adapter { if (msg[0] === 0x7b) { request = JSON.parse(msg.toString()); } else { - request = this.parser.decode(msg); + request = await this.parser.decode(msg); } } catch (err) { debug("ignoring malformed request"); @@ -450,17 +450,14 @@ export class RedisAdapter extends Adapter { }) ); }, - (arg) => { + async (arg) => { debug("received acknowledgement with value %j", arg); - - this.publishResponse( - request, - this.parser.encode({ - type: RequestType.BROADCAST_ACK, - requestId: request.requestId, - packet: arg, - }) - ); + const encodedData = await this.parser.encode({ + type: RequestType.BROADCAST_ACK, + requestId: request.requestId, + packet: arg, + }); + this.publishResponse(request, encodedData); } ); break; @@ -490,7 +487,7 @@ export class RedisAdapter extends Adapter { * * @private */ - private onresponse(channel, msg) { + private async onresponse(channel, msg) { let response; try { @@ -498,7 +495,7 @@ export class RedisAdapter extends Adapter { if (msg[0] === 0x7b) { response = JSON.parse(msg.toString()); } else { - response = this.parser.decode(msg); + response = await this.parser.decode(msg); } } catch (err) { debug("ignoring malformed response"); @@ -616,7 +613,7 @@ export class RedisAdapter extends Adapter { * * @public */ - public broadcast(packet: any, opts: BroadcastOptions) { + public async broadcast(packet: any, opts: BroadcastOptions) { packet.nsp = this.nsp.name; const onlyLocal = opts && opts.flags && opts.flags.local; @@ -627,7 +624,7 @@ export class RedisAdapter extends Adapter { except: [...new Set(opts.except)], flags: opts.flags, }; - const msg = this.parser.encode([this.uid, packet, rawOpts]); + const msg = await this.parser.encode([this.uid, packet, rawOpts]); let channel = this.channel; if (opts.rooms && opts.rooms.size === 1) { channel += opts.rooms.keys().next().value + "#"; @@ -638,7 +635,7 @@ export class RedisAdapter extends Adapter { super.broadcast(packet, opts); } - public broadcastWithAck( + public async broadcastWithAck( packet: any, opts: BroadcastOptions, clientCountCallback: (clientCount: number) => void, @@ -657,7 +654,7 @@ export class RedisAdapter extends Adapter { flags: opts.flags, }; - const request = this.parser.encode({ + const request = await this.parser.encode({ uid: this.uid, requestId, type: RequestType.BROADCAST,