@@ -271,8 +271,7 @@ public final class KafkaProducer: Service, Sendable {
271271 throw KafkaError . config (
272272 reason: " Could not initialize transactions because transactionalId is not set in config " )
273273 }
274- // FIXME: maybe add state 'startedWithTransactions'?
275- let client = try self . stateMachine. withLockedValue { try $0. transactionsClient ( ) }
274+ let client = try self . stateMachine. withLockedValue { try $0. initTransactions ( ) }
276275 try await client. initTransactions ( timeout: timeout)
277276 }
278277
@@ -329,6 +328,18 @@ extension KafkaProducer {
329328 source: Producer . Source ? ,
330329 topicHandles: RDKafkaTopicHandles
331330 )
331+ /// The ``KafkaProducer`` has started and is ready to use, transactions were initialized.
332+ ///
333+ /// - Parameter messageIDCounter:Used to incrementally assign unique IDs to messages.
334+ /// - Parameter client: Client used for handling the connection to the Kafka cluster.
335+ /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
336+ /// - Parameter topicHandles: Class containing all topic names with their respective `rd_kafka_topic_t` pointer.
337+ case startedWithTransactions(
338+ client: RDKafkaClient ,
339+ messageIDCounter: UInt ,
340+ source: Producer . Source ? ,
341+ topicHandles: RDKafkaTopicHandles
342+ )
332343 /// Producer is still running but the acknowledgement asynchronous sequence was terminated.
333344 /// All incoming acknowledgements will be dropped.
334345 ///
@@ -396,7 +407,7 @@ extension KafkaProducer {
396407 switch self . state {
397408 case . uninitialized:
398409 fatalError ( " \( #function) invoked while still in state \( self . state) " )
399- case . started( let client, _, let source, _) :
410+ case . started( let client, _, let source, _) , . startedWithTransactions ( let client , _ , let source , _ ) :
400411 return . pollAndYield( client: client, source: source)
401412 case . consumptionStopped( let client) :
402413 return . pollWithoutYield( client: client)
@@ -439,6 +450,19 @@ extension KafkaProducer {
439450 newMessageID: newMessageID,
440451 topicHandles: topicHandles
441452 )
453+ case . startedWithTransactions( let client, let messageIDCounter, let source, let topicHandles) :
454+ let newMessageID = messageIDCounter + 1
455+ self . state = . startedWithTransactions(
456+ client: client,
457+ messageIDCounter: newMessageID,
458+ source: source,
459+ topicHandles: topicHandles
460+ )
461+ return . send(
462+ client: client,
463+ newMessageID: newMessageID,
464+ topicHandles: topicHandles
465+ )
442466 case . consumptionStopped:
443467 throw KafkaError . connectionClosed ( reason: " Sequence consuming acknowledgements was abruptly terminated, producer closed " )
444468 case . finishing:
@@ -464,7 +488,7 @@ extension KafkaProducer {
464488 fatalError ( " \( #function) invoked while still in state \( self . state) " )
465489 case . consumptionStopped:
466490 fatalError ( " messageSequenceTerminated() must not be invoked more than once " )
467- case . started( let client, _, let source, _) :
491+ case . started( let client, _, let source, _) , . startedWithTransactions ( let client , _ , let source , _ ) :
468492 self . state = . consumptionStopped( client: client)
469493 return . finishSource( source: source)
470494 case . finishing( let client, let source) :
@@ -484,7 +508,7 @@ extension KafkaProducer {
484508 switch self . state {
485509 case . uninitialized:
486510 fatalError ( " \( #function) invoked while still in state \( self . state) " )
487- case . started( let client, _, let source, _) :
511+ case . started( let client, _, let source, _) , . startedWithTransactions ( let client , _ , let source , _ ) :
488512 self . state = . finishing( client: client, source: source)
489513 case . consumptionStopped( let client) :
490514 self . state = . finishing( client: client, source: nil )
@@ -493,21 +517,25 @@ extension KafkaProducer {
493517 }
494518 }
495519
496- // TODO:
497- // 1. add client()
498- // 2. initTransactions() -> change state to startedWithTransactions
499- // 3. transactionsClient() -> return client only for startedWithTransactions
500-
501-
502- func transactionsClient( ) throws -> RDKafkaClient {
520+ mutating func initTransactions( ) throws -> RDKafkaClient {
503521 switch self . state {
504522 case . uninitialized:
505523 fatalError ( " \( #function) invoked while still in state \( self . state) " )
506- case . started( let client, _, _, _) :
524+ case . started( let client, let messageIDCounter, let source, let topicHandles) :
525+ self . state = . startedWithTransactions( client: client, messageIDCounter: messageIDCounter, source: source, topicHandles: topicHandles)
507526 return client
508- default :
527+ case . startedWithTransactions:
528+ throw KafkaError . config ( reason: " Transactions were already initialized " )
529+ case . consumptionStopped, . finishing, . finished:
509530 throw KafkaError . connectionClosed ( reason: " Producer is stopping or finished " )
510531 }
511532 }
533+
534+ func transactionsClient( ) throws -> RDKafkaClient {
535+ guard case let . startedWithTransactions( client, _, _, _) = self . state else {
536+ throw KafkaError . transactionAborted ( reason: " Transactions were not initialized or producer is being stopped " )
537+ }
538+ return client
539+ }
512540 }
513541}
0 commit comments