11import uid2 = require( "uid2" ) ;
22import msgpack = require( "notepack.io" ) ;
33import { Adapter , BroadcastOptions , Room } from "socket.io-adapter" ;
4+ import { parseNumSubResponse , sumValues } from "./util" ;
45
56const debug = require ( "debug" ) ( "socket.io-redis" ) ;
67
@@ -688,7 +689,7 @@ export class RedisAdapter extends Adapter {
688689 */
689690 public async allRooms ( ) : Promise < Set < Room > > {
690691 const localRooms = new Set ( this . rooms . keys ( ) ) ;
691- const numSub = await this . getNumSub ( ) ;
692+ const numSub = await this . serverCount ( ) ;
692693 debug ( 'waiting for %d responses to "allRooms" request' , numSub ) ;
693694
694695 if ( numSub <= 1 ) {
@@ -732,7 +733,7 @@ export class RedisAdapter extends Adapter {
732733 return localSockets ;
733734 }
734735
735- const numSub = await this . getNumSub ( ) ;
736+ const numSub = await this . serverCount ( ) ;
736737 debug ( 'waiting for %d responses to "fetchSockets" request' , numSub ) ;
737738
738739 if ( numSub <= 1 ) {
@@ -849,7 +850,7 @@ export class RedisAdapter extends Adapter {
849850
850851 private async serverSideEmitWithAck ( packet : any [ ] ) {
851852 const ack = packet . pop ( ) ;
852- const numSub = ( await this . getNumSub ( ) ) - 1 ; // ignore self
853+ const numSub = ( await this . serverCount ( ) ) - 1 ; // ignore self
853854
854855 debug ( 'waiting for %d responses to "serverSideEmit" request' , numSub ) ;
855856
@@ -889,13 +890,7 @@ export class RedisAdapter extends Adapter {
889890 this . pubClient . publish ( this . requestChannel , request ) ;
890891 }
891892
892- /**
893- * Get the number of subscribers of the request channel
894- *
895- * @private
896- */
897-
898- private getNumSub ( ) : Promise < number > {
893+ override serverCount ( ) : Promise < number > {
899894 if (
900895 this . pubClient . constructor . name === "Cluster" ||
901896 this . pubClient . isCluster
@@ -904,39 +899,27 @@ export class RedisAdapter extends Adapter {
904899 const nodes = this . pubClient . nodes ( ) ;
905900 return Promise . all (
906901 nodes . map ( ( node ) =>
907- node . send_command ( "pubsub" , [ "numsub" , this . requestChannel ] )
902+ node
903+ . send_command ( "pubsub" , [ "numsub" , this . requestChannel ] )
904+ . then ( parseNumSubResponse )
908905 )
909- ) . then ( ( values ) => {
910- let numSub = 0 ;
911- values . forEach ( ( value ) => {
912- numSub += parseInt ( value [ 1 ] , 10 ) ;
913- } ) ;
914- return numSub ;
915- } ) ;
906+ ) . then ( sumValues ) ;
916907 } else if ( typeof this . pubClient . pSubscribe === "function" ) {
917908 // node-redis client
918909 const isCluster = Array . isArray ( this . pubClient . masters ) ;
919910 if ( isCluster ) {
920911 const nodes = this . pubClient . masters ;
921912 return Promise . all (
922913 nodes . map ( ( node ) => {
923- return node . client . sendCommand ( [
924- "pubsub" ,
925- "numsub" ,
926- this . requestChannel ,
927- ] ) ;
914+ return node . client
915+ . sendCommand ( [ "pubsub" , "numsub" , this . requestChannel ] )
916+ . then ( parseNumSubResponse ) ;
928917 } )
929- ) . then ( ( values ) => {
930- let numSub = 0 ;
931- values . map ( ( value ) => {
932- numSub += parseInt ( value [ 1 ] , 10 ) ;
933- } ) ;
934- return numSub ;
935- } ) ;
918+ ) . then ( sumValues ) ;
936919 } else {
937920 return this . pubClient
938921 . sendCommand ( [ "pubsub" , "numsub" , this . requestChannel ] )
939- . then ( ( res ) => parseInt ( res [ 1 ] , 10 ) ) ;
922+ . then ( parseNumSubResponse ) ;
940923 }
941924 } else {
942925 // ioredis or node-redis v3 client
@@ -946,17 +929,13 @@ export class RedisAdapter extends Adapter {
946929 [ "numsub" , this . requestChannel ] ,
947930 ( err , numSub ) => {
948931 if ( err ) return reject ( err ) ;
949- resolve ( parseInt ( numSub [ 1 ] , 10 ) ) ;
932+ resolve ( parseNumSubResponse ( numSub ) ) ;
950933 }
951934 ) ;
952935 } ) ;
953936 }
954937 }
955938
956- serverCount ( ) : Promise < number > {
957- return this . getNumSub ( ) ;
958- }
959-
960939 close ( ) : Promise < void > | void {
961940 const isRedisV4 = typeof this . pubClient . pSubscribe === "function" ;
962941 if ( isRedisV4 ) {
0 commit comments