Skip to content

Commit d72b9c5

Browse files
committed
fix(cluster): always refresh slots on sharded subscriber disconnect
1 parent 9d1beb0 commit d72b9c5

File tree

3 files changed

+102
-130
lines changed

3 files changed

+102
-130
lines changed

lib/cluster/ClusterOptions.ts

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -120,16 +120,6 @@ export interface IClusterOptions {
120120
*/
121121
slotsRefreshInterval?: number;
122122

123-
/**
124-
* Whether to refresh the cluster slot cache when a node connection is closed.
125-
*
126-
* Useful when slotsRefreshInterval is disabled (-1) but you still want
127-
* topology to be updated promptly on node disconnect/failover.
128-
*
129-
* @default false
130-
*/
131-
slotsRefreshOnDisconnect?: boolean;
132-
133123
/**
134124
* Use sharded subscribers instead of a single subscriber.
135125
*
@@ -221,7 +211,6 @@ export const DEFAULT_CLUSTER_OPTIONS: IClusterOptions = {
221211
retryDelayOnTryAgain: 100,
222212
slotsRefreshTimeout: 1000,
223213
slotsRefreshInterval: 5000,
224-
slotsRefreshOnDisconnect: false,
225214
useSRVRecords: false,
226215
resolveSrv: resolveSrv,
227216
dnsLookup: lookup,

lib/cluster/index.ts

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,6 @@ class Cluster extends EventEmitter {
130130

131131
this.connectionPool.on("-node", (redis, key) => {
132132
this.emit("-node", redis);
133-
134-
if (this.options.slotsRefreshOnDisconnect) {
135-
this.refreshSlotsCache();
136-
}
137133
});
138134
this.connectionPool.on("+node", (redis) => {
139135
this.emit("+node", redis);
@@ -1140,9 +1136,7 @@ class Cluster extends EventEmitter {
11401136
this.subscriberGroupEmitter.on("-node", (redis, nodeKey) => {
11411137
this.emit("-node", redis, nodeKey);
11421138

1143-
if (this.options.slotsRefreshOnDisconnect) {
1144-
this.refreshSlotsCache();
1145-
}
1139+
this.refreshSlotsCache();
11461140
});
11471141

11481142
this.subscriberGroupEmitter.on("moved", () => {

test/scenario/sharded-pub-sub.test.ts

Lines changed: 101 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,13 @@
11
import type { TestConfig } from "./utils/test.util";
2-
import {
3-
createClusterTestClient,
4-
getConfig,
5-
wait,
6-
} from "./utils/test.util";
2+
import { createClusterTestClient, getConfig, wait } from "./utils/test.util";
73

84
import { FaultInjectorClient } from "./utils/fault-injector";
95
import { TestCommandRunner } from "./utils/command-runner";
106
import { CHANNELS, CHANNELS_BY_SLOT } from "./utils/test.util";
117
import { MessageTracker } from "./utils/message-tracker";
128
import { Cluster } from "../../lib";
139
import { assert } from "chai";
10+
import { IClusterOptions } from "../../lib/cluster/ClusterOptions";
1411

1512
describe("Sharded Pub/Sub E2E", () => {
1613
let faultInjectorClient: FaultInjectorClient;
@@ -25,7 +22,10 @@ describe("Sharded Pub/Sub E2E", () => {
2522
describe("Single Subscriber", () => {
2623
let cleanup: (() => Promise<void>) | null = null;
2724

28-
const setup = async (subscriberOverrides = {}, publisherOverrides = {}) => {
25+
const setup = async (
26+
subscriberOverrides: Partial<IClusterOptions> = {},
27+
publisherOverrides: Partial<IClusterOptions> = {}
28+
) => {
2929
const messageTracker = new MessageTracker(CHANNELS);
3030
const subscriber = createClusterTestClient(config.clientConfig, {
3131
shardedSubscribers: true,
@@ -86,105 +86,93 @@ describe("Sharded Pub/Sub E2E", () => {
8686
}
8787
});
8888

89-
[
90-
{
91-
name: "slotsRefreshInterval: -1",
92-
subscriberOverrides: {
93-
slotsRefreshInterval: -1,
94-
slotsRefreshOnDisconnect: true,
95-
shardedSubscribers: true,
96-
},
97-
},
98-
{
99-
name: "slotsRefreshInterval: default",
100-
subscriberOverrides: {
101-
shardedSubscribers: true,
102-
},
103-
},
104-
].map((testCase) => {
105-
it(`should resume publishing and receiving after failover - ${testCase.name}`, async () => {
106-
const { subscriber, publisher, messageTracker } = await setup(
107-
testCase.subscriberOverrides
108-
);
89+
it("should resume publishing and receiving after failover", async () => {
90+
const { subscriber, publisher, messageTracker } = await setup({
91+
slotsRefreshInterval: -1,
92+
});
10993

110-
for (const channel of CHANNELS) {
111-
await subscriber.ssubscribe(channel);
112-
}
94+
for (const channel of CHANNELS) {
95+
await subscriber.ssubscribe(channel);
96+
}
11397

114-
subscriber.on("smessage", (channelName, _) => {
115-
messageTracker.incrementReceived(channelName);
116-
});
98+
subscriber.on("smessage", (channelName, _) => {
99+
messageTracker.incrementReceived(channelName);
100+
});
117101

118-
// Trigger failover twice
119-
for (let i = 0; i < 2; i++) {
120-
// Start publishing messages
121-
const { controller: publishAbort, result: publishResult } =
122-
TestCommandRunner.publishMessagesUntilAbortSignal(
123-
publisher,
124-
CHANNELS,
125-
messageTracker
126-
);
127-
128-
// Trigger failover during publishing
129-
const { action_id: failoverActionId } =
130-
await faultInjectorClient.triggerAction({
131-
type: "failover",
132-
parameters: {
133-
bdb_id: config.clientConfig.bdbId.toString(),
134-
cluster_index: 0,
135-
},
136-
});
137-
138-
// Wait for failover to complete
139-
await faultInjectorClient.waitForAction(failoverActionId);
140-
141-
publishAbort.abort();
142-
await publishResult;
143-
144-
for (const channel of CHANNELS) {
145-
const sent = messageTracker.getChannelStats(channel)!.sent;
146-
const received = messageTracker.getChannelStats(channel)!.received;
147-
148-
assert.ok(
149-
received <= sent,
150-
`Channel ${channel}: received (${received}) should be <= sent (${sent})`
151-
);
152-
}
153-
154-
// Wait for 3 seconds before resuming publishing
155-
await wait(5_000);
156-
157-
messageTracker.reset();
158-
159-
const {
160-
controller: afterFailoverController,
161-
result: afterFailoverResult,
162-
} = TestCommandRunner.publishMessagesUntilAbortSignal(
102+
// Trigger failover twice
103+
for (let i = 0; i < 2; i++) {
104+
// Start publishing messages
105+
const { controller: publishAbort, result: publishResult } =
106+
TestCommandRunner.publishMessagesUntilAbortSignal(
163107
publisher,
164108
CHANNELS,
165109
messageTracker
166110
);
167111

168-
await wait(10_000);
169-
afterFailoverController.abort();
170-
await afterFailoverResult;
171-
172-
for (const channel of CHANNELS) {
173-
const sent = messageTracker.getChannelStats(channel)!.sent;
174-
const received = messageTracker.getChannelStats(channel)!.received;
175-
assert.ok(sent > 0, `Channel ${channel} should have sent messages`);
176-
assert.ok(
177-
received > 0,
178-
`Channel ${channel} should have received messages`
179-
);
180-
assert.strictEqual(
181-
messageTracker.getChannelStats(channel)!.received,
182-
messageTracker.getChannelStats(channel)!.sent,
183-
`Channel ${channel} received (${received}) should equal sent (${sent}) once resumed after failover`
184-
);
185-
}
112+
// Trigger failover during publishing
113+
const { action_id: failoverActionId } =
114+
await faultInjectorClient.triggerAction({
115+
type: "failover",
116+
parameters: {
117+
bdb_id: config.clientConfig.bdbId.toString(),
118+
cluster_index: 0,
119+
},
120+
});
121+
122+
// Wait for failover to complete
123+
await faultInjectorClient.waitForAction(failoverActionId);
124+
125+
publishAbort.abort();
126+
await publishResult;
127+
128+
const totalSent = CHANNELS.reduce(
129+
(acc, channel) => acc + messageTracker.getChannelStats(channel)!.sent,
130+
0
131+
);
132+
const totalReceived = CHANNELS.reduce(
133+
(acc, channel) =>
134+
acc + messageTracker.getChannelStats(channel)!.received,
135+
0
136+
);
137+
138+
assert.ok(
139+
totalReceived <= totalSent,
140+
`Total received (${totalReceived}) should be <= total sent (${totalSent})`
141+
);
142+
143+
// Wait for 2 seconds before resuming publishing
144+
await wait(2_000);
145+
146+
messageTracker.reset();
147+
148+
const {
149+
controller: afterFailoverController,
150+
result: afterFailoverResult,
151+
} = TestCommandRunner.publishMessagesUntilAbortSignal(
152+
publisher,
153+
CHANNELS,
154+
messageTracker
155+
);
156+
157+
await wait(10_000);
158+
afterFailoverController.abort();
159+
await afterFailoverResult;
160+
161+
for (const channel of CHANNELS) {
162+
const sent = messageTracker.getChannelStats(channel)!.sent;
163+
const received = messageTracker.getChannelStats(channel)!.received;
164+
assert.ok(sent > 0, `Channel ${channel} should have sent messages`);
165+
assert.ok(
166+
received > 0,
167+
`Channel ${channel} should have received messages`
168+
);
169+
assert.strictEqual(
170+
messageTracker.getChannelStats(channel)!.received,
171+
messageTracker.getChannelStats(channel)!.sent,
172+
`Channel ${channel} received (${received}) should equal sent (${sent}) once resumed after failover`
173+
);
186174
}
187-
});
175+
}
188176
});
189177

190178
it("should NOT receive messages after sunsubscribe", async () => {
@@ -375,24 +363,25 @@ describe("Sharded Pub/Sub E2E", () => {
375363
publishAbort.abort();
376364
await publishResult;
377365

378-
for (const channel of CHANNELS) {
379-
const sent = messageTracker1.getChannelStats(channel)!.sent;
380-
const received1 = messageTracker1.getChannelStats(channel)!.received;
381-
382-
const received2 = messageTracker2.getChannelStats(channel)!.received;
366+
const totalSent = CHANNELS.reduce(
367+
(acc, channel) => acc + messageTracker1.getChannelStats(channel)!.sent,
368+
0
369+
);
370+
const totalReceived = CHANNELS.reduce(
371+
(acc, channel) =>
372+
acc +
373+
messageTracker1.getChannelStats(channel)!.received +
374+
messageTracker2.getChannelStats(channel)!.received,
375+
0
376+
);
383377

384-
assert.ok(
385-
received1 <= sent,
386-
`Channel ${channel}: received (${received1}) should be <= sent (${sent})`
387-
);
388-
assert.ok(
389-
received2 <= sent,
390-
`Channel ${channel}: received2 (${received2}) should be <= sent (${sent})`
391-
);
392-
}
378+
assert.ok(
379+
totalReceived <= totalSent * 2,
380+
`Total received (${totalReceived}) should be <= total sent (${totalSent})`
381+
);
393382

394-
// Wait for 5 seconds before resuming publishing
395-
await wait(5_000);
383+
// Wait for 2 seconds before resuming publishing
384+
await wait(2_000);
396385

397386
messageTracker1.reset();
398387
messageTracker2.reset();

0 commit comments

Comments
 (0)