@@ -89,6 +89,7 @@ final class KafkaClient: Sendable {
8989 /// Swift wrapper for events from `librdkafka`'s event queue.
9090 enum KafkaEvent {
9191 case deliveryReport( results: [ Result < KafkaAcknowledgedMessage , KafkaAcknowledgedMessageError > ] )
92+ case consumerMessages( result: Result < KafkaConsumerMessage , Error > )
9293 }
9394
9495 /// Poll the event `rd_kafka_queue_t` for new events.
@@ -110,9 +111,15 @@ final class KafkaClient: Sendable {
110111 switch eventType {
111112 case . deliveryReport:
112113 let forwardEvent = self . handleDeliveryReportEvent ( event)
113- events. append ( forwardEvent) // Return KafkaEvent.deliveryReport as part of this method
114+ events. append ( forwardEvent)
115+ case . fetch:
116+ if let forwardEvent = self . handleFetchEvent ( event) {
117+ events. append ( forwardEvent)
118+ }
114119 case . log:
115120 self . handleLogEvent ( event)
121+ case . offsetCommit:
122+ self . handleOffsetCommitEvent ( event)
116123 case . none:
117124 // Finished reading events, return early
118125 return events
@@ -140,9 +147,30 @@ final class KafkaClient: Sendable {
140147 deliveryReportResults. append ( message)
141148 }
142149
150+ // The returned message(s) MUST NOT be freed with rd_kafka_message_destroy().
143151 return . deliveryReport( results: deliveryReportResults)
144152 }
145153
154+ /// Handle event of type `RDKafkaEvent.fetch`.
155+ ///
156+ /// - Parameter event: Pointer to underlying `rd_kafka_event_t`.
157+ /// - Returns: `KafkaEvent` to be returned as part of ``KafkaClient.eventPoll()`.
158+ private func handleFetchEvent( _ event: OpaquePointer ? ) -> KafkaEvent ? {
159+ do {
160+ // RD_KAFKA_EVENT_FETCH only returns a single message:
161+ // https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a3a855eb7bdf17f5797d4911362a5fc7c
162+ if let messagePointer = rd_kafka_event_message_next ( event) {
163+ let message = try KafkaConsumerMessage ( messagePointer: messagePointer)
164+ return . consumerMessages( result: . success( message) )
165+ } else {
166+ return nil
167+ }
168+ } catch {
169+ return . consumerMessages( result: . failure( error) )
170+ }
171+ // The returned message(s) MUST NOT be freed with rd_kafka_message_destroy().
172+ }
173+
146174 /// Handle event of type `RDKafkaEvent.log`.
147175 ///
148176 /// - Parameter event: Pointer to underlying `rd_kafka_event_t`.
@@ -183,6 +211,25 @@ final class KafkaClient: Sendable {
183211 }
184212 }
185213
214+ /// Handle event of type `RDKafkaEvent.offsetCommit`.
215+ ///
216+ /// - Parameter event: Pointer to underlying `rd_kafka_event_t`.
217+ private func handleOffsetCommitEvent( _ event: OpaquePointer ? ) {
218+ guard let opaquePointer = rd_kafka_event_opaque ( event) else {
219+ fatalError ( " Could not resolve reference to catpured Swift callback instance " )
220+ }
221+ let opaque = Unmanaged < CapturedCommitCallback > . fromOpaque ( opaquePointer) . takeUnretainedValue ( )
222+ let actualCallback = opaque. closure
223+
224+ let error = rd_kafka_event_error ( event)
225+ guard error == RD_KAFKA_RESP_ERR_NO_ERROR else {
226+ let kafkaError = KafkaError . rdKafkaError ( wrapping: error)
227+ actualCallback ( . failure( kafkaError) )
228+ return
229+ }
230+ actualCallback ( . success( ( ) ) )
231+ }
232+
186233 /// Redirect the main ``KafkaClient/poll(timeout:)`` queue to the `KafkaConsumer`'s
187234 /// queue (``KafkaClient/consumerPoll``).
188235 ///
@@ -197,32 +244,6 @@ final class KafkaClient: Sendable {
197244 }
198245 }
199246
200- /// Request a new message from the Kafka cluster.
201- ///
202- /// - Important: This method should only be invoked from ``KafkaConsumer``.
203- ///
204- /// - Returns: A ``KafkaConsumerMessage`` or `nil` if there are no new messages.
205- /// - Throws: A ``KafkaError`` if the received message is an error message or malformed.
206- func consumerPoll( ) throws -> KafkaConsumerMessage ? {
207- guard let messagePointer = rd_kafka_consumer_poll ( self . kafkaHandle, 0 ) else {
208- // No error, there might be no more messages
209- return nil
210- }
211-
212- defer {
213- // Destroy message otherwise poll() will block forever
214- rd_kafka_message_destroy ( messagePointer)
215- }
216-
217- // Reached the end of the topic+partition queue on the broker
218- if messagePointer. pointee. err == RD_KAFKA_RESP_ERR__PARTITION_EOF {
219- return nil
220- }
221-
222- let message = try KafkaConsumerMessage ( messagePointer: messagePointer)
223- return message
224- }
225-
226247 /// Subscribe to topic set using balanced consumer groups.
227248 /// - Parameter topicPartitionList: Pointer to a list of topics + partition pairs.
228249 func subscribe( topicPartitionList: RDKafkaTopicPartitionList ) throws {
@@ -285,39 +306,12 @@ final class KafkaClient: Sendable {
285306 // should not be counted in ARC as this can lead to memory leaks.
286307 let opaquePointer : UnsafeMutableRawPointer ? = Unmanaged . passUnretained ( capturedClosure) . toOpaque ( )
287308
288- let consumerQueue = rd_kafka_queue_get_consumer ( self . kafkaHandle)
289-
290- // Create a C closure that calls the captured closure
291- let callbackWrapper : (
292- @convention ( c) (
293- OpaquePointer ? ,
294- rd_kafka_resp_err_t ,
295- UnsafeMutablePointer < rd_kafka_topic_partition_list_t > ? ,
296- UnsafeMutableRawPointer ?
297- ) -> Void
298- ) = { _, error, _, opaquePointer in
299-
300- guard let opaquePointer = opaquePointer else {
301- fatalError ( " Could not resolve reference to catpured Swift callback instance " )
302- }
303- let opaque = Unmanaged < CapturedCommitCallback > . fromOpaque ( opaquePointer) . takeUnretainedValue ( )
304-
305- let actualCallback = opaque. closure
306-
307- if error == RD_KAFKA_RESP_ERR_NO_ERROR {
308- actualCallback ( . success( ( ) ) )
309- } else {
310- let kafkaError = KafkaError . rdKafkaError ( wrapping: error)
311- actualCallback ( . failure( kafkaError) )
312- }
313- }
314-
315309 changesList. withListPointer { listPointer in
316310 rd_kafka_commit_queue (
317311 self . kafkaHandle,
318312 listPointer,
319- consumerQueue ,
320- callbackWrapper ,
313+ self . mainQueue ,
314+ nil ,
321315 opaquePointer
322316 )
323317 }
0 commit comments