@@ -64,8 +64,8 @@ public final class ValkeyClusterClient: Sendable {
6464
6565 @usableFromInline
6666 typealias StateMachine = ValkeyClusterClientStateMachine <
67- ValkeyClient ,
68- ValkeyClientFactory ,
67+ ValkeyNodeClient ,
68+ ValkeyNodeClientFactory ,
6969 ContinuousClock ,
7070 CheckedContinuation < Void , any Error > ,
7171 AsyncStream < Void > . Continuation
@@ -82,7 +82,7 @@ public final class ValkeyClusterClient: Sendable {
8282
8383 private enum RunAction {
8484 case runClusterDiscovery( runNodeDiscovery: Bool )
85- case runClient( ValkeyClient )
85+ case runClient( ValkeyNodeClient )
8686 case runTimer( ValkeyClusterTimer )
8787 }
8888
@@ -113,7 +113,7 @@ public final class ValkeyClusterClient: Sendable {
113113 self . actionStream = stream
114114 self . actionStreamContinuation = continuation
115115
116- let factory = ValkeyClientFactory (
116+ let factory = ValkeyNodeClientFactory (
117117 logger: logger,
118118 configuration: clientConfiguration,
119119 connectionFactory: ValkeyConnectionFactory (
@@ -153,8 +153,8 @@ public final class ValkeyClusterClient: Sendable {
153153 @inlinable
154154 public func send< Command: ValkeyCommand > ( command: Command ) async throws -> Command . Response {
155155 let hashSlots = command. keysAffected. map { HashSlot ( key: $0) }
156- var clientSelector : ( ) async throws -> ValkeyClient = {
157- try await self . client ( for: hashSlots)
156+ var clientSelector : ( ) async throws -> ValkeyNodeClient = {
157+ try await self . nodeClient ( for: hashSlots)
158158 }
159159
160160 while !Task. isCancelled {
@@ -168,7 +168,7 @@ public final class ValkeyClusterClient: Sendable {
168168 throw error
169169 }
170170 self . logger. trace ( " Received move error " , metadata: [ " error " : " \( movedError) " ] )
171- clientSelector = { try await self . client ( for: movedError) }
171+ clientSelector = { try await self . nodeClient ( for: movedError) }
172172 }
173173 }
174174 throw CancellationError ( )
@@ -188,8 +188,8 @@ public final class ValkeyClusterClient: Sendable {
188188 operation: ( ValkeyConnection ) async throws -> sending Value
189189 ) async throws -> Value {
190190 let hashSlots = keys. map { HashSlot ( key: $0) }
191- let client = try await self . client ( for: hashSlots)
192- return try await client . withConnection ( isolation: isolation, operation: operation)
191+ let node = try await self . nodeClient ( for: hashSlots)
192+ return try await node . withConnection ( isolation: isolation, operation: operation)
193193 }
194194
195195 /// Starts running the cluster client.
@@ -361,13 +361,13 @@ public final class ValkeyClusterClient: Sendable {
361361 /// MOVED responses from Valkey nodes.
362362 ///
363363 /// - Parameter moveError: The MOVED error response from a Valkey node.
364- /// - Returns: A client connected to the node that can handle the request.
364+ /// - Returns: A ``ValkeyNode`` connected to the node that can handle the request.
365365 /// - Throws:
366366 /// - `ValkeyClusterError.waitedForDiscoveryAfterMovedErrorThreeTimes` if unable to resolve
367367 /// the MOVED error after multiple attempts
368368 /// - `ValkeyClusterError.clientRequestCancelled` if the request is cancelled
369369 @usableFromInline
370- /* private */ func client ( for moveError: ValkeyMovedError ) async throws -> ValkeyClient {
370+ /* private */ func nodeClient ( for moveError: ValkeyMovedError ) async throws -> ValkeyNodeClient {
371371 var counter = 0
372372 while counter < 3 {
373373 defer { counter += 1 }
@@ -376,8 +376,8 @@ public final class ValkeyClusterClient: Sendable {
376376 }
377377
378378 switch action {
379- case . connectionPool( let client ) :
380- return client
379+ case . connectionPool( let node ) :
380+ return node
381381
382382 case . waitForDiscovery:
383383 break
@@ -414,25 +414,25 @@ public final class ValkeyClusterClient: Sendable {
414414 throw ValkeyClusterError . waitedForDiscoveryAfterMovedErrorThreeTimes
415415 }
416416
417- /// Retrieves a client for communicating with nodes that manage the given hash slots.
417+ /// Retrieves a ``ValkeyNode`` for communicating with nodes that manage the given hash slots.
418418 ///
419419 /// This is a lower-level method that can be used when you need direct access to a
420- /// specific `ValkeyClient ` instance for nodes managing particular hash slots. Most users
420+ /// specific `ValkeyNode ` instance for nodes managing particular hash slots. Most users
421421 /// should prefer the higher-level `send(command:)` method.
422422 ///
423423 /// - Parameter slots: The collection of hash slots to determine which node to connect to.
424- /// - Returns: A `ValkeyClient ` instance connected to the appropriate node.
424+ /// - Returns: A `ValkeyNode ` instance connected to the appropriate node.
425425 /// - Throws:
426426 /// - `ValkeyClusterError.clusterIsUnavailable` if no healthy nodes are available
427427 /// - `ValkeyClusterError.clusterIsMissingSlotAssignment` if the slot assignment cannot be determined
428428 @inlinable
429- func client ( for slots: some ( Collection < HashSlot > & Sendable ) ) async throws -> ValkeyClient {
429+ func nodeClient ( for slots: some ( Collection < HashSlot > & Sendable ) ) async throws -> ValkeyNodeClient {
430430 var retries = 0
431431 while retries < 3 {
432432 defer { retries += 1 }
433433
434434 do {
435- return try self . stateLock. withLock { state -> ValkeyClient in
435+ return try self . stateLock. withLock { state -> ValkeyNodeClient in
436436 try state. poolFastPath ( for: slots)
437437 }
438438 } catch ValkeyClusterError . clusterIsUnavailable {
@@ -539,7 +539,7 @@ public final class ValkeyClusterClient: Sendable {
539539 ///
540540 /// - Returns: A list of voters that can participate in cluster topology election.
541541 /// - Throws: Any error encountered during node discovery.
542- private func runNodeDiscovery( ) async throws -> [ ValkeyClusterVoter < ValkeyClient > ] {
542+ private func runNodeDiscovery( ) async throws -> [ ValkeyClusterVoter < ValkeyNodeClient > ] {
543543 do {
544544 self . logger. trace ( " Running node discovery " )
545545 let nodes = try await self . nodeDiscovery. lookupNodes ( )
@@ -577,11 +577,11 @@ public final class ValkeyClusterClient: Sendable {
577577 /// - Parameter voters: The list of nodes that can vote on cluster topology.
578578 /// - Returns: The agreed-upon cluster description.
579579 /// - Throws: `ValkeyClusterError.clusterIsUnavailable` if consensus cannot be reached.
580- private func runClusterDiscoveryFindingConsensus( voters: [ ValkeyClusterVoter < ValkeyClient > ] ) async throws -> ValkeyClusterDescription {
580+ private func runClusterDiscoveryFindingConsensus( voters: [ ValkeyClusterVoter < ValkeyNodeClient > ] ) async throws -> ValkeyClusterDescription {
581581 try await withThrowingTaskGroup ( of: ( ValkeyClusterDescription, ValkeyNodeID) . self) { taskGroup in
582582 for voter in voters {
583583 taskGroup. addTask {
584- ( try await voter. client. clusterShards ( ) , voter. nodeID)
584+ ( try await voter. client. send ( command : CLUSTER . SHARDS ( ) ) , voter. nodeID)
585585 }
586586 }
587587
@@ -625,7 +625,7 @@ public final class ValkeyClusterClient: Sendable {
625625
626626 for voter in actions. voters {
627627 taskGroup. addTask {
628- ( try await voter. client. clusterShards ( ) , voter. nodeID)
628+ ( try await voter. client. send ( command : CLUSTER . SHARDS ( ) ) , voter. nodeID)
629629 }
630630 }
631631
@@ -650,80 +650,3 @@ public final class ValkeyClusterClient: Sendable {
650650/// This allows the cluster client to be used anywhere a `ValkeyClientProtocol` is expected.
651651@available ( valkeySwift 1 . 0 , * )
652652extension ValkeyClusterClient : ValkeyClientProtocol { }
653-
654- /// Extension that makes ``ValkeyClient`` conform to ``ValkeyNodeConnectionPool``.
655- ///
656- /// This enables the ``ValkeyClusterClient`` to manage individual ``ValkeyClient`` instances.
657- @available ( valkeySwift 1 . 0 , * )
658- extension ValkeyClient : ValkeyNodeConnectionPool {
659- /// Initiates a graceful shutdown of the client.
660- ///
661- /// This method attempts to cleanly shut down the client's connections.
662- /// If not implemented, it falls back to force shutdown.
663- @usableFromInline
664- package func triggerGracefulShutdown( ) {
665- // TODO: Implement graceful shutdown
666- self . triggerForceShutdown ( )
667- }
668- }
669-
670- /// A factory for creating ``ValkeyClient`` instances to connect to specific nodes.
671- ///
672- /// This factory is used by the ``ValkeyClusterClient`` to create client instances
673- /// for each node in the cluster as needed.
674- @available ( valkeySwift 1 . 0 , * )
675- @usableFromInline
676- package struct ValkeyClientFactory : ValkeyNodeConnectionPoolFactory {
677- @usableFromInline
678- package typealias ConnectionPool = ValkeyClient
679-
680- var logger : Logger
681- var configuration : ValkeyClientConfiguration
682- var eventLoopGroup : any EventLoopGroup
683- let connectionIDGenerator = ConnectionIDGenerator ( )
684- let connectionFactory : ValkeyConnectionFactory
685-
686- /// Creates a new `ValkeyClientFactory` instance.
687- ///
688- /// - Parameters:
689- /// - logger: The logger used for diagnostic information.
690- /// - configuration: Configuration for the Valkey clients created by this factory.
691- /// - eventLoopGroup: The event loop group to use for client connections.
692- package init (
693- logger: Logger ,
694- configuration: ValkeyClientConfiguration ,
695- connectionFactory: ValkeyConnectionFactory ,
696- eventLoopGroup: any EventLoopGroup
697- ) {
698- self . logger = logger
699- self . configuration = configuration
700- self . connectionFactory = connectionFactory
701- self . eventLoopGroup = eventLoopGroup
702- }
703-
704- /// Creates a connection pool (client) for a specific node in the cluster.
705- ///
706- /// - Parameter nodeDescription: Description of the node to connect to.
707- /// - Returns: A configured `ValkeyClient` instance ready to connect to the specified node.
708- @usableFromInline
709- package func makeConnectionPool( nodeDescription: ValkeyNodeDescription ) -> ValkeyClient {
710- let serverAddress = ValkeyServerAddress . hostname (
711- nodeDescription. endpoint,
712- port: nodeDescription. port
713- )
714-
715- var clientConfiguration = self . configuration
716- if !nodeDescription. useTLS {
717- // TODO: Should this throw? What about the other way around?
718- clientConfiguration. tls = . disable
719- }
720-
721- return ValkeyClient (
722- serverAddress,
723- connectionIDGenerator: self . connectionIDGenerator,
724- connectionFactory: self . connectionFactory,
725- eventLoopGroup: self . eventLoopGroup,
726- logger: self . logger
727- )
728- }
729- }
0 commit comments