@@ -66,13 +66,18 @@ import {
6666} from "./cluster" ;
6767import { type ConatSocketServer } from "@cocalc/conat/socket" ;
6868import { throttle } from "lodash" ;
69- import { getLogger } from "@cocalc/conat/client" ;
7069import { reuseInFlight } from "@cocalc/util/reuse-in-flight" ;
7170import { type SysConatServer , sysApiSubject , sysApi } from "./sys" ;
7271import { forkedConatServer } from "./start-server" ;
73- import { stickyChoice } from "./sticky" ;
72+ import {
73+ stickyChoice ,
74+ createStickyRouter ,
75+ getStickyTarget ,
76+ stickyKey ,
77+ } from "./sticky" ;
7478import { EventEmitter } from "events" ;
7579import { Metrics } from "../types" ;
80+ import { getLogger } from "@cocalc/conat/client" ;
7681
7782const logger = getLogger ( "conat:core:server" ) ;
7883
@@ -171,6 +176,8 @@ export class ConatServer extends EventEmitter {
171176 private getUser : UserFunction ;
172177 private isAllowed : AllowFunction ;
173178 public readonly options : Partial < Options > ;
179+
180+ // cluster = true if and only if this is a cluster:
174181 private cluster ?: boolean ;
175182
176183 private sockets : { [ id : string ] : any } = { } ;
@@ -180,6 +187,7 @@ export class ConatServer extends EventEmitter {
180187
181188 private subscriptions : { [ socketId : string ] : Set < string > } = { } ;
182189 public interest : Interest = new Patterns ( ) ;
190+ private stickyClient : Client ;
183191
184192 private clusterStreams ?: ClusterStreams ;
185193 private clusterLinks : {
@@ -277,6 +285,7 @@ export class ConatServer extends EventEmitter {
277285 }
278286 this . initUsage ( ) ;
279287 this . io . on ( "connection" , this . handleSocket ) ;
288+ this . initSticky ( ) ;
280289 this . init ( ) ;
281290 }
282291
@@ -443,15 +452,21 @@ export class ConatServer extends EventEmitter {
443452 // STICKY QUEUE GROUPS
444453 ///////////////////////////////////////
445454
446- private stickyKey = ( { pattern, subject } ) => {
447- return pattern + " " + subject ;
455+ private initSticky = ( ) => {
456+ this . stickyClient = this . client ( {
457+ systemAccountPassword : this . options . systemAccountPassword ,
458+ } ) ;
459+ if ( ! this . cluster ) {
460+ // not a cluster, so we can server as the sticky routing serivce
461+ createStickyRouter ( { client : this . stickyClient } ) ;
462+ }
448463 } ;
449464
450465 private stickyCache : { [ key : string ] : { target : string ; expire : number } } =
451466 { } ;
452467 private updateSticky = ( sticky : StickyUpdate ) => {
453468 // save in the cache
454- this . stickyCache [ this . stickyKey ( sticky ) ] = {
469+ this . stickyCache [ stickyKey ( sticky ) ] = {
455470 target : sticky . target ,
456471 expire : Date . now ( ) + sticky . ttl ,
457472 } ;
@@ -464,18 +479,7 @@ export class ConatServer extends EventEmitter {
464479 pattern : string ;
465480 subject : string ;
466481 } ) : string | undefined => {
467- const key = this . stickyKey ( { pattern, subject } ) ;
468- const x = this . stickyCache [ key ] ;
469- if ( x != null ) {
470- if ( Date . now ( ) >= x . expire ) {
471- // it's in the cache
472- return x . target ;
473- } else {
474- delete this . stickyCache [ key ] ;
475- }
476- }
477- // not in the cache or expired
478- return undefined ;
482+ return getStickyTarget ( { stickyCache : this . stickyCache , pattern, subject } ) ;
479483 } ;
480484
481485 ///////////////////////////////////////
@@ -763,13 +767,27 @@ export class ConatServer extends EventEmitter {
763767 if ( targets . size == 0 ) {
764768 return undefined ;
765769 }
766- return await stickyChoice ( {
767- pattern,
768- subject,
769- targets,
770- updateSticky : this . updateSticky ,
771- getStickyTarget : this . getStickyTarget ,
772- } ) ;
770+ try {
771+ const target = await stickyChoice ( {
772+ client : this . stickyClient ,
773+ pattern,
774+ subject,
775+ targets,
776+ updateSticky : this . updateSticky ,
777+ getStickyTarget : this . getStickyTarget ,
778+ } ) ;
779+ return target ;
780+ } catch ( err ) {
781+ this . log ( "WARNING: sticky router is not working" , err ) ;
782+ // not possible to make assignment, e.g., not able
783+ // to connect to the sticky service. Can and should
784+ // happen in case of a split brain (say). This will
785+ // make it so messages stop being delivered and hopefully
786+ // client gets an error, and keeps retrying until the
787+ // sticky service is back. This is of course the tradeoff
788+ // of using a centralized algorithm for sticky routing.
789+ return undefined ;
790+ }
773791 } ;
774792
775793 stickyClusterLoadBalance = async ( {
0 commit comments