Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
strategy:
fail-fast: false
matrix:
node: [8.x, 10.x, 12.x, 14.x, 16.x, 20.x]
node: [10.x, 12.x, 14.x, 16.x, 20.x, 22.x]

# Steps represent a sequence of tasks that will be executed as part of the job
steps:
Expand Down
222 changes: 115 additions & 107 deletions lib/cluster/ClusterSubscriberGroup.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
import { Debug } from "../utils";
import ClusterSubscriber from "./ClusterSubscriber";
import Cluster from "./index";
import ConnectionPool from "./ConnectionPool";
import { getNodeKey } from "./util";
import * as calculateSlot from "cluster-key-slot";
import ShardedSubscriber from "./ShardedSubscriber";
import * as EventEmitter from "events";
const debug = Debug("cluster:subscriberGroup");

/**
* Redis differs between "normal" and sharded PubSub. If using the "normal" PubSub feature, exactly one
* ClusterSubscriber exists per cluster instance. This works because the Redis cluster bus forwards m
* messages between shards. However, this has scalability limitations, which is the reason why the sharded
* PubSub feature was added to Redis. With sharded PubSub, each shard is responsible for its own messages.
* Given that, we need at least one ClusterSubscriber per master endpoint/node.
* Redis distinguishes between "normal" and sharded PubSub. When using the normal PubSub feature,
* exactly one subscriber exists per cluster instance because the Redis cluster bus forwards
* messages between shards. Sharded PubSub removes this limitation by making each shard
* responsible for its own messages.
*
* This class leverages the previously exising ClusterSubscriber by adding support for multiple such subscribers
* in alignment to the master nodes of the cluster. The ClusterSubscriber class was extended in a non-breaking way
* to support this feature.
* This class coordinates one ShardedSubscriber per master node in the cluster, providing
* sharded PubSub support while keeping the public API backward compatible.
*/
export default class ClusterSubscriberGroup {
private shardedSubscribers: Map<string, ClusterSubscriber> = new Map();
private shardedSubscribers: Map<string, ShardedSubscriber> = new Map();
private clusterSlots: string[][] = [];
//Simple [min, max] slot ranges aren't enough because you can migrate single slots
// Simple [min, max] slot ranges aren't enough because you can migrate single slots
private subscriberToSlotsIndex: Map<string, number[]> = new Map();
private channels: Map<number, Array<string | Buffer>> = new Map();

Expand All @@ -29,32 +26,14 @@
*
* @param cluster
*/
constructor(private cluster: Cluster, refreshSlotsCacheCallback: () => void) {
cluster.on("+node", (redis) => {
this._addSubscriber(redis);
});

cluster.on("-node", (redis) => {
this._removeSubscriber(redis);
});

cluster.on("refresh", () => {
this._refreshSlots(cluster);
});

cluster.on("forceRefresh", () => {
refreshSlotsCacheCallback();
});
}
constructor(private readonly subscriberGroupEmitter: EventEmitter) {}

/**
* Get the responsible subscriber.
*
* Returns null if no subscriber was found
*
* @param slot
*/
getResponsibleSubscriber(slot: number): ClusterSubscriber {
getResponsibleSubscriber(slot: number): ShardedSubscriber | undefined {
const nodeKey = this.clusterSlots[slot][0];
return this.shardedSubscribers.get(nodeKey);
}
Expand All @@ -67,10 +46,12 @@
addChannels(channels: (string | Buffer)[]): number {
const slot = calculateSlot(channels[0]);

//Check if the all channels belong to the same slot and otherwise reject the operation
channels.forEach((c: string) => {
if (calculateSlot(c) != slot) return -1;
});
// Check if the all channels belong to the same slot and otherwise reject the operation
for (const c of channels) {
if (calculateSlot(c) !== slot) {
return -1;
}
}

const currChannels = this.channels.get(slot);

Expand All @@ -93,10 +74,12 @@
removeChannels(channels: (string | Buffer)[]): number {
const slot = calculateSlot(channels[0]);

//Check if the all channels belong to the same slot and otherwise reject the operation
channels.forEach((c: string) => {
if (calculateSlot(c) != slot) return -1;
});
// Check if the all channels belong to the same slot and otherwise reject the operation
for (const c of channels) {
if (calculateSlot(c) !== slot) {
return -1;
}
}

const slotChannels = this.channels.get(slot);

Expand Down Expand Up @@ -124,96 +107,123 @@
* Start all not yet started subscribers
*/
start() {
const startPromises = [];
for (const s of this.shardedSubscribers.values()) {
if (!s.isStarted()) {
s.start();
startPromises.push(s.start());
}
}
return Promise.all(startPromises);
}

/**
* Add a subscriber to the group of subscribers
*
* @param redis
* Resets the subscriber group by disconnecting all subscribers that are no longer needed and connecting new ones.
*/
private _addSubscriber(redis: any): ClusterSubscriber {
const pool: ConnectionPool = new ConnectionPool(redis.options);
public async reset(
clusterSlots: string[][],
clusterNodes: any[]
): Promise<void> {
// Update the slots cache and continue if there was a change
if (!this._refreshSlots(clusterSlots)) {
return;
}

if (pool.addMasterNode(redis)) {
const sub = new ClusterSubscriber(pool, this.cluster, true);
const nodeKey = getNodeKey(redis.options);
this.shardedSubscribers.set(nodeKey, sub);
sub.start();
// For each of the sharded subscribers
for (const [nodeKey, shardedSubscriber] of this.shardedSubscribers) {
if (
// If the subscriber is still responsible for a slot range and is running then keep it
this.subscriberToSlotsIndex.has(nodeKey) &&
shardedSubscriber.isStarted()
) {
continue;
}

// We need to attempt to resubscribe them in case the new node serves their slot
this._resubscribe();
this.cluster.emit("+subscriber");
return sub;
// Otherwise stop the subscriber and remove it
shardedSubscriber.stop();
this.shardedSubscribers.delete(nodeKey);

this.subscriberGroupEmitter.emit("-subscriber");
}

return null;
}
const startPromises = [];
// For each node in slots cache
for (const [nodeKey, _] of this.subscriberToSlotsIndex) {

Check warning on line 150 in lib/cluster/ClusterSubscriberGroup.ts

View workflow job for this annotation

GitHub Actions / build (16.x)

'_' is assigned a value but never used

Check warning on line 150 in lib/cluster/ClusterSubscriberGroup.ts

View workflow job for this annotation

GitHub Actions / build (10.x)

'_' is assigned a value but never used

Check warning on line 150 in lib/cluster/ClusterSubscriberGroup.ts

View workflow job for this annotation

GitHub Actions / build (14.x)

'_' is assigned a value but never used

Check warning on line 150 in lib/cluster/ClusterSubscriberGroup.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

'_' is assigned a value but never used

Check warning on line 150 in lib/cluster/ClusterSubscriberGroup.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

'_' is assigned a value but never used

Check warning on line 150 in lib/cluster/ClusterSubscriberGroup.ts

View workflow job for this annotation

GitHub Actions / build (12.x)

'_' is assigned a value but never used
// If we already have a subscriber for this node then keep it
if (this.shardedSubscribers.has(nodeKey)) {
continue;
}

/**
* Removes a subscriber from the group
* @param redis
*/
private _removeSubscriber(redis: any): Map<string, ClusterSubscriber> {
const nodeKey = getNodeKey(redis.options);
const sub = this.shardedSubscribers.get(nodeKey);
// Otherwise create a new subscriber
const redis = clusterNodes.find((node) => {
return getNodeKey(node.options) === nodeKey;
});

if (sub) {
sub.stop();
this.shardedSubscribers.delete(nodeKey);
if (!redis) {
debug("Failed to find node for key %s", nodeKey);
continue;
}

// Even though the subscriber to this node is going down, we might have another subscriber
// handling the same slots, so we need to attempt to subscribe the orphaned channels
this._resubscribe();
this.cluster.emit("-subscriber");
const sub = new ShardedSubscriber(
this.subscriberGroupEmitter,
redis.options
);

this.shardedSubscribers.set(nodeKey, sub);

startPromises.push(sub.start());

this.subscriberGroupEmitter.emit("+subscriber");
}

return this.shardedSubscribers;
// It's vital to await the start promises before resubscribing
// Otherwise we might try to resubscribe to a subscriber that is not yet connected
// This can cause a race condition
try {
await Promise.all(startPromises);
} catch (err) {
debug("Error while starting subscribers: %s", err);
this.subscriberGroupEmitter.emit("error", err);
}

this._resubscribe();
this.subscriberGroupEmitter.emit("subscribersReady");
}

/**
* Refreshes the subscriber-related slot ranges
*
* Returns false if no refresh was needed
*
* @param cluster
* @param targetSlots
*/
private _refreshSlots(cluster: Cluster): boolean {
private _refreshSlots(targetSlots: string[][]): boolean {
//If there was an actual change, then reassign the slot ranges
if (this._slotsAreEqual(cluster.slots)) {
if (this._slotsAreEqual(targetSlots)) {
debug(
"Nothing to refresh because the new cluster map is equal to the previous one."
);
} else {
debug("Refreshing the slots of the subscriber group.");

//Rebuild the slots index
this.subscriberToSlotsIndex = new Map();

for (let slot = 0; slot < cluster.slots.length; slot++) {
const node: string = cluster.slots[slot][0];
return false;
}

if (!this.subscriberToSlotsIndex.has(node)) {
this.subscriberToSlotsIndex.set(node, []);
}
this.subscriberToSlotsIndex.get(node).push(Number(slot));
}
debug("Refreshing the slots of the subscriber group.");

//Update the subscribers from the index
this._resubscribe();
//Rebuild the slots index
this.subscriberToSlotsIndex = new Map();

//Update the cached slots map
this.clusterSlots = JSON.parse(JSON.stringify(cluster.slots));
for (let slot = 0; slot < targetSlots.length; slot++) {
const node: string = targetSlots[slot][0];

this.cluster.emit("subscribersReady");
return true;
if (!this.subscriberToSlotsIndex.has(node)) {
this.subscriberToSlotsIndex.set(node, []);
}
this.subscriberToSlotsIndex.get(node).push(Number(slot));
}

return false;
//Update the cached slots map
this.clusterSlots = JSON.parse(JSON.stringify(targetSlots));

return true;
}

/**
Expand All @@ -224,12 +234,9 @@
private _resubscribe() {
if (this.shardedSubscribers) {
this.shardedSubscribers.forEach(
(s: ClusterSubscriber, nodeKey: string) => {
(s: ShardedSubscriber, nodeKey: string) => {
const subscriberSlots = this.subscriberToSlotsIndex.get(nodeKey);
if (subscriberSlots) {
//More for debugging purposes
s.associateSlotRange(subscriberSlots);

//Resubscribe on the underlying connection
subscriberSlots.forEach((ss) => {
//Might return null if being disconnected
Expand All @@ -238,12 +245,10 @@

if (channels && channels.length > 0) {
//Try to subscribe now
if (redis) {
redis.ssubscribe(channels);

//If the instance isn't ready yet, then register the re-subscription for later
redis.on("ready", () => {
redis.ssubscribe(channels);
if (redis && redis.status !== "end") {
redis.ssubscribe(channels).catch((err) => {
// TODO: Should we emit an error event here?
debug("Failed to ssubscribe on node %s: %s", nodeKey, err);
});
}
}
Expand All @@ -261,7 +266,10 @@
* @private
*/
private _slotsAreEqual(other: string[][]) {
if (this.clusterSlots === undefined) return false;
else return JSON.stringify(this.clusterSlots) === JSON.stringify(other);
if (this.clusterSlots === undefined) {
return false;
} else {
return JSON.stringify(this.clusterSlots) === JSON.stringify(other);
}
}
}
Loading
Loading