Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 1 addition & 23 deletions src/apis/callbacks.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { MultipleErrors, TimeoutError } from '../errors.ts'
import { MultipleErrors } from '../errors.ts'
import { PromiseWithResolvers } from '../utils.ts'
import { type Callback } from './definitions.ts'

Expand Down Expand Up @@ -65,25 +65,3 @@ export function runConcurrentCallbacks<ReturnType> (
operation(item, operationCallback.bind(null, i++))
}
}

export function createTimeoutCallback<ReturnType> (
callback: Callback<ReturnType>,
timeout: number,
errorMessage: string
): Callback<ReturnType> {
let timeoutFired = false
const timeoutHandle = setTimeout(() => {
timeoutFired = true
callback(new TimeoutError(errorMessage), undefined as unknown as ReturnType)
}, timeout)

return (error: Error | null, result: ReturnType) => {
/* c8 ignore next 3 - Hard to test */
if (timeoutFired) {
return
}

clearTimeout(timeoutHandle)
callback(error, result)
}
}
7 changes: 4 additions & 3 deletions src/clients/base/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import {
import type { GenericError } from '../../errors.ts'
import { MultipleErrors, NetworkError, UnsupportedApiError, UserError } from '../../errors.ts'
import { ConnectionPool } from '../../network/connection-pool.ts'
import { type Broker, type Connection, type ConnectionOptions } from '../../network/connection.ts'
import { type Broker, type Connection } from '../../network/connection.ts'
import { parseBroker } from '../../network/utils.ts'
import { kInstance } from '../../symbols.ts'
import { ajv, debugDump, loggers } from '../../utils.ts'
Expand Down Expand Up @@ -269,7 +269,7 @@ export class Base<OptionsType extends BaseOptions = BaseOptions> extends EventEm
[kCreateConnectionPool] (): ConnectionPool {
const pool = new ConnectionPool(this[kClientId], {
ownerId: this[kInstance],
...(this[kOptions] as ConnectionOptions)
...this[kOptions]
})

this.#forwardEvents(pool, [
Expand Down Expand Up @@ -356,7 +356,8 @@ export class Base<OptionsType extends BaseOptions = BaseOptions> extends EventEm
operation((error, result) => {
if (error) {
const genericError = error as GenericError
const retriable = genericError.findBy?.('code', NetworkError.code) || genericError.findBy?.('canRetry', true)
// Only retry if all the errors in the chain are retriable
const retriable = !genericError.findBy?.('canRetry', false)
errors.push(error)

if (attempt < retries && retriable && !shouldSkipRetry?.(error)) {
Expand Down
5 changes: 1 addition & 4 deletions src/clients/consumer/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { type ValidateFunction } from 'ajv'
import {
type CallbackWithPromise,
createPromisifiedCallback,
createTimeoutCallback,
kCallbackPromise,
runConcurrentCallbacks
} from '../../apis/callbacks.ts'
Expand Down Expand Up @@ -1141,8 +1140,6 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
return
}

const timeoutCallback = createTimeoutCallback(groupCallback, this[kOptions].timeout!, 'Heartbeat timeout.')

api(
connection,
this.groupId,
Expand All @@ -1154,7 +1151,7 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
this.topics.current,
this.#groupRemoteAssignor,
this.#assignments,
timeoutCallback
groupCallback
)
})
},
Expand Down
4 changes: 2 additions & 2 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ export class NetworkError extends GenericError {
static code: ErrorCode = 'PLT_KFK_NETWORK'

constructor (message: string, properties: ErrorProperties = {}) {
super(NetworkError.code, message, properties)
super(NetworkError.code, message, { canRetry: true, ...properties })
}
}

Expand Down Expand Up @@ -199,7 +199,7 @@ export class TimeoutError extends GenericError {
static code: ErrorCode = 'PLT_KFK_TIMEOUT'

constructor (message: string, properties: ErrorProperties = {}) {
super(NetworkError.code, message, properties)
super(NetworkError.code, message, { canRetry: false, ...properties })
}
}

Expand Down
51 changes: 35 additions & 16 deletions src/network/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export interface SASLOptions {

export interface ConnectionOptions {
connectTimeout?: number
requestTimeout?: number
maxInflights?: number
tls?: TLSConnectionOptions
tlsServerName?: string | boolean
Expand All @@ -66,6 +67,8 @@ export interface Request {
parser: ResponseParser<unknown>
callback: Callback<any>
diagnostic: Record<string, unknown>
timeoutHandle: NodeJS.Timeout | null
timedOut: boolean
}

export const ConnectionStatuses = {
Expand All @@ -78,11 +81,11 @@ export const ConnectionStatuses = {
ERROR: 'error'
} as const

export type ConnectionStatus = keyof typeof ConnectionStatuses
export type ConnectionStatusValue = (typeof ConnectionStatuses)[keyof typeof ConnectionStatuses]
export type ConnectionStatus = (typeof ConnectionStatuses)[keyof typeof ConnectionStatuses]

export const defaultOptions: ConnectionOptions = {
export const defaultOptions = {
connectTimeout: 5000,
requestTimeout: 30000,
maxInflights: 5
}

Expand All @@ -92,7 +95,7 @@ export class Connection extends EventEmitter {
#host: string | undefined
#port: number | undefined
#options: ConnectionOptions
#status: ConnectionStatusValue
#status: ConnectionStatus
#instanceId: number
#clientId: string | undefined
// @ts-ignore This is used just for debugging
Expand Down Expand Up @@ -143,7 +146,7 @@ export class Connection extends EventEmitter {
return this.#instanceId
}

get status (): ConnectionStatusValue {
get status (): ConnectionStatus {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change (and the types change above)?

return this.#status
}

Expand Down Expand Up @@ -187,7 +190,7 @@ export class Connection extends EventEmitter {
typeof this.#options.tlsServerName === 'string' ? this.#options.tlsServerName : host
}

const connectionTimeoutHandler = () => {
const connectingSocketTimeoutHandler = () => {
const error = new TimeoutError(`Connection to ${host}:${port} timed out.`)
diagnosticContext.error = error
this.#socket.destroy()
Expand All @@ -201,7 +204,7 @@ export class Connection extends EventEmitter {
connectionsConnectsChannel.asyncEnd.publish(diagnosticContext)
}

const connectionErrorHandler = (error: Error) => {
const connectingSocketErrorHandler = (error: Error) => {
this.#onConnectionError(host, port, diagnosticContext, error)
}

Expand All @@ -216,10 +219,10 @@ export class Connection extends EventEmitter {
this.#socket.setNoDelay(true)

this.#socket.once(this.#options.tls ? 'secureConnect' : 'connect', () => {
this.#socket.removeListener('timeout', connectionTimeoutHandler)
this.#socket.removeListener('error', connectionErrorHandler)
this.#socket.removeListener('timeout', connectingSocketTimeoutHandler)
this.#socket.removeListener('error', connectingSocketErrorHandler)

this.#socket.on('error', this.#onError.bind(this))
this.#socket.on('error', this.#connectedSocketErrorHandler.bind(this))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert this.

this.#socket.on('data', this.#onData.bind(this))
if (this.#handleBackPressure) {
this.#socket.on('drain', this.#onDrain.bind(this))
Expand All @@ -235,8 +238,8 @@ export class Connection extends EventEmitter {
}
})

this.#socket.once('timeout', connectionTimeoutHandler)
this.#socket.once('error', connectionErrorHandler)
this.#socket.once('timeout', connectingSocketTimeoutHandler)
this.#socket.once('error', connectingSocketErrorHandler)
} catch (error) {
this.#status = ConnectionStatuses.ERROR

Expand Down Expand Up @@ -364,11 +367,23 @@ export class Connection extends EventEmitter {
callback: null as unknown as Callback<any>, // Will be set later
hasResponseHeaderTaggedFields,
noResponse: payload.context.noResponse ?? false,
diagnostic
diagnostic,
timeoutHandle: null,
timedOut: false
}

this.#requestsQueue.push(fastQueueCallback => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than creating two functions here, I would use .bind.

this.#requestsQueue.push(fastQueueCallback => {
      request.callback = fastQueueCallback

      if (this.#socketMustBeDrained) {
        this.#afterDrainRequests.push(request)
        return false
      }

      return this.#sendRequest(request)
    }, this.#onResponse.bind(this, request, callback))

And #onResponse (or similar) you clear the timeout and then invoke callback.

Similarly for the setTimeout below:

if (!request.noResponse) {
        request.timeoutHandle = setTimeout(this.#onRequestTimeout.bind(this, request), this.#options.requestTimeout)
}

request.callback = fastQueueCallback
request.callback = (error: Error | null, payload: any) => {
clearTimeout(request.timeoutHandle!)
request.timeoutHandle = null
fastQueueCallback(error, payload)
}
if (!request.noResponse) {
request.timeoutHandle = setTimeout(() => {
request.timedOut = true
request.callback(new TimeoutError('Request timed out'), null)
}, this.#options.requestTimeout)
}

if (this.#socketMustBeDrained) {
this.#afterDrainRequests.push(request)
Expand Down Expand Up @@ -616,7 +631,11 @@ export class Connection extends EventEmitter {

this.#inflightRequests.delete(correlationId)

const { apiKey, apiVersion, hasResponseHeaderTaggedFields, parser, callback } = request
const { apiKey, apiVersion, hasResponseHeaderTaggedFields, parser, callback, timedOut } = request

if (timedOut) {
return
}

let deserialized: any
let responseError: Error | null = null
Expand Down Expand Up @@ -704,7 +723,7 @@ export class Connection extends EventEmitter {
}
}

#onError (error: Error): void {
#connectedSocketErrorHandler (error: Error): void {
clearTimeout(this.#reauthenticationTimeout)
this.emit('error', new NetworkError('Connection error', { cause: error }))
}
Expand Down
18 changes: 5 additions & 13 deletions test/clients/base/base.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
Connection,
MultipleErrors,
sleep,
TimeoutError,
UnsupportedApiError,
type ClientDiagnosticEvent,
type ClusterMetadata
Expand Down Expand Up @@ -541,22 +542,13 @@ test('metadata should handle connection failures to non-existent broker', async
strictEqual(['MultipleErrors', 'AggregateError'].includes(error.name), true)

// Error message should indicate failure
strictEqual(error.message.includes('failed'), true)
strictEqual(error.message, 'Cannot connect to any broker.')

// Should contain nested errors
strictEqual(Array.isArray(error.errors), true)
strictEqual(error.errors.length > 0, true)

// At least one error should be a network error
const hasNetworkError = error.errors.some(
(err: any) =>
err.message.includes('ECONNREFUSED') ||
err.message.includes('ETIMEDOUT') ||
err.message.includes('getaddrinfo') ||
err.message.includes('connect')
)

strictEqual(hasNetworkError, true)
strictEqual(error.errors.length, 1)
strictEqual(error.errors[0] instanceof TimeoutError, true)
strictEqual(error.errors[0].message, 'Connection to 192.0.2.1:9092 timed out.')
}
})

Expand Down
18 changes: 15 additions & 3 deletions test/clients/consumer/consumer-consumer-group-protocol.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
ProduceAcks,
ResponseError,
sleep,
TimeoutError,
UnsupportedApiError,
type MessageToProduce,
type ProducerOptions
Expand Down Expand Up @@ -165,11 +166,22 @@ test('#consumerGroupHeartbeat should ignore response when closed ', skipConsumer
})

test('#consumerGroupHeartbeat should timeout and schedule another heartbeat', skipConsumerGroupProtocol, async t => {
const consumer = createConsumer(t, { groupProtocol: 'consumer', maxWaitTime: 100, timeout: 200 })
const consumer = createConsumer(t, { groupProtocol: 'consumer', maxWaitTime: 100, requestTimeout: 200 })
const topic = await createTopic(t, true, 1)
const stream = await consumer.consume({ topics: [topic] })
let mockCount = 1
mockAPI(consumer[kConnections], consumerGroupHeartbeatV0.api.key, null, null, () => mockCount-- > 0)
mockAPI(consumer[kConnections], consumerGroupHeartbeatV0.api.key, null, null, (
__originalSend,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please only use a single _ in this call.

__apiKey,
__apiVersion,
__payload,
__responseParser,
__hasRequestHeaderTaggedFields,
__hasResponseHeaderTaggedFields,
callback
) => {
callback(new TimeoutError('Request timed out'), null)
return false
})
await once(consumer, 'consumer:heartbeat:error')
await once(consumer, 'consumer:heartbeat:end')
await stream.close()
Expand Down
2 changes: 1 addition & 1 deletion test/clients/consumer/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,7 @@ test('fetch should support both promise and callback API', async t => {
const topicInfo = metadata.topics.get(topic)!

await new Promise<void>((resolve, reject) => {
const consumer = createConsumer(t)
const consumer = createConsumer(t, { requestTimeout: 100000 })

// First test callback API
consumer.fetch(
Expand Down
2 changes: 1 addition & 1 deletion test/clients/producer/producer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ test('send should handle synchronuous error during payload creation', async t =>
const testTopic = await createTopic(t)

const compression = 'lz4'
const expectedError = new GenericError('PLT_KFK_UNSUPPORTED_COMPRESSION', 'Avoid RUD')
const expectedError = new GenericError('PLT_KFK_UNSUPPORTED_COMPRESSION', 'Avoid RUD', { canRetry: false })
t.mock.method(compressionsAlgorithms[compression], 'compressSync', () => {
throw expectedError
})
Expand Down
8 changes: 6 additions & 2 deletions test/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ export function mockMethod (
fn?: (original: (...args: any[]) => void, ...args: any[]) => boolean | void
) {
if (typeof errorToMock === 'undefined') {
errorToMock = new MultipleErrors(mockedErrorMessage, [new Error(mockedErrorMessage + ' (internal)')])
errorToMock = new MultipleErrors(mockedErrorMessage, [new Error(mockedErrorMessage + ' (internal)')], {
canRetry: false
})
}

const original = target[method].bind(target)
Expand Down Expand Up @@ -259,7 +261,9 @@ export function mockAPI (
fn?: (original: (...args: any[]) => void, ...args: any[]) => boolean | void
) {
if (typeof errorToMock === 'undefined') {
errorToMock = new MultipleErrors(mockedErrorMessage, [new Error(mockedErrorMessage + ' (internal)')])
errorToMock = new MultipleErrors(mockedErrorMessage, [new Error(mockedErrorMessage + ' (internal)')], {
canRetry: false
})
}

const originalGet = pool.get.bind(pool)
Expand Down
Loading