From e0a77be6998b95c7d2aa75c596c1b1c0377fb30c Mon Sep 17 00:00:00 2001 From: Pavel Pashov Date: Wed, 26 Nov 2025 15:40:58 +0200 Subject: [PATCH 1/5] feat(cluster): add slotsRefreshOnDisconnect option and refactor sharded pub/sub --- lib/cluster/ClusterOptions.ts | 11 ++ lib/cluster/ClusterSubscriberGroup.ts | 222 ++++++++++++----------- lib/cluster/ShardedSubscriber.ts | 111 ++++++++++++ lib/cluster/index.ts | 105 +++++++++-- test/cluster/cluster_subscriber_group.ts | 2 +- test/scenario/sharded-pub-sub.test.ts | 206 ++++++++++++--------- 6 files changed, 454 insertions(+), 203 deletions(-) create mode 100644 lib/cluster/ShardedSubscriber.ts diff --git a/lib/cluster/ClusterOptions.ts b/lib/cluster/ClusterOptions.ts index 051cb5ba..c3347976 100644 --- a/lib/cluster/ClusterOptions.ts +++ b/lib/cluster/ClusterOptions.ts @@ -120,6 +120,16 @@ export interface IClusterOptions { */ slotsRefreshInterval?: number; + /** + * Whether to refresh the cluster slot cache when a node connection is closed. + * + * Useful when slotsRefreshInterval is disabled (-1) but you still want + * topology to be updated promptly on node disconnect/failover. + * + * @default false + */ + slotsRefreshOnDisconnect?: boolean; + /** * Use sharded subscribers instead of a single subscriber. * @@ -211,6 +221,7 @@ export const DEFAULT_CLUSTER_OPTIONS: IClusterOptions = { retryDelayOnTryAgain: 100, slotsRefreshTimeout: 1000, slotsRefreshInterval: 5000, + slotsRefreshOnDisconnect: false, useSRVRecords: false, resolveSrv: resolveSrv, dnsLookup: lookup, diff --git a/lib/cluster/ClusterSubscriberGroup.ts b/lib/cluster/ClusterSubscriberGroup.ts index c0ff76da..a0f58274 100644 --- a/lib/cluster/ClusterSubscriberGroup.ts +++ b/lib/cluster/ClusterSubscriberGroup.ts @@ -1,26 +1,23 @@ import { Debug } from "../utils"; -import ClusterSubscriber from "./ClusterSubscriber"; -import Cluster from "./index"; -import ConnectionPool from "./ConnectionPool"; import { getNodeKey } from "./util"; import * as calculateSlot from "cluster-key-slot"; +import ShardedSubscriber from "./ShardedSubscriber"; +import * as EventEmitter from "events"; const debug = Debug("cluster:subscriberGroup"); /** - * Redis differs between "normal" and sharded PubSub. If using the "normal" PubSub feature, exactly one - * ClusterSubscriber exists per cluster instance. This works because the Redis cluster bus forwards m - * messages between shards. However, this has scalability limitations, which is the reason why the sharded - * PubSub feature was added to Redis. With sharded PubSub, each shard is responsible for its own messages. - * Given that, we need at least one ClusterSubscriber per master endpoint/node. + * Redis distinguishes between "normal" and sharded PubSub. When using the normal PubSub feature, + * exactly one subscriber exists per cluster instance because the Redis cluster bus forwards + * messages between shards. Sharded PubSub removes this limitation by making each shard + * responsible for its own messages. * - * This class leverages the previously exising ClusterSubscriber by adding support for multiple such subscribers - * in alignment to the master nodes of the cluster. The ClusterSubscriber class was extended in a non-breaking way - * to support this feature. + * This class coordinates one ShardedSubscriber per master node in the cluster, providing + * sharded PubSub support while keeping the public API backward compatible. */ export default class ClusterSubscriberGroup { - private shardedSubscribers: Map = new Map(); + private shardedSubscribers: Map = new Map(); private clusterSlots: string[][] = []; - //Simple [min, max] slot ranges aren't enough because you can migrate single slots + // Simple [min, max] slot ranges aren't enough because you can migrate single slots private subscriberToSlotsIndex: Map = new Map(); private channels: Map> = new Map(); @@ -29,32 +26,14 @@ export default class ClusterSubscriberGroup { * * @param cluster */ - constructor(private cluster: Cluster, refreshSlotsCacheCallback: () => void) { - cluster.on("+node", (redis) => { - this._addSubscriber(redis); - }); - - cluster.on("-node", (redis) => { - this._removeSubscriber(redis); - }); - - cluster.on("refresh", () => { - this._refreshSlots(cluster); - }); - - cluster.on("forceRefresh", () => { - refreshSlotsCacheCallback(); - }); - } + constructor(private readonly subscriberGroupEmitter: EventEmitter) {} /** * Get the responsible subscriber. * - * Returns null if no subscriber was found - * * @param slot */ - getResponsibleSubscriber(slot: number): ClusterSubscriber { + getResponsibleSubscriber(slot: number): ShardedSubscriber | undefined { const nodeKey = this.clusterSlots[slot][0]; return this.shardedSubscribers.get(nodeKey); } @@ -67,10 +46,12 @@ export default class ClusterSubscriberGroup { addChannels(channels: (string | Buffer)[]): number { const slot = calculateSlot(channels[0]); - //Check if the all channels belong to the same slot and otherwise reject the operation - channels.forEach((c: string) => { - if (calculateSlot(c) != slot) return -1; - }); + // Check if the all channels belong to the same slot and otherwise reject the operation + for (const c of channels) { + if (calculateSlot(c) !== slot) { + return -1; + } + } const currChannels = this.channels.get(slot); @@ -93,10 +74,12 @@ export default class ClusterSubscriberGroup { removeChannels(channels: (string | Buffer)[]): number { const slot = calculateSlot(channels[0]); - //Check if the all channels belong to the same slot and otherwise reject the operation - channels.forEach((c: string) => { - if (calculateSlot(c) != slot) return -1; - }); + // Check if the all channels belong to the same slot and otherwise reject the operation + for (const c of channels) { + if (calculateSlot(c) !== slot) { + return -1; + } + } const slotChannels = this.channels.get(slot); @@ -124,55 +107,86 @@ export default class ClusterSubscriberGroup { * Start all not yet started subscribers */ start() { + const startPromises = []; for (const s of this.shardedSubscribers.values()) { if (!s.isStarted()) { - s.start(); + startPromises.push(s.start()); } } + return Promise.all(startPromises); } /** - * Add a subscriber to the group of subscribers - * - * @param redis + * Resets the subscriber group by disconnecting all subscribers that are no longer needed and connecting new ones. */ - private _addSubscriber(redis: any): ClusterSubscriber { - const pool: ConnectionPool = new ConnectionPool(redis.options); + public async reset( + clusterSlots: string[][], + clusterNodes: any[] + ): Promise { + // Update the slots cache and continue if there was a change + if (!this._refreshSlots(clusterSlots)) { + return; + } - if (pool.addMasterNode(redis)) { - const sub = new ClusterSubscriber(pool, this.cluster, true); - const nodeKey = getNodeKey(redis.options); - this.shardedSubscribers.set(nodeKey, sub); - sub.start(); + // For each of the sharded subscribers + for (const [nodeKey, shardedSubscriber] of this.shardedSubscribers) { + if ( + // If the subscriber is still responsible for a slot range and is running then keep it + this.subscriberToSlotsIndex.has(nodeKey) && + shardedSubscriber.isStarted() + ) { + continue; + } - // We need to attempt to resubscribe them in case the new node serves their slot - this._resubscribe(); - this.cluster.emit("+subscriber"); - return sub; + // Otherwise stop the subscriber and remove it + shardedSubscriber.stop(); + this.shardedSubscribers.delete(nodeKey); + + this.subscriberGroupEmitter.emit("-subscriber"); } - return null; - } + const startPromises = []; + // For each node in slots cache + for (const [nodeKey, _] of this.subscriberToSlotsIndex) { + // If we already have a subscriber for this node then keep it + if (this.shardedSubscribers.has(nodeKey)) { + continue; + } - /** - * Removes a subscriber from the group - * @param redis - */ - private _removeSubscriber(redis: any): Map { - const nodeKey = getNodeKey(redis.options); - const sub = this.shardedSubscribers.get(nodeKey); + // Otherwise create a new subscriber + const redis = clusterNodes.find((node) => { + return getNodeKey(node.options) === nodeKey; + }); - if (sub) { - sub.stop(); - this.shardedSubscribers.delete(nodeKey); + if (!redis) { + debug("Failed to find node for key %s", nodeKey); + continue; + } - // Even though the subscriber to this node is going down, we might have another subscriber - // handling the same slots, so we need to attempt to subscribe the orphaned channels - this._resubscribe(); - this.cluster.emit("-subscriber"); + const sub = new ShardedSubscriber( + this.subscriberGroupEmitter, + redis.options + ); + + this.shardedSubscribers.set(nodeKey, sub); + + startPromises.push(sub.start()); + + this.subscriberGroupEmitter.emit("+subscriber"); } - return this.shardedSubscribers; + // It's vital to await the start promises before resubscribing + // Otherwise we might try to resubscribe to a subscriber that is not yet connected + // This can cause a race condition + try { + await Promise.all(startPromises); + } catch (err) { + debug("Error while starting subscribers: %s", err); + this.subscriberGroupEmitter.emit("error", err); + } + + this._resubscribe(); + this.subscriberGroupEmitter.emit("subscribersReady"); } /** @@ -180,40 +194,36 @@ export default class ClusterSubscriberGroup { * * Returns false if no refresh was needed * - * @param cluster + * @param targetSlots */ - private _refreshSlots(cluster: Cluster): boolean { + private _refreshSlots(targetSlots: string[][]): boolean { //If there was an actual change, then reassign the slot ranges - if (this._slotsAreEqual(cluster.slots)) { + if (this._slotsAreEqual(targetSlots)) { debug( "Nothing to refresh because the new cluster map is equal to the previous one." ); - } else { - debug("Refreshing the slots of the subscriber group."); - - //Rebuild the slots index - this.subscriberToSlotsIndex = new Map(); - for (let slot = 0; slot < cluster.slots.length; slot++) { - const node: string = cluster.slots[slot][0]; + return false; + } - if (!this.subscriberToSlotsIndex.has(node)) { - this.subscriberToSlotsIndex.set(node, []); - } - this.subscriberToSlotsIndex.get(node).push(Number(slot)); - } + debug("Refreshing the slots of the subscriber group."); - //Update the subscribers from the index - this._resubscribe(); + //Rebuild the slots index + this.subscriberToSlotsIndex = new Map(); - //Update the cached slots map - this.clusterSlots = JSON.parse(JSON.stringify(cluster.slots)); + for (let slot = 0; slot < targetSlots.length; slot++) { + const node: string = targetSlots[slot][0]; - this.cluster.emit("subscribersReady"); - return true; + if (!this.subscriberToSlotsIndex.has(node)) { + this.subscriberToSlotsIndex.set(node, []); + } + this.subscriberToSlotsIndex.get(node).push(Number(slot)); } - return false; + //Update the cached slots map + this.clusterSlots = JSON.parse(JSON.stringify(targetSlots)); + + return true; } /** @@ -224,12 +234,9 @@ export default class ClusterSubscriberGroup { private _resubscribe() { if (this.shardedSubscribers) { this.shardedSubscribers.forEach( - (s: ClusterSubscriber, nodeKey: string) => { + (s: ShardedSubscriber, nodeKey: string) => { const subscriberSlots = this.subscriberToSlotsIndex.get(nodeKey); if (subscriberSlots) { - //More for debugging purposes - s.associateSlotRange(subscriberSlots); - //Resubscribe on the underlying connection subscriberSlots.forEach((ss) => { //Might return null if being disconnected @@ -238,12 +245,10 @@ export default class ClusterSubscriberGroup { if (channels && channels.length > 0) { //Try to subscribe now - if (redis) { - redis.ssubscribe(channels); - - //If the instance isn't ready yet, then register the re-subscription for later - redis.on("ready", () => { - redis.ssubscribe(channels); + if (redis && redis.status !== "end") { + redis.ssubscribe(channels).catch((err) => { + // TODO: Should we emit an error event here? + debug("Failed to ssubscribe on node %s: %s", nodeKey, err); }); } } @@ -261,7 +266,10 @@ export default class ClusterSubscriberGroup { * @private */ private _slotsAreEqual(other: string[][]) { - if (this.clusterSlots === undefined) return false; - else return JSON.stringify(this.clusterSlots) === JSON.stringify(other); + if (this.clusterSlots === undefined) { + return false; + } else { + return JSON.stringify(this.clusterSlots) === JSON.stringify(other); + } } } diff --git a/lib/cluster/ShardedSubscriber.ts b/lib/cluster/ShardedSubscriber.ts new file mode 100644 index 00000000..ae7e4b88 --- /dev/null +++ b/lib/cluster/ShardedSubscriber.ts @@ -0,0 +1,111 @@ +import EventEmitter = require("events"); +import Redis from "../redis"; +import { getConnectionName, getNodeKey, IRedisOptions } from "./util"; +import { Debug } from "../utils"; +const debug = Debug("cluster:subscriberGroup:shardedSubscriber"); + +export default class ShardedSubscriber { + private readonly nodeKey: string; + private started = false; + private instance: any = null; + + // Store listener references for cleanup + private readonly onEnd: () => void; + private readonly onError: (error: Error) => void; + private readonly onMoved: () => void; + private readonly messageListeners: Map void> = + new Map(); + + constructor(private readonly emitter: EventEmitter, options: IRedisOptions) { + this.instance = new Redis({ + port: options.port, + host: options.host, + username: options.username, + password: options.password, + enableReadyCheck: false, + connectionName: getConnectionName("ssubscriber", options.connectionName), + lazyConnect: true, + tls: options.tls, + /** + * Disable auto reconnection for subscribers. + * The ClusterSubscriberGroup will handle the reconnection. + */ + retryStrategy: null, + }); + + this.nodeKey = getNodeKey(options); + + // Define listeners as instance methods so we can remove them later + this.onEnd = () => { + this.started = false; + this.emitter.emit("-node", this.instance, this.nodeKey); + }; + + this.onError = (error: Error) => { + this.emitter.emit("nodeError", error, this.nodeKey); + }; + + this.onMoved = () => { + this.emitter.emit("moved"); + }; + + // Register listeners + this.instance.once("end", this.onEnd); + this.instance.on("error", this.onError); + this.instance.on("moved", this.onMoved); + + for (const event of ["smessage", "smessageBuffer"]) { + const listener = (...args: any[]) => { + this.emitter.emit(event, ...args); + }; + this.messageListeners.set(event, listener); + this.instance.on(event, listener); + } + } + + async start(): Promise { + if (this.started) { + debug("already started %s", this.nodeKey); + return; + } + + try { + await this.instance.connect(); + debug("started %s", this.nodeKey); + this.started = true; + } catch (err) { + debug("failed to start %s: %s", this.nodeKey, err); + this.started = false; + throw err; // Re-throw so caller knows it failed + } + } + + stop(): void { + this.started = false; + + if (this.instance) { + // Remove all listeners before disconnecting + this.instance.off("end", this.onEnd); + this.instance.off("error", this.onError); + this.instance.off("moved", this.onMoved); + + for (const [event, listener] of this.messageListeners) { + this.instance.off(event, listener); + } + this.messageListeners.clear(); + + this.instance.disconnect(); + this.instance = null; + } + + debug("stopped %s", this.nodeKey); + } + + isStarted(): boolean { + return this.started; + } + + getInstance(): any { + return this.instance; + } +} diff --git a/lib/cluster/index.ts b/lib/cluster/index.ts index 5957f049..ab2b3ccd 100644 --- a/lib/cluster/index.ts +++ b/lib/cluster/index.ts @@ -12,7 +12,6 @@ import { groupSrvRecords, weightSrvRecords, getConnectionName, - getNodeKey, } from "./util"; import ClusterSubscriber from "./ClusterSubscriber"; import DelayQueue from "./DelayQueue"; @@ -36,6 +35,7 @@ import Commander from "../commander"; import Deque = require("denque"); import { Pipeline } from ".."; import ClusterSubscriberGroup from "./ClusterSubscriberGroup"; +import ShardedSubscriber from "./ShardedSubscriber"; const debug = Debug("cluster"); @@ -79,6 +79,7 @@ class Cluster extends EventEmitter { private _readyDelayedCallbacks: CallbackFunction[] = []; public _addedScriptHashes: { [key: string]: any } = {}; public _addedScriptHashesCleanInterval: NodeJS.Timeout; + private subscriberGroupEmitter: EventEmitter | null; /** * Every time Cluster#connect() is called, this value will be @@ -109,11 +110,9 @@ class Cluster extends EventEmitter { this.startupNodes = startupNodes; this.options = defaults({}, options, DEFAULT_CLUSTER_OPTIONS, this.options); - if (this.options.shardedSubscribers == true) - this.shardedSubscribers = new ClusterSubscriberGroup( - this, - this.refreshSlotsCache.bind(this) - ); + if (this.options.shardedSubscribers) { + this.createShardedSubscriberGroup(); + } // validate options if ( @@ -131,6 +130,10 @@ class Cluster extends EventEmitter { this.connectionPool.on("-node", (redis, key) => { this.emit("-node", redis); + + if (this.options.slotsRefreshOnDisconnect) { + this.refreshSlotsCache(); + } }); this.connectionPool.on("+node", (redis) => { this.emit("+node", redis); @@ -243,6 +246,15 @@ class Cluster extends EventEmitter { } this.connectionPool.reset(nodes); + if (this.options.shardedSubscribers) { + this.shardedSubscribers + .reset(this.slots, this.connectionPool.getNodes("all")) + .catch((err) => { + // TODO should we emit an error event here? + debug("Error while starting subscribers: %s", err); + }); + } + function readyHandler() { this.setStatus("ready"); this.retryAttempts = 0; @@ -298,7 +310,10 @@ class Cluster extends EventEmitter { this.subscriber.start(); if (this.options.shardedSubscribers) { - this.shardedSubscribers.start(); + this.shardedSubscribers.start().catch((err) => { + // TODO should we emit an error event here? + debug("Error while starting subscribers: %s", err); + }); } }) .catch((err) => { @@ -347,6 +362,9 @@ class Cluster extends EventEmitter { retryDelay ); } else { + if (this.options.shardedSubscribers) { + this.subscriberGroupEmitter?.removeAllListeners(); + } this.setStatus("end"); this.flushQueue(new Error("None of startup nodes is available")); } @@ -411,6 +429,10 @@ class Cluster extends EventEmitter { this.subscriber.stop(); + if (this.options.shardedSubscribers) { + this.shardedSubscribers.stop(); + } + const Promise = PromiseContainer.get(); if (status === "wait") { const ret = asCallback(Promise.resolve<"OK">("OK"), callback); @@ -703,22 +725,32 @@ class Cluster extends EventEmitter { _this.options.shardedSubscribers == true && (command.name == "ssubscribe" || command.name == "sunsubscribe") ) { - const sub: ClusterSubscriber = + const sub: ShardedSubscriber = _this.shardedSubscribers.getResponsibleSubscriber(targetSlot); + + if (!sub) { + command.reject( + new AbortError(`No sharded subscriber for slot: ${targetSlot}`) + ); + return; + } + let status = -1; - if (command.name == "ssubscribe") + if (command.name == "ssubscribe") { status = _this.shardedSubscribers.addChannels(command.getKeys()); - if (command.name == "sunsubscribe") + } + if (command.name == "sunsubscribe") { status = _this.shardedSubscribers.removeChannels( command.getKeys() ); + } if (status !== -1) { redis = sub.getInstance(); } else { command.reject( new AbortError( - "Can't add or remove the given channels. Are they in the same slot?" + "Possible CROSSSLOT error: All channels must hash to the same slot" ) ); } @@ -939,6 +971,15 @@ class Cluster extends EventEmitter { } this.connectionPool.reset(nodes); + + if (this.options.shardedSubscribers) { + this.shardedSubscribers + .reset(this.slots, this.connectionPool.getNodes("all")) + .catch((err) => { + // TODO should we emit an error event here? + debug("Error while starting subscribers: %s", err); + }); + } callback(); }, this.options.slotsRefreshTimeout) ); @@ -1088,6 +1129,48 @@ class Cluster extends EventEmitter { }); }); } + + private createShardedSubscriberGroup() { + this.subscriberGroupEmitter = new EventEmitter(); + + this.shardedSubscribers = new ClusterSubscriberGroup( + this.subscriberGroupEmitter + ); + + this.subscriberGroupEmitter.on("-node", (redis, nodeKey) => { + this.emit("-node", redis, nodeKey); + + if (this.options.slotsRefreshOnDisconnect) { + this.refreshSlotsCache(); + } + }); + + this.subscriberGroupEmitter.on("moved", () => { + this.refreshSlotsCache(); + }); + + this.subscriberGroupEmitter.on("-subscriber", () => { + this.emit("-subscriber"); + }); + + this.subscriberGroupEmitter.on("+subscriber", () => { + this.emit("+subscriber"); + }); + + this.subscriberGroupEmitter.on("nodeError", (error, nodeKey) => { + this.emit("nodeError", error, nodeKey); + }); + + this.subscriberGroupEmitter.on("subscribersReady", () => { + this.emit("subscribersReady"); + }); + + for (const event of ["smessage", "smessageBuffer"]) { + this.subscriberGroupEmitter.on(event, (arg1, arg2, arg3) => { + this.emit(event, arg1, arg2, arg3); + }); + } + } } Object.getOwnPropertyNames(Commander.prototype).forEach((name) => { diff --git a/test/cluster/cluster_subscriber_group.ts b/test/cluster/cluster_subscriber_group.ts index 9e9a3a82..d2776d29 100644 --- a/test/cluster/cluster_subscriber_group.ts +++ b/test/cluster/cluster_subscriber_group.ts @@ -89,7 +89,7 @@ describe("cluster:ClusterSubscriberGroup", () => { expect( err .toString() - .includes("CROSSSLOT Keys in request don't hash to the same slot") + .includes("CROSSSLOT") ).to.be.true; }); diff --git a/test/scenario/sharded-pub-sub.test.ts b/test/scenario/sharded-pub-sub.test.ts index 0a308f97..f433d341 100644 --- a/test/scenario/sharded-pub-sub.test.ts +++ b/test/scenario/sharded-pub-sub.test.ts @@ -3,7 +3,6 @@ import { createClusterTestClient, getConfig, wait, - waitClientReady, } from "./utils/test.util"; import { FaultInjectorClient } from "./utils/fault-injector"; @@ -24,25 +23,41 @@ describe("Sharded Pub/Sub E2E", () => { }); describe("Single Subscriber", () => { - let subscriber: Cluster; - let publisher: Cluster; - let messageTracker: MessageTracker; + let cleanup: (() => Promise) | null = null; - beforeEach(async () => { - messageTracker = new MessageTracker(CHANNELS); - subscriber = createClusterTestClient(config.clientConfig, { - shardedSubscribers: true, - }); - publisher = createClusterTestClient(config.clientConfig, { + const setup = async (subscriberOverrides = {}, publisherOverrides = {}) => { + const messageTracker = new MessageTracker(CHANNELS); + const subscriber = createClusterTestClient(config.clientConfig, { shardedSubscribers: true, + ...subscriberOverrides, }); - }); + const publisher = createClusterTestClient( + config.clientConfig, + publisherOverrides + ); + + // Return cleanup function along with the resources + cleanup = async () => { + await Promise.all([subscriber.quit(), publisher.quit()]); + }; + + return { subscriber, publisher, messageTracker }; + }; afterEach(async () => { - await Promise.all([subscriber.quit(), publisher.quit()]); + if (cleanup) { + try { + await cleanup(); + } catch { + } finally { + cleanup = null; + } + } }); it("should receive messages published to multiple channels", async () => { + const { subscriber, publisher, messageTracker } = await setup(); + for (const channel of CHANNELS) { await subscriber.ssubscribe(channel); } @@ -71,87 +86,110 @@ describe("Sharded Pub/Sub E2E", () => { } }); - it("should resume publishing and receiving after failover", async () => { - for (const channel of CHANNELS) { - await subscriber.ssubscribe(channel); - } + [ + { + name: "slotsRefreshInterval: -1", + subscriberOverrides: { + slotsRefreshInterval: -1, + slotsRefreshOnDisconnect: true, + shardedSubscribers: true, + }, + }, + { + name: "slotsRefreshInterval: default", + subscriberOverrides: { + shardedSubscribers: true, + }, + }, + ].map((testCase) => { + it(`should resume publishing and receiving after failover - ${testCase.name}`, async () => { + const { subscriber, publisher, messageTracker } = await setup( + testCase.subscriberOverrides + ); - subscriber.on("smessage", (channelName, _) => { - messageTracker.incrementReceived(channelName); - }); + for (const channel of CHANNELS) { + await subscriber.ssubscribe(channel); + } + + subscriber.on("smessage", (channelName, _) => { + messageTracker.incrementReceived(channelName); + }); - // Trigger failover twice - for (let i = 0; i < 2; i++) { - // Start publishing messages - const { controller: publishAbort, result: publishResult } = - TestCommandRunner.publishMessagesUntilAbortSignal( + // Trigger failover twice + for (let i = 0; i < 2; i++) { + // Start publishing messages + const { controller: publishAbort, result: publishResult } = + TestCommandRunner.publishMessagesUntilAbortSignal( + publisher, + CHANNELS, + messageTracker + ); + + // Trigger failover during publishing + const { action_id: failoverActionId } = + await faultInjectorClient.triggerAction({ + type: "failover", + parameters: { + bdb_id: config.clientConfig.bdbId.toString(), + cluster_index: 0, + }, + }); + + // Wait for failover to complete + await faultInjectorClient.waitForAction(failoverActionId); + + publishAbort.abort(); + await publishResult; + + for (const channel of CHANNELS) { + const sent = messageTracker.getChannelStats(channel)!.sent; + const received = messageTracker.getChannelStats(channel)!.received; + + assert.ok( + received <= sent, + `Channel ${channel}: received (${received}) should be <= sent (${sent})` + ); + } + + // Wait for 3 seconds before resuming publishing + await wait(5_000); + + messageTracker.reset(); + + const { + controller: afterFailoverController, + result: afterFailoverResult, + } = TestCommandRunner.publishMessagesUntilAbortSignal( publisher, CHANNELS, messageTracker ); - // Trigger failover during publishing - const { action_id: failoverActionId } = - await faultInjectorClient.triggerAction({ - type: "failover", - parameters: { - bdb_id: config.clientConfig.bdbId.toString(), - cluster_index: 0, - }, - }); - - // Wait for failover to complete - await faultInjectorClient.waitForAction(failoverActionId); - - publishAbort.abort(); - await publishResult; - - for (const channel of CHANNELS) { - const sent = messageTracker.getChannelStats(channel)!.sent; - const received = messageTracker.getChannelStats(channel)!.received; - - assert.ok( - received <= sent, - `Channel ${channel}: received (${received}) should be <= sent (${sent})` - ); + await wait(10_000); + afterFailoverController.abort(); + await afterFailoverResult; + + for (const channel of CHANNELS) { + const sent = messageTracker.getChannelStats(channel)!.sent; + const received = messageTracker.getChannelStats(channel)!.received; + assert.ok(sent > 0, `Channel ${channel} should have sent messages`); + assert.ok( + received > 0, + `Channel ${channel} should have received messages` + ); + assert.strictEqual( + messageTracker.getChannelStats(channel)!.received, + messageTracker.getChannelStats(channel)!.sent, + `Channel ${channel} received (${received}) should equal sent (${sent}) once resumed after failover` + ); + } } - - // Wait for 2 seconds before resuming publishing - await wait(2_000); - - messageTracker.reset(); - - const { - controller: afterFailoverController, - result: afterFailoverResult, - } = TestCommandRunner.publishMessagesUntilAbortSignal( - publisher, - CHANNELS, - messageTracker - ); - - await wait(10_000); - afterFailoverController.abort(); - await afterFailoverResult; - - for (const channel of CHANNELS) { - const sent = messageTracker.getChannelStats(channel)!.sent; - const received = messageTracker.getChannelStats(channel)!.received; - assert.ok(sent > 0, `Channel ${channel} should have sent messages`); - assert.ok( - received > 0, - `Channel ${channel} should have received messages` - ); - assert.strictEqual( - messageTracker.getChannelStats(channel)!.received, - messageTracker.getChannelStats(channel)!.sent, - `Channel ${channel} received (${received}) should equal sent (${sent}) once resumed after failover` - ); - } - } + }); }); it("should NOT receive messages after sunsubscribe", async () => { + const { subscriber, publisher, messageTracker } = await setup(); + for (const channel of CHANNELS) { await subscriber.ssubscribe(channel); } @@ -353,8 +391,8 @@ describe("Sharded Pub/Sub E2E", () => { ); } - // Wait for 2 seconds before resuming publishing - await wait(2_000); + // Wait for 5 seconds before resuming publishing + await wait(5_000); messageTracker1.reset(); messageTracker2.reset(); From 9d1beb0240e777a647339a39b1e1332325330fc0 Mon Sep 17 00:00:00 2001 From: Pavel Pashov Date: Wed, 26 Nov 2025 15:50:57 +0200 Subject: [PATCH 2/5] chore(ci): removed node 8 and added 22 --- .github/workflows/main.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index d5ff146b..ed8106e8 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -28,7 +28,7 @@ jobs: strategy: fail-fast: false matrix: - node: [8.x, 10.x, 12.x, 14.x, 16.x, 20.x] + node: [10.x, 12.x, 14.x, 16.x, 20.x, 22.x] # Steps represent a sequence of tasks that will be executed as part of the job steps: From d72b9c5ed77e1e6fbc2c48a19b2d75285ca48497 Mon Sep 17 00:00:00 2001 From: Pavel Pashov Date: Thu, 27 Nov 2025 16:45:26 +0200 Subject: [PATCH 3/5] fix(cluster): always refresh slots on sharded subscriber disconnect --- lib/cluster/ClusterOptions.ts | 11 -- lib/cluster/index.ts | 8 +- test/scenario/sharded-pub-sub.test.ts | 213 ++++++++++++-------------- 3 files changed, 102 insertions(+), 130 deletions(-) diff --git a/lib/cluster/ClusterOptions.ts b/lib/cluster/ClusterOptions.ts index c3347976..051cb5ba 100644 --- a/lib/cluster/ClusterOptions.ts +++ b/lib/cluster/ClusterOptions.ts @@ -120,16 +120,6 @@ export interface IClusterOptions { */ slotsRefreshInterval?: number; - /** - * Whether to refresh the cluster slot cache when a node connection is closed. - * - * Useful when slotsRefreshInterval is disabled (-1) but you still want - * topology to be updated promptly on node disconnect/failover. - * - * @default false - */ - slotsRefreshOnDisconnect?: boolean; - /** * Use sharded subscribers instead of a single subscriber. * @@ -221,7 +211,6 @@ export const DEFAULT_CLUSTER_OPTIONS: IClusterOptions = { retryDelayOnTryAgain: 100, slotsRefreshTimeout: 1000, slotsRefreshInterval: 5000, - slotsRefreshOnDisconnect: false, useSRVRecords: false, resolveSrv: resolveSrv, dnsLookup: lookup, diff --git a/lib/cluster/index.ts b/lib/cluster/index.ts index ab2b3ccd..95f7aa7e 100644 --- a/lib/cluster/index.ts +++ b/lib/cluster/index.ts @@ -130,10 +130,6 @@ class Cluster extends EventEmitter { this.connectionPool.on("-node", (redis, key) => { this.emit("-node", redis); - - if (this.options.slotsRefreshOnDisconnect) { - this.refreshSlotsCache(); - } }); this.connectionPool.on("+node", (redis) => { this.emit("+node", redis); @@ -1140,9 +1136,7 @@ class Cluster extends EventEmitter { this.subscriberGroupEmitter.on("-node", (redis, nodeKey) => { this.emit("-node", redis, nodeKey); - if (this.options.slotsRefreshOnDisconnect) { - this.refreshSlotsCache(); - } + this.refreshSlotsCache(); }); this.subscriberGroupEmitter.on("moved", () => { diff --git a/test/scenario/sharded-pub-sub.test.ts b/test/scenario/sharded-pub-sub.test.ts index f433d341..78476aac 100644 --- a/test/scenario/sharded-pub-sub.test.ts +++ b/test/scenario/sharded-pub-sub.test.ts @@ -1,9 +1,5 @@ import type { TestConfig } from "./utils/test.util"; -import { - createClusterTestClient, - getConfig, - wait, -} from "./utils/test.util"; +import { createClusterTestClient, getConfig, wait } from "./utils/test.util"; import { FaultInjectorClient } from "./utils/fault-injector"; import { TestCommandRunner } from "./utils/command-runner"; @@ -11,6 +7,7 @@ import { CHANNELS, CHANNELS_BY_SLOT } from "./utils/test.util"; import { MessageTracker } from "./utils/message-tracker"; import { Cluster } from "../../lib"; import { assert } from "chai"; +import { IClusterOptions } from "../../lib/cluster/ClusterOptions"; describe("Sharded Pub/Sub E2E", () => { let faultInjectorClient: FaultInjectorClient; @@ -25,7 +22,10 @@ describe("Sharded Pub/Sub E2E", () => { describe("Single Subscriber", () => { let cleanup: (() => Promise) | null = null; - const setup = async (subscriberOverrides = {}, publisherOverrides = {}) => { + const setup = async ( + subscriberOverrides: Partial = {}, + publisherOverrides: Partial = {} + ) => { const messageTracker = new MessageTracker(CHANNELS); const subscriber = createClusterTestClient(config.clientConfig, { shardedSubscribers: true, @@ -86,105 +86,93 @@ describe("Sharded Pub/Sub E2E", () => { } }); - [ - { - name: "slotsRefreshInterval: -1", - subscriberOverrides: { - slotsRefreshInterval: -1, - slotsRefreshOnDisconnect: true, - shardedSubscribers: true, - }, - }, - { - name: "slotsRefreshInterval: default", - subscriberOverrides: { - shardedSubscribers: true, - }, - }, - ].map((testCase) => { - it(`should resume publishing and receiving after failover - ${testCase.name}`, async () => { - const { subscriber, publisher, messageTracker } = await setup( - testCase.subscriberOverrides - ); + it("should resume publishing and receiving after failover", async () => { + const { subscriber, publisher, messageTracker } = await setup({ + slotsRefreshInterval: -1, + }); - for (const channel of CHANNELS) { - await subscriber.ssubscribe(channel); - } + for (const channel of CHANNELS) { + await subscriber.ssubscribe(channel); + } - subscriber.on("smessage", (channelName, _) => { - messageTracker.incrementReceived(channelName); - }); + subscriber.on("smessage", (channelName, _) => { + messageTracker.incrementReceived(channelName); + }); - // Trigger failover twice - for (let i = 0; i < 2; i++) { - // Start publishing messages - const { controller: publishAbort, result: publishResult } = - TestCommandRunner.publishMessagesUntilAbortSignal( - publisher, - CHANNELS, - messageTracker - ); - - // Trigger failover during publishing - const { action_id: failoverActionId } = - await faultInjectorClient.triggerAction({ - type: "failover", - parameters: { - bdb_id: config.clientConfig.bdbId.toString(), - cluster_index: 0, - }, - }); - - // Wait for failover to complete - await faultInjectorClient.waitForAction(failoverActionId); - - publishAbort.abort(); - await publishResult; - - for (const channel of CHANNELS) { - const sent = messageTracker.getChannelStats(channel)!.sent; - const received = messageTracker.getChannelStats(channel)!.received; - - assert.ok( - received <= sent, - `Channel ${channel}: received (${received}) should be <= sent (${sent})` - ); - } - - // Wait for 3 seconds before resuming publishing - await wait(5_000); - - messageTracker.reset(); - - const { - controller: afterFailoverController, - result: afterFailoverResult, - } = TestCommandRunner.publishMessagesUntilAbortSignal( + // Trigger failover twice + for (let i = 0; i < 2; i++) { + // Start publishing messages + const { controller: publishAbort, result: publishResult } = + TestCommandRunner.publishMessagesUntilAbortSignal( publisher, CHANNELS, messageTracker ); - await wait(10_000); - afterFailoverController.abort(); - await afterFailoverResult; - - for (const channel of CHANNELS) { - const sent = messageTracker.getChannelStats(channel)!.sent; - const received = messageTracker.getChannelStats(channel)!.received; - assert.ok(sent > 0, `Channel ${channel} should have sent messages`); - assert.ok( - received > 0, - `Channel ${channel} should have received messages` - ); - assert.strictEqual( - messageTracker.getChannelStats(channel)!.received, - messageTracker.getChannelStats(channel)!.sent, - `Channel ${channel} received (${received}) should equal sent (${sent}) once resumed after failover` - ); - } + // Trigger failover during publishing + const { action_id: failoverActionId } = + await faultInjectorClient.triggerAction({ + type: "failover", + parameters: { + bdb_id: config.clientConfig.bdbId.toString(), + cluster_index: 0, + }, + }); + + // Wait for failover to complete + await faultInjectorClient.waitForAction(failoverActionId); + + publishAbort.abort(); + await publishResult; + + const totalSent = CHANNELS.reduce( + (acc, channel) => acc + messageTracker.getChannelStats(channel)!.sent, + 0 + ); + const totalReceived = CHANNELS.reduce( + (acc, channel) => + acc + messageTracker.getChannelStats(channel)!.received, + 0 + ); + + assert.ok( + totalReceived <= totalSent, + `Total received (${totalReceived}) should be <= total sent (${totalSent})` + ); + + // Wait for 2 seconds before resuming publishing + await wait(2_000); + + messageTracker.reset(); + + const { + controller: afterFailoverController, + result: afterFailoverResult, + } = TestCommandRunner.publishMessagesUntilAbortSignal( + publisher, + CHANNELS, + messageTracker + ); + + await wait(10_000); + afterFailoverController.abort(); + await afterFailoverResult; + + for (const channel of CHANNELS) { + const sent = messageTracker.getChannelStats(channel)!.sent; + const received = messageTracker.getChannelStats(channel)!.received; + assert.ok(sent > 0, `Channel ${channel} should have sent messages`); + assert.ok( + received > 0, + `Channel ${channel} should have received messages` + ); + assert.strictEqual( + messageTracker.getChannelStats(channel)!.received, + messageTracker.getChannelStats(channel)!.sent, + `Channel ${channel} received (${received}) should equal sent (${sent}) once resumed after failover` + ); } - }); + } }); it("should NOT receive messages after sunsubscribe", async () => { @@ -375,24 +363,25 @@ describe("Sharded Pub/Sub E2E", () => { publishAbort.abort(); await publishResult; - for (const channel of CHANNELS) { - const sent = messageTracker1.getChannelStats(channel)!.sent; - const received1 = messageTracker1.getChannelStats(channel)!.received; - - const received2 = messageTracker2.getChannelStats(channel)!.received; + const totalSent = CHANNELS.reduce( + (acc, channel) => acc + messageTracker1.getChannelStats(channel)!.sent, + 0 + ); + const totalReceived = CHANNELS.reduce( + (acc, channel) => + acc + + messageTracker1.getChannelStats(channel)!.received + + messageTracker2.getChannelStats(channel)!.received, + 0 + ); - assert.ok( - received1 <= sent, - `Channel ${channel}: received (${received1}) should be <= sent (${sent})` - ); - assert.ok( - received2 <= sent, - `Channel ${channel}: received2 (${received2}) should be <= sent (${sent})` - ); - } + assert.ok( + totalReceived <= totalSent * 2, + `Total received (${totalReceived}) should be <= total sent (${totalSent})` + ); - // Wait for 5 seconds before resuming publishing - await wait(5_000); + // Wait for 2 seconds before resuming publishing + await wait(2_000); messageTracker1.reset(); messageTracker2.reset(); From d542b492d1c1640d66952d15bf7282506a485eec Mon Sep 17 00:00:00 2001 From: Pavel Pashov Date: Fri, 28 Nov 2025 17:19:29 +0200 Subject: [PATCH 4/5] fix: detect and recover unhealthy sharded subscribers --- lib/cluster/ClusterSubscriberGroup.ts | 46 ++++++++++++++++++++++++--- lib/cluster/ShardedSubscriber.ts | 10 +++++- lib/cluster/index.ts | 6 ++++ 3 files changed, 57 insertions(+), 5 deletions(-) diff --git a/lib/cluster/ClusterSubscriberGroup.ts b/lib/cluster/ClusterSubscriberGroup.ts index a0f58274..d99d528e 100644 --- a/lib/cluster/ClusterSubscriberGroup.ts +++ b/lib/cluster/ClusterSubscriberGroup.ts @@ -110,7 +110,11 @@ export default class ClusterSubscriberGroup { const startPromises = []; for (const s of this.shardedSubscribers.values()) { if (!s.isStarted()) { - startPromises.push(s.start()); + startPromises.push( + s.start().catch((err) => { + this.subscriberGroupEmitter.emit("subscriberConnectFailed", err); + }) + ); } } return Promise.all(startPromises); @@ -123,8 +127,13 @@ export default class ClusterSubscriberGroup { clusterSlots: string[][], clusterNodes: any[] ): Promise { - // Update the slots cache and continue if there was a change - if (!this._refreshSlots(clusterSlots)) { + const hasTopologyChanged = this._refreshSlots(clusterSlots); + const hasFailedSubscribers = this.hasUnhealthySubscribers(); + + if (!hasTopologyChanged && !hasFailedSubscribers) { + debug( + "No topology change detected or failed subscribers. Skipping reset." + ); return; } @@ -135,9 +144,11 @@ export default class ClusterSubscriberGroup { this.subscriberToSlotsIndex.has(nodeKey) && shardedSubscriber.isStarted() ) { + debug("Skipping deleting subscriber for %s", nodeKey); continue; } + debug("Removing subscriber for %s", nodeKey); // Otherwise stop the subscriber and remove it shardedSubscriber.stop(); this.shardedSubscribers.delete(nodeKey); @@ -150,9 +161,11 @@ export default class ClusterSubscriberGroup { for (const [nodeKey, _] of this.subscriberToSlotsIndex) { // If we already have a subscriber for this node then keep it if (this.shardedSubscribers.has(nodeKey)) { + debug("Skipping creating new subscriber for %s", nodeKey); continue; } + debug("Creating new subscriber for %s", nodeKey); // Otherwise create a new subscriber const redis = clusterNodes.find((node) => { return getNodeKey(node.options) === nodeKey; @@ -170,7 +183,11 @@ export default class ClusterSubscriberGroup { this.shardedSubscribers.set(nodeKey, sub); - startPromises.push(sub.start()); + startPromises.push( + sub.start().catch((err) => { + this.subscriberGroupEmitter.emit("subscriberConnectFailed", err); + }) + ); this.subscriberGroupEmitter.emit("+subscriber"); } @@ -272,4 +289,25 @@ export default class ClusterSubscriberGroup { return JSON.stringify(this.clusterSlots) === JSON.stringify(other); } } + + /** + * Checks if any subscribers are in an unhealthy state. + * + * A subscriber is considered unhealthy if: + * - It exists but is not started (failed/disconnected) + * - It's missing entirely for a node that should have one + * + * @returns true if any subscribers need to be recreated + */ + private hasUnhealthySubscribers(): boolean { + const hasFailedSubscribers = Array.from( + this.shardedSubscribers.values() + ).some((sub) => !sub.isStarted()); + + const hasMissingSubscribers = Array.from( + this.subscriberToSlotsIndex.keys() + ).some((nodeKey) => !this.shardedSubscribers.has(nodeKey)); + + return hasFailedSubscribers || hasMissingSubscribers; + } } diff --git a/lib/cluster/ShardedSubscriber.ts b/lib/cluster/ShardedSubscriber.ts index ae7e4b88..8f961094 100644 --- a/lib/cluster/ShardedSubscriber.ts +++ b/lib/cluster/ShardedSubscriber.ts @@ -101,8 +101,16 @@ export default class ShardedSubscriber { debug("stopped %s", this.nodeKey); } + /** + * Checks if the subscriber is started and NOT explicitly disconnected. + */ isStarted(): boolean { - return this.started; + const status = this.instance?.status; + + const isDisconnected = + status === "end" || status === "close" || !this.instance; + + return this.started && !isDisconnected; } getInstance(): any { diff --git a/lib/cluster/index.ts b/lib/cluster/index.ts index 95f7aa7e..14131c27 100644 --- a/lib/cluster/index.ts +++ b/lib/cluster/index.ts @@ -1139,6 +1139,12 @@ class Cluster extends EventEmitter { this.refreshSlotsCache(); }); + this.subscriberGroupEmitter.on("subscriberConnectFailed", (err) => { + this.emit("error", err); + + this.refreshSlotsCache(); + }); + this.subscriberGroupEmitter.on("moved", () => { this.refreshSlotsCache(); }); From c9cb3ed46f3c0e0b430d0f578743e3e5dfb8b657 Mon Sep 17 00:00:00 2001 From: Pavel Pashov Date: Mon, 1 Dec 2025 17:48:41 +0200 Subject: [PATCH 5/5] fix: add exponential backoff for sharded subscriber reconnection --- lib/cluster/ClusterSubscriberGroup.ts | 201 ++++++++++++++++++-------- lib/cluster/ShardedSubscriber.ts | 46 +++--- lib/cluster/index.ts | 15 +- 3 files changed, 167 insertions(+), 95 deletions(-) diff --git a/lib/cluster/ClusterSubscriberGroup.ts b/lib/cluster/ClusterSubscriberGroup.ts index d99d528e..53323ed4 100644 --- a/lib/cluster/ClusterSubscriberGroup.ts +++ b/lib/cluster/ClusterSubscriberGroup.ts @@ -20,6 +20,16 @@ export default class ClusterSubscriberGroup { // Simple [min, max] slot ranges aren't enough because you can migrate single slots private subscriberToSlotsIndex: Map = new Map(); private channels: Map> = new Map(); + private failedAttemptsByNode: Map = new Map(); + + // Only latest pending reset kept; throttled by refreshSlotsCache's isRefreshing + backoff delay + private isResetting = false; + private pendingReset: { slots: string[][]; nodes: any[] } | null = null; + + // Retry strategy + private static readonly MAX_RETRY_ATTEMPTS = 10; + private static readonly MAX_BACKOFF_MS = 2000; + private static readonly BASE_BACKOFF_MS = 100; /** * Register callbacks @@ -111,9 +121,14 @@ export default class ClusterSubscriberGroup { for (const s of this.shardedSubscribers.values()) { if (!s.isStarted()) { startPromises.push( - s.start().catch((err) => { - this.subscriberGroupEmitter.emit("subscriberConnectFailed", err); - }) + s + .start() + .then(() => { + this.handleSubscriberConnectSucceeded(s.getNodeKey()); + }) + .catch((err) => { + this.handleSubscriberConnectFailed(err, s.getNodeKey()); + }) ); } } @@ -127,83 +142,99 @@ export default class ClusterSubscriberGroup { clusterSlots: string[][], clusterNodes: any[] ): Promise { - const hasTopologyChanged = this._refreshSlots(clusterSlots); - const hasFailedSubscribers = this.hasUnhealthySubscribers(); - - if (!hasTopologyChanged && !hasFailedSubscribers) { - debug( - "No topology change detected or failed subscribers. Skipping reset." - ); + if (this.isResetting) { + this.pendingReset = { slots: clusterSlots, nodes: clusterNodes }; return; } - // For each of the sharded subscribers - for (const [nodeKey, shardedSubscriber] of this.shardedSubscribers) { - if ( - // If the subscriber is still responsible for a slot range and is running then keep it - this.subscriberToSlotsIndex.has(nodeKey) && - shardedSubscriber.isStarted() - ) { - debug("Skipping deleting subscriber for %s", nodeKey); - continue; + this.isResetting = true; + + try { + const hasTopologyChanged = this._refreshSlots(clusterSlots); + const hasFailedSubscribers = this.hasUnhealthySubscribers(); + + if (!hasTopologyChanged && !hasFailedSubscribers) { + debug( + "No topology change detected or failed subscribers. Skipping reset." + ); + return; } - debug("Removing subscriber for %s", nodeKey); - // Otherwise stop the subscriber and remove it - shardedSubscriber.stop(); - this.shardedSubscribers.delete(nodeKey); + // For each of the sharded subscribers + for (const [nodeKey, shardedSubscriber] of this.shardedSubscribers) { + if ( + // If the subscriber is still responsible for a slot range and is running then keep it + this.subscriberToSlotsIndex.has(nodeKey) && + shardedSubscriber.isStarted() + ) { + debug("Skipping deleting subscriber for %s", nodeKey); + continue; + } - this.subscriberGroupEmitter.emit("-subscriber"); - } + debug("Removing subscriber for %s", nodeKey); + // Otherwise stop the subscriber and remove it + shardedSubscriber.stop(); + this.shardedSubscribers.delete(nodeKey); - const startPromises = []; - // For each node in slots cache - for (const [nodeKey, _] of this.subscriberToSlotsIndex) { - // If we already have a subscriber for this node then keep it - if (this.shardedSubscribers.has(nodeKey)) { - debug("Skipping creating new subscriber for %s", nodeKey); - continue; + this.subscriberGroupEmitter.emit("-subscriber"); } - debug("Creating new subscriber for %s", nodeKey); - // Otherwise create a new subscriber - const redis = clusterNodes.find((node) => { - return getNodeKey(node.options) === nodeKey; - }); + const startPromises = []; + // For each node in slots cache + for (const [nodeKey, _] of this.subscriberToSlotsIndex) { + // If we already have a subscriber for this node then keep it + if (this.shardedSubscribers.has(nodeKey)) { + debug("Skipping creating new subscriber for %s", nodeKey); + continue; + } - if (!redis) { - debug("Failed to find node for key %s", nodeKey); - continue; - } + debug("Creating new subscriber for %s", nodeKey); + // Otherwise create a new subscriber + const redis = clusterNodes.find((node) => { + return getNodeKey(node.options) === nodeKey; + }); - const sub = new ShardedSubscriber( - this.subscriberGroupEmitter, - redis.options - ); + if (!redis) { + debug("Failed to find node for key %s", nodeKey); + continue; + } - this.shardedSubscribers.set(nodeKey, sub); + const sub = new ShardedSubscriber( + this.subscriberGroupEmitter, + redis.options + ); - startPromises.push( - sub.start().catch((err) => { - this.subscriberGroupEmitter.emit("subscriberConnectFailed", err); - }) - ); + this.shardedSubscribers.set(nodeKey, sub); - this.subscriberGroupEmitter.emit("+subscriber"); - } + startPromises.push( + sub + .start() + .then(() => { + this.handleSubscriberConnectSucceeded(nodeKey); + }) + .catch((error) => { + this.handleSubscriberConnectFailed(error, nodeKey); + }) + ); - // It's vital to await the start promises before resubscribing - // Otherwise we might try to resubscribe to a subscriber that is not yet connected - // This can cause a race condition - try { + this.subscriberGroupEmitter.emit("+subscriber"); + } + + // It's vital to await the start promises before resubscribing + // Otherwise we might try to resubscribe to a subscriber that is not yet connected + // This can cause a race condition await Promise.all(startPromises); - } catch (err) { - debug("Error while starting subscribers: %s", err); - this.subscriberGroupEmitter.emit("error", err); - } - this._resubscribe(); - this.subscriberGroupEmitter.emit("subscribersReady"); + this._resubscribe(); + this.subscriberGroupEmitter.emit("subscribersReady"); + } finally { + this.isResetting = false; + if (this.pendingReset) { + const { slots, nodes } = this.pendingReset; + this.pendingReset = null; + await this.reset(slots, nodes); + } + } } /** @@ -310,4 +341,48 @@ export default class ClusterSubscriberGroup { return hasFailedSubscribers || hasMissingSubscribers; } + + /** + * Handles failed subscriber connections by emitting an event to refresh the slots cache + * after a backoff period. + * + * @param error + * @param nodeKey + */ + private handleSubscriberConnectFailed = (error: Error, nodeKey: string) => { + const currentAttempts = this.failedAttemptsByNode.get(nodeKey) || 0; + const failedAttempts = currentAttempts + 1; + this.failedAttemptsByNode.set(nodeKey, failedAttempts); + + const attempts = Math.min( + failedAttempts, + ClusterSubscriberGroup.MAX_RETRY_ATTEMPTS + ); + const backoff = Math.min( + ClusterSubscriberGroup.BASE_BACKOFF_MS * 2 ** attempts, + ClusterSubscriberGroup.MAX_BACKOFF_MS + ); + const jitter = Math.floor((Math.random() - 0.5) * (backoff * 0.5)); + const delay = Math.max(0, backoff + jitter); + + debug( + "Failed to connect subscriber for %s. Refreshing slots in %dms", + nodeKey, + delay + ); + + this.subscriberGroupEmitter.emit("subscriberConnectFailed", { + delay, + error, + }); + }; + + /** + * Handles successful subscriber connections by resetting the failed attempts counter. + * + * @param nodeKey + */ + private handleSubscriberConnectSucceeded = (nodeKey: string) => { + this.failedAttemptsByNode.delete(nodeKey); + }; } diff --git a/lib/cluster/ShardedSubscriber.ts b/lib/cluster/ShardedSubscriber.ts index 8f961094..8b39d0d4 100644 --- a/lib/cluster/ShardedSubscriber.ts +++ b/lib/cluster/ShardedSubscriber.ts @@ -10,9 +10,6 @@ export default class ShardedSubscriber { private instance: any = null; // Store listener references for cleanup - private readonly onEnd: () => void; - private readonly onError: (error: Error) => void; - private readonly onMoved: () => void; private readonly messageListeners: Map void> = new Map(); @@ -35,20 +32,6 @@ export default class ShardedSubscriber { this.nodeKey = getNodeKey(options); - // Define listeners as instance methods so we can remove them later - this.onEnd = () => { - this.started = false; - this.emitter.emit("-node", this.instance, this.nodeKey); - }; - - this.onError = (error: Error) => { - this.emitter.emit("nodeError", error, this.nodeKey); - }; - - this.onMoved = () => { - this.emitter.emit("moved"); - }; - // Register listeners this.instance.once("end", this.onEnd); this.instance.on("error", this.onError); @@ -63,6 +46,19 @@ export default class ShardedSubscriber { } } + private onEnd = () => { + this.started = false; + this.emitter.emit("-node", this.instance, this.nodeKey); + }; + + private onError = (error: Error) => { + this.emitter.emit("nodeError", error, this.nodeKey); + }; + + private onMoved = () => { + this.emitter.emit("moved"); + }; + async start(): Promise { if (this.started) { debug("already started %s", this.nodeKey); @@ -84,17 +80,9 @@ export default class ShardedSubscriber { this.started = false; if (this.instance) { - // Remove all listeners before disconnecting - this.instance.off("end", this.onEnd); - this.instance.off("error", this.onError); - this.instance.off("moved", this.onMoved); - - for (const [event, listener] of this.messageListeners) { - this.instance.off(event, listener); - } - this.messageListeners.clear(); - this.instance.disconnect(); + this.instance.removeAllListeners(); + this.messageListeners.clear(); this.instance = null; } @@ -116,4 +104,8 @@ export default class ShardedSubscriber { getInstance(): any { return this.instance; } + + getNodeKey(): string { + return this.nodeKey; + } } diff --git a/lib/cluster/index.ts b/lib/cluster/index.ts index 14131c27..fa5b3f76 100644 --- a/lib/cluster/index.ts +++ b/lib/cluster/index.ts @@ -1139,11 +1139,16 @@ class Cluster extends EventEmitter { this.refreshSlotsCache(); }); - this.subscriberGroupEmitter.on("subscriberConnectFailed", (err) => { - this.emit("error", err); - - this.refreshSlotsCache(); - }); + this.subscriberGroupEmitter.on( + "subscriberConnectFailed", + ({ delay, error }) => { + this.emit("error", error); + + setTimeout(() => { + this.refreshSlotsCache(); + }, delay); + } + ); this.subscriberGroupEmitter.on("moved", () => { this.refreshSlotsCache();