1212//
1313//===----------------------------------------------------------------------===//
1414
15- import Crdkafka
1615import Logging
1716import NIOConcurrencyHelpers
1817import NIOCore
@@ -122,7 +121,7 @@ public final class KafkaConsumer: Sendable, Service {
122121 self . config = config
123122 self . logger = logger
124123
125- let client = try RDKafka . createClient (
124+ let client = try RDKafkaClient . makeClient (
126125 type: . consumer,
127126 configDictionary: config. dictionary,
128127 events: [ . log, . fetch, . offsetCommit] ,
@@ -149,8 +148,8 @@ public final class KafkaConsumer: Sendable, Service {
149148 )
150149 }
151150
152- // Events that would be triggered by ``KafkaClient /poll(timeout:)``
153- // are now triggered by ``KafkaClient /consumerPoll``.
151+ // Events that would be triggered by ``RDKafkaClient /poll(timeout:)``
152+ // are now triggered by ``RDKafkaClient /consumerPoll``.
154153 try client. pollSetConsumer ( )
155154
156155 switch config. consumptionStrategy. _internal {
@@ -289,7 +288,7 @@ public final class KafkaConsumer: Sendable, Service {
289288 }
290289
291290 private func _triggerGracefulShutdown(
292- client: KafkaClient ,
291+ client: RDKafkaClient ,
293292 logger: Logger
294293 ) {
295294 do {
@@ -323,27 +322,27 @@ extension KafkaConsumer {
323322 /// - Parameter client: Client used for handling the connection to the Kafka cluster.
324323 /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
325324 case initializing(
326- client: KafkaClient ,
325+ client: RDKafkaClient ,
327326 source: Producer . Source
328327 )
329328 /// The ``KafkaConsumer`` is consuming messages.
330329 ///
331330 /// - Parameter client: Client used for handling the connection to the Kafka cluster.
332331 /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
333332 case consuming(
334- client: KafkaClient ,
333+ client: RDKafkaClient ,
335334 source: Producer . Source
336335 )
337336 /// Consumer is still running but the messages asynchronous sequence was terminated.
338337 /// All incoming messages will be dropped.
339338 ///
340339 /// - Parameter client: Client used for handling the connection to the Kafka cluster.
341- case consumptionStopped( client: KafkaClient )
340+ case consumptionStopped( client: RDKafkaClient )
342341 /// The ``KafkaConsumer/triggerGracefulShutdown()`` has been invoked.
343342 /// We are now in the process of commiting our last state to the broker.
344343 ///
345344 /// - Parameter client: Client used for handling the connection to the Kafka cluster.
346- case finishing( client: KafkaClient )
345+ case finishing( client: RDKafkaClient )
347346 /// The ``KafkaConsumer`` is closed.
348347 case finished
349348 }
@@ -354,7 +353,7 @@ extension KafkaConsumer {
354353 /// Delayed initialization of `StateMachine` as the `source` and the `pollClosure` are
355354 /// not yet available when the normal initialization occurs.
356355 mutating func initialize(
357- client: KafkaClient ,
356+ client: RDKafkaClient ,
358357 source: Producer . Source
359358 ) {
360359 guard case . uninitialized = self . state else {
@@ -373,15 +372,15 @@ extension KafkaConsumer {
373372 /// - Parameter client: Client used for handling the connection to the Kafka cluster.
374373 /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
375374 case pollForAndYieldMessage(
376- client: KafkaClient ,
375+ client: RDKafkaClient ,
377376 source: Producer . Source
378377 )
379378 /// The ``KafkaConsumer`` stopped consuming messages or
380379 /// is in the process of shutting down.
381380 /// Poll to serve any queued events and commit outstanding state to the broker.
382381 ///
383382 /// - Parameter client: Client used for handling the connection to the Kafka cluster.
384- case pollWithoutYield( client: KafkaClient )
383+ case pollWithoutYield( client: RDKafkaClient )
385384 /// Terminate the poll loop.
386385 case terminatePollLoop
387386 }
@@ -416,7 +415,7 @@ extension KafkaConsumer {
416415 enum SetUpConnectionAction {
417416 /// Set up the connection through ``subscribe()`` or ``assign()``.
418417 /// - Parameter client: Client used for handling the connection to the Kafka cluster.
419- case setUpConnection( client: KafkaClient )
418+ case setUpConnection( client: RDKafkaClient )
420419 }
421420
422421 /// Get action to be taken when wanting to set up the connection through ``subscribe()`` or ``assign()``.
@@ -458,7 +457,7 @@ extension KafkaConsumer {
458457 enum StoreOffsetAction {
459458 /// Store the message offset with the given `client`.
460459 /// - Parameter client: Client used for handling the connection to the Kafka cluster.
461- case storeOffset( client: KafkaClient )
460+ case storeOffset( client: RDKafkaClient )
462461 }
463462
464463 /// Get action to take when wanting to store a message offset (to be auto-committed by `librdkafka`).
@@ -483,7 +482,7 @@ extension KafkaConsumer {
483482 ///
484483 /// - Parameter client: Client used for handling the connection to the Kafka cluster.
485484 case commitSync(
486- client: KafkaClient
485+ client: RDKafkaClient
487486 )
488487 /// Throw an error. The ``KafkaConsumer`` is closed.
489488 case throwClosedError
@@ -514,14 +513,14 @@ extension KafkaConsumer {
514513 ///
515514 /// - Parameter client: Client used for handling the connection to the Kafka cluster.
516515 case triggerGracefulShutdown(
517- client: KafkaClient
516+ client: RDKafkaClient
518517 )
519518 /// Shut down the ``KafkaConsumer`` and finish the given `source` object.
520519 ///
521520 /// - Parameter client: Client used for handling the connection to the Kafka cluster.
522521 /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
523522 case triggerGracefulShutdownAndFinishSource(
524- client: KafkaClient ,
523+ client: RDKafkaClient ,
525524 source: Producer . Source
526525 )
527526 }
0 commit comments