1313//===----------------------------------------------------------------------===//
1414
1515import Crdkafka
16+ import BlockingCallWrapper
1617import Dispatch
1718import Logging
1819
@@ -436,6 +437,14 @@ final class RDKafkaClient: Sendable {
436437 func withKafkaHandlePointer< T> ( _ body: ( OpaquePointer ) throws -> T ) rethrows -> T {
437438 return try body ( self . kafkaHandle)
438439 }
440+
441+ /// Scoped accessor that enables safe access to the pointer of the client's Kafka handle with async closure.
442+ /// - Warning: Do not escape the pointer from the closure for later use.
443+ /// - Parameter body: The closure will use the Kafka handle pointer.
444+ @discardableResult
445+ func withKafkaHandlePointer< T> ( _ body: ( OpaquePointer ) async throws -> T ) async rethrows -> T {
446+ return try await body ( self . kafkaHandle)
447+ }
439448
440449 /// Convert an unsafe`rd_kafka_message_t` object to a safe ``KafkaAcknowledgementResult``.
441450 /// - Parameter messagePointer: An `UnsafePointer` pointing to the `rd_kafka_message_t` object in memory.
@@ -462,4 +471,138 @@ final class RDKafkaClient: Sendable {
462471
463472 return messageResult
464473 }
474+
475+ func initTransactions( timeout: Duration ) async throws {
476+ let result = await forBlockingFunc {
477+ rd_kafka_init_transactions ( self . kafkaHandle, Int32 ( timeout. totalMilliseconds) )
478+ }
479+
480+ if result != nil {
481+ let code = rd_kafka_error_code ( result)
482+ rd_kafka_error_destroy ( result)
483+ throw KafkaError . rdKafkaError ( wrapping: code)
484+ }
485+ }
486+
487+ func beginTransaction( ) throws {
488+ let result = rd_kafka_begin_transaction ( kafkaHandle)
489+ if result != nil {
490+ let code = rd_kafka_error_code ( result)
491+ rd_kafka_error_destroy ( result)
492+ throw KafkaError . rdKafkaError ( wrapping: code)
493+ }
494+ }
495+
496+ func send(
497+ attempts: UInt64 ,
498+ offsets: RDKafkaTopicPartitionList ,
499+ forConsumerKafkaHandle consumer: OpaquePointer ,
500+ timeout: Duration ) async throws {
501+ try await offsets. withListPointer { topicPartitionList in
502+
503+ let consumerMetadata = rd_kafka_consumer_group_metadata ( consumer)
504+ defer { rd_kafka_consumer_group_metadata_destroy ( consumerMetadata) }
505+
506+ // TODO: actually it should be withing some timeout (like transaction timeout or session timeout)
507+ for idx in 0 ..< attempts {
508+ let error = await forBlockingFunc {
509+ rd_kafka_send_offsets_to_transaction ( self . kafkaHandle, topicPartitionList,
510+ consumerMetadata, timeout. totalMillisecondsOrMinusOne)
511+ }
512+
513+ /* check if offset commit is completed successfully */
514+ if error == nil {
515+ return
516+ }
517+ defer { rd_kafka_error_destroy ( error) }
518+
519+ /* check if offset commit is retriable */
520+ if rd_kafka_error_is_retriable ( error) == 1 {
521+ continue
522+ }
523+
524+ /* check if transaction need to be aborted */
525+ if rd_kafka_error_txn_requires_abort ( error) == 1 {
526+ do {
527+ try await abortTransaction ( attempts: attempts - idx, timeout: timeout)
528+ throw KafkaError . transactionAborted ( reason: " Transaction aborted and can be started from scratch " )
529+ } catch {
530+ throw KafkaError . transactionIncomplete (
531+ reason: " Could not complete or abort transaction with error \( error) " )
532+ }
533+ }
534+ let isFatal = ( rd_kafka_error_is_fatal ( error) == 1 ) // fatal when Producer/Consumer must be restarted
535+ throw KafkaError . rdKafkaError ( wrapping: rd_kafka_error_code ( error) , isFatal: isFatal)
536+ }
537+ }
538+ throw KafkaError . transactionOutOfAttempts ( numOfAttempts: attempts)
539+ }
540+
541+ func abortTransaction( attempts: UInt64 , timeout: Duration ) async throws {
542+ for _ in 0 ..< attempts {
543+ let error = await forBlockingFunc {
544+ rd_kafka_abort_transaction ( self . kafkaHandle, timeout. totalMillisecondsOrMinusOne)
545+ }
546+ /* check if transaction abort is completed successfully */
547+ if error == nil {
548+ return
549+ }
550+ defer { rd_kafka_error_destroy ( error) }
551+
552+ /* check if transaction abort is retriable */
553+ if rd_kafka_error_is_retriable ( error) == 1 {
554+ continue
555+ }
556+ let isFatal = ( rd_kafka_error_is_fatal ( error) == 1 ) // fatal when Producer/Consumer must be restarted
557+ throw KafkaError . rdKafkaError ( wrapping: rd_kafka_error_code ( error) , isFatal: isFatal)
558+ }
559+ throw KafkaError . transactionOutOfAttempts ( numOfAttempts: attempts)
560+ }
561+
562+ func commitTransaction( attempts: UInt64 , timeout: Duration ) async throws {
563+ for idx in 0 ..< attempts {
564+ let error = await forBlockingFunc {
565+ rd_kafka_commit_transaction ( self . kafkaHandle, timeout. totalMillisecondsOrMinusOne)
566+ }
567+ /* check if transaction is completed successfully */
568+ if error == nil {
569+ return
570+ }
571+ /* check if transaction is retriable */
572+ if rd_kafka_error_is_retriable ( error) == 1 {
573+ continue
574+ }
575+ defer { rd_kafka_error_destroy ( error) }
576+
577+ /* check if transaction need to be aborted */
578+ if rd_kafka_error_txn_requires_abort ( error) == 1 {
579+ do {
580+ try await abortTransaction ( attempts: attempts - idx, timeout: timeout)
581+ throw KafkaError . transactionAborted ( reason: " Transaction aborted and can be started from scratch " )
582+ } catch {
583+ throw KafkaError . transactionIncomplete (
584+ reason: " Could not complete or abort transaction with error \( error) " )
585+ }
586+ }
587+ /* check if error is fatal */
588+ let isFatal = ( rd_kafka_error_is_fatal ( error) == 1 ) // fatal when Producer/Consumer must be restarted
589+ throw KafkaError . rdKafkaError ( wrapping: rd_kafka_error_code ( error) , isFatal: isFatal)
590+ }
591+ throw KafkaError . transactionOutOfAttempts ( numOfAttempts: attempts)
592+ }
593+ }
594+
595+ // TODO: tmp, should be in other PRs
596+ extension Duration {
597+ // Internal usage only: librdkafka accepts Int32 as timeouts
598+ var totalMilliseconds : Int32 {
599+ return Int32 ( self . components. seconds * 1000 + self . components. attoseconds / 1_000_000_000_000_000 )
600+ }
601+
602+ var totalMillisecondsOrMinusOne : Int32 {
603+ return max ( totalMilliseconds, - 1 )
604+ }
605+
606+ public static var kafkaUntilEndOfTransactionTimeout : Duration = . milliseconds( - 1 )
607+ public static var kafkaNoWaitTransaction : Duration = . zero
465608}
0 commit comments