@@ -744,12 +744,17 @@ export class ConatServer extends EventEmitter {
744744 }
745745 // send to exactly one in each queue group
746746 for ( const queue in g ) {
747- const target = this . loadBalance ( {
748- pattern,
749- subject,
750- queue,
751- targets : g [ queue ] ,
752- } ) ;
747+ let target ;
748+ const targets = g [ queue ] ;
749+ if ( queue == STICKY_QUEUE_GROUP ) {
750+ target = await this . stickyLoadBalance ( {
751+ pattern,
752+ subject,
753+ targets,
754+ } ) ;
755+ } else {
756+ target = this . loadBalance ( targets ) ;
757+ }
753758 if ( target !== undefined ) {
754759 this . io . to ( target ) . emit ( pattern , { subject, data } ) ;
755760 if ( ! isSilentPattern ( pattern ) ) {
@@ -773,12 +778,17 @@ export class ConatServer extends EventEmitter {
773778 for ( const pattern in clusterInterest ) {
774779 const g = clusterInterest [ pattern ] ;
775780 for ( const queue in g ) {
776- const t = this . clusterLoadBalance ( {
777- pattern,
778- subject,
779- queue,
780- targets : g [ queue ] ,
781- } ) ;
781+ let t ;
782+ const targets = g [ queue ] ;
783+ if ( queue == STICKY_QUEUE_GROUP ) {
784+ t = await this . stickyClusterLoadBalance ( {
785+ pattern,
786+ subject,
787+ targets,
788+ } ) ;
789+ } else {
790+ t = this . clusterLoadBalance ( targets ) ;
791+ }
782792 if ( t !== undefined ) {
783793 const { id, target } = t ;
784794 if ( id == this . id ) {
@@ -824,78 +834,74 @@ export class ConatServer extends EventEmitter {
824834 link ?. client . conn . emit ( "publish" , data1 ) ;
825835 }
826836
827- //
828- // TODO: Supercluster routing. NOT IMPLEMENTED YET
829- //
830- // // if no matches in local cluster, try the supercluster (if there is one)
831- // if (count == 0) {
832- // // nothing in this cluster, so try other clusters
833- // for (const clusterName in this.clusterLinks) {
834- // if (clusterName == this.clusterName) continue;
835- // const links = this.clusterLinks[clusterName];
836- // for (const id in links) {
837- // const link = links[id];
838- // const count2 = link.publish({ subject, data, queueGroups });
839- // if (count2 > 0) {
840- // count += count2;
841- // // once we publish to any other cluster, we are done.
842- // break;
843- // }
844- // }
845- // }
846- // }
847-
848837 return count ;
849838 } ;
850839
851840 ///////////////////////////////////////
852841 // WHO GETS PUBLISHED MESSAGE:
853842 ///////////////////////////////////////
854- private loadBalance = ( {
843+ private loadBalance = ( targets : Set < string > ) : string | undefined => {
844+ if ( targets . size == 0 ) {
845+ return undefined ;
846+ }
847+ return randomChoice ( targets ) ;
848+ } ;
849+
850+ clusterLoadBalance = ( targets0 : {
851+ [ id : string ] : Set < string > ;
852+ } ) : { id : string ; target : string } | undefined => {
853+ const targets = new Set < string > ( ) ;
854+ for ( const id in targets0 ) {
855+ for ( const target of targets0 [ id ] ) {
856+ targets . add ( JSON . stringify ( { id, target } ) ) ;
857+ }
858+ }
859+ const x = this . loadBalance ( targets ) ;
860+ if ( ! x ) {
861+ return undefined ;
862+ }
863+ return JSON . parse ( x ) ;
864+ } ;
865+
866+ // sticky versions
867+
868+ private stickyLoadBalance = async ( {
855869 pattern,
856870 subject,
857- queue,
858871 targets,
859872 } : {
860873 pattern : string ;
861874 subject : string ;
862- queue : string ;
863875 targets : Set < string > ;
864- } ) : string | undefined => {
876+ } ) : Promise < string | undefined > => {
865877 if ( targets . size == 0 ) {
866878 return undefined ;
867879 }
868- if ( queue == STICKY_QUEUE_GROUP ) {
869- return stickyChoice ( {
870- pattern,
871- subject,
872- targets,
873- updateSticky : this . updateSticky ,
874- getStickyTarget : this . getStickyTarget ,
875- } ) ;
876- } else {
877- return randomChoice ( targets ) ;
878- }
880+ return stickyChoice ( {
881+ pattern,
882+ subject,
883+ targets,
884+ updateSticky : this . updateSticky ,
885+ getStickyTarget : this . getStickyTarget ,
886+ } ) ;
879887 } ;
880888
881- clusterLoadBalance = ( {
889+ stickyClusterLoadBalance = async ( {
882890 pattern,
883891 subject,
884- queue,
885892 targets : targets0 ,
886893 } : {
887894 pattern : string ;
888895 subject : string ;
889- queue : string ;
890896 targets : { [ id : string ] : Set < string > } ;
891- } ) : { id : string ; target : string } | undefined => {
897+ } ) : Promise < { id : string ; target : string } | undefined > => {
892898 const targets = new Set < string > ( ) ;
893899 for ( const id in targets0 ) {
894900 for ( const target of targets0 [ id ] ) {
895901 targets . add ( JSON . stringify ( { id, target } ) ) ;
896902 }
897903 }
898- const x = this . loadBalance ( { pattern, subject, queue , targets } ) ;
904+ const x = await this . stickyLoadBalance ( { pattern, subject, targets } ) ;
899905 if ( ! x ) {
900906 return undefined ;
901907 }
0 commit comments