diff --git a/docs/admin.md b/docs/admin.md index aa6e325..c198c72 100644 --- a/docs/admin.md +++ b/docs/admin.md @@ -90,6 +90,19 @@ Options: | -------- | ---------- | ----------------- | | groups | `string[]` | Groups to delete. | +### `removeMembersFromConsumerGroup(options[, callback])` + +Removes members from a consumer group. + +The return value is `void`. + +Options: + +| Property | Type | Description | +| -------- | ------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| groupId | `string` | The consumer group ID. | +| members | `(string \| MemberRemoval)[] \| null` | Array of member IDs to remove (as strings), or an array of `MemberRemoval` objects with `memberId` and optional `reason`, or `null` to remove all members from the group. | + ### `describeClientQuotas(options[, callback])` Gets detailed information about client quotas. diff --git a/docs/diagnostic.md b/docs/diagnostic.md index 3f16782..c5023a9 100644 --- a/docs/diagnostic.md +++ b/docs/diagnostic.md @@ -62,23 +62,23 @@ Each tracing channel publishes events with the following common properties: ## Published tracing channels -| Name | Target | Description | -| ----------------------------------- | ---------------- | ------------------------------------------------------------------------------------------------- | -| `plt:kafka:connections:connects` | `Connection` | Traces a connection attempt to a broker. | -| `plt:kafka:connections:api` | `Connection` | Traces a low level API request. | -| `plt:kafka:connections:pools:gets` | `ConnectionPool` | Traces a connection retrieval attempt from a connection pool. | -| `plt:kafka:base:apis` | `Base` | Traces a `Base.listApis` request. | -| `plt:kafka:base:metadata` | `Base` | Traces a `Base.metadata` request. | -| `plt:kafka:admin:topics` | `Admin` | Traces a `Admin.createTopics` or `Admin.deleteTopics` request. | -| `plt:kafka:admin:groups` | `Admin` | Traces a `Admin.listGroups`, `Admin.describeGroups` or `Admin.deleteGroups` request. | -| `plt:kafka:admin:clientQuotas` | `Admin` | Traces a `Admin.describeClientQuotas` or `Admin.alterClientQuotas` request. | -| `plt:kafka:admin:logDirs` | `Admin` | Traces a `Admin.describeLogDirs` request. | -| `plt:kafka:producer:initIdempotent` | `Producer` | Traces a `Producer.initIdempotentProducer` request. | -| `plt:kafka:producer:sends` | `Producer` | Traces a `Producer.send` request. | -| `plt:kafka:consumer:group` | `Consumer` | Traces a `Consumer.findGroupCoordinator`, `Consumer.joinGroup` or `Consumer.leaveGroup` requests. | -| `plt:kafka:consumer:heartbeat` | `Consumer` | Traces the `Consumer` heartbeat requests. | -| `plt:kafka:consumer:receives` | `Consumer` | Traces processing of every message. | -| `plt:kafka:consumer:fetches` | `Consumer` | Traces a `Consumer.fetch` request. | -| `plt:kafka:consumer:consumes` | `Consumer` | Traces a `Consumer.consume` request. | -| `plt:kafka:consumer:commits` | `Consumer` | Traces a `Consumer.commit` request. | -| `plt:kafka:consumer:offsets` | `Consumer` | Traces a `Consumer.listOffsets` or `Consumer.listCommittedOffsets` request. | +| Name | Target | Description | +| ----------------------------------- | ---------------- | ---------------------------------------------------------------------------------------------------------------------------- | +| `plt:kafka:connections:connects` | `Connection` | Traces a connection attempt to a broker. | +| `plt:kafka:connections:api` | `Connection` | Traces a low level API request. | +| `plt:kafka:connections:pools:gets` | `ConnectionPool` | Traces a connection retrieval attempt from a connection pool. | +| `plt:kafka:base:apis` | `Base` | Traces a `Base.listApis` request. | +| `plt:kafka:base:metadata` | `Base` | Traces a `Base.metadata` request. | +| `plt:kafka:admin:topics` | `Admin` | Traces a `Admin.createTopics` or `Admin.deleteTopics` request. | +| `plt:kafka:admin:groups` | `Admin` | Traces a `Admin.listGroups`, `Admin.describeGroups`, `Admin.deleteGroups` or `Admin.removeMembersFromConsumerGroup` request. | +| `plt:kafka:admin:clientQuotas` | `Admin` | Traces a `Admin.describeClientQuotas` or `Admin.alterClientQuotas` request. | +| `plt:kafka:admin:logDirs` | `Admin` | Traces a `Admin.describeLogDirs` request. | +| `plt:kafka:producer:initIdempotent` | `Producer` | Traces a `Producer.initIdempotentProducer` request. | +| `plt:kafka:producer:sends` | `Producer` | Traces a `Producer.send` request. | +| `plt:kafka:consumer:group` | `Consumer` | Traces a `Consumer.findGroupCoordinator`, `Consumer.joinGroup` or `Consumer.leaveGroup` requests. | +| `plt:kafka:consumer:heartbeat` | `Consumer` | Traces the `Consumer` heartbeat requests. | +| `plt:kafka:consumer:receives` | `Consumer` | Traces processing of every message. | +| `plt:kafka:consumer:fetches` | `Consumer` | Traces a `Consumer.fetch` request. | +| `plt:kafka:consumer:consumes` | `Consumer` | Traces a `Consumer.consume` request. | +| `plt:kafka:consumer:commits` | `Consumer` | Traces a `Consumer.commit` request. | +| `plt:kafka:consumer:offsets` | `Consumer` | Traces a `Consumer.listOffsets` or `Consumer.listCommittedOffsets` request. | diff --git a/src/clients/admin/admin.ts b/src/clients/admin/admin.ts index 1bddabd..1b41132 100644 --- a/src/clients/admin/admin.ts +++ b/src/clients/admin/admin.ts @@ -23,6 +23,11 @@ import { import { type DescribeGroupsRequest, type DescribeGroupsResponse } from '../../apis/admin/describe-groups-v5.ts' import { type DescribeLogDirsRequest, type DescribeLogDirsResponse } from '../../apis/admin/describe-log-dirs-v4.ts' import { type ListGroupsRequest as ListGroupsRequestV4 } from '../../apis/admin/list-groups-v4.ts' +import { + type LeaveGroupRequest, + type LeaveGroupRequestMember, + type LeaveGroupResponse +} from '../../apis/consumer/leave-group-v5.ts' import { type ListGroupsRequest as ListGroupsRequestV5, type ListGroupsResponse @@ -70,7 +75,8 @@ import { describeGroupsOptionsValidator, describeLogDirsOptionsValidator, listGroupsOptionsValidator, - listTopicsOptionsValidator + listTopicsOptionsValidator, + removeMembersFromConsumerGroupOptionsValidator } from './options.ts' import { type AdminOptions, @@ -87,7 +93,8 @@ import { type GroupBase, type GroupMember, type ListGroupsOptions, - type ListTopicsOptions + type ListTopicsOptions, + type RemoveMembersFromConsumerGroupOptions } from './types.ts' export class Admin extends Base { @@ -289,6 +296,46 @@ export class Admin extends Base { return callback[kCallbackPromise] } + removeMembersFromConsumerGroup ( + options: RemoveMembersFromConsumerGroupOptions, + callback: CallbackWithPromise + ): void + removeMembersFromConsumerGroup (options: RemoveMembersFromConsumerGroupOptions): Promise + removeMembersFromConsumerGroup ( + options: RemoveMembersFromConsumerGroupOptions, + callback?: CallbackWithPromise + ): void | Promise { + if (!callback) { + callback = createPromisifiedCallback() + } + + if (this[kCheckNotClosed](callback)) { + return callback[kCallbackPromise] + } + + const validationError = this[kValidateOptions]( + options, + removeMembersFromConsumerGroupOptionsValidator, + '/options', + false + ) + if (validationError) { + callback(validationError) + return callback[kCallbackPromise] + } + + adminGroupsChannel.traceCallback( + this.#removeMembersFromConsumerGroup, + 1, + createDiagnosticContext({ client: this, operation: 'removeMembersFromConsumerGroup', options }), + this, + options, + callback + ) + + return callback[kCallbackPromise] + } + describeClientQuotas ( options: DescribeClientQuotasOptions, callback: CallbackWithPromise @@ -811,6 +858,105 @@ export class Admin extends Base { }) } + #removeMembersFromConsumerGroup ( + options: RemoveMembersFromConsumerGroupOptions, + callback: CallbackWithPromise + ): void { + if (!options.members || options.members.length === 0) { + this.#describeGroups({ groups: [options.groupId] }, (error, groupsMap) => { + if (error) { + callback(new MultipleErrors('Removing members from consumer group failed.', [error])) + return + } + + const group = groupsMap.get(options.groupId) + if (!group) { + callback(new MultipleErrors('Removing members from consumer group failed.', [])) + return + } + + const allMemberIds = Array.from(group.members.keys()) + if (allMemberIds.length === 0) { + callback(null) + return + } + + this.#removeMembersFromConsumerGroup({ ...options, members: allMemberIds }, callback) + }) + return + } + + this[kMetadata]({ topics: [] }, (error, metadata) => { + if (error) { + callback(error, undefined) + return + } + + this.#findGroupCoordinator([options.groupId], (error, response) => { + if (error) { + callback(new MultipleErrors('Removing members from consumer group failed.', [error])) + return + } + + const coordinator = response.coordinators.find(c => c.key === options.groupId) + if (!coordinator) { + callback( + new MultipleErrors('Removing members from consumer group failed.', [ + new Error(`No coordinator found for group ${options.groupId}`) + ]) + ) + return + } + + const broker = metadata.brokers.get(coordinator.nodeId) + if (!broker) { + callback( + new MultipleErrors('Removing members from consumer group failed.', [ + new Error(`Broker ${coordinator.nodeId} not found`) + ]) + ) + return + } + + this[kGetConnection](broker, (error, connection) => { + if (error) { + callback(new MultipleErrors('Removing members from consumer group failed.', [error])) + return + } + + this[kPerformWithRetry]( + 'removeMembersFromConsumerGroup', + retryCallback => { + this[kGetApi]('LeaveGroup', (error, api) => { + if (error) { + retryCallback(error, undefined as unknown as LeaveGroupResponse) + return + } + + const members: LeaveGroupRequestMember[] = options.members!.map(member => ({ + memberId: typeof member === 'string' ? member : member.memberId, + groupInstanceId: null, + reason: typeof member === 'string' ? 'Not specified' : member.reason + })) + + api(connection, options.groupId, members, retryCallback as unknown as Callback) + }) + }, + (error: Error | null) => { + if (error) { + callback(new MultipleErrors('Removing members from consumer group failed.', [error])) + return + } + + callback(null) + }, + 0 + ) + }) + }) + }) + } + #findGroupCoordinator (groups: string[], callback: CallbackWithPromise): void { this[kPerformWithRetry]( 'findGroupCoordinator', diff --git a/src/clients/admin/options.ts b/src/clients/admin/options.ts index 38758a8..598afb7 100644 --- a/src/clients/admin/options.ts +++ b/src/clients/admin/options.ts @@ -91,6 +91,33 @@ export const deleteGroupsOptionsSchema = { additionalProperties: false } +export const removeMembersFromConsumerGroupOptionsSchema = { + type: 'object', + properties: { + groupId: idProperty, + members: { + type: ['array', 'null'], + items: { + oneOf: [ + idProperty, + { + type: 'object', + properties: { + memberId: idProperty, + reason: { type: 'string', minLength: 1 } + }, + required: ['memberId', 'reason'], + additionalProperties: false + } + ] + }, + default: null + } + }, + required: ['groupId'], + additionalProperties: false +} + export const describeClientQuotasOptionsSchema = { type: 'object', properties: { @@ -205,6 +232,7 @@ export const deleteTopicsOptionsValidator = ajv.compile(deleteTopicOptionsSchema export const listGroupsOptionsValidator = ajv.compile(listGroupsOptionsSchema) export const describeGroupsOptionsValidator = ajv.compile(describeGroupsOptionsSchema) export const deleteGroupsOptionsValidator = ajv.compile(deleteGroupsOptionsSchema) +export const removeMembersFromConsumerGroupOptionsValidator = ajv.compile(removeMembersFromConsumerGroupOptionsSchema) export const describeClientQuotasOptionsValidator = ajv.compile(describeClientQuotasOptionsSchema) export const alterClientQuotasOptionsValidator = ajv.compile(alterClientQuotasOptionsSchema) export const describeLogDirsOptionsValidator = ajv.compile(describeLogDirsOptionsSchema) diff --git a/src/clients/admin/types.ts b/src/clients/admin/types.ts index ac86e19..e9f765b 100644 --- a/src/clients/admin/types.ts +++ b/src/clients/admin/types.ts @@ -7,7 +7,7 @@ import { type DescribeLogDirsResponseResult } from '../../apis/admin/describe-log-dirs-v4.ts' import { type ConsumerGroupState } from '../../apis/enumerations.ts' -import { type NullableString } from '../../protocol/definitions.ts' +import { type Nullable, type NullableString } from '../../protocol/definitions.ts' import { type BaseOptions } from '../base/types.ts' import { type ExtendedGroupProtocolSubscription, type GroupAssignment } from '../consumer/types.ts' @@ -79,6 +79,16 @@ export interface DeleteGroupsOptions { groups: string[] } +interface MemberRemoval { + memberId: string + reason: string +} + +export interface RemoveMembersFromConsumerGroupOptions { + groupId: string + members?: Nullable<(string | MemberRemoval)[]> +} + export interface DescribeClientQuotasOptions { components: DescribeClientQuotasRequestComponent[] strict?: boolean diff --git a/src/protocol/definitions.ts b/src/protocol/definitions.ts index 13d5995..37dc33a 100644 --- a/src/protocol/definitions.ts +++ b/src/protocol/definitions.ts @@ -17,4 +17,5 @@ export const EMPTY_TAGGED_FIELDS_BUFFER = Buffer.from([0]) export type Collection = string | Buffer | DynamicBuffer | Array | Map | Set -export type NullableString = string | undefined | null +export type Nullable = T | null | undefined +export type NullableString = Nullable diff --git a/test/clients/admin/admin.test.ts b/test/clients/admin/admin.test.ts index 6528779..fd69896 100644 --- a/test/clients/admin/admin.test.ts +++ b/test/clients/admin/admin.test.ts @@ -31,6 +31,7 @@ import { } from '../../../src/index.ts' import { createAdmin, + createConsumer, createCreationChannelVerifier, createTopic, createTracingChannelVerifier, @@ -175,6 +176,17 @@ test('all operations should fail when admin is closed', async t => { } catch (error) { strictEqual(error.message, 'Client is closed.') } + + // Attempt to call removeMembersFromConsumerGroup on closed admin + try { + await admin.removeMembersFromConsumerGroup({ + groupId: 'test-group', + members: [{ memberId: 'test-member', reason: 'test-reason' }] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message, 'Client is closed.') + } }) test('listTopics should list topics and support diagnostic channels', async t => { @@ -1376,6 +1388,329 @@ test('deleteGroups should handle unavailable API errors (FindCoordinator)', asyn } }) +test('removeMembersFromConsumerGroup should remove specific members', async t => { + const admin = createAdmin(t) + const groupId = `test-group-${randomUUID()}` + const clientId = `test-client-${randomUUID()}` + const consumer = createConsumer(t, { clientId, groupId }) + await consumer.joinGroup({}) + + ok(consumer.memberId) + + const groups1 = await admin.describeGroups({ groups: [groupId] }) + strictEqual(groups1.get(groupId)!.members.has(consumer.memberId), true) + + await admin.removeMembersFromConsumerGroup({ groupId, members: [consumer.memberId] }) + const groups2 = await admin.describeGroups({ groups: [groupId] }) + strictEqual(groups2.get(groupId)!.members.has(consumer.memberId), false) +}) + +test('removeMembersFromConsumerGroup should remove specific members with custom reason', async t => { + const admin = createAdmin(t) + const groupId = `test-group-${randomUUID()}` + const clientId = `test-client-${randomUUID()}` + const consumer = createConsumer(t, { clientId, groupId }) + await consumer.joinGroup({}) + + ok(consumer.memberId) + + const groups1 = await admin.describeGroups({ groups: [groupId] }) + strictEqual(groups1.get(groupId)!.members.has(consumer.memberId), true) + + await admin.removeMembersFromConsumerGroup({ + groupId, + members: [{ memberId: consumer.memberId, reason: 'Custom reason' }] + }) + + const groups2 = await admin.describeGroups({ groups: [groupId] }) + strictEqual(groups2.get(groupId)!.members.has(consumer.memberId), false) +}) + +test('removeMembersFromConsumerGroup should remove all members', async t => { + const admin = createAdmin(t) + const groupId = `test-group-${randomUUID()}` + const clientId = `test-client-${randomUUID()}` + const consumer = createConsumer(t, { clientId, groupId }) + await consumer.joinGroup({}) + + ok(consumer.memberId) + + const groups1 = await admin.describeGroups({ groups: [groupId] }) + strictEqual(groups1.get(groupId)!.members.has(consumer.memberId), true) + + await admin.removeMembersFromConsumerGroup({ groupId }) + + const groups2 = await admin.describeGroups({ groups: [groupId] }) + strictEqual(groups2.get(groupId)!.members.size, 0) +}) + +test('removeMembersFromConsumerGroup should support diagnostic channels', async t => { + const admin = createAdmin(t) + const groupId = `test-group-${randomUUID()}` + const clientId = `test-client-${randomUUID()}` + const consumer = createConsumer(t, { clientId, groupId }) + await consumer.joinGroup({}) + + ok(consumer.memberId) + + const verifyTracingChannel = createTracingChannelVerifier( + adminGroupsChannel, + 'client', + { + start (context: ClientDiagnosticEvent) { + deepStrictEqual(context, { + client: admin, + operation: 'removeMembersFromConsumerGroup', + options: { groupId, members: [consumer.memberId] }, + operationId: mockedOperationId + }) + }, + error (context: ClientDiagnosticEvent) { + ok(typeof context === 'undefined') + } + }, + (_label: string, data: ClientDiagnosticEvent) => data.operation === 'removeMembersFromConsumerGroup' + ) + + // Remove one specific member + await admin.removeMembersFromConsumerGroup({ groupId, members: [consumer.memberId] }) + verifyTracingChannel() +}) + +test('removeMembersFromConsumerGroup should validate options in strict mode', async t => { + const admin = createAdmin(t, { strict: true }) + + // Test with missing required field (groupId) + try { + // @ts-expect-error - Intentionally passing invalid options + await admin.removeMembersFromConsumerGroup({}) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('groupId'), true) + } + + // Test with invalid additional property + try { + // @ts-expect-error - Intentionally passing invalid options + await admin.removeMembersFromConsumerGroup({ groupId: 'test-group', members: null, invalidProperty: true }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('must NOT have additional properties'), true) + } + + // Test with invalid type for groupId + try { + // @ts-expect-error - Intentionally passing invalid options + await admin.removeMembersFromConsumerGroup({ groupId: 0 }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('groupId'), true) + } + + // Test with invalid empty groupId + try { + await admin.removeMembersFromConsumerGroup({ groupId: '' }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('groupId'), true) + } + + // Test with invalid type for members + try { + // @ts-expect-error - Intentionally passing invalid options + await admin.removeMembersFromConsumerGroup({ groupId: 'test-group', members: 123 }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('members'), true) + } + + // Test with invalid member type + try { + // @ts-expect-error - Intentionally passing invalid options + await admin.removeMembersFromConsumerGroup({ groupId: 'test-group', members: [true] }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('members'), true) + } + + // Test with invalid empty string member + try { + await admin.removeMembersFromConsumerGroup({ groupId: 'test-group', members: [''] }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('members'), true) + } + + // Test with invalid object member missing memberId + try { + // @ts-expect-error - Intentionally passing invalid options + await admin.removeMembersFromConsumerGroup({ groupId: 'test-group', members: [{ reason: 'No memberId' }] }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('members'), true) + } + + // Test with invalid object member with non-string memberId + try { + // @ts-expect-error - Intentionally passing invalid options + await admin.removeMembersFromConsumerGroup({ groupId: 'test-group', members: [{ memberId: 123 }] }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('members'), true) + } + + // Test with invalid object member with empty string memberId + try { + // @ts-expect-error - Intentionally passing invalid options + await admin.removeMembersFromConsumerGroup({ groupId: 'test-group', members: [{ memberId: '' }] }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('members'), true) + } + + // Test with invalid object member missing reason + try { + // @ts-expect-error - Intentionally passing invalid options + await admin.removeMembersFromConsumerGroup({ groupId: 'test-group', members: [{ memberId: 'valid-id' }] }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('members'), true) + } + + // Test with invalid object member with non-string reason + try { + await admin.removeMembersFromConsumerGroup({ + groupId: 'test-group', + // @ts-expect-error - Intentionally passing invalid options + members: [{ memberId: 'valid-id', reason: 123 }] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('members'), true) + } + + // Test with invalid object member with empty string reason + try { + await admin.removeMembersFromConsumerGroup({ + groupId: 'test-group', + members: [{ memberId: 'valid-id', reason: '' }] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('members'), true) + } + + // Test with invalid object member with additional property + try { + await admin.removeMembersFromConsumerGroup({ + groupId: 'test-group', + // @ts-expect-error - Intentionally passing invalid options + members: [{ memberId: 'valid-id', reason: 'Valid reason', extra: true }] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error.message.includes('members'), true) + } +}) + +test('removeMembersFromConsumerGroup should handle errors from Base.metadata', async t => { + const admin = createAdmin(t) + + mockMetadata(admin) + + try { + await admin.removeMembersFromConsumerGroup({ groupId: 'non-existent', members: ['fake-member'] }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error instanceof MultipleErrors, true) + strictEqual(error.message.includes(mockedErrorMessage), true) + } +}) + +test('removeMembersFromConsumerGroup should handle errors from Connection.getFirstAvailable', async t => { + const admin = createAdmin(t) + + mockConnectionPoolGetFirstAvailable(admin[kConnections], 2) + + try { + await admin.removeMembersFromConsumerGroup({ + groupId: 'test-group', + members: ['fake-member'] + }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error instanceof MultipleErrors, true) + strictEqual(error.message.includes(mockedErrorMessage), true) + } +}) + +test('removeMembersFromConsumerGroup should handle errors from Connection.get', async t => { + const admin = createAdmin(t) + + mockConnectionPoolGet(admin[kConnections], 4) + + try { + await admin.removeMembersFromConsumerGroup({ groupId: 'non-existent', members: ['fake-member'] }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error instanceof MultipleErrors, true) + strictEqual(error.message.includes('Removing members from consumer group failed.'), true) + } +}) + +test('removeMembersFromConsumerGroup should handle unavailable API errors (LeaveGroup)', async t => { + const admin = createAdmin(t) + + mockUnavailableAPI(admin, 'LeaveGroup') + + try { + await admin.removeMembersFromConsumerGroup({ groupId: 'non-existent', members: ['fake-member'] }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error instanceof MultipleErrors, true) + strictEqual(error.errors[0].message.includes('Unsupported API LeaveGroup.'), true) + } +}) + +test('removeMembersFromConsumerGroup should handle unavailable API errors (FindCoordinator)', async t => { + const admin = createAdmin(t) + + mockUnavailableAPI(admin, 'FindCoordinator') + + try { + await admin.removeMembersFromConsumerGroup({ groupId: 'non-existent', members: ['fake-member'] }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error instanceof MultipleErrors, true) + strictEqual(error.message, 'Removing members from consumer group failed.') + const subError = error.errors[0] + ok(subError) + strictEqual(subError instanceof UnsupportedApiError, true) + strictEqual(subError.message.includes('Unsupported API FindCoordinator.'), true) + } +}) + +test('removeMembersFromConsumerGroup should handle unavailable API errors (DescribeGroups)', async t => { + const admin = createAdmin(t) + + mockUnavailableAPI(admin, 'DescribeGroups') + + try { + await admin.removeMembersFromConsumerGroup({ groupId: 'non-existent', members: null }) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error instanceof MultipleErrors, true) + strictEqual(error.message, 'Removing members from consumer group failed.') + const subError = error.errors[0] + ok(subError) + strictEqual(subError.message, 'Describing groups failed.') + const subSubError = subError.errors[0] + ok(subSubError) + strictEqual(subSubError instanceof UnsupportedApiError, true) + strictEqual(subSubError.message.includes('Unsupported API DescribeGroups.'), true) + } +}) + const clientQuotasTestCases = [ { clientId: 'Client1', user: 'User1' }, { clientId: 'Client1', user: null },