@@ -163,7 +163,7 @@ package struct ValkeyClusterClientStateMachine<
163163 @usableFromInline
164164 /* private */ var clusterState : ClusterState
165165 @usableFromInline
166- /* private */ var runningClients : [ ValkeyNodeID : NodeBundle ] = [ : ]
166+ /* private */ var runningClients : ValkeyRunningClientsStateMachine < ConnectionPool , ConnectionPoolFactory >
167167 @usableFromInline
168168 /* private */ var configuration : ValkeyClusterClientStateMachineConfiguration
169169
@@ -182,7 +182,7 @@ package struct ValkeyClusterClientStateMachine<
182182 )
183183 )
184184 self . refreshState = . notRefreshing
185- self . runningClients = [ : ]
185+ self . runningClients = . init ( poolFactory : poolFactory )
186186 self . configuration = configuration
187187 self . clock = clock
188188 self . poolFactory = poolFactory
@@ -231,8 +231,8 @@ package struct ValkeyClusterClientStateMachine<
231231 case . refreshing:
232232 switch self . clusterState {
233233 case . unavailable, . degraded, . healthy:
234- let action = self . updateNodes ( newNodes, removeUnmentionedPools: false )
235- let voters = self . allNodeClients ( ) . map { ValkeyClusterVoter ( client: $0. pool, nodeID: $0. nodeID) }
234+ let action = self . runningClients . updateNodes ( newNodes, removeUnmentionedPools: false )
235+ let voters = self . runningClients . clients . map { ValkeyClusterVoter ( client: $0. pool, nodeID: $0. nodeID) }
236236 return . init(
237237 clientsToRun: action. poolsToRun. map ( \. 0 ) ,
238238 clientsToShutdown: action. poolsToShutdown,
@@ -246,7 +246,7 @@ package struct ValkeyClusterClientStateMachine<
246246 }
247247
248248 package func getInitialVoters( ) -> [ ValkeyClusterVoter < ConnectionPool > ] {
249- self . allNodeClients ( ) . map { ValkeyClusterVoter ( client: $0. pool, nodeID: $0. nodeID) }
249+ self . runningClients . clients . map { ValkeyClusterVoter ( client: $0. pool, nodeID: $0. nodeID) }
250250 }
251251
252252 package struct ClusterDiscoverySucceededAction {
@@ -278,7 +278,7 @@ package struct ValkeyClusterClientStateMachine<
278278 self . refreshState = . waitingForRefresh( . init( id: refreshTimerID) , previousRefresh: . init( consecutiveFailures: 0 ) )
279279
280280 let newShards = description. shards
281- let poolUpdate = self . updateNodes (
281+ let poolUpdate = self . runningClients . updateNodes (
282282 newShards. lazy. flatMap { $0. nodes. lazy. map { ValkeyNodeDescription ( description: $0) } } ,
283283 removeUnmentionedPools: true
284284 )
@@ -694,7 +694,7 @@ package struct ValkeyClusterClientStateMachine<
694694
695695 case . refreshing:
696696 let newShards = description. shards
697- let poolActions = self . updateNodes (
697+ let poolActions = self . runningClients . updateNodes (
698698 newShards. lazy. flatMap { $0. nodes. lazy. map { ValkeyNodeDescription ( description: $0) } } ,
699699 removeUnmentionedPools: false
700700 )
@@ -719,80 +719,18 @@ package struct ValkeyClusterClientStateMachine<
719719 case . unavailable, . degraded, . healthy:
720720 self . clusterState = . shutdown
721721 let existingNodes = self . runningClients
722- self . runningClients. removeAll ( keepingCapacity : false )
723- return existingNodes. values . lazy. map { $0. pool }
722+ self . runningClients. removeAll ( )
723+ return existingNodes. clients . lazy. map { $0. pool }
724724
725725 case . shutdown:
726726 return [ ]
727727 }
728728 }
729729
730- private struct PoolUpdateAction {
731- var poolsToShutdown : [ ConnectionPool ]
732- var poolsToRun : [ ( ConnectionPool , ValkeyNodeID ) ]
733-
734- static func empty( ) -> PoolUpdateAction { PoolUpdateAction ( poolsToShutdown: [ ] , poolsToRun: [ ] ) }
735- }
736-
737- private mutating func updateNodes(
738- _ newNodes: some Collection < ValkeyNodeDescription > ,
739- removeUnmentionedPools: Bool
740- ) -> PoolUpdateAction {
741- var previousNodes = self . runningClients
742- self . runningClients. removeAll ( keepingCapacity: true )
743- var newPools = [ ( ConnectionPool, ValkeyNodeID) ] ( )
744- newPools. reserveCapacity ( 16 )
745- var poolsToShutdown = [ ConnectionPool] ( )
746-
747- for newNodeDescription in newNodes {
748- // if we had a pool previously, let's continue to use it!
749- if let existingPool = previousNodes. removeValue ( forKey: newNodeDescription. id) {
750- if newNodeDescription == existingPool. nodeDescription {
751- // the existing pool matches the new node description. nothing todo
752- self . runningClients [ newNodeDescription. id] = existingPool
753- } else {
754- // the existing pool does not match new node description. For example tls may now be required.
755- // shutdown the old pool and create a new one
756- poolsToShutdown. append ( existingPool. pool)
757- let newPool = self . makePool ( for: newNodeDescription)
758- self . runningClients [ newNodeDescription. id] = NodeBundle ( pool: newPool, nodeDescription: newNodeDescription)
759- newPools. append ( ( newPool, newNodeDescription. id) )
760- }
761- } else {
762- let newPool = self . makePool ( for: newNodeDescription)
763- self . runningClients [ newNodeDescription. id] = NodeBundle ( pool: newPool, nodeDescription: newNodeDescription)
764- newPools. append ( ( newPool, newNodeDescription. id) )
765- }
766- }
767-
768- if removeUnmentionedPools {
769- poolsToShutdown. append ( contentsOf: previousNodes. values. lazy. map { $0. pool } )
770-
771- return PoolUpdateAction (
772- poolsToShutdown: poolsToShutdown,
773- poolsToRun: newPools
774- )
775- }
776-
777- // re-add pools that were not part of the node list.
778- for (nodeID, poolDescription) in previousNodes {
779- self . runningClients [ nodeID] = poolDescription
780- }
781-
782- return PoolUpdateAction (
783- poolsToShutdown: poolsToShutdown,
784- poolsToRun: newPools
785- )
786- }
787-
788730 private func makePool( for description: ValkeyNodeDescription ) -> ConnectionPool {
789731 self . poolFactory. makeConnectionPool ( nodeDescription: description)
790732 }
791733
792- private func allNodeClients( ) -> some Collection < NodeBundle > {
793- self . runningClients. values
794- }
795-
796734 private mutating func nextTimerID( ) -> Int {
797735 defer { self . _nextTimerID += 1 }
798736 return self . _nextTimerID
0 commit comments