diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index d7ce00f38a..b06b0d5079 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -4,12 +4,16 @@ import RedisClient, { RedisClientOptions, RedisClientType } from '.'; import { AbortError, ClientClosedError, ClientOfflineError, ConnectionTimeoutError, DisconnectsClientError, ErrorReply, MultiErrorReply, TimeoutError, WatchError } from '../errors'; import { defineScript } from '../lua-script'; import { spy, stub } from 'sinon'; -import { once } from 'node:events'; +import EventEmitter, { once } from 'node:events'; import { MATH_FUNCTION, loadMathFunction } from '../commands/FUNCTION_LOAD.spec'; import { RESP_TYPES } from '../RESP/decoder'; import { BlobStringReply, NumberReply } from '../RESP/types'; import { SortedSetMember } from '../commands/generic-transformers'; import { CommandParser } from './parser'; +import { RedisSocketOptions } from './socket'; +import { getFreePortNumber } from '@redis/test-utils/lib/proxy/redis-proxy'; +import { createClient } from '../../'; +import net from 'node:net' export const SQUARE_SCRIPT = defineScript({ SCRIPT: @@ -1008,6 +1012,95 @@ describe('Client', () => { } }, GLOBAL.SERVERS.OPEN); }); + + describe("socket errors during handshake", () => { + + it("should successfully connect when server accepts connection immediately", async () => { + const { log, client, teardown } = await setup({}, 0); + await client.connect(); + assert.deepEqual(["connect", "ready"], log); + teardown(); + }); + + it("should reconnect after multiple connection drops during handshake", async () => { + const { log, client, teardown } = await setup({}, 2); + await client.connect(); + assert.deepEqual( + [ + "connect", + "error", + "reconnecting", + "connect", + "error", + "reconnecting", + "connect", + "ready", + ], + log, + ); + teardown(); + }); + + //utils + + async function setup( + socketOptions: Partial, + dropCount: number, + ) { + const port = await getFreePortNumber(); + const server = setupMockServer(dropCount); + const options = { + ...{ + socket: { + host: "localhost", + port, + }, + ...socketOptions, + }, + }; + const client = createClient(options); + const log = setupLog(client); + await once(server.listen(port), "listening"); + return { + log, + client, + server, + teardown: async function () { + client.destroy(); + server.close(); + }, + }; + } + + function setupLog(client: EventEmitter): string[] { + const log: string[] = []; + client.on("connect", () => log.push("connect")); + client.on("ready", () => log.push("ready")); + client.on("reconnecting", () => log.push("reconnecting")); + client.on("error", () => log.push("error")); + return log; + } + + // Create a TCP server that accepts connections but immediately drops them times + // This simulates what happens when Docker container is stopped: + // - TCP connection succeeds (OS accepts it) + // - But socket is immediately destroyed, causing ECONNRESET during handshake + function setupMockServer(dropImmediately: number) { + const server = net.createServer(async (socket) => { + // socket.on("data", (data) => + // console.log(data.toString().replaceAll("\r\n", " ")), + // ); + if (dropImmediately > 0) { + dropImmediately--; + socket.destroy(); + } + socket.write("+OK\r\n+OK\r\n"); + // socket.write("*7\r\n$6\r\nserver\r\n$5\r\nredis\r\n$7\r\nversion\r\n$5\r\n7.2.0\r\n$5\r\nproto\r\n:3\r\n$2\r\nid\r\n:1\r\n") + }); + return server; + } + + }); }); /** diff --git a/packages/client/lib/client/socket.ts b/packages/client/lib/client/socket.ts index c5569e8654..ab8e9992b6 100644 --- a/packages/client/lib/client/socket.ts +++ b/packages/client/lib/client/socket.ts @@ -220,6 +220,15 @@ export default class RedisSocket extends EventEmitter { try { await this.#initiator(); + + // Check if socket was closed/destroyed during initiator execution + if (!this.#socket || this.#socket.destroyed || !this.#socket.readable || !this.#socket.writable) { + const retryIn = this.#shouldReconnect(retries++, new SocketClosedUnexpectedlyError()); + if (typeof retryIn !== 'number') { throw retryIn; } + await setTimeout(retryIn); + this.emit('reconnecting'); + continue; + } } catch (err) { this.#socket.destroy(); this.#socket = undefined; @@ -312,6 +321,7 @@ export default class RedisSocket extends EventEmitter { }); } + write(iterable: Iterable>) { if (!this.#socket) return;