Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/admin.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,19 @@ Options:
| -------- | ---------- | ----------------- |
| groups | `string[]` | Groups to delete. |

### `removeMembersFromConsumerGroup(options[, callback])`

Removes members from a consumer group.

The return value is `void`.

Options:

| Property | Type | Description |
| -------- | ------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| groupId | `string` | The consumer group ID. |
| members | `(string \| MemberRemoval)[] \| null` | Array of member IDs to remove (as strings), or an array of `MemberRemoval` objects with `memberId` and optional `reason`, or `null` to remove all members from the group. |

### `describeClientQuotas(options[, callback])`

Gets detailed information about client quotas.
Expand Down
40 changes: 20 additions & 20 deletions docs/diagnostic.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,23 @@ Each tracing channel publishes events with the following common properties:

## Published tracing channels

| Name | Target | Description |
| ----------------------------------- | ---------------- | ------------------------------------------------------------------------------------------------- |
| `plt:kafka:connections:connects` | `Connection` | Traces a connection attempt to a broker. |
| `plt:kafka:connections:api` | `Connection` | Traces a low level API request. |
| `plt:kafka:connections:pools:gets` | `ConnectionPool` | Traces a connection retrieval attempt from a connection pool. |
| `plt:kafka:base:apis` | `Base` | Traces a `Base.listApis` request. |
| `plt:kafka:base:metadata` | `Base` | Traces a `Base.metadata` request. |
| `plt:kafka:admin:topics` | `Admin` | Traces a `Admin.createTopics` or `Admin.deleteTopics` request. |
| `plt:kafka:admin:groups` | `Admin` | Traces a `Admin.listGroups`, `Admin.describeGroups` or `Admin.deleteGroups` request. |
| `plt:kafka:admin:clientQuotas` | `Admin` | Traces a `Admin.describeClientQuotas` or `Admin.alterClientQuotas` request. |
| `plt:kafka:admin:logDirs` | `Admin` | Traces a `Admin.describeLogDirs` request. |
| `plt:kafka:producer:initIdempotent` | `Producer` | Traces a `Producer.initIdempotentProducer` request. |
| `plt:kafka:producer:sends` | `Producer` | Traces a `Producer.send` request. |
| `plt:kafka:consumer:group` | `Consumer` | Traces a `Consumer.findGroupCoordinator`, `Consumer.joinGroup` or `Consumer.leaveGroup` requests. |
| `plt:kafka:consumer:heartbeat` | `Consumer` | Traces the `Consumer` heartbeat requests. |
| `plt:kafka:consumer:receives` | `Consumer` | Traces processing of every message. |
| `plt:kafka:consumer:fetches` | `Consumer` | Traces a `Consumer.fetch` request. |
| `plt:kafka:consumer:consumes` | `Consumer` | Traces a `Consumer.consume` request. |
| `plt:kafka:consumer:commits` | `Consumer` | Traces a `Consumer.commit` request. |
| `plt:kafka:consumer:offsets` | `Consumer` | Traces a `Consumer.listOffsets` or `Consumer.listCommittedOffsets` request. |
| Name | Target | Description |
| ----------------------------------- | ---------------- | ---------------------------------------------------------------------------------------------------------------------------- |
| `plt:kafka:connections:connects` | `Connection` | Traces a connection attempt to a broker. |
| `plt:kafka:connections:api` | `Connection` | Traces a low level API request. |
| `plt:kafka:connections:pools:gets` | `ConnectionPool` | Traces a connection retrieval attempt from a connection pool. |
| `plt:kafka:base:apis` | `Base` | Traces a `Base.listApis` request. |
| `plt:kafka:base:metadata` | `Base` | Traces a `Base.metadata` request. |
| `plt:kafka:admin:topics` | `Admin` | Traces a `Admin.createTopics` or `Admin.deleteTopics` request. |
| `plt:kafka:admin:groups` | `Admin` | Traces a `Admin.listGroups`, `Admin.describeGroups`, `Admin.deleteGroups` or `Admin.removeMembersFromConsumerGroup` request. |
| `plt:kafka:admin:clientQuotas` | `Admin` | Traces a `Admin.describeClientQuotas` or `Admin.alterClientQuotas` request. |
| `plt:kafka:admin:logDirs` | `Admin` | Traces a `Admin.describeLogDirs` request. |
| `plt:kafka:producer:initIdempotent` | `Producer` | Traces a `Producer.initIdempotentProducer` request. |
| `plt:kafka:producer:sends` | `Producer` | Traces a `Producer.send` request. |
| `plt:kafka:consumer:group` | `Consumer` | Traces a `Consumer.findGroupCoordinator`, `Consumer.joinGroup` or `Consumer.leaveGroup` requests. |
| `plt:kafka:consumer:heartbeat` | `Consumer` | Traces the `Consumer` heartbeat requests. |
| `plt:kafka:consumer:receives` | `Consumer` | Traces processing of every message. |
| `plt:kafka:consumer:fetches` | `Consumer` | Traces a `Consumer.fetch` request. |
| `plt:kafka:consumer:consumes` | `Consumer` | Traces a `Consumer.consume` request. |
| `plt:kafka:consumer:commits` | `Consumer` | Traces a `Consumer.commit` request. |
| `plt:kafka:consumer:offsets` | `Consumer` | Traces a `Consumer.listOffsets` or `Consumer.listCommittedOffsets` request. |
150 changes: 148 additions & 2 deletions src/clients/admin/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ import {
import { type DescribeGroupsRequest, type DescribeGroupsResponse } from '../../apis/admin/describe-groups-v5.ts'
import { type DescribeLogDirsRequest, type DescribeLogDirsResponse } from '../../apis/admin/describe-log-dirs-v4.ts'
import { type ListGroupsRequest as ListGroupsRequestV4 } from '../../apis/admin/list-groups-v4.ts'
import {
type LeaveGroupRequest,
type LeaveGroupRequestMember,
type LeaveGroupResponse
} from '../../apis/consumer/leave-group-v5.ts'
import {
type ListGroupsRequest as ListGroupsRequestV5,
type ListGroupsResponse
Expand Down Expand Up @@ -70,7 +75,8 @@ import {
describeGroupsOptionsValidator,
describeLogDirsOptionsValidator,
listGroupsOptionsValidator,
listTopicsOptionsValidator
listTopicsOptionsValidator,
removeMembersFromConsumerGroupOptionsValidator
} from './options.ts'
import {
type AdminOptions,
Expand All @@ -87,7 +93,8 @@ import {
type GroupBase,
type GroupMember,
type ListGroupsOptions,
type ListTopicsOptions
type ListTopicsOptions,
type RemoveMembersFromConsumerGroupOptions
} from './types.ts'

export class Admin extends Base<AdminOptions> {
Expand Down Expand Up @@ -289,6 +296,46 @@ export class Admin extends Base<AdminOptions> {
return callback[kCallbackPromise]
}

removeMembersFromConsumerGroup (
options: RemoveMembersFromConsumerGroupOptions,
callback: CallbackWithPromise<void>
): void
removeMembersFromConsumerGroup (options: RemoveMembersFromConsumerGroupOptions): Promise<void>
removeMembersFromConsumerGroup (
options: RemoveMembersFromConsumerGroupOptions,
callback?: CallbackWithPromise<void>
): void | Promise<void> {
if (!callback) {
callback = createPromisifiedCallback()
}

if (this[kCheckNotClosed](callback)) {
return callback[kCallbackPromise]
}

const validationError = this[kValidateOptions](
options,
removeMembersFromConsumerGroupOptionsValidator,
'/options',
false
)
if (validationError) {
callback(validationError)
return callback[kCallbackPromise]
}

adminGroupsChannel.traceCallback(
this.#removeMembersFromConsumerGroup,
1,
createDiagnosticContext({ client: this, operation: 'removeMembersFromConsumerGroup', options }),
this,
options,
callback
)

return callback[kCallbackPromise]
}

describeClientQuotas (
options: DescribeClientQuotasOptions,
callback: CallbackWithPromise<DescribeClientQuotasResponseEntry[]>
Expand Down Expand Up @@ -811,6 +858,105 @@ export class Admin extends Base<AdminOptions> {
})
}

#removeMembersFromConsumerGroup (
options: RemoveMembersFromConsumerGroupOptions,
callback: CallbackWithPromise<void>
): void {
if (!options.members || options.members.length === 0) {
this.#describeGroups({ groups: [options.groupId] }, (error, groupsMap) => {
if (error) {
callback(new MultipleErrors('Removing members from consumer group failed.', [error]))
return
}

const group = groupsMap.get(options.groupId)
if (!group) {
callback(new MultipleErrors('Removing members from consumer group failed.', []))
return
}

const allMemberIds = Array.from(group.members.keys())
if (allMemberIds.length === 0) {
callback(null)
return
}

this.#removeMembersFromConsumerGroup({ ...options, members: allMemberIds }, callback)
})
return
}

this[kMetadata]({ topics: [] }, (error, metadata) => {
if (error) {
callback(error, undefined)
return
}

this.#findGroupCoordinator([options.groupId], (error, response) => {
if (error) {
callback(new MultipleErrors('Removing members from consumer group failed.', [error]))
return
}

const coordinator = response.coordinators.find(c => c.key === options.groupId)
if (!coordinator) {
callback(
new MultipleErrors('Removing members from consumer group failed.', [
new Error(`No coordinator found for group ${options.groupId}`)
])
)
return
}

const broker = metadata.brokers.get(coordinator.nodeId)
if (!broker) {
callback(
new MultipleErrors('Removing members from consumer group failed.', [
new Error(`Broker ${coordinator.nodeId} not found`)
])
)
return
}

this[kGetConnection](broker, (error, connection) => {
if (error) {
callback(new MultipleErrors('Removing members from consumer group failed.', [error]))
return
}

this[kPerformWithRetry]<LeaveGroupResponse>(
'removeMembersFromConsumerGroup',
retryCallback => {
this[kGetApi]<LeaveGroupRequest, LeaveGroupResponse>('LeaveGroup', (error, api) => {
if (error) {
retryCallback(error, undefined as unknown as LeaveGroupResponse)
return
}

const members: LeaveGroupRequestMember[] = options.members!.map(member => ({
memberId: typeof member === 'string' ? member : member.memberId,
groupInstanceId: null,
reason: typeof member === 'string' ? 'Not specified' : member.reason
}))

api(connection, options.groupId, members, retryCallback as unknown as Callback<LeaveGroupResponse>)
})
},
(error: Error | null) => {
if (error) {
callback(new MultipleErrors('Removing members from consumer group failed.', [error]))
return
}

callback(null)
},
0
)
})
})
})
}

#findGroupCoordinator (groups: string[], callback: CallbackWithPromise<FindCoordinatorResponse>): void {
this[kPerformWithRetry]<FindCoordinatorResponse>(
'findGroupCoordinator',
Expand Down
28 changes: 28 additions & 0 deletions src/clients/admin/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,33 @@ export const deleteGroupsOptionsSchema = {
additionalProperties: false
}

export const removeMembersFromConsumerGroupOptionsSchema = {
type: 'object',
properties: {
groupId: idProperty,
members: {
type: ['array', 'null'],
items: {
oneOf: [
idProperty,
{
type: 'object',
properties: {
memberId: idProperty,
reason: { type: 'string', minLength: 1 }
},
required: ['memberId', 'reason'],
additionalProperties: false
}
]
},
default: null
}
},
required: ['groupId'],
additionalProperties: false
}

export const describeClientQuotasOptionsSchema = {
type: 'object',
properties: {
Expand Down Expand Up @@ -205,6 +232,7 @@ export const deleteTopicsOptionsValidator = ajv.compile(deleteTopicOptionsSchema
export const listGroupsOptionsValidator = ajv.compile(listGroupsOptionsSchema)
export const describeGroupsOptionsValidator = ajv.compile(describeGroupsOptionsSchema)
export const deleteGroupsOptionsValidator = ajv.compile(deleteGroupsOptionsSchema)
export const removeMembersFromConsumerGroupOptionsValidator = ajv.compile(removeMembersFromConsumerGroupOptionsSchema)
export const describeClientQuotasOptionsValidator = ajv.compile(describeClientQuotasOptionsSchema)
export const alterClientQuotasOptionsValidator = ajv.compile(alterClientQuotasOptionsSchema)
export const describeLogDirsOptionsValidator = ajv.compile(describeLogDirsOptionsSchema)
12 changes: 11 additions & 1 deletion src/clients/admin/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
type DescribeLogDirsResponseResult
} from '../../apis/admin/describe-log-dirs-v4.ts'
import { type ConsumerGroupState } from '../../apis/enumerations.ts'
import { type NullableString } from '../../protocol/definitions.ts'
import { type Nullable, type NullableString } from '../../protocol/definitions.ts'
import { type BaseOptions } from '../base/types.ts'
import { type ExtendedGroupProtocolSubscription, type GroupAssignment } from '../consumer/types.ts'

Expand Down Expand Up @@ -79,6 +79,16 @@ export interface DeleteGroupsOptions {
groups: string[]
}

interface MemberRemoval {
memberId: string
reason: string
}

export interface RemoveMembersFromConsumerGroupOptions {
groupId: string
members?: Nullable<(string | MemberRemoval)[]>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change this. I would only accept MemberRemoval.
Also, the type of Member should be:

interface MemberRemoval {
  memberId: string
  reason?: string
}

}

export interface DescribeClientQuotasOptions {
components: DescribeClientQuotasRequestComponent[]
strict?: boolean
Expand Down
3 changes: 2 additions & 1 deletion src/protocol/definitions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ export const EMPTY_TAGGED_FIELDS_BUFFER = Buffer.from([0])

export type Collection = string | Buffer | DynamicBuffer | Array<unknown> | Map<unknown, unknown> | Set<unknown>

export type NullableString = string | undefined | null
export type Nullable<T> = T | null | undefined
export type NullableString = Nullable<string>
Loading