-
Notifications
You must be signed in to change notification settings - Fork 20
Implement Request Timeouts #172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -48,6 +48,7 @@ export interface SASLOptions { | |
|
|
||
| export interface ConnectionOptions { | ||
| connectTimeout?: number | ||
| requestTimeout?: number | ||
| maxInflights?: number | ||
| tls?: TLSConnectionOptions | ||
| tlsServerName?: string | boolean | ||
|
|
@@ -66,6 +67,8 @@ export interface Request { | |
| parser: ResponseParser<unknown> | ||
| callback: Callback<any> | ||
| diagnostic: Record<string, unknown> | ||
| timeoutHandle: NodeJS.Timeout | null | ||
| timedOut: boolean | ||
| } | ||
|
|
||
| export const ConnectionStatuses = { | ||
|
|
@@ -78,11 +81,11 @@ export const ConnectionStatuses = { | |
| ERROR: 'error' | ||
| } as const | ||
|
|
||
| export type ConnectionStatus = keyof typeof ConnectionStatuses | ||
| export type ConnectionStatusValue = (typeof ConnectionStatuses)[keyof typeof ConnectionStatuses] | ||
| export type ConnectionStatus = (typeof ConnectionStatuses)[keyof typeof ConnectionStatuses] | ||
|
|
||
| export const defaultOptions: ConnectionOptions = { | ||
| export const defaultOptions = { | ||
| connectTimeout: 5000, | ||
| requestTimeout: 30000, | ||
| maxInflights: 5 | ||
| } | ||
|
|
||
|
|
@@ -92,7 +95,7 @@ export class Connection extends EventEmitter { | |
| #host: string | undefined | ||
| #port: number | undefined | ||
| #options: ConnectionOptions | ||
| #status: ConnectionStatusValue | ||
| #status: ConnectionStatus | ||
| #instanceId: number | ||
| #clientId: string | undefined | ||
| // @ts-ignore This is used just for debugging | ||
|
|
@@ -143,7 +146,7 @@ export class Connection extends EventEmitter { | |
| return this.#instanceId | ||
| } | ||
|
|
||
| get status (): ConnectionStatusValue { | ||
| get status (): ConnectionStatus { | ||
| return this.#status | ||
| } | ||
|
|
||
|
|
@@ -187,7 +190,7 @@ export class Connection extends EventEmitter { | |
| typeof this.#options.tlsServerName === 'string' ? this.#options.tlsServerName : host | ||
| } | ||
|
|
||
| const connectionTimeoutHandler = () => { | ||
| const connectingSocketTimeoutHandler = () => { | ||
| const error = new TimeoutError(`Connection to ${host}:${port} timed out.`) | ||
| diagnosticContext.error = error | ||
| this.#socket.destroy() | ||
|
|
@@ -201,7 +204,7 @@ export class Connection extends EventEmitter { | |
| connectionsConnectsChannel.asyncEnd.publish(diagnosticContext) | ||
| } | ||
|
|
||
| const connectionErrorHandler = (error: Error) => { | ||
| const connectingSocketErrorHandler = (error: Error) => { | ||
| this.#onConnectionError(host, port, diagnosticContext, error) | ||
| } | ||
|
|
||
|
|
@@ -216,10 +219,10 @@ export class Connection extends EventEmitter { | |
| this.#socket.setNoDelay(true) | ||
|
|
||
| this.#socket.once(this.#options.tls ? 'secureConnect' : 'connect', () => { | ||
| this.#socket.removeListener('timeout', connectionTimeoutHandler) | ||
| this.#socket.removeListener('error', connectionErrorHandler) | ||
| this.#socket.removeListener('timeout', connectingSocketTimeoutHandler) | ||
| this.#socket.removeListener('error', connectingSocketErrorHandler) | ||
|
|
||
| this.#socket.on('error', this.#onError.bind(this)) | ||
| this.#socket.on('error', this.#connectedSocketErrorHandler.bind(this)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please revert this. |
||
| this.#socket.on('data', this.#onData.bind(this)) | ||
| if (this.#handleBackPressure) { | ||
| this.#socket.on('drain', this.#onDrain.bind(this)) | ||
|
|
@@ -235,8 +238,8 @@ export class Connection extends EventEmitter { | |
| } | ||
| }) | ||
|
|
||
| this.#socket.once('timeout', connectionTimeoutHandler) | ||
| this.#socket.once('error', connectionErrorHandler) | ||
| this.#socket.once('timeout', connectingSocketTimeoutHandler) | ||
| this.#socket.once('error', connectingSocketErrorHandler) | ||
| } catch (error) { | ||
| this.#status = ConnectionStatuses.ERROR | ||
|
|
||
|
|
@@ -364,11 +367,23 @@ export class Connection extends EventEmitter { | |
| callback: null as unknown as Callback<any>, // Will be set later | ||
| hasResponseHeaderTaggedFields, | ||
| noResponse: payload.context.noResponse ?? false, | ||
| diagnostic | ||
| diagnostic, | ||
| timeoutHandle: null, | ||
| timedOut: false | ||
| } | ||
|
|
||
| this.#requestsQueue.push(fastQueueCallback => { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rather than creating two functions here, I would use And Similarly for the setTimeout below: |
||
| request.callback = fastQueueCallback | ||
| request.callback = (error: Error | null, payload: any) => { | ||
| clearTimeout(request.timeoutHandle!) | ||
| request.timeoutHandle = null | ||
| fastQueueCallback(error, payload) | ||
| } | ||
| if (!request.noResponse) { | ||
| request.timeoutHandle = setTimeout(() => { | ||
| request.timedOut = true | ||
| request.callback(new TimeoutError('Request timed out'), null) | ||
| }, this.#options.requestTimeout) | ||
| } | ||
|
|
||
| if (this.#socketMustBeDrained) { | ||
| this.#afterDrainRequests.push(request) | ||
|
|
@@ -616,7 +631,11 @@ export class Connection extends EventEmitter { | |
|
|
||
| this.#inflightRequests.delete(correlationId) | ||
|
|
||
| const { apiKey, apiVersion, hasResponseHeaderTaggedFields, parser, callback } = request | ||
| const { apiKey, apiVersion, hasResponseHeaderTaggedFields, parser, callback, timedOut } = request | ||
|
|
||
| if (timedOut) { | ||
| return | ||
| } | ||
|
|
||
| let deserialized: any | ||
| let responseError: Error | null = null | ||
|
|
@@ -704,7 +723,7 @@ export class Connection extends EventEmitter { | |
| } | ||
| } | ||
|
|
||
| #onError (error: Error): void { | ||
| #connectedSocketErrorHandler (error: Error): void { | ||
| clearTimeout(this.#reauthenticationTimeout) | ||
| this.emit('error', new NetworkError('Connection error', { cause: error })) | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,6 +8,7 @@ import { | |
| ProduceAcks, | ||
| ResponseError, | ||
| sleep, | ||
| TimeoutError, | ||
| UnsupportedApiError, | ||
| type MessageToProduce, | ||
| type ProducerOptions | ||
|
|
@@ -165,11 +166,22 @@ test('#consumerGroupHeartbeat should ignore response when closed ', skipConsumer | |
| }) | ||
|
|
||
| test('#consumerGroupHeartbeat should timeout and schedule another heartbeat', skipConsumerGroupProtocol, async t => { | ||
| const consumer = createConsumer(t, { groupProtocol: 'consumer', maxWaitTime: 100, timeout: 200 }) | ||
| const consumer = createConsumer(t, { groupProtocol: 'consumer', maxWaitTime: 100, requestTimeout: 200 }) | ||
| const topic = await createTopic(t, true, 1) | ||
| const stream = await consumer.consume({ topics: [topic] }) | ||
| let mockCount = 1 | ||
| mockAPI(consumer[kConnections], consumerGroupHeartbeatV0.api.key, null, null, () => mockCount-- > 0) | ||
| mockAPI(consumer[kConnections], consumerGroupHeartbeatV0.api.key, null, null, ( | ||
| __originalSend, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please only use a single _ in this call. |
||
| __apiKey, | ||
| __apiVersion, | ||
| __payload, | ||
| __responseParser, | ||
| __hasRequestHeaderTaggedFields, | ||
| __hasResponseHeaderTaggedFields, | ||
| callback | ||
| ) => { | ||
| callback(new TimeoutError('Request timed out'), null) | ||
| return false | ||
| }) | ||
| await once(consumer, 'consumer:heartbeat:error') | ||
| await once(consumer, 'consumer:heartbeat:end') | ||
| await stream.close() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change (and the types change above)?