@@ -67,7 +67,7 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence {
6767 public typealias Element = KafkaConsumerMessage
6868 typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies . NoBackPressure
6969 typealias WrappedSequence = NIOThrowingAsyncSequenceProducer <
70- Element ,
70+ Result < KafkaConsumerMessage , Error > ,
7171 Error ,
7272 BackPressureStrategy ,
7373 KafkaConsumerCloseOnTerminate
@@ -80,24 +80,30 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence {
8080 var wrappedIterator : WrappedSequence . AsyncIterator ?
8181
8282 public mutating func next( ) async throws -> Element ? {
83- guard let element = try await self . wrappedIterator? . next ( ) else {
83+ guard let result = try await self . wrappedIterator? . next ( ) else {
8484 self . deallocateIterator ( )
8585 return nil
8686 }
8787
88- let action = self . stateMachine. withLockedValue { $0. storeOffset ( ) }
89- switch action {
90- case . storeOffset( let client) :
91- do {
92- try client. storeMessageOffset ( element)
93- } catch {
88+ switch result {
89+ case . success( let message) :
90+ let action = self . stateMachine. withLockedValue { $0. storeOffset ( ) }
91+ switch action {
92+ case . storeOffset( let client) :
93+ do {
94+ try client. storeMessageOffset ( message)
95+ } catch {
96+ self . deallocateIterator ( )
97+ throw error
98+ }
99+ return message
100+ case . terminateConsumerSequence:
94101 self . deallocateIterator ( )
95- throw error
102+ return nil
96103 }
97- return element
98- case . terminateConsumerSequence:
104+ case . failure( let error) :
99105 self . deallocateIterator ( )
100- return nil
106+ throw error
101107 }
102108 }
103109
@@ -119,7 +125,7 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence {
119125/// A ``KafkaConsumer `` can be used to consume messages from a Kafka cluster.
120126public final class KafkaConsumer : Sendable , Service {
121127 typealias Producer = NIOThrowingAsyncSequenceProducer <
122- KafkaConsumerMessage ,
128+ Result < KafkaConsumerMessage , Error > ,
123129 Error ,
124130 NIOAsyncSequenceProducerBackPressureStrategies . NoBackPressure ,
125131 KafkaConsumerCloseOnTerminate
@@ -156,7 +162,7 @@ public final class KafkaConsumer: Sendable, Service {
156162 self . logger = logger
157163
158164 let sourceAndSequence = NIOThrowingAsyncSequenceProducer . makeSequence (
159- elementType: KafkaConsumerMessage . self,
165+ elementType: Result < KafkaConsumerMessage , Error > . self,
160166 backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies . NoBackPressure ( ) ,
161167 delegate: KafkaConsumerCloseOnTerminate ( stateMachine: self . stateMachine)
162168 )
@@ -333,14 +339,8 @@ public final class KafkaConsumer: Sendable, Service {
333339 for event in events {
334340 switch event {
335341 case . consumerMessages( let result) :
336- switch result {
337- case . success( let message) :
338- // We do not support back pressure, we can ignore the yield result
339- _ = source. yield ( message)
340- case . failure( let error) :
341- source. finish ( )
342- throw error
343- }
342+ // We do not support back pressure, we can ignore the yield result
343+ _ = source. yield ( result)
344344 default :
345345 break // Ignore
346346 }
0 commit comments