Skip to content

Commit d542b49

Browse files
committed
fix: detect and recover unhealthy sharded subscribers
1 parent d72b9c5 commit d542b49

File tree

3 files changed

+57
-5
lines changed

3 files changed

+57
-5
lines changed

lib/cluster/ClusterSubscriberGroup.ts

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,11 @@ export default class ClusterSubscriberGroup {
110110
const startPromises = [];
111111
for (const s of this.shardedSubscribers.values()) {
112112
if (!s.isStarted()) {
113-
startPromises.push(s.start());
113+
startPromises.push(
114+
s.start().catch((err) => {
115+
this.subscriberGroupEmitter.emit("subscriberConnectFailed", err);
116+
})
117+
);
114118
}
115119
}
116120
return Promise.all(startPromises);
@@ -123,8 +127,13 @@ export default class ClusterSubscriberGroup {
123127
clusterSlots: string[][],
124128
clusterNodes: any[]
125129
): Promise<void> {
126-
// Update the slots cache and continue if there was a change
127-
if (!this._refreshSlots(clusterSlots)) {
130+
const hasTopologyChanged = this._refreshSlots(clusterSlots);
131+
const hasFailedSubscribers = this.hasUnhealthySubscribers();
132+
133+
if (!hasTopologyChanged && !hasFailedSubscribers) {
134+
debug(
135+
"No topology change detected or failed subscribers. Skipping reset."
136+
);
128137
return;
129138
}
130139

@@ -135,9 +144,11 @@ export default class ClusterSubscriberGroup {
135144
this.subscriberToSlotsIndex.has(nodeKey) &&
136145
shardedSubscriber.isStarted()
137146
) {
147+
debug("Skipping deleting subscriber for %s", nodeKey);
138148
continue;
139149
}
140150

151+
debug("Removing subscriber for %s", nodeKey);
141152
// Otherwise stop the subscriber and remove it
142153
shardedSubscriber.stop();
143154
this.shardedSubscribers.delete(nodeKey);
@@ -150,9 +161,11 @@ export default class ClusterSubscriberGroup {
150161
for (const [nodeKey, _] of this.subscriberToSlotsIndex) {
151162
// If we already have a subscriber for this node then keep it
152163
if (this.shardedSubscribers.has(nodeKey)) {
164+
debug("Skipping creating new subscriber for %s", nodeKey);
153165
continue;
154166
}
155167

168+
debug("Creating new subscriber for %s", nodeKey);
156169
// Otherwise create a new subscriber
157170
const redis = clusterNodes.find((node) => {
158171
return getNodeKey(node.options) === nodeKey;
@@ -170,7 +183,11 @@ export default class ClusterSubscriberGroup {
170183

171184
this.shardedSubscribers.set(nodeKey, sub);
172185

173-
startPromises.push(sub.start());
186+
startPromises.push(
187+
sub.start().catch((err) => {
188+
this.subscriberGroupEmitter.emit("subscriberConnectFailed", err);
189+
})
190+
);
174191

175192
this.subscriberGroupEmitter.emit("+subscriber");
176193
}
@@ -272,4 +289,25 @@ export default class ClusterSubscriberGroup {
272289
return JSON.stringify(this.clusterSlots) === JSON.stringify(other);
273290
}
274291
}
292+
293+
/**
294+
* Checks if any subscribers are in an unhealthy state.
295+
*
296+
* A subscriber is considered unhealthy if:
297+
* - It exists but is not started (failed/disconnected)
298+
* - It's missing entirely for a node that should have one
299+
*
300+
* @returns true if any subscribers need to be recreated
301+
*/
302+
private hasUnhealthySubscribers(): boolean {
303+
const hasFailedSubscribers = Array.from(
304+
this.shardedSubscribers.values()
305+
).some((sub) => !sub.isStarted());
306+
307+
const hasMissingSubscribers = Array.from(
308+
this.subscriberToSlotsIndex.keys()
309+
).some((nodeKey) => !this.shardedSubscribers.has(nodeKey));
310+
311+
return hasFailedSubscribers || hasMissingSubscribers;
312+
}
275313
}

lib/cluster/ShardedSubscriber.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,16 @@ export default class ShardedSubscriber {
101101
debug("stopped %s", this.nodeKey);
102102
}
103103

104+
/**
105+
* Checks if the subscriber is started and NOT explicitly disconnected.
106+
*/
104107
isStarted(): boolean {
105-
return this.started;
108+
const status = this.instance?.status;
109+
110+
const isDisconnected =
111+
status === "end" || status === "close" || !this.instance;
112+
113+
return this.started && !isDisconnected;
106114
}
107115

108116
getInstance(): any {

lib/cluster/index.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1139,6 +1139,12 @@ class Cluster extends EventEmitter {
11391139
this.refreshSlotsCache();
11401140
});
11411141

1142+
this.subscriberGroupEmitter.on("subscriberConnectFailed", (err) => {
1143+
this.emit("error", err);
1144+
1145+
this.refreshSlotsCache();
1146+
});
1147+
11421148
this.subscriberGroupEmitter.on("moved", () => {
11431149
this.refreshSlotsCache();
11441150
});

0 commit comments

Comments
 (0)