Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 10 additions & 15 deletions src/clients/admin/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ import {
kGetApi,
kGetBootstrapConnection,
kGetConnection,
kGetControllerConnection,
kHandleNotControllerError,
kMetadata,
kOptions,
kPerformDeduplicated,
Expand Down Expand Up @@ -470,7 +472,7 @@ export class Admin extends Base<AdminOptions> {
this[kPerformWithRetry](
'createTopics',
retryCallback => {
this[kGetBootstrapConnection]((error, connection) => {
this[kGetControllerConnection]((error, connection) => {
if (error) {
retryCallback(error, undefined as unknown as CreateTopicsResponse)
return
Expand All @@ -482,13 +484,9 @@ export class Admin extends Base<AdminOptions> {
return
}

api(
connection,
requests,
this[kOptions].timeout!,
false,
retryCallback as unknown as Callback<CreateTopicsResponse>
)
api(connection, requests, this[kOptions].timeout!, false, (error, response) => {
this[kHandleNotControllerError](error, response, retryCallback)
})
})
})
},
Expand Down Expand Up @@ -532,7 +530,7 @@ export class Admin extends Base<AdminOptions> {
this[kPerformWithRetry](
'deleteTopics',
retryCallback => {
this[kGetBootstrapConnection]((error, connection) => {
this[kGetControllerConnection]((error, connection) => {
if (error) {
retryCallback(error, undefined)
return
Expand All @@ -549,12 +547,9 @@ export class Admin extends Base<AdminOptions> {
return
}

api(
connection,
requests,
this[kOptions].timeout!,
retryCallback as unknown as Callback<DeleteTopicsResponse>
)
api(connection, requests, this[kOptions].timeout!, (error, response) => {
this[kHandleNotControllerError](error, response, retryCallback)
})
})
})
},
Expand Down
31 changes: 29 additions & 2 deletions src/clients/base/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ export const kFormatValidationErrors = Symbol('plt.kafka.base.formatValidationEr
export const kPrometheus = Symbol('plt.kafka.base.prometheus')
export const kClientType = Symbol('plt.kafka.base.clientType')
export const kAfterCreate = Symbol('plt.kafka.base.afterCreate')
export const kController = Symbol('plt.kafka.base.controller')
export const kGetControllerConnection = Symbol('plt.kafka.base.getControllerConnection')
export const kHandleNotControllerError = Symbol('plt.kafka.base.handleNotControllerError')

let currentInstance = 0

Expand All @@ -76,7 +79,8 @@ export class Base<OptionsType extends BaseOptions = BaseOptions> extends EventEm
[kOptions]: OptionsType;
[kConnections]: ConnectionPool;
[kClosed]: boolean;
[kPrometheus]: Metrics | undefined
[kPrometheus]: Metrics | undefined;
[kController]: Broker | undefined

#metadata: ClusterMetadata | undefined
#inflightDeduplications: Map<string, CallbackWithPromise<any>[]>
Expand Down Expand Up @@ -478,6 +482,28 @@ export class Base<OptionsType extends BaseOptions = BaseOptions> extends EventEm
this[kConnections].getFirstAvailable(this[kBootstrapBrokers], callback)
}

[kGetControllerConnection] (callback: Callback<Connection>): void {
if (this[kController]) {
this[kConnections].get(this[kController], callback)
} else {
this[kGetBootstrapConnection](callback)
}
}

[kHandleNotControllerError]<T> (error: Error | null, value: T, callback: Callback<T>): void {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this only happens in Admin, move it as private method of Admin.

if (error && (error as MultipleErrors)?.findBy?.('apiCode', 41)) {
this.metadata({ topics: [] }, (metadataError, metadata) => {
if (metadataError) {
callback(metadataError, undefined as unknown as T)
}
this[kController] = metadata.brokers.get(metadata.controllerId)
callback(error, undefined as unknown as T)
})
} else {
callback(error, value)
}
}

[kValidateOptions] (
target: unknown,
validator: ValidateFunction<unknown>,
Expand Down Expand Up @@ -590,7 +616,8 @@ export class Base<OptionsType extends BaseOptions = BaseOptions> extends EventEm
id: metadata.clusterId!,
brokers: new Map(),
topics: new Map(),
lastUpdate
lastUpdate,
controllerId: metadata.controllerId
}
} else {
this.#metadata.lastUpdate = lastUpdate
Expand Down
1 change: 1 addition & 0 deletions src/clients/base/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export interface ClusterTopicMetadata {
export interface ClusterMetadata {
id: string
brokers: Map<number, Broker>
controllerId: number
topics: Map<string, ClusterTopicMetadata>
lastUpdate: number
}
Expand Down
148 changes: 147 additions & 1 deletion test/clients/admin/admin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ import {
adminLogDirsChannel,
adminTopicsChannel,
alterClientQuotasV1,
type Broker,
type BrokerLogDirDescription,
type Callback,
type CallbackWithPromise,
type ClientDiagnosticEvent,
ClientQuotaMatchTypes,
type ClusterPartitionMetadata,
type Connection,
Consumer,
type CreatedTopic,
type DescribeClientQuotasOptions,
Expand All @@ -26,8 +30,11 @@ import {
instancesChannel,
listGroupsV5,
MultipleErrors,
ResponseError,
type ResponseParser,
sleep,
UnsupportedApiError
UnsupportedApiError,
type Writer
} from '../../../src/index.ts'
import {
createAdmin,
Expand All @@ -45,6 +52,17 @@ import {
retry
} from '../../helpers.ts'

process
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leftover.

.on('unhandledRejection', (reason, p) => {
console.error('Unhandled Rejection at:', p)
console.error(reason)
})
.on('uncaughtException', err => {
console.error('Uncaught Exception thrown')
console.error(err)
process.exit(1)
})

test('constructor should initialize properly', t => {
const created = createCreationChannelVerifier(instancesChannel)
const admin = createAdmin(t)
Expand Down Expand Up @@ -458,6 +476,70 @@ test('createTopics using assignments', async t => {
await admin.deleteTopics({ topics: [topicName] })
})

test('createTopics should retarget controller when needed', async t => {
const admin = createAdmin(t)

// Generate a unique topic name for testing
const topicName = `test-topic-partitions-${randomUUID()}`
let correctControllerId: number | null = null

const pool = admin[kConnections]
const originalGet = pool.get.bind(pool)
// @ts-ignore
pool.get = function (broker: Broker, callback: CallbackWithPromise<Connection>) {
originalGet(broker, (error: Error | null, connection: Connection) => {
// Define the next broker in sequence as the correct controller
if (correctControllerId === null) {
correctControllerId = (connection.port! - 9010 + 1) % 3
mockMetadata(admin, 1, null, {
brokers: new Map([
[1, { host: 'localhost', port: 9011 }],
[2, { host: 'localhost', port: 9012 }],
[3, { host: 'localhost', port: 9013 }]
]),
controllerId: correctControllerId
})
}

const originalSend = connection.send.bind(connection)

connection.send = function <ReturnType>(
apiKey: number,
apiVersion: number,
payload: () => Writer,
responseParser: ResponseParser<ReturnType>,
hasRequestHeaderTaggedFields: boolean,
hasResponseHeaderTaggedFields: boolean,
callback: Callback<ReturnType>
) {
if (apiKey === 19) {
if (connection.port! - 9010 !== correctControllerId) {
callback(new ResponseError(19, 7, { '/': 41 }, {}), undefined as unknown as ReturnType)
return
}
}

originalSend(
apiKey,
apiVersion,
payload,
responseParser,
hasRequestHeaderTaggedFields,
hasResponseHeaderTaggedFields,
callback
)
}

callback(error, connection)
})
}

await admin.createTopics({ topics: [topicName] })

// Clean up by deleting the topic
await admin.deleteTopics({ topics: [topicName] })
})

test('createTopics should validate options in strict mode', async t => {
const admin = createAdmin(t, { strict: true })

Expand Down Expand Up @@ -617,6 +699,70 @@ test('deleteTopics should delete a topic and support diagnostic channels', async
await admin.deleteTopics({ topics: [topicName] })
})

test('deleteTopics should retarget controller when needed', async t => {
const admin = createAdmin(t)

// Generate a unique topic name for testing
const topicName = `test-topic-partitions-${randomUUID()}`
await admin.createTopics({ topics: [topicName] })

let correctControllerId: number | null = null

const pool = admin[kConnections]
const originalGet = pool.get.bind(pool)
// @ts-ignore
pool.get = function (broker: Broker, callback: CallbackWithPromise<Connection>) {
originalGet(broker, (error: Error | null, connection: Connection) => {
// Define the next broker in sequence as the correct controller
if (correctControllerId === null) {
correctControllerId = (connection.port! - 9010 + 1) % 3
mockMetadata(admin, 1, null, {
brokers: new Map([
[1, { host: 'localhost', port: 9011 }],
[2, { host: 'localhost', port: 9012 }],
[3, { host: 'localhost', port: 9013 }]
]),
controllerId: correctControllerId
})
}

const originalSend = connection.send.bind(connection)

connection.send = function <ReturnType>(
apiKey: number,
apiVersion: number,
payload: () => Writer,
responseParser: ResponseParser<ReturnType>,
hasRequestHeaderTaggedFields: boolean,
hasResponseHeaderTaggedFields: boolean,
callback: Callback<ReturnType>
) {
if (apiKey === 20) {
if (connection.port! - 9010 !== correctControllerId) {
callback(new ResponseError(20, 6, { '/': 41 }, {}), undefined as unknown as ReturnType)
return
}
}

originalSend(
apiKey,
apiVersion,
payload,
responseParser,
hasRequestHeaderTaggedFields,
hasResponseHeaderTaggedFields,
callback
)
}

callback(error, connection)
})
}

// Clean up by deleting the topic
await admin.deleteTopics({ topics: [topicName] })
})

test('deleteTopics should validate options in strict mode', async t => {
const admin = createAdmin(t, { strict: true })

Expand Down