Skip to content

Commit c118c46

Browse files
author
Matt Willian
committed
feat: improve redis cluster handling
1 parent 215d0a7 commit c118c46

File tree

2 files changed

+90
-41
lines changed

2 files changed

+90
-41
lines changed

lib/queue-factory.ts

Lines changed: 75 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { Redis, Cluster, RedisOptions } from "ioredis";
2+
import { ConnectionOptions as TlsConnectionOptions } from "tls";
23

34
import { QueueType, getQueueType, redisOptsFromUrl } from "./utils";
45
import { Queue } from "bullmq";
@@ -16,6 +17,13 @@ const maxTime = 40000;
1617
// We keep a redis client that we can reuse for all the queues.
1718
let redisClients: Record<string, Redis | Cluster> = {} as any;
1819

20+
type NestedRedisOptions = Partial<RedisOptions> & {
21+
tls?: TlsConnectionOptions;
22+
};
23+
type RedisOptionsWithNested = RedisOptions & {
24+
redisOptions?: NestedRedisOptions;
25+
};
26+
1927
export interface FoundQueue {
2028
prefix: string;
2129
name: string;
@@ -167,21 +175,52 @@ export function getRedisClient(
167175

168176
if (!redisClients[key]) {
169177
if (clusterNodes && clusterNodes.length) {
170-
const { username, password } = redisOptsFromUrl(clusterNodes[0]);
178+
const { username: nodeUsername, password: nodePassword } =
179+
redisOptsFromUrl(clusterNodes[0]);
180+
const {
181+
username: redisOptsUsername,
182+
password: redisOptsPassword,
183+
tls: redisOptsTls,
184+
redisOptions: suppliedRedisOptionsRaw,
185+
...clusterLevelOpts
186+
} = redisOpts as RedisOptionsWithNested;
187+
const suppliedRedisOptions: NestedRedisOptions =
188+
suppliedRedisOptionsRaw ?? {};
189+
const baseRedisOptions: NestedRedisOptions = {
190+
...suppliedRedisOptions,
191+
};
192+
delete baseRedisOptions.username;
193+
delete baseRedisOptions.password;
194+
delete baseRedisOptions.tls;
195+
196+
const finalUsername =
197+
redisOptsUsername ??
198+
suppliedRedisOptions.username ??
199+
nodeUsername;
200+
if (finalUsername !== undefined) {
201+
baseRedisOptions.username = finalUsername;
202+
}
203+
204+
const finalPassword =
205+
redisOptsPassword ??
206+
suppliedRedisOptions.password ??
207+
nodePassword;
208+
if (finalPassword !== undefined) {
209+
baseRedisOptions.password = finalPassword;
210+
}
211+
212+
const mergedTls = mergeTlsConfigs(
213+
getClusterTlsFromEnv(),
214+
suppliedRedisOptions.tls,
215+
redisOptsTls
216+
);
217+
if (mergedTls) {
218+
baseRedisOptions.tls = mergedTls;
219+
}
220+
171221
redisClients[key] = new Redis.Cluster(clusterNodes, {
172-
...redisOpts,
173-
redisOptions: {
174-
username,
175-
password,
176-
tls: process.env.REDIS_CLUSTER_TLS
177-
? {
178-
cert: Buffer.from(
179-
process.env.REDIS_CLUSTER_TLS ?? "",
180-
"base64"
181-
).toString("ascii"),
182-
}
183-
: undefined,
184-
},
222+
...clusterLevelOpts,
223+
redisOptions: baseRedisOptions,
185224
});
186225
} else {
187226
redisClients[key] = new Redis(redisOpts);
@@ -300,3 +339,25 @@ export function createQueue(
300339
);
301340
}
302341
}
342+
343+
function getClusterTlsFromEnv(): TlsConnectionOptions | undefined {
344+
if (!process.env.REDIS_CLUSTER_TLS) {
345+
return undefined;
346+
}
347+
return {
348+
cert: Buffer.from(
349+
process.env.REDIS_CLUSTER_TLS ?? "",
350+
"base64"
351+
).toString("ascii"),
352+
};
353+
}
354+
355+
function mergeTlsConfigs(
356+
...configs: Array<TlsConnectionOptions | undefined>
357+
): TlsConnectionOptions | undefined {
358+
const valid = configs.filter(Boolean) as TlsConnectionOptions[];
359+
if (!valid.length) {
360+
return undefined;
361+
}
362+
return Object.assign({}, ...valid);
363+
}

lib/socket.ts

Lines changed: 15 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import { RedisOptions } from "ioredis";
2-
import { pick } from "lodash";
32
import { getCache, updateQueuesCache, queueKey } from "./queues-cache";
43
import { WebSocketClient } from "./ws-autoreconnect";
54
import {
@@ -15,13 +14,8 @@ const { version } = require(`${__dirname}/../package.json`);
1514

1615
const chalk = require("chalk");
1716

18-
export interface Connection {
19-
port?: number;
20-
host?: string;
21-
password?: string;
22-
db?: number;
17+
export interface Connection extends Partial<RedisOptions> {
2318
uri?: string;
24-
tls?: object;
2519
}
2620

2721
export const Socket = (
@@ -249,31 +243,25 @@ export const Socket = (
249243
}
250244
};
251245

252-
function redisOptsFromConnection(connection: Connection): RedisOptions {
253-
let opts: RedisOptions = {
254-
...pick(connection, [
255-
"host",
256-
"port",
257-
"username",
258-
"password",
259-
"family",
260-
"sentinelPassword",
261-
"db",
262-
"tls",
263-
"sentinels",
264-
"name",
265-
]),
246+
function redisOptsFromConnection(
247+
connection: Connection = {}
248+
): RedisOptions {
249+
let opts: (RedisOptions & { uri?: string }) = {
250+
...connection,
266251
};
267252

268253
if (connection.uri) {
269254
opts = { ...opts, ...redisOptsFromUrl(connection.uri) };
270255
}
256+
delete opts.uri;
271257

272-
opts.retryStrategy = function (times: number) {
273-
times = times % 8;
274-
const delay = Math.round(Math.pow(2, times + 8));
275-
console.log(chalk.yellow("Redis: ") + `Reconnecting in ${delay} ms`);
276-
return delay;
277-
};
258+
if (!opts.retryStrategy) {
259+
opts.retryStrategy = function (times: number) {
260+
times = times % 8;
261+
const delay = Math.round(Math.pow(2, times + 8));
262+
console.log(chalk.yellow("Redis: ") + `Reconnecting in ${delay} ms`);
263+
return delay;
264+
};
265+
}
278266
return opts;
279267
}

0 commit comments

Comments
 (0)