@@ -145,15 +145,31 @@ export abstract class AbstractKafkaConsumer<
145145 * Returns true if all client's connections are currently connected and the client is connected to at least one broker.
146146 */
147147 get isConnected ( ) : boolean {
148- return this . consumer . isConnected ( )
148+ // Streams are created only when init method was called
149+ if ( ! this . consumerStream && ! this . messageBatchStream ) return false
150+ try {
151+ return this . consumer . isConnected ( )
152+ } catch ( _ ) {
153+ // this should not happen, but if so it means the consumer is not healthy
154+ /* v8 ignore next */
155+ return false
156+ }
149157 }
150158
151159 /**
152160 * Returns `true` if the consumer is not closed, and it is currently an active member of a consumer group.
153161 * This method will return `false` during consumer group rebalancing.
154162 */
155163 get isActive ( ) : boolean {
156- return this . consumer . isActive ( )
164+ // Streams are created only when init method was called
165+ if ( ! this . consumerStream && ! this . messageBatchStream ) return false
166+ try {
167+ return this . consumer . isActive ( )
168+ } catch ( _ ) {
169+ // this should not happen, but if so it means the consumer is not healthy
170+ /* v8 ignore next */
171+ return false
172+ }
157173 }
158174
159175 async init ( ) : Promise < void > {
@@ -209,9 +225,7 @@ export abstract class AbstractKafkaConsumer<
209225 async close ( ) : Promise < void > {
210226 if ( ! this . consumerStream && ! this . messageBatchStream ) {
211227 // Leaving the group in case consumer joined but streams were not created
212- if ( this . isActive ) {
213- this . consumer . leaveGroup ( )
214- }
228+ if ( this . isActive ) this . consumer . leaveGroup ( )
215229 return
216230 }
217231
0 commit comments