@@ -18,45 +18,86 @@ import NIOConcurrencyHelpers
1818import NIOCore
1919import ServiceLifecycle
2020
21- // MARK: - NoDelegate
21+ // MARK: - KafkaConsumerCloseOnTerminate
2222
23- // `NIOAsyncSequenceProducerDelegate` that does nothing.
24- internal struct NoDelegate : NIOAsyncSequenceProducerDelegate {
25- func produceMore( ) { }
26- func didTerminate( ) { }
23+ /// `NIOAsyncSequenceProducerDelegate` that terminates the closes the producer when
24+ /// `didTerminate()` is invoked.
25+ internal struct KafkaConsumerCloseOnTerminate : Sendable {
26+ let stateMachine : NIOLockedValueBox < KafkaConsumer . StateMachine >
27+ }
28+
29+ extension KafkaConsumerCloseOnTerminate : NIOAsyncSequenceProducerDelegate {
30+ func produceMore( ) {
31+ return // No back pressure
32+ }
33+
34+ func didTerminate( ) {
35+ self . stateMachine. withLockedValue { $0. messageSequenceTerminated ( ) }
36+ }
2737}
2838
2939// MARK: - KafkaConsumerMessages
3040
3141/// `AsyncSequence` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``).
3242public struct KafkaConsumerMessages : Sendable , AsyncSequence {
43+ let stateMachine : NIOLockedValueBox < KafkaConsumer . StateMachine >
44+
3345 public typealias Element = KafkaConsumerMessage
3446 typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies . NoBackPressure
35- typealias WrappedSequence = NIOAsyncSequenceProducer < Element , BackPressureStrategy , NoDelegate >
47+ typealias WrappedSequence = NIOThrowingAsyncSequenceProducer <
48+ Element ,
49+ Error ,
50+ BackPressureStrategy ,
51+ KafkaConsumerCloseOnTerminate
52+ >
3653 let wrappedSequence : WrappedSequence
3754
3855 /// `AsynceIteratorProtocol` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``).
3956 public struct ConsumerMessagesAsyncIterator : AsyncIteratorProtocol {
40- var wrappedIterator : WrappedSequence . AsyncIterator
57+ let stateMachine : NIOLockedValueBox < KafkaConsumer . StateMachine >
58+ var wrappedIterator : WrappedSequence . AsyncIterator ?
4159
42- public mutating func next( ) async -> Element ? {
43- await self . wrappedIterator. next ( )
60+ public mutating func next( ) async throws -> Element ? {
61+ guard let element = try await self . wrappedIterator? . next ( ) else {
62+ self . deallocateIterator ( )
63+ return nil
64+ }
65+
66+ let action = self . stateMachine. withLockedValue { $0. storeOffset ( ) }
67+ switch action {
68+ case . storeOffset( let client) :
69+ do {
70+ try client. storeMessageOffset ( element)
71+ } catch {
72+ self . deallocateIterator ( )
73+ throw error
74+ }
75+ }
76+ return element
77+ }
78+
79+ private mutating func deallocateIterator( ) {
80+ self . wrappedIterator = nil
4481 }
4582 }
4683
4784 public func makeAsyncIterator( ) -> ConsumerMessagesAsyncIterator {
48- return ConsumerMessagesAsyncIterator ( wrappedIterator: self . wrappedSequence. makeAsyncIterator ( ) )
85+ return ConsumerMessagesAsyncIterator (
86+ stateMachine: self . stateMachine,
87+ wrappedIterator: self . wrappedSequence. makeAsyncIterator ( )
88+ )
4989 }
5090}
5191
5292// MARK: - KafkaConsumer
5393
5494/// Receive messages from the Kafka cluster.
5595public final class KafkaConsumer : Sendable , Service {
56- typealias Producer = NIOAsyncSequenceProducer <
96+ typealias Producer = NIOThrowingAsyncSequenceProducer <
5797 KafkaConsumerMessage ,
98+ Error ,
5899 NIOAsyncSequenceProducerBackPressureStrategies . NoBackPressure ,
59- NoDelegate
100+ KafkaConsumerCloseOnTerminate
60101 >
61102 /// The configuration object of the consumer client.
62103 private let config : KafkaConsumerConfiguration
@@ -90,13 +131,14 @@ public final class KafkaConsumer: Sendable, Service {
90131
91132 self . stateMachine = NIOLockedValueBox ( StateMachine ( logger: self . logger) )
92133
93- let sourceAndSequence = NIOAsyncSequenceProducer . makeSequence (
134+ let sourceAndSequence = NIOThrowingAsyncSequenceProducer . makeSequence (
94135 elementType: KafkaConsumerMessage . self,
95136 backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies . NoBackPressure ( ) ,
96- delegate: NoDelegate ( )
137+ delegate: KafkaConsumerCloseOnTerminate ( stateMachine : self . stateMachine )
97138 )
98139
99140 self . messages = KafkaConsumerMessages (
141+ stateMachine: self . stateMachine,
100142 wrappedSequence: sourceAndSequence. sequence
101143 )
102144
@@ -141,7 +183,8 @@ public final class KafkaConsumer: Sendable, Service {
141183 /// Assign the``KafkaConsumer`` to a specific `partition` of a `topic`.
142184 /// - Parameter topic: Name of the topic that this ``KafkaConsumer`` will read from.
143185 /// - Parameter partition: Partition that this ``KafkaConsumer`` will read from.
144- /// - Parameter offset: The topic offset where reading begins. Defaults to the offset of the last read message.
186+ /// - Parameter offset: The offset to start consuming from.
187+ /// Defaults to the end of the Kafka partition queue (meaning wait for next produced message).
145188 /// - Throws: A ``KafkaError`` if the consumer could not be assigned to the topic + partition pair.
146189 private func assign(
147190 topic: String ,
@@ -190,10 +233,11 @@ public final class KafkaConsumer: Sendable, Service {
190233 }
191234 }
192235 try await Task . sleep ( for: self . config. pollInterval)
193- case . pollUntilClosed( let client) :
194- // Ignore poll result, we are closing down and just polling to commit
195- // outstanding consumer state
196- let _ = client. eventPoll ( )
236+ case . pollWithoutYield( let client) :
237+ // Ignore poll result.
238+ // We are just polling to serve any remaining events queued inside of `librdkafka`.
239+ // All remaining queued consumer messages will get dropped and not be committed (marked as read).
240+ _ = client. eventPoll ( )
197241 try await Task . sleep ( for: self . config. pollInterval)
198242 case . terminatePollLoop:
199243 return
@@ -228,24 +272,26 @@ public final class KafkaConsumer: Sendable, Service {
228272 private func triggerGracefulShutdown( ) {
229273 let action = self . stateMachine. withLockedValue { $0. finish ( ) }
230274 switch action {
275+ case . triggerGracefulShutdown( let client) :
276+ self . _triggerGracefulShutdown (
277+ client: client,
278+ logger: self . logger
279+ )
231280 case . triggerGracefulShutdownAndFinishSource( let client, let source) :
232- self . _triggerGracefulShutdownAndFinishSource (
281+ source. finish ( )
282+ self . _triggerGracefulShutdown (
233283 client: client,
234- source: source,
235284 logger: self . logger
236285 )
237286 case . none:
238287 return
239288 }
240289 }
241290
242- private func _triggerGracefulShutdownAndFinishSource (
291+ private func _triggerGracefulShutdown (
243292 client: KafkaClient ,
244- source: Producer . Source ,
245293 logger: Logger
246294 ) {
247- source. finish ( )
248-
249295 do {
250296 try client. consumerClose ( )
251297 } catch {
@@ -288,6 +334,11 @@ extension KafkaConsumer {
288334 client: KafkaClient ,
289335 source: Producer . Source
290336 )
337+ /// Consumer is still running but the messages asynchronous sequence was terminated.
338+ /// All incoming messages will be dropped.
339+ ///
340+ /// - Parameter client: Client used for handling the connection to the Kafka cluster.
341+ case consumptionStopped( client: KafkaClient )
291342 /// The ``KafkaConsumer/triggerGracefulShutdown()`` has been invoked.
292343 /// We are now in the process of commiting our last state to the broker.
293344 ///
@@ -325,11 +376,12 @@ extension KafkaConsumer {
325376 client: KafkaClient ,
326377 source: Producer . Source
327378 )
328- /// The ``KafkaConsumer`` is in the process of closing down, but still needs to poll
329- /// to commit its state to the broker.
379+ /// The ``KafkaConsumer`` stopped consuming messages or
380+ /// is in the process of shutting down.
381+ /// Poll to serve any queued events and commit outstanding state to the broker.
330382 ///
331383 /// - Parameter client: Client used for handling the connection to the Kafka cluster.
332- case pollUntilClosed ( client: KafkaClient )
384+ case pollWithoutYield ( client: KafkaClient )
333385 /// Terminate the poll loop.
334386 case terminatePollLoop
335387 }
@@ -346,12 +398,14 @@ extension KafkaConsumer {
346398 fatalError ( " Subscribe to consumer group / assign to topic partition pair before reading messages " )
347399 case . consuming( let client, let source) :
348400 return . pollForAndYieldMessage( client: client, source: source)
401+ case . consumptionStopped( let client) :
402+ return . pollWithoutYield( client: client)
349403 case . finishing( let client) :
350404 if client. isConsumerClosed {
351405 self . state = . finished
352406 return . terminatePollLoop
353407 } else {
354- return . pollUntilClosed ( client: client)
408+ return . pollWithoutYield ( client: client)
355409 }
356410 case . finished:
357411 return . terminatePollLoop
@@ -378,11 +432,51 @@ extension KafkaConsumer {
378432 source: source
379433 )
380434 return . setUpConnection( client: client)
381- case . consuming, . finishing, . finished:
435+ case . consuming, . consumptionStopped , . finishing, . finished:
382436 fatalError ( " \( #function) should only be invoked upon initialization of KafkaConsumer " )
383437 }
384438 }
385439
440+ /// The messages asynchronous sequence was terminated.
441+ /// All incoming messages will be dropped.
442+ mutating func messageSequenceTerminated( ) {
443+ switch self . state {
444+ case . uninitialized:
445+ fatalError ( " \( #function) invoked while still in state \( self . state) " )
446+ case . initializing:
447+ fatalError ( " Call to \( #function) before setUpConnection() was invoked " )
448+ case . consumptionStopped:
449+ fatalError ( " messageSequenceTerminated() must not be invoked more than once " )
450+ case . consuming( let client, _) :
451+ self . state = . consumptionStopped( client: client)
452+ case . finishing, . finished:
453+ break
454+ }
455+ }
456+
457+ /// Action to take when wanting to store a message offset (to be auto-committed by `librdkafka`).
458+ enum StoreOffsetAction {
459+ /// Store the message offset with the given `client`.
460+ /// - Parameter client: Client used for handling the connection to the Kafka cluster.
461+ case storeOffset( client: KafkaClient )
462+ }
463+
464+ /// Get action to take when wanting to store a message offset (to be auto-committed by `librdkafka`).
465+ func storeOffset( ) -> StoreOffsetAction {
466+ switch self . state {
467+ case . uninitialized:
468+ fatalError ( " \( #function) invoked while still in state \( self . state) " )
469+ case . initializing:
470+ fatalError ( " Subscribe to consumer group / assign to topic partition pair before committing offsets " )
471+ case . consumptionStopped:
472+ fatalError ( " Cannot store offset when consumption has been stopped " )
473+ case . consuming( let client, _) :
474+ return . storeOffset( client: client)
475+ case . finishing, . finished:
476+ fatalError ( " \( #function) invoked while still in state \( self . state) " )
477+ }
478+ }
479+
386480 /// Action to be taken when wanting to do a synchronous commit.
387481 enum CommitSyncAction {
388482 /// Do a synchronous commit.
@@ -405,6 +499,8 @@ extension KafkaConsumer {
405499 fatalError ( " \( #function) invoked while still in state \( self . state) " )
406500 case . initializing:
407501 fatalError ( " Subscribe to consumer group / assign to topic partition pair before committing offsets " )
502+ case . consumptionStopped:
503+ fatalError ( " Cannot commit when consumption has been stopped " )
408504 case . consuming( let client, _) :
409505 return . commitSync( client: client)
410506 case . finishing, . finished:
@@ -414,6 +510,12 @@ extension KafkaConsumer {
414510
415511 /// Action to be taken when wanting to do close the consumer.
416512 enum FinishAction {
513+ /// Shut down the ``KafkaConsumer``.
514+ ///
515+ /// - Parameter client: Client used for handling the connection to the Kafka cluster.
516+ case triggerGracefulShutdown(
517+ client: KafkaClient
518+ )
417519 /// Shut down the ``KafkaConsumer`` and finish the given `source` object.
418520 ///
419521 /// - Parameter client: Client used for handling the connection to the Kafka cluster.
@@ -440,6 +542,9 @@ extension KafkaConsumer {
440542 client: client,
441543 source: source
442544 )
545+ case . consumptionStopped( let client) :
546+ self . state = . finishing( client: client)
547+ return . triggerGracefulShutdown( client: client)
443548 case . finishing, . finished:
444549 return nil
445550 }
0 commit comments