@@ -20,6 +20,16 @@ export default class ClusterSubscriberGroup {
2020 // Simple [min, max] slot ranges aren't enough because you can migrate single slots
2121 private subscriberToSlotsIndex : Map < string , number [ ] > = new Map ( ) ;
2222 private channels : Map < number , Array < string | Buffer > > = new Map ( ) ;
23+ private failedAttemptsByNode : Map < string , number > = new Map ( ) ;
24+
25+ // Only latest pending reset kept; throttled by refreshSlotsCache's isRefreshing + backoff delay
26+ private isResetting = false ;
27+ private pendingReset : { slots : string [ ] [ ] ; nodes : any [ ] } | null = null ;
28+
29+ // Retry strategy
30+ private static readonly MAX_RETRY_ATTEMPTS = 10 ;
31+ private static readonly MAX_BACKOFF_MS = 2000 ;
32+ private static readonly BASE_BACKOFF_MS = 100 ;
2333
2434 /**
2535 * Register callbacks
@@ -111,9 +121,14 @@ export default class ClusterSubscriberGroup {
111121 for ( const s of this . shardedSubscribers . values ( ) ) {
112122 if ( ! s . isStarted ( ) ) {
113123 startPromises . push (
114- s . start ( ) . catch ( ( err ) => {
115- this . subscriberGroupEmitter . emit ( "subscriberConnectFailed" , err ) ;
116- } )
124+ s
125+ . start ( )
126+ . then ( ( ) => {
127+ this . handleSubscriberConnectSucceeded ( s . getNodeKey ( ) ) ;
128+ } )
129+ . catch ( ( err ) => {
130+ this . handleSubscriberConnectFailed ( err , s . getNodeKey ( ) ) ;
131+ } )
117132 ) ;
118133 }
119134 }
@@ -127,83 +142,99 @@ export default class ClusterSubscriberGroup {
127142 clusterSlots : string [ ] [ ] ,
128143 clusterNodes : any [ ]
129144 ) : Promise < void > {
130- const hasTopologyChanged = this . _refreshSlots ( clusterSlots ) ;
131- const hasFailedSubscribers = this . hasUnhealthySubscribers ( ) ;
132-
133- if ( ! hasTopologyChanged && ! hasFailedSubscribers ) {
134- debug (
135- "No topology change detected or failed subscribers. Skipping reset."
136- ) ;
145+ if ( this . isResetting ) {
146+ this . pendingReset = { slots : clusterSlots , nodes : clusterNodes } ;
137147 return ;
138148 }
139149
140- // For each of the sharded subscribers
141- for ( const [ nodeKey , shardedSubscriber ] of this . shardedSubscribers ) {
142- if (
143- // If the subscriber is still responsible for a slot range and is running then keep it
144- this . subscriberToSlotsIndex . has ( nodeKey ) &&
145- shardedSubscriber . isStarted ( )
146- ) {
147- debug ( "Skipping deleting subscriber for %s" , nodeKey ) ;
148- continue ;
150+ this . isResetting = true ;
151+
152+ try {
153+ const hasTopologyChanged = this . _refreshSlots ( clusterSlots ) ;
154+ const hasFailedSubscribers = this . hasUnhealthySubscribers ( ) ;
155+
156+ if ( ! hasTopologyChanged && ! hasFailedSubscribers ) {
157+ debug (
158+ "No topology change detected or failed subscribers. Skipping reset."
159+ ) ;
160+ return ;
149161 }
150162
151- debug ( "Removing subscriber for %s" , nodeKey ) ;
152- // Otherwise stop the subscriber and remove it
153- shardedSubscriber . stop ( ) ;
154- this . shardedSubscribers . delete ( nodeKey ) ;
163+ // For each of the sharded subscribers
164+ for ( const [ nodeKey , shardedSubscriber ] of this . shardedSubscribers ) {
165+ if (
166+ // If the subscriber is still responsible for a slot range and is running then keep it
167+ this . subscriberToSlotsIndex . has ( nodeKey ) &&
168+ shardedSubscriber . isStarted ( )
169+ ) {
170+ debug ( "Skipping deleting subscriber for %s" , nodeKey ) ;
171+ continue ;
172+ }
155173
156- this . subscriberGroupEmitter . emit ( "-subscriber" ) ;
157- }
174+ debug ( "Removing subscriber for %s" , nodeKey ) ;
175+ // Otherwise stop the subscriber and remove it
176+ shardedSubscriber . stop ( ) ;
177+ this . shardedSubscribers . delete ( nodeKey ) ;
158178
159- const startPromises = [ ] ;
160- // For each node in slots cache
161- for ( const [ nodeKey , _ ] of this . subscriberToSlotsIndex ) {
162- // If we already have a subscriber for this node then keep it
163- if ( this . shardedSubscribers . has ( nodeKey ) ) {
164- debug ( "Skipping creating new subscriber for %s" , nodeKey ) ;
165- continue ;
179+ this . subscriberGroupEmitter . emit ( "-subscriber" ) ;
166180 }
167181
168- debug ( "Creating new subscriber for %s" , nodeKey ) ;
169- // Otherwise create a new subscriber
170- const redis = clusterNodes . find ( ( node ) => {
171- return getNodeKey ( node . options ) === nodeKey ;
172- } ) ;
182+ const startPromises = [ ] ;
183+ // For each node in slots cache
184+ for ( const [ nodeKey , _ ] of this . subscriberToSlotsIndex ) {
185+ // If we already have a subscriber for this node then keep it
186+ if ( this . shardedSubscribers . has ( nodeKey ) ) {
187+ debug ( "Skipping creating new subscriber for %s" , nodeKey ) ;
188+ continue ;
189+ }
173190
174- if ( ! redis ) {
175- debug ( "Failed to find node for key %s" , nodeKey ) ;
176- continue ;
177- }
191+ debug ( "Creating new subscriber for %s" , nodeKey ) ;
192+ // Otherwise create a new subscriber
193+ const redis = clusterNodes . find ( ( node ) => {
194+ return getNodeKey ( node . options ) === nodeKey ;
195+ } ) ;
178196
179- const sub = new ShardedSubscriber (
180- this . subscriberGroupEmitter ,
181- redis . options
182- ) ;
197+ if ( ! redis ) {
198+ debug ( "Failed to find node for key %s" , nodeKey ) ;
199+ continue ;
200+ }
183201
184- this . shardedSubscribers . set ( nodeKey , sub ) ;
202+ const sub = new ShardedSubscriber (
203+ this . subscriberGroupEmitter ,
204+ redis . options
205+ ) ;
185206
186- startPromises . push (
187- sub . start ( ) . catch ( ( err ) => {
188- this . subscriberGroupEmitter . emit ( "subscriberConnectFailed" , err ) ;
189- } )
190- ) ;
207+ this . shardedSubscribers . set ( nodeKey , sub ) ;
191208
192- this . subscriberGroupEmitter . emit ( "+subscriber" ) ;
193- }
209+ startPromises . push (
210+ sub
211+ . start ( )
212+ . then ( ( ) => {
213+ this . handleSubscriberConnectSucceeded ( nodeKey ) ;
214+ } )
215+ . catch ( ( error ) => {
216+ this . handleSubscriberConnectFailed ( error , nodeKey ) ;
217+ } )
218+ ) ;
194219
195- // It's vital to await the start promises before resubscribing
196- // Otherwise we might try to resubscribe to a subscriber that is not yet connected
197- // This can cause a race condition
198- try {
220+ this . subscriberGroupEmitter . emit ( "+subscriber" ) ;
221+ }
222+
223+ // It's vital to await the start promises before resubscribing
224+ // Otherwise we might try to resubscribe to a subscriber that is not yet connected
225+ // This can cause a race condition
199226 await Promise . all ( startPromises ) ;
200- } catch ( err ) {
201- debug ( "Error while starting subscribers: %s" , err ) ;
202- this . subscriberGroupEmitter . emit ( "error" , err ) ;
203- }
204227
205- this . _resubscribe ( ) ;
206- this . subscriberGroupEmitter . emit ( "subscribersReady" ) ;
228+ this . _resubscribe ( ) ;
229+ this . subscriberGroupEmitter . emit ( "subscribersReady" ) ;
230+ } finally {
231+ this . isResetting = false ;
232+ if ( this . pendingReset ) {
233+ const { slots, nodes } = this . pendingReset ;
234+ this . pendingReset = null ;
235+ await this . reset ( slots , nodes ) ;
236+ }
237+ }
207238 }
208239
209240 /**
@@ -310,4 +341,48 @@ export default class ClusterSubscriberGroup {
310341
311342 return hasFailedSubscribers || hasMissingSubscribers ;
312343 }
344+
345+ /**
346+ * Handles failed subscriber connections by emitting an event to refresh the slots cache
347+ * after a backoff period.
348+ *
349+ * @param error
350+ * @param nodeKey
351+ */
352+ private handleSubscriberConnectFailed = ( error : Error , nodeKey : string ) => {
353+ const currentAttempts = this . failedAttemptsByNode . get ( nodeKey ) || 0 ;
354+ const failedAttempts = currentAttempts + 1 ;
355+ this . failedAttemptsByNode . set ( nodeKey , failedAttempts ) ;
356+
357+ const attempts = Math . min (
358+ failedAttempts ,
359+ ClusterSubscriberGroup . MAX_RETRY_ATTEMPTS
360+ ) ;
361+ const backoff = Math . min (
362+ ClusterSubscriberGroup . BASE_BACKOFF_MS * 2 ** attempts ,
363+ ClusterSubscriberGroup . MAX_BACKOFF_MS
364+ ) ;
365+ const jitter = Math . floor ( ( Math . random ( ) - 0.5 ) * ( backoff * 0.5 ) ) ;
366+ const delay = Math . max ( 0 , backoff + jitter ) ;
367+
368+ debug (
369+ "Failed to connect subscriber for %s. Refreshing slots in %dms" ,
370+ nodeKey ,
371+ delay
372+ ) ;
373+
374+ this . subscriberGroupEmitter . emit ( "subscriberConnectFailed" , {
375+ delay,
376+ error,
377+ } ) ;
378+ } ;
379+
380+ /**
381+ * Handles successful subscriber connections by resetting the failed attempts counter.
382+ *
383+ * @param nodeKey
384+ */
385+ private handleSubscriberConnectSucceeded = ( nodeKey : string ) => {
386+ this . failedAttemptsByNode . delete ( nodeKey ) ;
387+ } ;
313388}
0 commit comments