From 305df00f425590ad5f1834f78ac1a835e843db1b Mon Sep 17 00:00:00 2001 From: Peter Lehnhardt Date: Fri, 21 Nov 2025 14:12:31 +0100 Subject: [PATCH] Implement Request Timeouts This change implements request timeouts for any request sent to the Kafka cluster. The timeout's duration can be configured via option `requestTimeout` in every client type. Additionally, the retrying logic in the base class is changed so that it retries only if every error in the current error chain is marked as retriable. Otherwise, timeout errors, which should be handled as non-retriable errors, would potentially lead to much longer timeouts due to multiple attempts being made. (But also in general, I would assume that every non-retriable error encountered should block any additional attempt) on-behalf-of: @SAP ospo@sap.com --- src/apis/callbacks.ts | 24 +---- src/clients/base/base.ts | 7 +- src/clients/consumer/consumer.ts | 5 +- src/errors.ts | 4 +- src/network/connection.ts | 51 ++++++---- test/clients/base/base.test.ts | 18 +--- .../consumer-consumer-group-protocol.test.ts | 18 +++- test/clients/consumer/consumer.test.ts | 2 +- test/clients/producer/producer.test.ts | 2 +- test/helpers.ts | 8 +- test/network/connection.test.ts | 94 +++++++++++++++++++ 11 files changed, 165 insertions(+), 68 deletions(-) diff --git a/src/apis/callbacks.ts b/src/apis/callbacks.ts index 513930e..d1bf716 100644 --- a/src/apis/callbacks.ts +++ b/src/apis/callbacks.ts @@ -1,4 +1,4 @@ -import { MultipleErrors, TimeoutError } from '../errors.ts' +import { MultipleErrors } from '../errors.ts' import { PromiseWithResolvers } from '../utils.ts' import { type Callback } from './definitions.ts' @@ -65,25 +65,3 @@ export function runConcurrentCallbacks ( operation(item, operationCallback.bind(null, i++)) } } - -export function createTimeoutCallback ( - callback: Callback, - timeout: number, - errorMessage: string -): Callback { - let timeoutFired = false - const timeoutHandle = setTimeout(() => { - timeoutFired = true - callback(new TimeoutError(errorMessage), undefined as unknown as ReturnType) - }, timeout) - - return (error: Error | null, result: ReturnType) => { - /* c8 ignore next 3 - Hard to test */ - if (timeoutFired) { - return - } - - clearTimeout(timeoutHandle) - callback(error, result) - } -} diff --git a/src/clients/base/base.ts b/src/clients/base/base.ts index ba2e4ac..f45204a 100644 --- a/src/clients/base/base.ts +++ b/src/clients/base/base.ts @@ -24,7 +24,7 @@ import { import type { GenericError } from '../../errors.ts' import { MultipleErrors, NetworkError, UnsupportedApiError, UserError } from '../../errors.ts' import { ConnectionPool } from '../../network/connection-pool.ts' -import { type Broker, type Connection, type ConnectionOptions } from '../../network/connection.ts' +import { type Broker, type Connection } from '../../network/connection.ts' import { parseBroker } from '../../network/utils.ts' import { kInstance } from '../../symbols.ts' import { ajv, debugDump, loggers } from '../../utils.ts' @@ -269,7 +269,7 @@ export class Base extends EventEm [kCreateConnectionPool] (): ConnectionPool { const pool = new ConnectionPool(this[kClientId], { ownerId: this[kInstance], - ...(this[kOptions] as ConnectionOptions) + ...this[kOptions] }) this.#forwardEvents(pool, [ @@ -356,7 +356,8 @@ export class Base extends EventEm operation((error, result) => { if (error) { const genericError = error as GenericError - const retriable = genericError.findBy?.('code', NetworkError.code) || genericError.findBy?.('canRetry', true) + // Only retry if all the errors in the chain are retriable + const retriable = !genericError.findBy?.('canRetry', false) errors.push(error) if (attempt < retries && retriable && !shouldSkipRetry?.(error)) { diff --git a/src/clients/consumer/consumer.ts b/src/clients/consumer/consumer.ts index af8b90d..ceeb701 100644 --- a/src/clients/consumer/consumer.ts +++ b/src/clients/consumer/consumer.ts @@ -2,7 +2,6 @@ import { type ValidateFunction } from 'ajv' import { type CallbackWithPromise, createPromisifiedCallback, - createTimeoutCallback, kCallbackPromise, runConcurrentCallbacks } from '../../apis/callbacks.ts' @@ -1141,8 +1140,6 @@ export class Consumer callback: Callback diagnostic: Record + timeoutHandle: NodeJS.Timeout | null + timedOut: boolean } export const ConnectionStatuses = { @@ -78,11 +81,11 @@ export const ConnectionStatuses = { ERROR: 'error' } as const -export type ConnectionStatus = keyof typeof ConnectionStatuses -export type ConnectionStatusValue = (typeof ConnectionStatuses)[keyof typeof ConnectionStatuses] +export type ConnectionStatus = (typeof ConnectionStatuses)[keyof typeof ConnectionStatuses] -export const defaultOptions: ConnectionOptions = { +export const defaultOptions = { connectTimeout: 5000, + requestTimeout: 30000, maxInflights: 5 } @@ -92,7 +95,7 @@ export class Connection extends EventEmitter { #host: string | undefined #port: number | undefined #options: ConnectionOptions - #status: ConnectionStatusValue + #status: ConnectionStatus #instanceId: number #clientId: string | undefined // @ts-ignore This is used just for debugging @@ -143,7 +146,7 @@ export class Connection extends EventEmitter { return this.#instanceId } - get status (): ConnectionStatusValue { + get status (): ConnectionStatus { return this.#status } @@ -187,7 +190,7 @@ export class Connection extends EventEmitter { typeof this.#options.tlsServerName === 'string' ? this.#options.tlsServerName : host } - const connectionTimeoutHandler = () => { + const connectingSocketTimeoutHandler = () => { const error = new TimeoutError(`Connection to ${host}:${port} timed out.`) diagnosticContext.error = error this.#socket.destroy() @@ -201,7 +204,7 @@ export class Connection extends EventEmitter { connectionsConnectsChannel.asyncEnd.publish(diagnosticContext) } - const connectionErrorHandler = (error: Error) => { + const connectingSocketErrorHandler = (error: Error) => { this.#onConnectionError(host, port, diagnosticContext, error) } @@ -216,10 +219,10 @@ export class Connection extends EventEmitter { this.#socket.setNoDelay(true) this.#socket.once(this.#options.tls ? 'secureConnect' : 'connect', () => { - this.#socket.removeListener('timeout', connectionTimeoutHandler) - this.#socket.removeListener('error', connectionErrorHandler) + this.#socket.removeListener('timeout', connectingSocketTimeoutHandler) + this.#socket.removeListener('error', connectingSocketErrorHandler) - this.#socket.on('error', this.#onError.bind(this)) + this.#socket.on('error', this.#connectedSocketErrorHandler.bind(this)) this.#socket.on('data', this.#onData.bind(this)) if (this.#handleBackPressure) { this.#socket.on('drain', this.#onDrain.bind(this)) @@ -235,8 +238,8 @@ export class Connection extends EventEmitter { } }) - this.#socket.once('timeout', connectionTimeoutHandler) - this.#socket.once('error', connectionErrorHandler) + this.#socket.once('timeout', connectingSocketTimeoutHandler) + this.#socket.once('error', connectingSocketErrorHandler) } catch (error) { this.#status = ConnectionStatuses.ERROR @@ -364,11 +367,23 @@ export class Connection extends EventEmitter { callback: null as unknown as Callback, // Will be set later hasResponseHeaderTaggedFields, noResponse: payload.context.noResponse ?? false, - diagnostic + diagnostic, + timeoutHandle: null, + timedOut: false } this.#requestsQueue.push(fastQueueCallback => { - request.callback = fastQueueCallback + request.callback = (error: Error | null, payload: any) => { + clearTimeout(request.timeoutHandle!) + request.timeoutHandle = null + fastQueueCallback(error, payload) + } + if (!request.noResponse) { + request.timeoutHandle = setTimeout(() => { + request.timedOut = true + request.callback(new TimeoutError('Request timed out'), null) + }, this.#options.requestTimeout) + } if (this.#socketMustBeDrained) { this.#afterDrainRequests.push(request) @@ -616,7 +631,11 @@ export class Connection extends EventEmitter { this.#inflightRequests.delete(correlationId) - const { apiKey, apiVersion, hasResponseHeaderTaggedFields, parser, callback } = request + const { apiKey, apiVersion, hasResponseHeaderTaggedFields, parser, callback, timedOut } = request + + if (timedOut) { + return + } let deserialized: any let responseError: Error | null = null @@ -704,7 +723,7 @@ export class Connection extends EventEmitter { } } - #onError (error: Error): void { + #connectedSocketErrorHandler (error: Error): void { clearTimeout(this.#reauthenticationTimeout) this.emit('error', new NetworkError('Connection error', { cause: error })) } diff --git a/test/clients/base/base.test.ts b/test/clients/base/base.test.ts index 30f9da2..4b8d460 100644 --- a/test/clients/base/base.test.ts +++ b/test/clients/base/base.test.ts @@ -11,6 +11,7 @@ import { Connection, MultipleErrors, sleep, + TimeoutError, UnsupportedApiError, type ClientDiagnosticEvent, type ClusterMetadata @@ -541,22 +542,13 @@ test('metadata should handle connection failures to non-existent broker', async strictEqual(['MultipleErrors', 'AggregateError'].includes(error.name), true) // Error message should indicate failure - strictEqual(error.message.includes('failed'), true) + strictEqual(error.message, 'Cannot connect to any broker.') // Should contain nested errors strictEqual(Array.isArray(error.errors), true) - strictEqual(error.errors.length > 0, true) - - // At least one error should be a network error - const hasNetworkError = error.errors.some( - (err: any) => - err.message.includes('ECONNREFUSED') || - err.message.includes('ETIMEDOUT') || - err.message.includes('getaddrinfo') || - err.message.includes('connect') - ) - - strictEqual(hasNetworkError, true) + strictEqual(error.errors.length, 1) + strictEqual(error.errors[0] instanceof TimeoutError, true) + strictEqual(error.errors[0].message, 'Connection to 192.0.2.1:9092 timed out.') } }) diff --git a/test/clients/consumer/consumer-consumer-group-protocol.test.ts b/test/clients/consumer/consumer-consumer-group-protocol.test.ts index 770c250..8fe45e9 100644 --- a/test/clients/consumer/consumer-consumer-group-protocol.test.ts +++ b/test/clients/consumer/consumer-consumer-group-protocol.test.ts @@ -8,6 +8,7 @@ import { ProduceAcks, ResponseError, sleep, + TimeoutError, UnsupportedApiError, type MessageToProduce, type ProducerOptions @@ -165,11 +166,22 @@ test('#consumerGroupHeartbeat should ignore response when closed ', skipConsumer }) test('#consumerGroupHeartbeat should timeout and schedule another heartbeat', skipConsumerGroupProtocol, async t => { - const consumer = createConsumer(t, { groupProtocol: 'consumer', maxWaitTime: 100, timeout: 200 }) + const consumer = createConsumer(t, { groupProtocol: 'consumer', maxWaitTime: 100, requestTimeout: 200 }) const topic = await createTopic(t, true, 1) const stream = await consumer.consume({ topics: [topic] }) - let mockCount = 1 - mockAPI(consumer[kConnections], consumerGroupHeartbeatV0.api.key, null, null, () => mockCount-- > 0) + mockAPI(consumer[kConnections], consumerGroupHeartbeatV0.api.key, null, null, ( + __originalSend, + __apiKey, + __apiVersion, + __payload, + __responseParser, + __hasRequestHeaderTaggedFields, + __hasResponseHeaderTaggedFields, + callback + ) => { + callback(new TimeoutError('Request timed out'), null) + return false + }) await once(consumer, 'consumer:heartbeat:error') await once(consumer, 'consumer:heartbeat:end') await stream.close() diff --git a/test/clients/consumer/consumer.test.ts b/test/clients/consumer/consumer.test.ts index d71bca3..0efedf4 100644 --- a/test/clients/consumer/consumer.test.ts +++ b/test/clients/consumer/consumer.test.ts @@ -929,7 +929,7 @@ test('fetch should support both promise and callback API', async t => { const topicInfo = metadata.topics.get(topic)! await new Promise((resolve, reject) => { - const consumer = createConsumer(t) + const consumer = createConsumer(t, { requestTimeout: 100000 }) // First test callback API consumer.fetch( diff --git a/test/clients/producer/producer.test.ts b/test/clients/producer/producer.test.ts index e11af3d..f6cf97f 100644 --- a/test/clients/producer/producer.test.ts +++ b/test/clients/producer/producer.test.ts @@ -715,7 +715,7 @@ test('send should handle synchronuous error during payload creation', async t => const testTopic = await createTopic(t) const compression = 'lz4' - const expectedError = new GenericError('PLT_KFK_UNSUPPORTED_COMPRESSION', 'Avoid RUD') + const expectedError = new GenericError('PLT_KFK_UNSUPPORTED_COMPRESSION', 'Avoid RUD', { canRetry: false }) t.mock.method(compressionsAlgorithms[compression], 'compressSync', () => { throw expectedError }) diff --git a/test/helpers.ts b/test/helpers.ts index eb97d98..5c34bd8 100644 --- a/test/helpers.ts +++ b/test/helpers.ts @@ -130,7 +130,9 @@ export function mockMethod ( fn?: (original: (...args: any[]) => void, ...args: any[]) => boolean | void ) { if (typeof errorToMock === 'undefined') { - errorToMock = new MultipleErrors(mockedErrorMessage, [new Error(mockedErrorMessage + ' (internal)')]) + errorToMock = new MultipleErrors(mockedErrorMessage, [new Error(mockedErrorMessage + ' (internal)')], { + canRetry: false + }) } const original = target[method].bind(target) @@ -259,7 +261,9 @@ export function mockAPI ( fn?: (original: (...args: any[]) => void, ...args: any[]) => boolean | void ) { if (typeof errorToMock === 'undefined') { - errorToMock = new MultipleErrors(mockedErrorMessage, [new Error(mockedErrorMessage + ' (internal)')]) + errorToMock = new MultipleErrors(mockedErrorMessage, [new Error(mockedErrorMessage + ' (internal)')], { + canRetry: false + }) } const originalGet = pool.get.bind(pool) diff --git a/test/network/connection.test.ts b/test/network/connection.test.ts index 139aa30..4a442ff 100644 --- a/test/network/connection.test.ts +++ b/test/network/connection.test.ts @@ -411,6 +411,100 @@ test('Connection.send should enqueue request and process response', async t => { verifyTracingChannel() }) +test('Connection.send should handle requests with response', async t => { + const { server, port } = await createServer(t) + const connection = new Connection('test-client') + t.after(() => connection.close()) + + // Setup a simple echo server + server.on('connection', socket => { + socket.on('data', () => { + setTimeout(() => socket.write(Buffer.from([0, 0, 0, 0, 0, 0, 0, 1])), 1000) + }) + }) + + await connection.connect('localhost', port) + + // Create payload function that indicates no response is expected + function payloadFn () { + const writer = Writer.create() + writer.appendInt32(42) + return writer + } + + await new Promise((resolve, reject) => { + connection.send( + 0, // apiKey + 0, // apiVersion + payloadFn, + function () { + return 'Success' + }, // Dummy parser + false, // hasRequestHeaderTaggedFields + false, // hasResponseHeaderTaggedFields + (err, returnValue) => { + if (err) { + reject(err) + } else { + resolve(returnValue) + } + } + ) + }) +}) + +test('Connection.send should time out eventually (custom timeout)', async t => { + const { server, port } = await createServer(t) + const customTimeout = 2000 + const connection = new Connection('test-client', { requestTimeout: customTimeout }) + t.after(() => connection.close()) + + // Setup a simple echo server + server.on('connection', socket => { + socket.on('data', () => { + setTimeout(() => socket.write(Buffer.from([0, 0, 0, 0, 0, 0, 0, 1])), 10000) + }) + }) + + await connection.connect('localhost', port) + + // Create payload function that indicates no response is expected + function payloadFn () { + const writer = Writer.create() + writer.appendInt32(42) + return writer + } + + const startTime = performance.now() + try { + await new Promise((resolve, reject) => { + connection.send( + 0, // apiKey + 0, // apiVersion + payloadFn, + function () { + return 'Success' + }, // Dummy parser + false, // hasRequestHeaderTaggedFields + false, // hasResponseHeaderTaggedFields + (err, returnValue) => { + if (err) { + reject(err) + } else { + resolve(returnValue) + } + } + ) + }) + throw new Error('Expected request to time out') + } catch (error) { + deepStrictEqual((error as Error).message, 'Request timed out') + const timeoutMargin = customTimeout * 0.01 // Allow 1% margin + const timeoutDiff = Math.abs(performance.now() - startTime - customTimeout) + strictEqual(timeoutDiff <= timeoutMargin, true, 'Should time out with custom timeout') + } +}) + test('Connection.send should handle requests with no response', async t => { const { server, port } = await createServer(t) const connection = new Connection('test-client')