You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
* Adapt `swift-service-lifecycle`
Modifications:
* add `swift-service-lifecycle` aka `ServicLifecycle` dependency to
`Package.swift`
* add `Sendable` conformance to the following types:
* `KafkaProducerConfiguration`
* `KafkaConsumerConfiguration`
* `KafkaTopicConfiguration`
* `KafkaPartition`
* `KafkaClient`
* `KafkaConsumerMessages`
* `KafkaConsumerMessage`
* `RDKafkaConfig.CapturedClosures`
* `RDKafkaTopicHandles`
* add `@unchecked Sendable` conformance to the following types:
* `KafkaProducer`
* `KafkaConsumer`
* add `Service` conformance to the following types:
* `KafkaProducer`
* `KafkaConsumer`
* `SwiftKafkaTests`:
* use `ServiceGroup`s
* `KafkaProducerTests`:
* use `ServiceGroup`s
* remove `testFlushQueuedProducerMessages` as it relied on the now
`private` implementation detail `triggerGracefulShutdown`
* remove `testProducerNotUsableAfterShutdown` as it relied on the now
`private` implementation detail `triggerGracefulShutdown`
* `testNoMemoryLeakAfterShutdown`: remove obsolete wait for timeout
as flushing in `KafkaProducer` is now non-blocking
* `KafkaConsumer`:
* remove `triggerGracefulShutdown` on `deinit` -> this is now
`ServicLifecycle`'s responsibility
* refactor `RDKafkaConfig` to make `RDKafkaConfig.CapturedClosures`
sendable
* Remove `ShutdownOnTerminate` for `AsyncSequence`s
Motivation:
Exiting the `run()` loop early is an error in `swift-service-lifecycle`.
Therefore we **don't** want to invoke
`KafkaProducer/KafkaConsumer.triggerGracefulShutdown()` when the our
`NIOAsyncSequence`s have stopped being consumed
Modifications:
* remove `Kafka[Producer|Consumer]ShutdownOnTerminate` behaviour
* adjust tests
* Review Franz
Modifications:
* remove `*Service` suffix from `consumerService` and `producerService`
in tests and README
* remove old comment in `KafkaConsumer`
Copy file name to clipboardExpand all lines: README.md
+32-11Lines changed: 32 additions & 11 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -23,6 +23,10 @@ Finally, add `import SwiftKafka` to your source code.
23
23
24
24
## Usage
25
25
26
+
`SwiftKafka` should be used within a [`Swift Service Lifecycle`](https://github.com/swift-server/swift-service-lifecycle)
27
+
[`ServiceGroup`](https://swiftpackageindex.com/swift-server/swift-service-lifecycle/main/documentation/servicelifecycle/servicegroup) for proper startup and shutdown handling.
28
+
Both the `KafkaProducer` and the `KafkaConsumer` implement the [`Service`](https://swiftpackageindex.com/swift-server/swift-service-lifecycle/main/documentation/servicelifecycle/service) protocol.
29
+
26
30
### Producer API
27
31
28
32
The `send(_:)` method of `KafkaProducer` returns a message-id that can later be used to identify the corresponding acknowledgement. Acknowledgements are received through the `acknowledgements`[`AsyncSequence`](https://developer.apple.com/documentation/swift/asyncsequence). Each acknowledgement indicates that producing a message was successful or returns an error.
@@ -32,14 +36,19 @@ let config = KafkaProducerConfiguration(bootstrapServers: ["localhost:9092"])
32
36
33
37
let (producer, acknowledgements) =try KafkaProducer.makeProducerWithAcknowledgements(
34
38
config: config,
35
-
logger: .kafkaTest// Your logger here
39
+
logger: logger
36
40
)
37
41
38
42
awaitwithThrowingTaskGroup(of: Void.self) { group in
/// Used to configure new topics created by the ``KafkaProducer``.
16
-
publicstructKafkaTopicConfiguration:Hashable{
16
+
publicstructKafkaTopicConfiguration{
17
17
vardictionary:[String:String]=[:]
18
18
19
19
/// This field indicates the number of acknowledgements the leader broker must receive from ISR brokers before responding to the request: 0=Broker does not send any response/ack to client, -1 or all=Broker will block until message is committed by all in sync replicas (ISRs). If there are less than min.insync.replicas (broker configuration) in the ISR set the produce request will fail.
0 commit comments