@@ -16,17 +16,22 @@ import Foundation
1616import Logging
1717import Testing
1818import Valkey
19+ import XCTest
1920
21+ @Suite (
22+ " Cluster Integration Tests " ,
23+ . serialized,
24+ . disabled( if: clusterFirstNodeHostname == nil , " VALKEY_NODE1_HOSTNAME environment variable is not set. " )
25+ )
2026struct ClusterIntegrationTests {
21-
22- @Test ( . disabled( if: ClusterIntegrationTests . firstNodeHostname == nil , " VALKEY_NODE1_HOSTNAME environment variable is not set. " ) )
27+ @Test
2328 @available ( valkeySwift 1 . 0 , * )
2429 func testSetGet( ) async throws {
2530 var logger = Logger ( label: " ValkeyCluster " )
2631 logger. logLevel = . trace
27- let firstNodeHostname = ClusterIntegrationTests . firstNodeHostname !
28- let firstNodePort = ClusterIntegrationTests . firstNodePort ?? 6379
29- try await Self . withValkeyCluster ( [ ( host: firstNodeHostname, port: firstNodePort, tls: false ) ] ) { ( client, logger ) in
32+ let firstNodeHostname = clusterFirstNodeHostname !
33+ let firstNodePort = clusterFirstNodePort ?? 6379
34+ try await Self . withValkeyCluster ( [ ( host: firstNodeHostname, port: firstNodePort, tls: false ) ] , logger : logger ) { client in
3035 try await Self . withKey ( connection: client) { key in
3136 try await client. set ( key, value: " Hello " )
3237
@@ -36,13 +41,14 @@ struct ClusterIntegrationTests {
3641 }
3742 }
3843
44+ @Test
3945 @available ( valkeySwift 1 . 0 , * )
4046 func testWithConnection( ) async throws {
4147 var logger = Logger ( label: " ValkeyCluster " )
4248 logger. logLevel = . trace
43- let firstNodeHostname = ClusterIntegrationTests . firstNodeHostname !
44- let firstNodePort = ClusterIntegrationTests . firstNodePort ?? 6379
45- try await Self . withValkeyCluster ( [ ( host: firstNodeHostname, port: firstNodePort, tls: false ) ] ) { ( client, logger ) in
49+ let firstNodeHostname = clusterFirstNodeHostname !
50+ let firstNodePort = clusterFirstNodePort ?? 6379
51+ try await Self . withValkeyCluster ( [ ( host: firstNodeHostname, port: firstNodePort, tls: false ) ] , logger : logger ) { client in
4652 try await Self . withKey ( connection: client) { key in
4753 try await client. withConnection ( forKeys: [ key] ) { connection in
4854 _ = try await connection. set ( key, value: " Hello " )
@@ -53,6 +59,36 @@ struct ClusterIntegrationTests {
5359 }
5460 }
5561
62+ @Test
63+ @available ( valkeySwift 1 . 0 , * )
64+ func testFailover( ) async throws {
65+ var logger = Logger ( label: " ValkeyCluster " )
66+ logger. logLevel = . trace
67+ let firstNodeHostname = clusterFirstNodeHostname!
68+ let firstNodePort = clusterFirstNodePort ?? 6379
69+ try await Self . withValkeyCluster ( [ ( host: firstNodeHostname, port: firstNodePort, tls: false ) ] , logger: logger) { clusterClient in
70+ try await Self . withKey ( connection: clusterClient) { key in
71+ try await clusterClient. set ( key, value: " bar " )
72+ let cluster = try await clusterClient. clusterShards ( )
73+ let shard = try #require(
74+ cluster. shards. first { shard in
75+ let hashSlot = HashSlot ( key: key)
76+ return shard. slots [ 0 ] . lowerBound <= hashSlot && shard. slots [ 0 ] . upperBound >= hashSlot
77+ }
78+ )
79+ let replica = try #require( shard. nodes. first { $0. role == . replica } )
80+ let port = try #require( replica. port)
81+ // connect to replica and call CLUSTER FAILOVER
82+ try await withValkeyClient ( . hostname( replica. endpoint, port: port) , logger: logger) { client in
83+ try await client. clusterFailover ( )
84+ }
85+ try await clusterClient. set ( key, value: " baz " )
86+ let response = try await clusterClient. get ( key)
87+ #expect( response. map { String ( buffer: $0) } == " baz " )
88+ }
89+ }
90+ }
91+
5692 @available ( valkeySwift 1 . 0 , * )
5793 static func withKey< Value> (
5894 connection: some ValkeyConnectionProtocol ,
@@ -73,10 +109,9 @@ struct ClusterIntegrationTests {
73109 static func withValkeyCluster< T> (
74110 _ nodeAddresses: [ ( host: String , port: Int , tls: Bool ) ] ,
75111 nodeClientConfiguration: ValkeyClientConfiguration = . init( ) ,
76- _ body: ( ValkeyClusterClient , Logger ) async throws -> sending T
112+ logger: Logger ,
113+ _ body: ( ValkeyClusterClient ) async throws -> sending T
77114 ) async throws -> T {
78- var logger = Logger ( label: " Valkey " )
79- logger. logLevel = . debug
80115 let client = ValkeyClusterClient (
81116 clientConfiguration: nodeClientConfiguration,
82117 nodeDiscovery: ValkeyStaticNodeDiscovery ( nodeAddresses. map { . init( host: $0. host, port: $0. port, useTLS: $0. tls) } ) ,
@@ -90,7 +125,7 @@ struct ClusterIntegrationTests {
90125
91126 let result : Result < T , any Error >
92127 do {
93- result = try await . success( body ( client, logger ) )
128+ result = try await . success( body ( client) )
94129 } catch {
95130 result = . failure( error)
96131 }
@@ -102,14 +137,26 @@ struct ClusterIntegrationTests {
102137 return try result. get ( )
103138 }
104139
105- }
106-
107- extension ClusterIntegrationTests {
108- static var firstNodeHostname : String ? {
109- ProcessInfo . processInfo. environment [ " VALKEY_NODE1_HOSTNAME " ]
110- }
111-
112- static var firstNodePort : Int ? {
113- ProcessInfo . processInfo. environment [ " VALKEY_NODE1_PORT " ] . flatMap { Int ( $0) }
140+ @available ( valkeySwift 1 . 0 , * )
141+ func withValkeyClient(
142+ _ address: ValkeyServerAddress ,
143+ configuration: ValkeyClientConfiguration = . init( ) ,
144+ logger: Logger ,
145+ operation: @escaping @Sendable ( ValkeyClient) async throws -> Void
146+ ) async throws {
147+ try await withThrowingTaskGroup ( of: Void . self) { group in
148+ let client = ValkeyClient ( address, configuration: configuration, logger: logger)
149+ group. addTask {
150+ await client. run ( )
151+ }
152+ group. addTask {
153+ try await operation ( client)
154+ }
155+ try await group. next ( )
156+ group. cancelAll ( )
157+ }
114158 }
115159}
160+
161+ private let clusterFirstNodeHostname : String ? = ProcessInfo . processInfo. environment [ " VALKEY_NODE1_HOSTNAME " ]
162+ private let clusterFirstNodePort : Int ? = ProcessInfo . processInfo. environment [ " VALKEY_NODE1_PORT " ] . flatMap { Int ( $0) }
0 commit comments