@@ -18,13 +18,37 @@ import NIOConcurrencyHelpers
1818import NIOCore
1919import ServiceLifecycle
2020
21+ // MARK: - KafkaProducerCloseOnTerminate
22+
23+ /// `NIOAsyncSequenceProducerDelegate` that terminates the closes the producer when
24+ /// `didTerminate()` is invoked.
25+ internal struct KafkaProducerCloseOnTerminate : Sendable {
26+ let stateMachine : NIOLockedValueBox < KafkaProducer . StateMachine >
27+ }
28+
29+ extension KafkaProducerCloseOnTerminate : NIOAsyncSequenceProducerDelegate {
30+ func produceMore( ) {
31+ return // No back pressure
32+ }
33+
34+ func didTerminate( ) {
35+ let action = self . stateMachine. withLockedValue { $0. stopConsuming ( ) }
36+ switch action {
37+ case . finishSource( let source) :
38+ source? . finish ( )
39+ case . none:
40+ break
41+ }
42+ }
43+ }
44+
2145// MARK: - KafkaMessageAcknowledgements
2246
2347/// `AsyncSequence` implementation for handling messages acknowledged by the Kafka cluster (``KafkaAcknowledgedMessage``).
2448public struct KafkaMessageAcknowledgements : AsyncSequence {
2549 public typealias Element = Result < KafkaAcknowledgedMessage , KafkaAcknowledgedMessageError >
2650 typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies . NoBackPressure
27- typealias WrappedSequence = NIOAsyncSequenceProducer < Element , BackPressureStrategy , NoDelegate >
51+ typealias WrappedSequence = NIOAsyncSequenceProducer < Element , BackPressureStrategy , KafkaProducerCloseOnTerminate >
2852 let wrappedSequence : WrappedSequence
2953
3054 /// `AsynceIteratorProtocol` implementation for handling messages acknowledged by the Kafka cluster (``KafkaAcknowledgedMessage``).
@@ -51,7 +75,7 @@ public final class KafkaProducer: Service, Sendable {
5175 typealias Producer = NIOAsyncSequenceProducer <
5276 Result < KafkaAcknowledgedMessage , KafkaAcknowledgedMessageError > ,
5377 NIOAsyncSequenceProducerBackPressureStrategies . NoBackPressure ,
54- NoDelegate
78+ KafkaProducerCloseOnTerminate
5579 >
5680
5781 /// State of the ``KafkaProducer``.
@@ -122,7 +146,8 @@ public final class KafkaProducer: Service, Sendable {
122146 ///
123147 /// Use the asynchronous sequence to consume message acknowledgements.
124148 ///
125- /// - Important: When the asynchronous sequence is deinited the producer will be shutdown.
149+ /// - Important: When the asynchronous sequence is deinited the producer will be shutdown and disallow sending more messages.
150+ /// Additionally, make sure to consume the asynchronous sequence otherwise the acknowledgements will be buffered in memory indefinitely.
126151 ///
127152 /// - Parameter config: The ``KafkaProducerConfiguration`` for configuring the ``KafkaProducer``.
128153 /// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics.
@@ -140,7 +165,7 @@ public final class KafkaProducer: Service, Sendable {
140165 let sourceAndSequence = NIOAsyncSequenceProducer . makeSequence (
141166 elementType: Result < KafkaAcknowledgedMessage , KafkaAcknowledgedMessageError > . self,
142167 backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies . NoBackPressure ( ) ,
143- delegate: NoDelegate ( )
168+ delegate: KafkaProducerCloseOnTerminate ( stateMachine : stateMachine )
144169 )
145170 let source = sourceAndSequence. source
146171
@@ -183,7 +208,10 @@ public final class KafkaProducer: Service, Sendable {
183208 while !Task. isCancelled {
184209 let nextAction = self . stateMachine. withLockedValue { $0. nextPollLoopAction ( ) }
185210 switch nextAction {
186- case . poll( let client, let source) :
211+ case . pollWithoutYield( let client) :
212+ // Drop any incoming acknowledgments
213+ let _ = client. eventPoll ( )
214+ case . pollAndYield( let client, let source) :
187215 let events = client. eventPoll ( )
188216 for event in events {
189217 switch event {
@@ -258,6 +286,11 @@ extension KafkaProducer {
258286 source: Producer . Source ? ,
259287 topicHandles: RDKafkaTopicHandles
260288 )
289+ /// Producer is still running but the acknowledgement asynchronous sequence was terminated.
290+ /// All incoming acknowledgements will be dropped.
291+ ///
292+ /// - Parameter client: Client used for handling the connection to the Kafka cluster.
293+ case consumptionStopped( client: KafkaClient )
261294 /// ``KafkaProducer/triggerGracefulShutdown()`` was invoked so we are flushing
262295 /// any messages that wait to be sent and serve any remaining queued callbacks.
263296 ///
@@ -293,11 +326,15 @@ extension KafkaProducer {
293326
294327 /// Action to be taken when wanting to poll.
295328 enum PollLoopAction {
296- /// Poll client for new consumer messages.
329+ /// Poll client.
330+ ///
331+ /// - Parameter client: Client used for handling the connection to the Kafka cluster.
332+ case pollWithoutYield( client: KafkaClient )
333+ /// Poll client and yield acknowledgments if any received.
297334 ///
298335 /// - Parameter client: Client used for handling the connection to the Kafka cluster.
299336 /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
300- case poll ( client: KafkaClient , source: Producer . Source ? )
337+ case pollAndYield ( client: KafkaClient , source: Producer . Source ? )
301338 /// Terminate the poll loop and finish the given `NIOAsyncSequenceProducerSource`.
302339 ///
303340 /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
@@ -315,10 +352,12 @@ extension KafkaProducer {
315352 case . uninitialized:
316353 fatalError ( " \( #function) invoked while still in state \( self . state) " )
317354 case . started( let client, _, let source, _) :
318- return . poll( client: client, source: source)
355+ return . pollAndYield( client: client, source: source)
356+ case . consumptionStopped( let client) :
357+ return . pollWithoutYield( client: client)
319358 case . flushing( let client, let source) :
320359 if client. outgoingQueueSize > 0 {
321- return . poll ( client: client, source: source)
360+ return . pollAndYield ( client: client, source: source)
322361 } else {
323362 self . state = . finished
324363 return . terminatePollLoopAndFinishSource( source: source)
@@ -360,13 +399,44 @@ extension KafkaProducer {
360399 newMessageID: newMessageID,
361400 topicHandles: topicHandles
362401 )
402+ case . consumptionStopped:
403+ throw KafkaError . connectionClosed ( reason: " Sequence consuming acknowledgements was abruptly terminated, producer closed " )
363404 case . flushing:
364405 throw KafkaError . connectionClosed ( reason: " Producer in the process of flushing and shutting down " )
365406 case . finished:
366407 throw KafkaError . connectionClosed ( reason: " Tried to produce a message with a closed producer " )
367408 }
368409 }
369410
411+ /// Action to take after invoking ``KafkaProducer/StateMachine/stopConsuming()``.
412+ enum StopConsumingAction {
413+ /// Finish the given `NIOAsyncSequenceProducerSource`.
414+ ///
415+ /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
416+ case finishSource( source: Producer . Source ? )
417+ }
418+
419+ /// The acknowledgements asynchronous sequence was terminated.
420+ /// All incoming acknowledgements will be dropped.
421+ mutating func stopConsuming( ) -> StopConsumingAction ? {
422+ switch self . state {
423+ case . uninitialized:
424+ fatalError ( " \( #function) invoked while still in state \( self . state) " )
425+ case . consumptionStopped:
426+ fatalError ( " stopConsuming() must not be invoked more than once " )
427+ case . started( let client, _, let source, _) :
428+ self . state = . consumptionStopped( client: client)
429+ return . finishSource( source: source)
430+ case . flushing( let client, let source) :
431+ // Setting source to nil to prevent incoming acknowledgements from buffering in `source`
432+ self . state = . flushing( client: client, source: nil )
433+ return . finishSource( source: source)
434+ case . finished:
435+ break
436+ }
437+ return nil
438+ }
439+
370440 /// Get action to be taken when wanting to do close the producer.
371441 ///
372442 /// - Important: This function throws a `fatalError` if called while in the `.initializing` state.
@@ -376,6 +446,8 @@ extension KafkaProducer {
376446 fatalError ( " \( #function) invoked while still in state \( self . state) " )
377447 case . started( let client, _, let source, _) :
378448 self . state = . flushing( client: client, source: source)
449+ case . consumptionStopped( let client) :
450+ self . state = . flushing( client: client, source: nil )
379451 case . flushing, . finished:
380452 break
381453 }
0 commit comments