diff --git a/src/clients/admin/admin.ts b/src/clients/admin/admin.ts index 1bddabd..8ae3842 100644 --- a/src/clients/admin/admin.ts +++ b/src/clients/admin/admin.ts @@ -53,6 +53,8 @@ import { kGetApi, kGetBootstrapConnection, kGetConnection, + kGetControllerConnection, + kHandleNotControllerError, kMetadata, kOptions, kPerformDeduplicated, @@ -470,7 +472,7 @@ export class Admin extends Base { this[kPerformWithRetry]( 'createTopics', retryCallback => { - this[kGetBootstrapConnection]((error, connection) => { + this[kGetControllerConnection]((error, connection) => { if (error) { retryCallback(error, undefined as unknown as CreateTopicsResponse) return @@ -482,13 +484,9 @@ export class Admin extends Base { return } - api( - connection, - requests, - this[kOptions].timeout!, - false, - retryCallback as unknown as Callback - ) + api(connection, requests, this[kOptions].timeout!, false, (error, response) => { + this[kHandleNotControllerError](error, response, retryCallback) + }) }) }) }, @@ -532,7 +530,7 @@ export class Admin extends Base { this[kPerformWithRetry]( 'deleteTopics', retryCallback => { - this[kGetBootstrapConnection]((error, connection) => { + this[kGetControllerConnection]((error, connection) => { if (error) { retryCallback(error, undefined) return @@ -549,12 +547,9 @@ export class Admin extends Base { return } - api( - connection, - requests, - this[kOptions].timeout!, - retryCallback as unknown as Callback - ) + api(connection, requests, this[kOptions].timeout!, (error, response) => { + this[kHandleNotControllerError](error, response, retryCallback) + }) }) }) }, diff --git a/src/clients/base/base.ts b/src/clients/base/base.ts index ba2e4ac..a46da8e 100644 --- a/src/clients/base/base.ts +++ b/src/clients/base/base.ts @@ -61,6 +61,9 @@ export const kFormatValidationErrors = Symbol('plt.kafka.base.formatValidationEr export const kPrometheus = Symbol('plt.kafka.base.prometheus') export const kClientType = Symbol('plt.kafka.base.clientType') export const kAfterCreate = Symbol('plt.kafka.base.afterCreate') +export const kController = Symbol('plt.kafka.base.controller') +export const kGetControllerConnection = Symbol('plt.kafka.base.getControllerConnection') +export const kHandleNotControllerError = Symbol('plt.kafka.base.handleNotControllerError') let currentInstance = 0 @@ -76,7 +79,8 @@ export class Base extends EventEm [kOptions]: OptionsType; [kConnections]: ConnectionPool; [kClosed]: boolean; - [kPrometheus]: Metrics | undefined + [kPrometheus]: Metrics | undefined; + [kController]: Broker | undefined #metadata: ClusterMetadata | undefined #inflightDeduplications: Map[]> @@ -478,6 +482,28 @@ export class Base extends EventEm this[kConnections].getFirstAvailable(this[kBootstrapBrokers], callback) } + [kGetControllerConnection] (callback: Callback): void { + if (this[kController]) { + this[kConnections].get(this[kController], callback) + } else { + this[kGetBootstrapConnection](callback) + } + } + + [kHandleNotControllerError] (error: Error | null, value: T, callback: Callback): void { + if (error && (error as MultipleErrors)?.findBy?.('apiCode', 41)) { + this.metadata({ topics: [] }, (metadataError, metadata) => { + if (metadataError) { + callback(metadataError, undefined as unknown as T) + } + this[kController] = metadata.brokers.get(metadata.controllerId) + callback(error, undefined as unknown as T) + }) + } else { + callback(error, value) + } + } + [kValidateOptions] ( target: unknown, validator: ValidateFunction, @@ -590,7 +616,8 @@ export class Base extends EventEm id: metadata.clusterId!, brokers: new Map(), topics: new Map(), - lastUpdate + lastUpdate, + controllerId: metadata.controllerId } } else { this.#metadata.lastUpdate = lastUpdate diff --git a/src/clients/base/types.ts b/src/clients/base/types.ts index a02bd0f..3fad394 100644 --- a/src/clients/base/types.ts +++ b/src/clients/base/types.ts @@ -23,6 +23,7 @@ export interface ClusterTopicMetadata { export interface ClusterMetadata { id: string brokers: Map + controllerId: number topics: Map lastUpdate: number } diff --git a/test/clients/admin/admin.test.ts b/test/clients/admin/admin.test.ts index 6528779..d16c2a4 100644 --- a/test/clients/admin/admin.test.ts +++ b/test/clients/admin/admin.test.ts @@ -11,10 +11,14 @@ import { adminLogDirsChannel, adminTopicsChannel, alterClientQuotasV1, + type Broker, type BrokerLogDirDescription, + type Callback, + type CallbackWithPromise, type ClientDiagnosticEvent, ClientQuotaMatchTypes, type ClusterPartitionMetadata, + type Connection, Consumer, type CreatedTopic, type DescribeClientQuotasOptions, @@ -26,8 +30,11 @@ import { instancesChannel, listGroupsV5, MultipleErrors, + ResponseError, + type ResponseParser, sleep, - UnsupportedApiError + UnsupportedApiError, + type Writer } from '../../../src/index.ts' import { createAdmin, @@ -45,6 +52,17 @@ import { retry } from '../../helpers.ts' +process + .on('unhandledRejection', (reason, p) => { + console.error('Unhandled Rejection at:', p) + console.error(reason) + }) + .on('uncaughtException', err => { + console.error('Uncaught Exception thrown') + console.error(err) + process.exit(1) + }) + test('constructor should initialize properly', t => { const created = createCreationChannelVerifier(instancesChannel) const admin = createAdmin(t) @@ -458,6 +476,70 @@ test('createTopics using assignments', async t => { await admin.deleteTopics({ topics: [topicName] }) }) +test('createTopics should retarget controller when needed', async t => { + const admin = createAdmin(t) + + // Generate a unique topic name for testing + const topicName = `test-topic-partitions-${randomUUID()}` + let correctControllerId: number | null = null + + const pool = admin[kConnections] + const originalGet = pool.get.bind(pool) + // @ts-ignore + pool.get = function (broker: Broker, callback: CallbackWithPromise) { + originalGet(broker, (error: Error | null, connection: Connection) => { + // Define the next broker in sequence as the correct controller + if (correctControllerId === null) { + correctControllerId = (connection.port! - 9010 + 1) % 3 + mockMetadata(admin, 1, null, { + brokers: new Map([ + [1, { host: 'localhost', port: 9011 }], + [2, { host: 'localhost', port: 9012 }], + [3, { host: 'localhost', port: 9013 }] + ]), + controllerId: correctControllerId + }) + } + + const originalSend = connection.send.bind(connection) + + connection.send = function ( + apiKey: number, + apiVersion: number, + payload: () => Writer, + responseParser: ResponseParser, + hasRequestHeaderTaggedFields: boolean, + hasResponseHeaderTaggedFields: boolean, + callback: Callback + ) { + if (apiKey === 19) { + if (connection.port! - 9010 !== correctControllerId) { + callback(new ResponseError(19, 7, { '/': 41 }, {}), undefined as unknown as ReturnType) + return + } + } + + originalSend( + apiKey, + apiVersion, + payload, + responseParser, + hasRequestHeaderTaggedFields, + hasResponseHeaderTaggedFields, + callback + ) + } + + callback(error, connection) + }) + } + + await admin.createTopics({ topics: [topicName] }) + + // Clean up by deleting the topic + await admin.deleteTopics({ topics: [topicName] }) +}) + test('createTopics should validate options in strict mode', async t => { const admin = createAdmin(t, { strict: true }) @@ -617,6 +699,70 @@ test('deleteTopics should delete a topic and support diagnostic channels', async await admin.deleteTopics({ topics: [topicName] }) }) +test('deleteTopics should retarget controller when needed', async t => { + const admin = createAdmin(t) + + // Generate a unique topic name for testing + const topicName = `test-topic-partitions-${randomUUID()}` + await admin.createTopics({ topics: [topicName] }) + + let correctControllerId: number | null = null + + const pool = admin[kConnections] + const originalGet = pool.get.bind(pool) + // @ts-ignore + pool.get = function (broker: Broker, callback: CallbackWithPromise) { + originalGet(broker, (error: Error | null, connection: Connection) => { + // Define the next broker in sequence as the correct controller + if (correctControllerId === null) { + correctControllerId = (connection.port! - 9010 + 1) % 3 + mockMetadata(admin, 1, null, { + brokers: new Map([ + [1, { host: 'localhost', port: 9011 }], + [2, { host: 'localhost', port: 9012 }], + [3, { host: 'localhost', port: 9013 }] + ]), + controllerId: correctControllerId + }) + } + + const originalSend = connection.send.bind(connection) + + connection.send = function ( + apiKey: number, + apiVersion: number, + payload: () => Writer, + responseParser: ResponseParser, + hasRequestHeaderTaggedFields: boolean, + hasResponseHeaderTaggedFields: boolean, + callback: Callback + ) { + if (apiKey === 20) { + if (connection.port! - 9010 !== correctControllerId) { + callback(new ResponseError(20, 6, { '/': 41 }, {}), undefined as unknown as ReturnType) + return + } + } + + originalSend( + apiKey, + apiVersion, + payload, + responseParser, + hasRequestHeaderTaggedFields, + hasResponseHeaderTaggedFields, + callback + ) + } + + callback(error, connection) + }) + } + + // Clean up by deleting the topic + await admin.deleteTopics({ topics: [topicName] }) +}) + test('deleteTopics should validate options in strict mode', async t => { const admin = createAdmin(t, { strict: true })