@@ -174,30 +174,25 @@ export default class RedisCommandsQueue {
174174 } ) ;
175175 }
176176
177- #setupPubSubHandler( command : Exclude < PubSubCommand , undefined > ) {
177+ #setupPubSubHandler( ) {
178178 // RESP3 uses `onPush` to handle PubSub, so no need to modify `onReply`
179179 if ( this . #respVersion !== 2 ) return ;
180180
181- // overriding `resolve` instead of using `.then` to make sure it'll be called before processing the next reply
182- const { resolve } = command ;
183- command . resolve = ( ) => {
184- this . decoder . onReply = ( reply => {
185- if ( Array . isArray ( reply ) ) {
186- if ( this . #onPush( reply ) ) return ;
187-
188- if ( PONG . equals ( reply [ 0 ] as Buffer ) ) {
189- const { resolve, typeMapping } = this . #waitingForReply. shift ( ) ! ,
190- buffer = ( ( reply [ 1 ] as Buffer ) . length === 0 ? reply [ 0 ] : reply [ 1 ] ) as Buffer ;
191- resolve ( typeMapping ?. [ RESP_TYPES . SIMPLE_STRING ] === Buffer ? buffer : buffer . toString ( ) ) ;
192- return ;
193- }
181+ this . decoder . onReply = ( reply => {
182+ if ( Array . isArray ( reply ) ) {
183+ if ( this . #onPush( reply ) ) return ;
184+
185+ if ( PONG . equals ( reply [ 0 ] as Buffer ) ) {
186+ const { resolve, typeMapping } = this . #waitingForReply. shift ( ) ! ,
187+ buffer = ( ( reply [ 1 ] as Buffer ) . length === 0 ? reply [ 0 ] : reply [ 1 ] ) as Buffer ;
188+ resolve ( typeMapping ?. [ RESP_TYPES . SIMPLE_STRING ] === Buffer ? buffer : buffer . toString ( ) ) ;
189+ return ;
194190 }
195-
196- return this . #onReply( reply ) ;
197- } ) as Decoder [ 'onReply' ] ;
198- this . decoder . getTypeMapping = ( ) => RESP2_PUSH_TYPE_MAPPING ;
199- resolve ( ) ;
200- } ;
191+ }
192+
193+ return this . #onReply( reply ) ;
194+ } ) as Decoder [ 'onReply' ] ;
195+ this . decoder . getTypeMapping = ( ) => RESP2_PUSH_TYPE_MAPPING ;
201196 }
202197
203198 subscribe < T extends boolean > (
@@ -209,7 +204,7 @@ export default class RedisCommandsQueue {
209204 const command = this . #pubSub. subscribe ( type , channels , listener , returnBuffers ) ;
210205 if ( ! command ) return ;
211206
212- this . #setupPubSubHandler( command ) ;
207+ this . #setupPubSubHandler( ) ;
213208 return this . #addPubSubCommand( command ) ;
214209 }
215210
@@ -246,8 +241,7 @@ export default class RedisCommandsQueue {
246241 const commands = this . #pubSub. resubscribe ( ) ;
247242 if ( ! commands . length ) return ;
248243
249- // using last command becasue of asap
250- this . #setupPubSubHandler( commands [ commands . length - 1 ] ) ;
244+ this . #setupPubSubHandler( ) ;
251245 return Promise . all (
252246 commands . map ( command => this . #addPubSubCommand( command , true ) )
253247 ) ;
@@ -261,15 +255,15 @@ export default class RedisCommandsQueue {
261255 const command = this . #pubSub. extendChannelListeners ( type , channel , listeners ) ;
262256 if ( ! command ) return ;
263257
264- this . #setupPubSubHandler( command ) ;
258+ this . #setupPubSubHandler( ) ;
265259 return this . #addPubSubCommand( command ) ;
266260 }
267261
268262 extendPubSubListeners ( type : PubSubType , listeners : PubSubTypeListeners ) {
269263 const command = this . #pubSub. extendTypeListeners ( type , listeners ) ;
270264 if ( ! command ) return ;
271265
272- this . #setupPubSubHandler( command ) ;
266+ this . #setupPubSubHandler( ) ;
273267 return this . #addPubSubCommand( command ) ;
274268 }
275269
0 commit comments