Skip to content

Commit 305df00

Browse files
committed
Implement Request Timeouts
This change implements request timeouts for any request sent to the Kafka cluster. The timeout's duration can be configured via option `requestTimeout` in every client type. Additionally, the retrying logic in the base class is changed so that it retries only if every error in the current error chain is marked as retriable. Otherwise, timeout errors, which should be handled as non-retriable errors, would potentially lead to much longer timeouts due to multiple attempts being made. (But also in general, I would assume that every non-retriable error encountered should block any additional attempt) on-behalf-of: @SAP ospo@sap.com
1 parent 12cb67e commit 305df00

File tree

11 files changed

+165
-68
lines changed

11 files changed

+165
-68
lines changed

src/apis/callbacks.ts

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { MultipleErrors, TimeoutError } from '../errors.ts'
1+
import { MultipleErrors } from '../errors.ts'
22
import { PromiseWithResolvers } from '../utils.ts'
33
import { type Callback } from './definitions.ts'
44

@@ -65,25 +65,3 @@ export function runConcurrentCallbacks<ReturnType> (
6565
operation(item, operationCallback.bind(null, i++))
6666
}
6767
}
68-
69-
export function createTimeoutCallback<ReturnType> (
70-
callback: Callback<ReturnType>,
71-
timeout: number,
72-
errorMessage: string
73-
): Callback<ReturnType> {
74-
let timeoutFired = false
75-
const timeoutHandle = setTimeout(() => {
76-
timeoutFired = true
77-
callback(new TimeoutError(errorMessage), undefined as unknown as ReturnType)
78-
}, timeout)
79-
80-
return (error: Error | null, result: ReturnType) => {
81-
/* c8 ignore next 3 - Hard to test */
82-
if (timeoutFired) {
83-
return
84-
}
85-
86-
clearTimeout(timeoutHandle)
87-
callback(error, result)
88-
}
89-
}

src/clients/base/base.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import {
2424
import type { GenericError } from '../../errors.ts'
2525
import { MultipleErrors, NetworkError, UnsupportedApiError, UserError } from '../../errors.ts'
2626
import { ConnectionPool } from '../../network/connection-pool.ts'
27-
import { type Broker, type Connection, type ConnectionOptions } from '../../network/connection.ts'
27+
import { type Broker, type Connection } from '../../network/connection.ts'
2828
import { parseBroker } from '../../network/utils.ts'
2929
import { kInstance } from '../../symbols.ts'
3030
import { ajv, debugDump, loggers } from '../../utils.ts'
@@ -269,7 +269,7 @@ export class Base<OptionsType extends BaseOptions = BaseOptions> extends EventEm
269269
[kCreateConnectionPool] (): ConnectionPool {
270270
const pool = new ConnectionPool(this[kClientId], {
271271
ownerId: this[kInstance],
272-
...(this[kOptions] as ConnectionOptions)
272+
...this[kOptions]
273273
})
274274

275275
this.#forwardEvents(pool, [
@@ -356,7 +356,8 @@ export class Base<OptionsType extends BaseOptions = BaseOptions> extends EventEm
356356
operation((error, result) => {
357357
if (error) {
358358
const genericError = error as GenericError
359-
const retriable = genericError.findBy?.('code', NetworkError.code) || genericError.findBy?.('canRetry', true)
359+
// Only retry if all the errors in the chain are retriable
360+
const retriable = !genericError.findBy?.('canRetry', false)
360361
errors.push(error)
361362

362363
if (attempt < retries && retriable && !shouldSkipRetry?.(error)) {

src/clients/consumer/consumer.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import { type ValidateFunction } from 'ajv'
22
import {
33
type CallbackWithPromise,
44
createPromisifiedCallback,
5-
createTimeoutCallback,
65
kCallbackPromise,
76
runConcurrentCallbacks
87
} from '../../apis/callbacks.ts'
@@ -1141,8 +1140,6 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
11411140
return
11421141
}
11431142

1144-
const timeoutCallback = createTimeoutCallback(groupCallback, this[kOptions].timeout!, 'Heartbeat timeout.')
1145-
11461143
api(
11471144
connection,
11481145
this.groupId,
@@ -1154,7 +1151,7 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
11541151
this.topics.current,
11551152
this.#groupRemoteAssignor,
11561153
this.#assignments,
1157-
timeoutCallback
1154+
groupCallback
11581155
)
11591156
})
11601157
},

src/errors.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ export class NetworkError extends GenericError {
139139
static code: ErrorCode = 'PLT_KFK_NETWORK'
140140

141141
constructor (message: string, properties: ErrorProperties = {}) {
142-
super(NetworkError.code, message, properties)
142+
super(NetworkError.code, message, { canRetry: true, ...properties })
143143
}
144144
}
145145

@@ -199,7 +199,7 @@ export class TimeoutError extends GenericError {
199199
static code: ErrorCode = 'PLT_KFK_TIMEOUT'
200200

201201
constructor (message: string, properties: ErrorProperties = {}) {
202-
super(NetworkError.code, message, properties)
202+
super(NetworkError.code, message, { canRetry: false, ...properties })
203203
}
204204
}
205205

src/network/connection.ts

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ export interface SASLOptions {
4848

4949
export interface ConnectionOptions {
5050
connectTimeout?: number
51+
requestTimeout?: number
5152
maxInflights?: number
5253
tls?: TLSConnectionOptions
5354
tlsServerName?: string | boolean
@@ -66,6 +67,8 @@ export interface Request {
6667
parser: ResponseParser<unknown>
6768
callback: Callback<any>
6869
diagnostic: Record<string, unknown>
70+
timeoutHandle: NodeJS.Timeout | null
71+
timedOut: boolean
6972
}
7073

7174
export const ConnectionStatuses = {
@@ -78,11 +81,11 @@ export const ConnectionStatuses = {
7881
ERROR: 'error'
7982
} as const
8083

81-
export type ConnectionStatus = keyof typeof ConnectionStatuses
82-
export type ConnectionStatusValue = (typeof ConnectionStatuses)[keyof typeof ConnectionStatuses]
84+
export type ConnectionStatus = (typeof ConnectionStatuses)[keyof typeof ConnectionStatuses]
8385

84-
export const defaultOptions: ConnectionOptions = {
86+
export const defaultOptions = {
8587
connectTimeout: 5000,
88+
requestTimeout: 30000,
8689
maxInflights: 5
8790
}
8891

@@ -92,7 +95,7 @@ export class Connection extends EventEmitter {
9295
#host: string | undefined
9396
#port: number | undefined
9497
#options: ConnectionOptions
95-
#status: ConnectionStatusValue
98+
#status: ConnectionStatus
9699
#instanceId: number
97100
#clientId: string | undefined
98101
// @ts-ignore This is used just for debugging
@@ -143,7 +146,7 @@ export class Connection extends EventEmitter {
143146
return this.#instanceId
144147
}
145148

146-
get status (): ConnectionStatusValue {
149+
get status (): ConnectionStatus {
147150
return this.#status
148151
}
149152

@@ -187,7 +190,7 @@ export class Connection extends EventEmitter {
187190
typeof this.#options.tlsServerName === 'string' ? this.#options.tlsServerName : host
188191
}
189192

190-
const connectionTimeoutHandler = () => {
193+
const connectingSocketTimeoutHandler = () => {
191194
const error = new TimeoutError(`Connection to ${host}:${port} timed out.`)
192195
diagnosticContext.error = error
193196
this.#socket.destroy()
@@ -201,7 +204,7 @@ export class Connection extends EventEmitter {
201204
connectionsConnectsChannel.asyncEnd.publish(diagnosticContext)
202205
}
203206

204-
const connectionErrorHandler = (error: Error) => {
207+
const connectingSocketErrorHandler = (error: Error) => {
205208
this.#onConnectionError(host, port, diagnosticContext, error)
206209
}
207210

@@ -216,10 +219,10 @@ export class Connection extends EventEmitter {
216219
this.#socket.setNoDelay(true)
217220

218221
this.#socket.once(this.#options.tls ? 'secureConnect' : 'connect', () => {
219-
this.#socket.removeListener('timeout', connectionTimeoutHandler)
220-
this.#socket.removeListener('error', connectionErrorHandler)
222+
this.#socket.removeListener('timeout', connectingSocketTimeoutHandler)
223+
this.#socket.removeListener('error', connectingSocketErrorHandler)
221224

222-
this.#socket.on('error', this.#onError.bind(this))
225+
this.#socket.on('error', this.#connectedSocketErrorHandler.bind(this))
223226
this.#socket.on('data', this.#onData.bind(this))
224227
if (this.#handleBackPressure) {
225228
this.#socket.on('drain', this.#onDrain.bind(this))
@@ -235,8 +238,8 @@ export class Connection extends EventEmitter {
235238
}
236239
})
237240

238-
this.#socket.once('timeout', connectionTimeoutHandler)
239-
this.#socket.once('error', connectionErrorHandler)
241+
this.#socket.once('timeout', connectingSocketTimeoutHandler)
242+
this.#socket.once('error', connectingSocketErrorHandler)
240243
} catch (error) {
241244
this.#status = ConnectionStatuses.ERROR
242245

@@ -364,11 +367,23 @@ export class Connection extends EventEmitter {
364367
callback: null as unknown as Callback<any>, // Will be set later
365368
hasResponseHeaderTaggedFields,
366369
noResponse: payload.context.noResponse ?? false,
367-
diagnostic
370+
diagnostic,
371+
timeoutHandle: null,
372+
timedOut: false
368373
}
369374

370375
this.#requestsQueue.push(fastQueueCallback => {
371-
request.callback = fastQueueCallback
376+
request.callback = (error: Error | null, payload: any) => {
377+
clearTimeout(request.timeoutHandle!)
378+
request.timeoutHandle = null
379+
fastQueueCallback(error, payload)
380+
}
381+
if (!request.noResponse) {
382+
request.timeoutHandle = setTimeout(() => {
383+
request.timedOut = true
384+
request.callback(new TimeoutError('Request timed out'), null)
385+
}, this.#options.requestTimeout)
386+
}
372387

373388
if (this.#socketMustBeDrained) {
374389
this.#afterDrainRequests.push(request)
@@ -616,7 +631,11 @@ export class Connection extends EventEmitter {
616631

617632
this.#inflightRequests.delete(correlationId)
618633

619-
const { apiKey, apiVersion, hasResponseHeaderTaggedFields, parser, callback } = request
634+
const { apiKey, apiVersion, hasResponseHeaderTaggedFields, parser, callback, timedOut } = request
635+
636+
if (timedOut) {
637+
return
638+
}
620639

621640
let deserialized: any
622641
let responseError: Error | null = null
@@ -704,7 +723,7 @@ export class Connection extends EventEmitter {
704723
}
705724
}
706725

707-
#onError (error: Error): void {
726+
#connectedSocketErrorHandler (error: Error): void {
708727
clearTimeout(this.#reauthenticationTimeout)
709728
this.emit('error', new NetworkError('Connection error', { cause: error }))
710729
}

test/clients/base/base.test.ts

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
Connection,
1212
MultipleErrors,
1313
sleep,
14+
TimeoutError,
1415
UnsupportedApiError,
1516
type ClientDiagnosticEvent,
1617
type ClusterMetadata
@@ -541,22 +542,13 @@ test('metadata should handle connection failures to non-existent broker', async
541542
strictEqual(['MultipleErrors', 'AggregateError'].includes(error.name), true)
542543

543544
// Error message should indicate failure
544-
strictEqual(error.message.includes('failed'), true)
545+
strictEqual(error.message, 'Cannot connect to any broker.')
545546

546547
// Should contain nested errors
547548
strictEqual(Array.isArray(error.errors), true)
548-
strictEqual(error.errors.length > 0, true)
549-
550-
// At least one error should be a network error
551-
const hasNetworkError = error.errors.some(
552-
(err: any) =>
553-
err.message.includes('ECONNREFUSED') ||
554-
err.message.includes('ETIMEDOUT') ||
555-
err.message.includes('getaddrinfo') ||
556-
err.message.includes('connect')
557-
)
558-
559-
strictEqual(hasNetworkError, true)
549+
strictEqual(error.errors.length, 1)
550+
strictEqual(error.errors[0] instanceof TimeoutError, true)
551+
strictEqual(error.errors[0].message, 'Connection to 192.0.2.1:9092 timed out.')
560552
}
561553
})
562554

test/clients/consumer/consumer-consumer-group-protocol.test.ts

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
ProduceAcks,
99
ResponseError,
1010
sleep,
11+
TimeoutError,
1112
UnsupportedApiError,
1213
type MessageToProduce,
1314
type ProducerOptions
@@ -165,11 +166,22 @@ test('#consumerGroupHeartbeat should ignore response when closed ', skipConsumer
165166
})
166167

167168
test('#consumerGroupHeartbeat should timeout and schedule another heartbeat', skipConsumerGroupProtocol, async t => {
168-
const consumer = createConsumer(t, { groupProtocol: 'consumer', maxWaitTime: 100, timeout: 200 })
169+
const consumer = createConsumer(t, { groupProtocol: 'consumer', maxWaitTime: 100, requestTimeout: 200 })
169170
const topic = await createTopic(t, true, 1)
170171
const stream = await consumer.consume({ topics: [topic] })
171-
let mockCount = 1
172-
mockAPI(consumer[kConnections], consumerGroupHeartbeatV0.api.key, null, null, () => mockCount-- > 0)
172+
mockAPI(consumer[kConnections], consumerGroupHeartbeatV0.api.key, null, null, (
173+
__originalSend,
174+
__apiKey,
175+
__apiVersion,
176+
__payload,
177+
__responseParser,
178+
__hasRequestHeaderTaggedFields,
179+
__hasResponseHeaderTaggedFields,
180+
callback
181+
) => {
182+
callback(new TimeoutError('Request timed out'), null)
183+
return false
184+
})
173185
await once(consumer, 'consumer:heartbeat:error')
174186
await once(consumer, 'consumer:heartbeat:end')
175187
await stream.close()

test/clients/consumer/consumer.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -929,7 +929,7 @@ test('fetch should support both promise and callback API', async t => {
929929
const topicInfo = metadata.topics.get(topic)!
930930

931931
await new Promise<void>((resolve, reject) => {
932-
const consumer = createConsumer(t)
932+
const consumer = createConsumer(t, { requestTimeout: 100000 })
933933

934934
// First test callback API
935935
consumer.fetch(

test/clients/producer/producer.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -715,7 +715,7 @@ test('send should handle synchronuous error during payload creation', async t =>
715715
const testTopic = await createTopic(t)
716716

717717
const compression = 'lz4'
718-
const expectedError = new GenericError('PLT_KFK_UNSUPPORTED_COMPRESSION', 'Avoid RUD')
718+
const expectedError = new GenericError('PLT_KFK_UNSUPPORTED_COMPRESSION', 'Avoid RUD', { canRetry: false })
719719
t.mock.method(compressionsAlgorithms[compression], 'compressSync', () => {
720720
throw expectedError
721721
})

test/helpers.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,9 @@ export function mockMethod (
130130
fn?: (original: (...args: any[]) => void, ...args: any[]) => boolean | void
131131
) {
132132
if (typeof errorToMock === 'undefined') {
133-
errorToMock = new MultipleErrors(mockedErrorMessage, [new Error(mockedErrorMessage + ' (internal)')])
133+
errorToMock = new MultipleErrors(mockedErrorMessage, [new Error(mockedErrorMessage + ' (internal)')], {
134+
canRetry: false
135+
})
134136
}
135137

136138
const original = target[method].bind(target)
@@ -259,7 +261,9 @@ export function mockAPI (
259261
fn?: (original: (...args: any[]) => void, ...args: any[]) => boolean | void
260262
) {
261263
if (typeof errorToMock === 'undefined') {
262-
errorToMock = new MultipleErrors(mockedErrorMessage, [new Error(mockedErrorMessage + ' (internal)')])
264+
errorToMock = new MultipleErrors(mockedErrorMessage, [new Error(mockedErrorMessage + ' (internal)')], {
265+
canRetry: false
266+
})
263267
}
264268

265269
const originalGet = pool.get.bind(pool)

0 commit comments

Comments
 (0)