11import { Debug } from "../utils" ;
2- import ClusterSubscriber from "./ClusterSubscriber" ;
3- import Cluster from "./index" ;
4- import ConnectionPool from "./ConnectionPool" ;
52import { getNodeKey } from "./util" ;
63import * as calculateSlot from "cluster-key-slot" ;
4+ import ShardedSubscriber from "./ShardedSubscriber" ;
5+ import * as EventEmitter from "events" ;
76const debug = Debug ( "cluster:subscriberGroup" ) ;
87
98/**
10- * Redis differs between "normal" and sharded PubSub. If using the "normal" PubSub feature, exactly one
11- * ClusterSubscriber exists per cluster instance. This works because the Redis cluster bus forwards m
12- * messages between shards. However, this has scalability limitations, which is the reason why the sharded
13- * PubSub feature was added to Redis. With sharded PubSub, each shard is responsible for its own messages.
14- * Given that, we need at least one ClusterSubscriber per master endpoint/node.
9+ * Redis distinguishes between "normal" and sharded PubSub. When using the normal PubSub feature,
10+ * exactly one subscriber exists per cluster instance because the Redis cluster bus forwards
11+ * messages between shards. Sharded PubSub removes this limitation by making each shard
12+ * responsible for its own messages.
1513 *
16- * This class leverages the previously exising ClusterSubscriber by adding support for multiple such subscribers
17- * in alignment to the master nodes of the cluster. The ClusterSubscriber class was extended in a non-breaking way
18- * to support this feature.
14+ * This class coordinates one ShardedSubscriber per master node in the cluster, providing
15+ * sharded PubSub support while keeping the public API backward compatible.
1916 */
2017export default class ClusterSubscriberGroup {
21- private shardedSubscribers : Map < string , ClusterSubscriber > = new Map ( ) ;
18+ private shardedSubscribers : Map < string , ShardedSubscriber > = new Map ( ) ;
2219 private clusterSlots : string [ ] [ ] = [ ] ;
23- //Simple [min, max] slot ranges aren't enough because you can migrate single slots
20+ // Simple [min, max] slot ranges aren't enough because you can migrate single slots
2421 private subscriberToSlotsIndex : Map < string , number [ ] > = new Map ( ) ;
2522 private channels : Map < number , Array < string | Buffer > > = new Map ( ) ;
2623
@@ -29,32 +26,14 @@ export default class ClusterSubscriberGroup {
2926 *
3027 * @param cluster
3128 */
32- constructor ( private cluster : Cluster , refreshSlotsCacheCallback : ( ) => void ) {
33- cluster . on ( "+node" , ( redis ) => {
34- this . _addSubscriber ( redis ) ;
35- } ) ;
36-
37- cluster . on ( "-node" , ( redis ) => {
38- this . _removeSubscriber ( redis ) ;
39- } ) ;
40-
41- cluster . on ( "refresh" , ( ) => {
42- this . _refreshSlots ( cluster ) ;
43- } ) ;
44-
45- cluster . on ( "forceRefresh" , ( ) => {
46- refreshSlotsCacheCallback ( ) ;
47- } ) ;
48- }
29+ constructor ( private readonly subscriberGroupEmitter : EventEmitter ) { }
4930
5031 /**
5132 * Get the responsible subscriber.
5233 *
53- * Returns null if no subscriber was found
54- *
5534 * @param slot
5635 */
57- getResponsibleSubscriber ( slot : number ) : ClusterSubscriber {
36+ getResponsibleSubscriber ( slot : number ) : ShardedSubscriber | undefined {
5837 const nodeKey = this . clusterSlots [ slot ] [ 0 ] ;
5938 return this . shardedSubscribers . get ( nodeKey ) ;
6039 }
@@ -67,10 +46,12 @@ export default class ClusterSubscriberGroup {
6746 addChannels ( channels : ( string | Buffer ) [ ] ) : number {
6847 const slot = calculateSlot ( channels [ 0 ] ) ;
6948
70- //Check if the all channels belong to the same slot and otherwise reject the operation
71- channels . forEach ( ( c : string ) => {
72- if ( calculateSlot ( c ) != slot ) return - 1 ;
73- } ) ;
49+ // Check if the all channels belong to the same slot and otherwise reject the operation
50+ for ( const c of channels ) {
51+ if ( calculateSlot ( c ) !== slot ) {
52+ return - 1 ;
53+ }
54+ }
7455
7556 const currChannels = this . channels . get ( slot ) ;
7657
@@ -93,10 +74,12 @@ export default class ClusterSubscriberGroup {
9374 removeChannels ( channels : ( string | Buffer ) [ ] ) : number {
9475 const slot = calculateSlot ( channels [ 0 ] ) ;
9576
96- //Check if the all channels belong to the same slot and otherwise reject the operation
97- channels . forEach ( ( c : string ) => {
98- if ( calculateSlot ( c ) != slot ) return - 1 ;
99- } ) ;
77+ // Check if the all channels belong to the same slot and otherwise reject the operation
78+ for ( const c of channels ) {
79+ if ( calculateSlot ( c ) !== slot ) {
80+ return - 1 ;
81+ }
82+ }
10083
10184 const slotChannels = this . channels . get ( slot ) ;
10285
@@ -124,96 +107,123 @@ export default class ClusterSubscriberGroup {
124107 * Start all not yet started subscribers
125108 */
126109 start ( ) {
110+ const startPromises = [ ] ;
127111 for ( const s of this . shardedSubscribers . values ( ) ) {
128112 if ( ! s . isStarted ( ) ) {
129- s . start ( ) ;
113+ startPromises . push ( s . start ( ) ) ;
130114 }
131115 }
116+ return Promise . all ( startPromises ) ;
132117 }
133118
134119 /**
135- * Add a subscriber to the group of subscribers
136- *
137- * @param redis
120+ * Resets the subscriber group by disconnecting all subscribers that are no longer needed and connecting new ones.
138121 */
139- private _addSubscriber ( redis : any ) : ClusterSubscriber {
140- const pool : ConnectionPool = new ConnectionPool ( redis . options ) ;
122+ public async reset (
123+ clusterSlots : string [ ] [ ] ,
124+ clusterNodes : any [ ]
125+ ) : Promise < void > {
126+ // Update the slots cache and continue if there was a change
127+ if ( ! this . _refreshSlots ( clusterSlots ) ) {
128+ return ;
129+ }
141130
142- if ( pool . addMasterNode ( redis ) ) {
143- const sub = new ClusterSubscriber ( pool , this . cluster , true ) ;
144- const nodeKey = getNodeKey ( redis . options ) ;
145- this . shardedSubscribers . set ( nodeKey , sub ) ;
146- sub . start ( ) ;
131+ // For each of the sharded subscribers
132+ for ( const [ nodeKey , shardedSubscriber ] of this . shardedSubscribers ) {
133+ if (
134+ // If the subscriber is still responsible for a slot range and is running then keep it
135+ this . subscriberToSlotsIndex . has ( nodeKey ) &&
136+ shardedSubscriber . isStarted ( )
137+ ) {
138+ continue ;
139+ }
147140
148- // We need to attempt to resubscribe them in case the new node serves their slot
149- this . _resubscribe ( ) ;
150- this . cluster . emit ( "+subscriber" ) ;
151- return sub ;
141+ // Otherwise stop the subscriber and remove it
142+ shardedSubscriber . stop ( ) ;
143+ this . shardedSubscribers . delete ( nodeKey ) ;
144+
145+ this . subscriberGroupEmitter . emit ( "-subscriber" ) ;
152146 }
153147
154- return null ;
155- }
148+ const startPromises = [ ] ;
149+ // For each node in slots cache
150+ for ( const [ nodeKey , _ ] of this . subscriberToSlotsIndex ) {
151+ // If we already have a subscriber for this node then keep it
152+ if ( this . shardedSubscribers . has ( nodeKey ) ) {
153+ continue ;
154+ }
156155
157- /**
158- * Removes a subscriber from the group
159- * @param redis
160- */
161- private _removeSubscriber ( redis : any ) : Map < string , ClusterSubscriber > {
162- const nodeKey = getNodeKey ( redis . options ) ;
163- const sub = this . shardedSubscribers . get ( nodeKey ) ;
156+ // Otherwise create a new subscriber
157+ const redis = clusterNodes . find ( ( node ) => {
158+ return getNodeKey ( node . options ) === nodeKey ;
159+ } ) ;
164160
165- if ( sub ) {
166- sub . stop ( ) ;
167- this . shardedSubscribers . delete ( nodeKey ) ;
161+ if ( ! redis ) {
162+ debug ( "Failed to find node for key %s" , nodeKey ) ;
163+ continue ;
164+ }
168165
169- // Even though the subscriber to this node is going down, we might have another subscriber
170- // handling the same slots, so we need to attempt to subscribe the orphaned channels
171- this . _resubscribe ( ) ;
172- this . cluster . emit ( "-subscriber" ) ;
166+ const sub = new ShardedSubscriber (
167+ this . subscriberGroupEmitter ,
168+ redis . options
169+ ) ;
170+
171+ this . shardedSubscribers . set ( nodeKey , sub ) ;
172+
173+ startPromises . push ( sub . start ( ) ) ;
174+
175+ this . subscriberGroupEmitter . emit ( "+subscriber" ) ;
173176 }
174177
175- return this . shardedSubscribers ;
178+ // It's vital to await the start promises before resubscribing
179+ // Otherwise we might try to resubscribe to a subscriber that is not yet connected
180+ // This can cause a race condition
181+ try {
182+ await Promise . all ( startPromises ) ;
183+ } catch ( err ) {
184+ debug ( "Error while starting subscribers: %s" , err ) ;
185+ this . subscriberGroupEmitter . emit ( "error" , err ) ;
186+ }
187+
188+ this . _resubscribe ( ) ;
189+ this . subscriberGroupEmitter . emit ( "subscribersReady" ) ;
176190 }
177191
178192 /**
179193 * Refreshes the subscriber-related slot ranges
180194 *
181195 * Returns false if no refresh was needed
182196 *
183- * @param cluster
197+ * @param targetSlots
184198 */
185- private _refreshSlots ( cluster : Cluster ) : boolean {
199+ private _refreshSlots ( targetSlots : string [ ] [ ] ) : boolean {
186200 //If there was an actual change, then reassign the slot ranges
187- if ( this . _slotsAreEqual ( cluster . slots ) ) {
201+ if ( this . _slotsAreEqual ( targetSlots ) ) {
188202 debug (
189203 "Nothing to refresh because the new cluster map is equal to the previous one."
190204 ) ;
191- } else {
192- debug ( "Refreshing the slots of the subscriber group." ) ;
193-
194- //Rebuild the slots index
195- this . subscriberToSlotsIndex = new Map ( ) ;
196205
197- for ( let slot = 0 ; slot < cluster . slots . length ; slot ++ ) {
198- const node : string = cluster . slots [ slot ] [ 0 ] ;
206+ return false ;
207+ }
199208
200- if ( ! this . subscriberToSlotsIndex . has ( node ) ) {
201- this . subscriberToSlotsIndex . set ( node , [ ] ) ;
202- }
203- this . subscriberToSlotsIndex . get ( node ) . push ( Number ( slot ) ) ;
204- }
209+ debug ( "Refreshing the slots of the subscriber group." ) ;
205210
206- //Update the subscribers from the index
207- this . _resubscribe ( ) ;
211+ //Rebuild the slots index
212+ this . subscriberToSlotsIndex = new Map ( ) ;
208213
209- //Update the cached slots map
210- this . clusterSlots = JSON . parse ( JSON . stringify ( cluster . slots ) ) ;
214+ for ( let slot = 0 ; slot < targetSlots . length ; slot ++ ) {
215+ const node : string = targetSlots [ slot ] [ 0 ] ;
211216
212- this . cluster . emit ( "subscribersReady" ) ;
213- return true ;
217+ if ( ! this . subscriberToSlotsIndex . has ( node ) ) {
218+ this . subscriberToSlotsIndex . set ( node , [ ] ) ;
219+ }
220+ this . subscriberToSlotsIndex . get ( node ) . push ( Number ( slot ) ) ;
214221 }
215222
216- return false ;
223+ //Update the cached slots map
224+ this . clusterSlots = JSON . parse ( JSON . stringify ( targetSlots ) ) ;
225+
226+ return true ;
217227 }
218228
219229 /**
@@ -224,12 +234,9 @@ export default class ClusterSubscriberGroup {
224234 private _resubscribe ( ) {
225235 if ( this . shardedSubscribers ) {
226236 this . shardedSubscribers . forEach (
227- ( s : ClusterSubscriber , nodeKey : string ) => {
237+ ( s : ShardedSubscriber , nodeKey : string ) => {
228238 const subscriberSlots = this . subscriberToSlotsIndex . get ( nodeKey ) ;
229239 if ( subscriberSlots ) {
230- //More for debugging purposes
231- s . associateSlotRange ( subscriberSlots ) ;
232-
233240 //Resubscribe on the underlying connection
234241 subscriberSlots . forEach ( ( ss ) => {
235242 //Might return null if being disconnected
@@ -238,12 +245,10 @@ export default class ClusterSubscriberGroup {
238245
239246 if ( channels && channels . length > 0 ) {
240247 //Try to subscribe now
241- if ( redis ) {
242- redis . ssubscribe ( channels ) ;
243-
244- //If the instance isn't ready yet, then register the re-subscription for later
245- redis . on ( "ready" , ( ) => {
246- redis . ssubscribe ( channels ) ;
248+ if ( redis && redis . status !== "end" ) {
249+ redis . ssubscribe ( channels ) . catch ( ( err ) => {
250+ // TODO: Should we emit an error event here?
251+ debug ( "Failed to ssubscribe on node %s: %s" , nodeKey , err ) ;
247252 } ) ;
248253 }
249254 }
@@ -261,7 +266,10 @@ export default class ClusterSubscriberGroup {
261266 * @private
262267 */
263268 private _slotsAreEqual ( other : string [ ] [ ] ) {
264- if ( this . clusterSlots === undefined ) return false ;
265- else return JSON . stringify ( this . clusterSlots ) === JSON . stringify ( other ) ;
269+ if ( this . clusterSlots === undefined ) {
270+ return false ;
271+ } else {
272+ return JSON . stringify ( this . clusterSlots ) === JSON . stringify ( other ) ;
273+ }
266274 }
267275}
0 commit comments