Skip to content

Commit 7e99e50

Browse files
authored
Feat: kafka is connected (#329)
* Updating deps * Kafka consumer exposing isConnected and isActive * Release prepare
1 parent ad5c0ad commit 7e99e50

File tree

3 files changed

+63
-2
lines changed

3 files changed

+63
-2
lines changed

packages/kafka/lib/AbstractKafkaConsumer.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,21 @@ export abstract class AbstractKafkaConsumer<
111111
/* v8 ignore stop */
112112
}
113113

114+
/**
115+
* Returns true if all client's connections are currently connected and the client is connected to at least one broker.
116+
*/
117+
get isConnected(): boolean {
118+
return this.consumer.isConnected()
119+
}
120+
121+
/**
122+
* Returns `true` if the consumer is not closed, and it is currently an active member of a consumer group.
123+
* This method will return `false` during consumer group rebalancing.
124+
*/
125+
get isActive(): boolean {
126+
return this.consumer.isActive()
127+
}
128+
114129
async init(): Promise<void> {
115130
if (this.consumerStream) return Promise.resolve()
116131
const topics = this.handlerContainer.topics

packages/kafka/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@message-queue-toolkit/kafka",
3-
"version": "0.5.1",
3+
"version": "0.6.0",
44
"engines": {
55
"node": ">= 22.14.0"
66
},
@@ -49,7 +49,7 @@
4949
"dependencies": {
5050
"@lokalise/node-core": "^14.2.0",
5151
"@lokalise/universal-ts-utils": "^4.5.1",
52-
"@platformatic/kafka": "^1.9.0"
52+
"@platformatic/kafka": "^1.10.5"
5353
},
5454
"peerDependencies": {
5555
"@message-queue-toolkit/core": ">=23.0.0",

packages/kafka/test/consumer/PermissionConsumer.spec.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,52 @@ describe('PermissionConsumer', () => {
114114
})
115115
})
116116

117+
describe('isConnected', () => {
118+
it('should return false if consumer is not initiated', () => {
119+
// Given
120+
consumer = new PermissionConsumer(testContext.cradle)
121+
122+
// When - Then
123+
expect(consumer.isConnected).toBe(false)
124+
})
125+
126+
it('should return true if consumer is initiated', async () => {
127+
// Given
128+
consumer = new PermissionConsumer(testContext.cradle)
129+
130+
// When
131+
await consumer.init()
132+
133+
// Then
134+
expect(consumer.isConnected).toBe(true)
135+
await consumer.close()
136+
expect(consumer.isConnected).toBe(false)
137+
})
138+
})
139+
140+
describe('isActive', () => {
141+
it('should return false if consumer is not initiated', () => {
142+
// Given
143+
consumer = new PermissionConsumer(testContext.cradle)
144+
145+
// When - Then
146+
expect(consumer.isActive).toBe(false)
147+
})
148+
149+
it('should return true if consumer is initiated', async () => {
150+
// Given
151+
consumer = new PermissionConsumer(testContext.cradle)
152+
153+
// When
154+
await consumer.init()
155+
156+
// Then
157+
expect(consumer.isActive).toBe(true)
158+
await consumer.close()
159+
expect(consumer.isActive).toBe(false)
160+
})
161+
})
162+
117163
describe('consume', () => {
118164
let publisher: PermissionPublisher
119165

0 commit comments

Comments
 (0)