@@ -16,27 +16,15 @@ import Crdkafka
1616import Logging
1717import NIOCore
1818
19- /// `NIOAsyncSequenceProducerBackPressureStrategy` that always returns true.
20- struct NoBackPressure : NIOAsyncSequenceProducerBackPressureStrategy {
21- func didYield( bufferDepth: Int ) -> Bool { true }
22- func didConsume( bufferDepth: Int ) -> Bool { true }
23- }
24-
25- /// `NIOAsyncSequenceProducerDelegate` that does nothing.
26- struct NoDelegate : NIOAsyncSequenceProducerDelegate {
27- func produceMore( ) { }
28- func didTerminate( ) { }
29- }
30-
3119/// `AsyncSequence` implementation for handling messages acknowledged by the Kafka cluster (``KafkaAcknowledgedMessage``).
32- public struct AcknowledgedMessagesAsyncSequence : AsyncSequence {
20+ public struct KafkaMessageAcknowledgements : AsyncSequence {
3321 public typealias Element = Result < KafkaAcknowledgedMessage , KafkaAcknowledgedMessageError >
34- typealias WrappedSequence = NIOAsyncSequenceProducer < Element , NoBackPressure , NoDelegate >
22+ typealias WrappedSequence = AsyncStream < Element >
3523 let wrappedSequence : WrappedSequence
3624
3725 /// `AsynceIteratorProtocol` implementation for handling messages acknowledged by the Kafka cluster (``KafkaAcknowledgedMessage``).
3826 public struct AcknowledgedMessagesAsyncIterator : AsyncIteratorProtocol {
39- let wrappedIterator : NIOAsyncSequenceProducer < Element , NoBackPressure , NoDelegate > . AsyncIterator
27+ var wrappedIterator : AsyncStream < Element > . AsyncIterator
4028
4129 public mutating func next( ) async -> Element ? {
4230 await self . wrappedIterator. next ( )
@@ -77,65 +65,110 @@ public actor KafkaProducer {
7765 /// Dictionary containing all topic names with their respective `rd_kafka_topic_t` pointer.
7866 private var topicHandles : [ String : OpaquePointer ]
7967
80- // We use implicitly unwrapped optionals here as these properties need to access self upon initialization
8168 /// Used for handling the connection to the Kafka cluster.
82- private var client : KafkaClient !
83- /// Task that polls the Kafka cluster for updates periodically.
84- private var pollTask : Task < Void , Never > !
85-
86- /// `AsyncSequence` that returns all ``KafkaProducerMessage`` objects that have been
87- /// acknowledged by the Kafka cluster.
88- public nonisolated let acknowledgements : AcknowledgedMessagesAsyncSequence
89- nonisolated let acknowlegdementsSource : AcknowledgedMessagesAsyncSequence . WrappedSequence . Source
90- private typealias Acknowledgement = Result < KafkaAcknowledgedMessage , KafkaAcknowledgedMessageError >
69+ private let client : KafkaClient
9170
71+ // Private initializer, use factory methods to create KafkaProducer
9272 /// Initialize a new ``KafkaProducer``.
9373 /// - Parameter config: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``.
9474 /// - Parameter topicConfig: The ``KafkaTopicConfig`` used for newly created topics.
9575 /// - Parameter logger: A logger.
96- /// - Throws: A ``KafkaError`` if the received message is an error message or malformed .
97- public init (
98- config : KafkaProducerConfig = KafkaProducerConfig ( ) ,
99- topicConfig: KafkaTopicConfig = KafkaTopicConfig ( ) ,
76+ /// - Throws: A ``KafkaError`` if initializing the producer failed .
77+ private init (
78+ client : KafkaClient ,
79+ topicConfig: KafkaTopicConfig ,
10080 logger: Logger
10181 ) async throws {
82+ self . client = client
10283 self . topicConfig = topicConfig
103- self . logger = logger
10484 self . topicHandles = [ : ]
85+ self . logger = logger
10586 self . state = . started
87+ }
10688
107- // (NIOAsyncSequenceProducer.makeSequence Documentation Excerpt)
108- // This method returns a struct containing a NIOAsyncSequenceProducer.Source and a NIOAsyncSequenceProducer.
109- // The source MUST be held by the caller and used to signal new elements or finish.
110- // The sequence MUST be passed to the actual consumer and MUST NOT be held by the caller.
111- // This is due to the fact that deiniting the sequence is used as part of a trigger to
112- // terminate the underlying source.
113- let acknowledgementsSourceAndSequence = NIOAsyncSequenceProducer . makeSequence (
114- elementType: Acknowledgement . self,
115- backPressureStrategy: NoBackPressure ( ) ,
116- delegate: NoDelegate ( )
89+ /// Initialize a new ``KafkaProducer``.
90+ ///
91+ /// This factory method creates a producer without message acknowledgements.
92+ ///
93+ /// - Parameter configuration: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``.
94+ /// - Parameter topicConfiguration: The ``KafkaTopicConfig`` used for newly created topics.
95+ /// - Parameter logger: A logger.
96+ /// - Returns: The newly created ``KafkaProducer``.
97+ /// - Throws: A ``KafkaError`` if initializing the producer failed.
98+ public static func makeProducer(
99+ config: KafkaProducerConfig = KafkaProducerConfig ( ) ,
100+ topicConfig: KafkaTopicConfig = KafkaTopicConfig ( ) ,
101+ logger: Logger
102+ ) async throws -> KafkaProducer {
103+ let client = try RDKafka . createClient (
104+ type: . producer,
105+ configDictionary: config. dictionary,
106+ // Having no callback will discard any incoming acknowledgement messages
107+ // Ref: rdkafka_broker.c:rd_kafka_dr_msgq
108+ callback: nil ,
109+ logger: logger
117110 )
118- self . acknowlegdementsSource = acknowledgementsSourceAndSequence. source
119- self . acknowledgements = AcknowledgedMessagesAsyncSequence (
120- wrappedSequence: acknowledgementsSourceAndSequence. sequence
111+
112+ let producer = try await KafkaProducer (
113+ client: client,
114+ topicConfig: topicConfig,
115+ logger: logger
121116 )
122117
123- self . client = try RDKafka . createClient (
118+ return producer
119+ }
120+
121+ /// Initialize a new ``KafkaProducer`` and a ``KafkaMessageAcknowledgements`` asynchronous sequence.
122+ ///
123+ /// Use the asynchronous sequence to consume message acknowledgements.
124+ ///
125+ /// - Important: When the asynchronous sequence is deinited the producer will be shutdown.
126+ ///
127+ /// - Parameter config: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``.
128+ /// - Parameter topicConfig: The ``KafkaTopicConfig`` used for newly created topics.
129+ /// - Parameter logger: A logger.
130+ /// - Returns: A tuple containing the created ``KafkaProducer`` and the ``KafkaMessageAcknowledgements``
131+ /// `AsyncSequence` used for receiving message acknowledgements.
132+ /// - Throws: A ``KafkaError`` if initializing the producer failed.
133+ public static func makeProducerWithAcknowledgements(
134+ config: KafkaProducerConfig = KafkaProducerConfig ( ) ,
135+ topicConfig: KafkaTopicConfig = KafkaTopicConfig ( ) ,
136+ logger: Logger
137+ ) async throws -> ( KafkaProducer , KafkaMessageAcknowledgements ) {
138+ var streamContinuation : AsyncStream < Result < KafkaAcknowledgedMessage , KafkaAcknowledgedMessageError > > . Continuation ?
139+ let stream = AsyncStream { continuation in
140+ streamContinuation = continuation
141+ }
142+
143+ let client = try RDKafka . createClient (
124144 type: . producer,
125145 configDictionary: config. dictionary,
126- callback: self . deliveryReportCallback,
127- logger: self . logger
146+ callback: { [ logger, streamContinuation] messageResult in
147+ guard let messageResult else {
148+ logger. error ( " Could not resolve acknowledged message " )
149+ return
150+ }
151+
152+ // Ignore YieldResult as we don't support back pressure in KafkaProducer
153+ streamContinuation? . yield ( messageResult)
154+ } ,
155+ logger: logger
128156 )
129157
130- // Poll Kafka every millisecond
131- self . pollTask = Task { [ client] in
132- while !Task. isCancelled {
133- client? . withKafkaHandlePointer { handle in
134- rd_kafka_poll ( handle, 0 )
135- }
136- try ? await Task . sleep ( nanoseconds: 1_000_000 )
158+ let producer = try await KafkaProducer (
159+ client: client,
160+ topicConfig: topicConfig,
161+ logger: logger
162+ )
163+
164+ streamContinuation? . onTermination = { [ producer] _ in
165+ Task {
166+ await producer. shutdownGracefully ( )
137167 }
138168 }
169+
170+ let acknowlegementsSequence = KafkaMessageAcknowledgements ( wrappedSequence: stream)
171+ return ( producer, acknowlegementsSequence)
139172 }
140173
141174 /// Method to shutdown the ``KafkaProducer``.
@@ -155,7 +188,7 @@ public actor KafkaProducer {
155188
156189 private func _shutDownGracefully( timeout: Int32 ) async {
157190 await withCheckedContinuation { ( continuation: CheckedContinuation < Void , Never > ) in
158- // Wait 10 seconds for outstanding messages to be sent and callbacks to be called
191+ // Wait `timeout` seconds for outstanding messages to be sent and callbacks to be called
159192 self . client. withKafkaHandlePointer { handle in
160193 rd_kafka_flush ( handle, timeout)
161194 continuation. resume ( )
@@ -165,11 +198,22 @@ public actor KafkaProducer {
165198 for (_, topicHandle) in self . topicHandles {
166199 rd_kafka_topic_destroy ( topicHandle)
167200 }
168- self . pollTask. cancel ( )
169201
170202 self . state = . shutDown
171203 }
172204
205+ /// Start polling Kafka for acknowledged messages.
206+ ///
207+ /// - Parameter pollInterval: The desired time interval between two consecutive polls.
208+ /// - Returns: An awaitable task representing the execution of the poll loop.
209+ public func run( pollInterval: Duration = . milliseconds( 100 ) ) async throws {
210+ // TODO(felix): make pollInterval part of config -> easier to adapt to Service protocol (service-lifecycle)
211+ while self . state == . started {
212+ self . client. poll ( timeout: 0 )
213+ try await Task . sleep ( for: pollInterval)
214+ }
215+ }
216+
173217 /// Send messages to the Kafka cluster asynchronously, aka "fire and forget".
174218 /// This function is non-blocking.
175219 /// - Parameter message: The ``KafkaProducerMessage`` that is sent to the KafkaCluster.
@@ -220,29 +264,6 @@ public actor KafkaProducer {
220264 return self . messageIDCounter
221265 }
222266
223- // Closure that is executed when a message has been acknowledged by Kafka
224- private lazy var deliveryReportCallback : ( UnsafePointer < rd_kafka_message_t > ? ) -> Void = { [ logger, acknowlegdementsSource] messagePointer in
225- guard let messagePointer = messagePointer else {
226- logger. error ( " Could not resolve acknowledged message " )
227- return
228- }
229-
230- let messageID = UInt ( bitPattern: messagePointer. pointee. _private)
231-
232- do {
233- let message = try KafkaAcknowledgedMessage ( messagePointer: messagePointer, id: messageID)
234- _ = acknowlegdementsSource. yield ( . success( message) )
235- } catch {
236- guard let error = error as? KafkaAcknowledgedMessageError else {
237- fatalError ( " Caught error that is not of type \( KafkaAcknowledgedMessageError . self) " )
238- }
239- _ = acknowlegdementsSource. yield ( . failure( error) )
240- }
241-
242- // The messagePointer is automatically destroyed by librdkafka
243- // For safety reasons, we only use it inside of this closure
244- }
245-
246267 /// Check `topicHandles` for a handle matching the topic name and create a new handle if needed.
247268 /// - Parameter topic: The name of the topic that is addressed.
248269 private func createTopicHandleIfNeeded( topic: String ) throws -> OpaquePointer ? {
0 commit comments