@@ -23,6 +23,10 @@ module.exports = adapter;
2323var requestTypes = {
2424 clients : 0 ,
2525 clientRooms : 1 ,
26+ allRooms : 2 ,
27+ remoteJoin : 3 ,
28+ remoteLeave : 4 ,
29+ customRequest : 5 ,
2630} ;
2731
2832/**
@@ -86,6 +90,7 @@ function adapter(uri, opts) {
8690 this . requestChannel = prefix + '-request#' + this . nsp . name + '#' ;
8791 this . responseChannel = prefix + '-response#' + this . nsp . name + '#' ;
8892 this . requests = { } ;
93+ this . customHook = function ( ) { return null ; }
8994
9095 if ( String . prototype . startsWith ) {
9196 this . channelMatches = function ( messageChannel , subscribedChannel ) {
@@ -212,6 +217,59 @@ function adapter(uri, opts) {
212217 } ) ;
213218 break ;
214219
220+ case requestTypes . allRooms :
221+
222+ var response = JSON . stringify ( {
223+ requestid : request . requestid ,
224+ rooms : Object . keys ( this . rooms )
225+ } ) ;
226+
227+ pub . publish ( self . responseChannel , response ) ;
228+ break ;
229+
230+ case requestTypes . remoteJoin :
231+
232+ var socket = this . nsp . connected [ request . sid ] ;
233+ if ( ! socket ) { return ; }
234+
235+ function sendAck ( ) {
236+ var response = JSON . stringify ( {
237+ requestid : request . requestid
238+ } ) ;
239+
240+ pub . publish ( self . responseChannel , response ) ;
241+ }
242+
243+ socket . join ( request . room , sendAck ) ;
244+ break ;
245+
246+ case requestTypes . remoteLeave :
247+
248+ var socket = this . nsp . connected [ request . sid ] ;
249+ if ( ! socket ) { return ; }
250+
251+ function sendAck ( ) {
252+ var response = JSON . stringify ( {
253+ requestid : request . requestid
254+ } ) ;
255+
256+ pub . publish ( self . responseChannel , response ) ;
257+ }
258+
259+ socket . leave ( request . room , sendAck ) ;
260+ break ;
261+
262+ case requestTypes . customRequest :
263+ var data = this . customHook ( request . data ) ;
264+
265+ var response = JSON . stringify ( {
266+ requestid : request . requestid ,
267+ data : data
268+ } ) ;
269+
270+ pub . publish ( self . responseChannel , response ) ;
271+ break ;
272+
215273 default :
216274 debug ( 'ignoring unknown request type: %s' , request . type ) ;
217275 }
@@ -268,6 +326,42 @@ function adapter(uri, opts) {
268326 delete self . requests [ request . requestid ] ;
269327 break ;
270328
329+ case requestTypes . allRooms :
330+ request . msgCount ++ ;
331+
332+ // ignore if response does not contain 'rooms' key
333+ if ( ! response . rooms || ! Array . isArray ( response . rooms ) ) return ;
334+
335+ for ( var i = 0 ; i < response . rooms . length ; i ++ ) {
336+ request . rooms [ response . rooms [ i ] ] = true ;
337+ }
338+
339+ if ( request . msgCount === request . numsub ) {
340+ clearTimeout ( request . timeout ) ;
341+ if ( request . callback ) process . nextTick ( request . callback . bind ( null , null , Object . keys ( request . rooms ) ) ) ;
342+ delete self . requests [ request . requestid ] ;
343+ }
344+ break ;
345+
346+ case requestTypes . remoteJoin :
347+ case requestTypes . remoteLeave :
348+ clearTimeout ( request . timeout ) ;
349+ if ( request . callback ) process . nextTick ( request . callback . bind ( null , null ) ) ;
350+ delete self . requests [ request . requestid ] ;
351+ break ;
352+
353+ case requestTypes . customRequest :
354+ request . msgCount ++ ;
355+
356+ request . replies . push ( response . data ) ;
357+
358+ if ( request . msgCount === request . numsub ) {
359+ clearTimeout ( request . timeout ) ;
360+ if ( request . callback ) process . nextTick ( request . callback . bind ( null , null , request . replies ) ) ;
361+ delete self . requests [ request . requestid ] ;
362+ }
363+ break ;
364+
271365 default :
272366 debug ( 'ignoring unknown request type: %s' , request . type ) ;
273367 }
@@ -489,6 +583,190 @@ function adapter(uri, opts) {
489583 pub . publish ( self . requestChannel , request ) ;
490584 } ;
491585
586+ /**
587+ * Gets the list of all rooms (accross every node)
588+ *
589+ * @param {Function } callback
590+ * @api public
591+ */
592+
593+ Redis . prototype . allRooms = function ( fn ) {
594+
595+ var self = this ;
596+ var requestid = uid2 ( 6 ) ;
597+
598+ pub . send_command ( 'pubsub' , [ 'numsub' , self . requestChannel ] , function ( err , numsub ) {
599+ if ( err ) {
600+ self . emit ( 'error' , err ) ;
601+ if ( fn ) fn ( err ) ;
602+ return ;
603+ }
604+
605+ numsub = parseInt ( numsub [ 1 ] , 10 ) ;
606+
607+ var request = JSON . stringify ( {
608+ requestid : requestid ,
609+ type : requestTypes . allRooms
610+ } ) ;
611+
612+ // if there is no response for x second, return result
613+ var timeout = setTimeout ( function ( ) {
614+ var request = self . requests [ requestid ] ;
615+ if ( fn ) process . nextTick ( fn . bind ( null , new Error ( 'timeout reached while waiting for allRooms response' ) , Object . keys ( request . rooms ) ) ) ;
616+ delete self . requests [ requestid ] ;
617+ } , self . requestsTimeout ) ;
618+
619+ self . requests [ requestid ] = {
620+ type : requestTypes . allRooms ,
621+ numsub : numsub ,
622+ msgCount : 0 ,
623+ rooms : { } ,
624+ callback : fn ,
625+ timeout : timeout
626+ } ;
627+
628+ pub . publish ( self . requestChannel , request ) ;
629+ } ) ;
630+ } ;
631+
632+ /**
633+ * Makes the socket with the given id join the room
634+ *
635+ * @param {String } socket id
636+ * @param {String } room name
637+ * @param {Function } callback
638+ * @api public
639+ */
640+
641+ Redis . prototype . remoteJoin = function ( id , room , fn ) {
642+
643+ var self = this ;
644+ var requestid = uid2 ( 6 ) ;
645+
646+ var socket = this . nsp . connected [ id ] ;
647+ if ( socket ) {
648+ socket . join ( room ) ;
649+ if ( fn ) process . nextTick ( fn . bind ( null , null ) ) ;
650+ return ;
651+ }
652+
653+ var request = JSON . stringify ( {
654+ requestid : requestid ,
655+ type : requestTypes . remoteJoin ,
656+ sid : id ,
657+ room : room
658+ } ) ;
659+
660+ // if there is no response for x second, return result
661+ var timeout = setTimeout ( function ( ) {
662+ if ( fn ) process . nextTick ( fn . bind ( null , new Error ( 'timeout reached while waiting for remoteJoin response' ) ) ) ;
663+ delete self . requests [ requestid ] ;
664+ } , self . requestsTimeout ) ;
665+
666+ self . requests [ requestid ] = {
667+ type : requestTypes . remoteJoin ,
668+ callback : fn ,
669+ timeout : timeout
670+ } ;
671+
672+ pub . publish ( self . requestChannel , request ) ;
673+ } ;
674+
675+ /**
676+ * Makes the socket with the given id leave the room
677+ *
678+ * @param {String } socket id
679+ * @param {String } room name
680+ * @param {Function } callback
681+ * @api public
682+ */
683+
684+ Redis . prototype . remoteLeave = function ( id , room , fn ) {
685+
686+ var self = this ;
687+ var requestid = uid2 ( 6 ) ;
688+
689+ var socket = this . nsp . connected [ id ] ;
690+ if ( socket ) {
691+ socket . leave ( room ) ;
692+ if ( fn ) process . nextTick ( fn . bind ( null , null ) ) ;
693+ return ;
694+ }
695+
696+ var request = JSON . stringify ( {
697+ requestid : requestid ,
698+ type : requestTypes . remoteLeave ,
699+ sid : id ,
700+ room : room
701+ } ) ;
702+
703+ // if there is no response for x second, return result
704+ var timeout = setTimeout ( function ( ) {
705+ if ( fn ) process . nextTick ( fn . bind ( null , new Error ( 'timeout reached while waiting for remoteLeave response' ) ) ) ;
706+ delete self . requests [ requestid ] ;
707+ } , self . requestsTimeout ) ;
708+
709+ self . requests [ requestid ] = {
710+ type : requestTypes . remoteLeave ,
711+ callback : fn ,
712+ timeout : timeout
713+ } ;
714+
715+ pub . publish ( self . requestChannel , request ) ;
716+ } ;
717+
718+ /**
719+ * Sends a new custom request to other nodes
720+ *
721+ * @param {Object } data (no binary)
722+ * @param {Function } callback
723+ * @api public
724+ */
725+
726+ Redis . prototype . customRequest = function ( data , fn ) {
727+ if ( typeof data === 'function' ) {
728+ fn = data ;
729+ data = null ;
730+ }
731+
732+ var self = this ;
733+ var requestid = uid2 ( 6 ) ;
734+
735+ pub . send_command ( 'pubsub' , [ 'numsub' , self . requestChannel ] , function ( err , numsub ) {
736+ if ( err ) {
737+ self . emit ( 'error' , err ) ;
738+ if ( fn ) fn ( err ) ;
739+ return ;
740+ }
741+
742+ numsub = parseInt ( numsub [ 1 ] , 10 ) ;
743+
744+ var request = JSON . stringify ( {
745+ requestid : requestid ,
746+ type : requestTypes . customRequest ,
747+ data : data
748+ } ) ;
749+
750+ // if there is no response for x second, return result
751+ var timeout = setTimeout ( function ( ) {
752+ var request = self . requests [ requestid ] ;
753+ if ( fn ) process . nextTick ( fn . bind ( null , new Error ( 'timeout reached while waiting for customRequest response' ) , request . replies ) ) ;
754+ delete self . requests [ requestid ] ;
755+ } , self . requestsTimeout ) ;
756+
757+ self . requests [ requestid ] = {
758+ type : requestTypes . customRequest ,
759+ numsub : numsub ,
760+ msgCount : 0 ,
761+ replies : [ ] ,
762+ callback : fn ,
763+ timeout : timeout
764+ } ;
765+
766+ pub . publish ( self . requestChannel , request ) ;
767+ } ) ;
768+ } ;
769+
492770 Redis . uid = uid ;
493771 Redis . pubClient = pub ;
494772 Redis . subClient = sub ;
0 commit comments