@@ -85,7 +85,6 @@ public final class ValkeyClusterClient: Sendable {
8585 case runClient( ValkeyNodeClient )
8686 case runTimer( ValkeyClusterTimer )
8787 }
88-
8988 private let actionStream : AsyncStream < RunAction >
9089 private let actionStreamContinuation : AsyncStream < RunAction > . Continuation
9190
@@ -109,9 +108,7 @@ public final class ValkeyClusterClient: Sendable {
109108 ) {
110109 self . logger = logger
111110
112- let ( stream, continuation) = AsyncStream . makeStream ( of: RunAction . self)
113- self . actionStream = stream
114- self . actionStreamContinuation = continuation
111+ ( self . actionStream, self . actionStreamContinuation) = AsyncStream . makeStream ( of: RunAction . self)
115112
116113 let factory = ValkeyNodeClientFactory (
117114 logger: logger,
@@ -220,12 +217,17 @@ public final class ValkeyClusterClient: Sendable {
220217 public func run( ) async {
221218 let circuitBreakerTimer = self . stateLock. withLock { $0. start ( ) }
222219
223- self . actionStreamContinuation . yield ( . runTimer( circuitBreakerTimer) )
224- self . actionStreamContinuation . yield ( . runClusterDiscovery( runNodeDiscovery: true ) )
220+ self . queueAction ( . runTimer( circuitBreakerTimer) )
221+ self . queueAction ( . runClusterDiscovery( runNodeDiscovery: true ) )
225222
226223 await withTaskCancellationHandler {
227- await withDiscardingTaskGroup { taskGroup in
228- await self . runUsingTaskGroup ( & taskGroup)
224+ /// Run discarding task group running actions
225+ await withDiscardingTaskGroup { group in
226+ for await action in self . actionStream {
227+ group. addTask {
228+ await self . runAction ( action)
229+ }
230+ }
229231 }
230232 } onCancel: {
231233 _ = self . stateLock. withLock {
@@ -238,51 +240,47 @@ public final class ValkeyClusterClient: Sendable {
238240
239241 // MARK: - Private methods -
240242
243+ private func queueAction( _ action: RunAction ) {
244+ self . actionStreamContinuation. yield ( action)
245+ }
246+
241247 /// Manages the primary task group that handles all client operations.
242248 ///
243249 /// - Parameter taskGroup: The task group to add tasks to.
244- private func runUsingTaskGroup( _ taskGroup: inout DiscardingTaskGroup ) async {
245- for await action in self . actionStream {
246- switch action {
247- case . runClusterDiscovery( let runNodeDiscovery) :
248- taskGroup. addTask {
249- await self . runClusterDiscovery ( runNodeDiscoveryFirst: runNodeDiscovery)
250- }
250+ private func runAction( _ action: RunAction ) async {
251+ switch action {
252+ case . runClusterDiscovery( let runNodeDiscovery) :
253+ await self . runClusterDiscovery ( runNodeDiscoveryFirst: runNodeDiscovery)
251254
252- case . runClient( let client) :
253- taskGroup. addTask {
254- await client. run ( )
255- }
255+ case . runClient( let client) :
256+ await client. run ( )
256257
257- case . runTimer( let timer) :
258+ case . runTimer( let timer) :
259+ await withTaskGroup ( of: Void . self) { taskGroup in
258260 taskGroup. addTask {
259- await withTaskGroup ( of: Void . self) { taskGroup in
260- taskGroup. addTask {
261- do {
262- try await self . clock. sleep ( for: timer. duration)
263- // timer has hit
264- let timerFiredAction = self . stateLock. withLock {
265- $0. timerFired ( timer)
266- }
267- self . runTimerFiredAction ( timerFiredAction)
268- } catch {
269- // do nothing
270- }
271- }
272-
273- let ( stream, continuation) = AsyncStream . makeStream ( of: Void . self)
274- taskGroup. addTask {
275- var iterator = stream. makeAsyncIterator ( )
276- await iterator. next ( )
261+ do {
262+ try await self . clock. sleep ( for: timer. duration)
263+ // timer has hit
264+ let timerFiredAction = self . stateLock. withLock {
265+ $0. timerFired ( timer)
277266 }
267+ self . runTimerFiredAction ( timerFiredAction)
268+ } catch {
269+ // do nothing
270+ }
271+ }
278272
279- let token = self . stateLock. withLock {
280- $0. registerTimerCancellationToken ( continuation, for: timer)
281- }
273+ let ( stream, continuation) = AsyncStream . makeStream ( of: Void . self)
274+ taskGroup. addTask {
275+ var iterator = stream. makeAsyncIterator ( )
276+ await iterator. next ( )
277+ }
282278
283- token ? . finish ( )
284- }
279+ let token = self . stateLock . withLock {
280+ $0 . registerTimerCancellationToken ( continuation , for : timer )
285281 }
282+
283+ token? . finish ( )
286284 }
287285 }
288286 }
@@ -300,7 +298,7 @@ public final class ValkeyClusterClient: Sendable {
300298 }
301299
302300 if let runDiscovery = action. runDiscovery {
303- self . actionStreamContinuation . yield ( . runClusterDiscovery( runNodeDiscovery: runDiscovery. runNodeDiscoveryFirst) )
301+ self . queueAction ( . runClusterDiscovery( runNodeDiscovery: runDiscovery. runNodeDiscoveryFirst) )
304302 }
305303 }
306304
@@ -309,7 +307,7 @@ public final class ValkeyClusterClient: Sendable {
309307 /// - Parameter action: The update action containing clients to run and shut down.
310308 private func runUpdateValkeyNodesAction( _ action: StateMachine . UpdateValkeyNodesAction ) {
311309 for client in action. clientsToRun {
312- self . actionStreamContinuation . yield ( . runClient( client) )
310+ self . queueAction ( . runClient( client) )
313311 }
314312
315313 for client in action. clientsToShutdown {
@@ -328,11 +326,11 @@ public final class ValkeyClusterClient: Sendable {
328326 action. cancelTimer? . yield ( )
329327
330328 if let newTimer = action. createTimer {
331- self . actionStreamContinuation . yield ( . runTimer( newTimer) )
329+ self . queueAction ( . runTimer( newTimer) )
332330 }
333331
334332 for client in action. clientsToRun {
335- self . actionStreamContinuation . yield ( . runClient( client) )
333+ self . queueAction ( . runClient( client) )
336334 }
337335
338336 for client in action. clientsToShutdown {
@@ -345,11 +343,11 @@ public final class ValkeyClusterClient: Sendable {
345343 /// - Parameter action: The action containing operations to perform after failed discovery.
346344 private func runClusterDiscoveryFailedAction( _ action: StateMachine . ClusterDiscoveryFailedAction ) {
347345 if let retryTimer = action. retryTimer {
348- self . actionStreamContinuation . yield ( . runTimer( retryTimer) )
346+ self . queueAction ( . runTimer( retryTimer) )
349347 }
350348
351349 if let circuitBreakerTimer = action. circuitBreakerTimer {
352- self . actionStreamContinuation . yield ( . runTimer( circuitBreakerTimer) )
350+ self . queueAction ( . runTimer( circuitBreakerTimer) )
353351 }
354352 }
355353
@@ -489,10 +487,10 @@ public final class ValkeyClusterClient: Sendable {
489487 private func runMovedToDegraded( _ action: StateMachine . PoolForMovedErrorAction . MoveToDegraded ) {
490488 if let cancelToken = action. runDiscoveryAndCancelTimer {
491489 cancelToken. yield ( )
492- self . actionStreamContinuation . yield ( . runClusterDiscovery( runNodeDiscovery: false ) )
490+ self . queueAction ( . runClusterDiscovery( runNodeDiscovery: false ) )
493491 }
494492
495- self . actionStreamContinuation . yield ( . runTimer( action. circuitBreakerTimer) )
493+ self . queueAction ( . runTimer( action. circuitBreakerTimer) )
496494 }
497495
498496 /// Runs the cluster discovery process to determine the current cluster topology.
0 commit comments