@@ -272,6 +272,70 @@ final class KafkaTests: XCTestCase {
272272 }
273273 }
274274
275+ func testNoNewConsumerMessagesAfterGracefulShutdown( ) async throws {
276+ let testMessages = Self . createTestMessages ( topic: self . uniqueTestTopic, count: 2 )
277+ let ( producer, acks) = try KafkaProducer . makeProducerWithEvents ( configuration: self . producerConfig, logger: . kafkaTest)
278+
279+ let uniqueGroupID = UUID ( ) . uuidString
280+
281+ var consumerConfig = KafkaConsumerConfiguration (
282+ consumptionStrategy: . group(
283+ id: uniqueGroupID,
284+ topics: [ self . uniqueTestTopic]
285+ ) ,
286+ bootstrapBrokerAddresses: [ self . bootstrapBrokerAddress]
287+ )
288+ consumerConfig. autoOffsetReset = . beginning // Read topic from beginning
289+ consumerConfig. broker. addressFamily = . v4
290+
291+ let consumer = try KafkaConsumer (
292+ configuration: consumerConfig,
293+ logger: . kafkaTest
294+ )
295+
296+ let serviceGroupConfiguration = ServiceGroupConfiguration ( services: [ producer, consumer] , logger: . kafkaTest)
297+ let serviceGroup = ServiceGroup ( configuration: serviceGroupConfiguration)
298+
299+ try await withThrowingTaskGroup ( of: Void . self) { group in
300+ // Run Task
301+ group. addTask {
302+ try await serviceGroup. run ( )
303+ }
304+
305+ // Producer Task
306+ group. addTask {
307+ try await Self . sendAndAcknowledgeMessages (
308+ producer: producer,
309+ events: acks,
310+ messages: testMessages
311+ )
312+ }
313+
314+ // Wait for Producer Task to complete
315+ try await group. next ( )
316+
317+ // Verify that we receive the first message
318+ var consumerIterator = consumer. messages. makeAsyncIterator ( )
319+
320+ let consumedMessage = try await consumerIterator. next ( )
321+ XCTAssertEqual ( testMessages. first!. topic, consumedMessage!. topic)
322+ XCTAssertEqual ( ByteBuffer ( string: testMessages. first!. key!) , consumedMessage!. key)
323+ XCTAssertEqual ( ByteBuffer ( string: testMessages. first!. value) , consumedMessage!. value)
324+
325+ // Trigger a graceful shutdown
326+ await serviceGroup. triggerGracefulShutdown ( )
327+
328+ // Wait to ensure the KafkaConsumer's shutdown handler has
329+ // been invoked.
330+ try await Task . sleep ( for: . seconds( 2 ) )
331+
332+ // We should not be able to read any new messages after the KafkaConsumer's
333+ // shutdown handler was invoked
334+ let stoppedConsumingMessage = try await consumerIterator. next ( )
335+ XCTAssertNil ( stoppedConsumingMessage)
336+ }
337+ }
338+
275339 func testCommittedOffsetsAreCorrect( ) async throws {
276340 let testMessages = Self . createTestMessages ( topic: self . uniqueTestTopic, count: 10 )
277341 let firstConsumerOffset = testMessages. count / 2
0 commit comments