Skip to content

Commit e162aac

Browse files
committed
Add node selection methods
Signed-off-by: Adam Fowler <adamfowler71@gmail.com>
1 parent 01bd337 commit e162aac

File tree

7 files changed

+104
-43
lines changed

7 files changed

+104
-43
lines changed

Sources/Valkey/Cluster/ValkeyClusterClient.swift

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,14 @@ public final class ValkeyClusterClient: Sendable {
146146
@inlinable
147147
public func execute<Command: ValkeyCommand>(_ command: Command) async throws -> Command.Response {
148148
let hashSlot = try self.hashSlot(for: command.keysAffected)
149-
let readOnly = self.clientConfiguration.useReadOnlyReplicas && command.isReadOnly
149+
let nodeSelection =
150+
if command.isReadOnly {
151+
self.clientConfiguration.readOnlyReplicaSelection.clusterNodeSelection
152+
} else {
153+
ValkeyClusterNodeSelection.primary
154+
}
150155
var clientSelector: () async throws -> ValkeyNodeClient = {
151-
try await self.nodeClient(for: hashSlot.map { [$0] } ?? [], readOnly: readOnly)
156+
try await self.nodeClient(for: hashSlot.map { [$0] } ?? [], nodeSelection: nodeSelection)
152157
}
153158

154159
var asking = false
@@ -470,7 +475,13 @@ public final class ValkeyClusterClient: Sendable {
470475
operation: (ValkeyConnection) async throws -> sending Value
471476
) async throws -> Value {
472477
let hashSlots = keys.compactMap { HashSlot(key: $0) }
473-
let node = try await self.nodeClient(for: hashSlots, readOnly: self.clientConfiguration.useReadOnlyReplicas && readOnly)
478+
let nodeSelection =
479+
if readOnly {
480+
self.clientConfiguration.readOnlyReplicaSelection.clusterNodeSelection
481+
} else {
482+
ValkeyClusterNodeSelection.primary
483+
}
484+
let node = try await self.nodeClient(for: hashSlots, nodeSelection: nodeSelection)
474485
return try await node.withConnection(isolation: isolation, operation: operation)
475486
}
476487

@@ -572,7 +583,7 @@ public final class ValkeyClusterClient: Sendable {
572583
// Get hash slot for key and add all the commands you have iterated through so far to the
573584
// node associated with that key and break out of loop
574585
let hashSlot = try self.hashSlot(for: keysAffected)
575-
let node = try await self.nodeClient(for: hashSlot.map { [$0] } ?? [], readOnly: false)
586+
let node = try await self.nodeClient(for: hashSlot.map { [$0] } ?? [], nodeSelection: .primary)
576587
let address = node.serverAddress
577588
let nodeAndCommands = NodeAndCommands(node: node, commandIndices: .init(commands.startIndex..<index))
578589
nodeMap[address] = nodeAndCommands
@@ -588,7 +599,7 @@ public final class ValkeyClusterClient: Sendable {
588599
if keysAffected.count > 0 {
589600
// If command affects a key get hash slot for key and add command to the node associated with that key
590601
let hashSlot = try self.hashSlot(for: keysAffected)
591-
let node = try await self.nodeClient(for: hashSlot.map { [$0] } ?? [], readOnly: false)
602+
let node = try await self.nodeClient(for: hashSlot.map { [$0] } ?? [], nodeSelection: .primary)
592603
prevAddress = node.serverAddress
593604
nodeMap[prevAddress, default: .init(node: node, commandIndices: [])].commandIndices.append(index)
594605
} else {
@@ -599,7 +610,7 @@ public final class ValkeyClusterClient: Sendable {
599610
}
600611
} else {
601612
// if none of the commands affect any keys then choose a random node
602-
let node = try await self.nodeClient(for: [], readOnly: false)
613+
let node = try await self.nodeClient(for: [], nodeSelection: .primary)
603614
let address = node.serverAddress
604615
let nodeAndCommands = NodeAndCommands(node: node, commandIndices: .init(commands.startIndex..<index))
605616
nodeMap[address] = nodeAndCommands
@@ -856,14 +867,17 @@ public final class ValkeyClusterClient: Sendable {
856867
/// - `ValkeyClusterError.clusterIsUnavailable` if no healthy nodes are available
857868
/// - `ValkeyClusterError.clusterIsMissingSlotAssignment` if the slot assignment cannot be determined
858869
@inlinable
859-
package func nodeClient(for slots: some (Collection<HashSlot> & Sendable), readOnly: Bool) async throws -> ValkeyNodeClient {
870+
package func nodeClient(
871+
for slots: some (Collection<HashSlot> & Sendable),
872+
nodeSelection: ValkeyClusterNodeSelection
873+
) async throws -> ValkeyNodeClient {
860874
var retries = 0
861875
while retries < 3 {
862876
defer { retries += 1 }
863877

864878
do {
865879
return try self.stateLock.withLock { state -> ValkeyNodeClient in
866-
try state.poolFastPath(for: slots, readOnly: readOnly)
880+
try state.poolFastPath(for: slots, nodeSelection: nodeSelection)
867881
}
868882
} catch let error as ValkeyClusterError where error == .clusterIsUnavailable {
869883
let waiterID = self.nextRequestID()

Sources/Valkey/Cluster/ValkeyClusterClientStateMachine.swift

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -532,19 +532,17 @@ where
532532
}
533533

534534
@inlinable
535-
package func poolFastPath(for slots: some Collection<HashSlot>, readOnly: Bool) throws(ValkeyClusterError) -> ConnectionPool {
535+
package func poolFastPath(
536+
for slots: some Collection<HashSlot>,
537+
nodeSelection: ValkeyClusterNodeSelection
538+
) throws(ValkeyClusterError) -> ConnectionPool {
536539
switch self.clusterState {
537540
case .unavailable:
538541
throw ValkeyClusterError.clusterIsUnavailable
539542

540543
case .degraded(let context):
541544
let shardID = try context.hashSlotShardMap.nodeID(for: slots)
542-
let nodeID =
543-
if readOnly {
544-
shardID.replicas.randomElement() ?? shardID.primary
545-
} else {
546-
shardID.primary
547-
}
545+
let nodeID = nodeSelection.select(nodeIDs: shardID)
548546
if let pool = self.runningClients[nodeID]?.pool {
549547
return pool
550548
}
@@ -555,12 +553,7 @@ where
555553

556554
case .healthy(let context):
557555
let shardID = try context.hashSlotShardMap.nodeID(for: slots)
558-
let nodeID =
559-
if readOnly {
560-
shardID.replicas.randomElement() ?? shardID.primary
561-
} else {
562-
shardID.primary
563-
}
556+
let nodeID = nodeSelection.select(nodeIDs: shardID)
564557
if let pool = self.runningClients[nodeID]?.pool {
565558
return pool
566559
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
//
2+
// This source file is part of the valkey-swift project
3+
// Copyright (c) 2025 the valkey-swift project authors
4+
//
5+
// See LICENSE.txt for license information
6+
// SPDX-License-Identifier: Apache-2.0
7+
//
8+
9+
@usableFromInline
10+
package enum ValkeyClusterNodeSelection: Sendable {
11+
case primary
12+
case cycleReplicas(Int)
13+
14+
@usableFromInline
15+
func select(nodeIDs: ValkeyShardNodeIDs) -> ValkeyNodeID {
16+
switch self {
17+
case .primary:
18+
return nodeIDs.primary
19+
case .cycleReplicas(let index):
20+
guard nodeIDs.replicas.count > 0 else { return nodeIDs.primary }
21+
return nodeIDs.replicas[index % nodeIDs.replicas.count]
22+
}
23+
}
24+
}
25+
extension ValkeyClientConfiguration.ReadOnlyReplicaSelection {
26+
@usableFromInline
27+
var clusterNodeSelection: ValkeyClusterNodeSelection {
28+
switch self.value {
29+
case .none:
30+
.primary
31+
case .cycle:
32+
.cycleReplicas(Self.idGenerator.next())
33+
}
34+
}
35+
36+
static let idGenerator: IDGenerator = .init()
37+
}

Sources/Valkey/Subscriptions/ValkeyClusterClient+subscribe.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ extension ValkeyClusterClient {
2020
isolation: isolated (any Actor)? = #isolation,
2121
_ operation: (ValkeyConnection) async throws -> sending Value
2222
) async throws -> sending Value {
23-
let node = try await self.nodeClient(for: [], readOnly: false)
23+
let node = try await self.nodeClient(for: [], nodeSelection: .primary)
2424
let id = node.subscriptionConnectionIDGenerator.next()
2525

2626
let connection = try await withTaskCancellationHandler {

Sources/Valkey/ValkeyClientConfiguration.swift

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,21 @@ public struct ValkeyClientConfiguration: Sendable {
153153
}
154154
}
155155

156+
/// Determine how replicas are chosen for readonly commands
157+
public struct ReadOnlyReplicaSelection: Sendable {
158+
enum _Internal {
159+
case none
160+
case cycle
161+
}
162+
163+
let value: _Internal
164+
165+
/// Do not use readonly replicas
166+
public static var none: Self { .init(value: .none) }
167+
/// Cycle through replicas
168+
public static var cycle: Self { .init(value: .cycle) }
169+
}
170+
156171
/// The authentication credentials for the connection.
157172
public var authentication: Authentication?
158173
/// The connection pool configuration.
@@ -171,13 +186,15 @@ public struct ValkeyClientConfiguration: Sendable {
171186
/// The TLS to use for the Valkey connection.
172187
public var tls: TLS
173188

174-
/// Execute readonly commands on replicas.
189+
/// Determine how we chose replicas for readonly commands
190+
///
191+
/// A nil value indicates we do not want to use readonly replicas
175192
///
176193
/// Cluster by default will redirect commands from replica nodes to the primary node.
177-
/// Setting this flag will allow replicas to run readonly commands. This will reduce
194+
/// Setting this value will allow replicas to run readonly commands. This will reduce
178195
/// load on your primary nodes but there is a chance you will receive stale data as
179196
/// the replica is not up to date.
180-
public var useReadOnlyReplicas: Bool
197+
public var readOnlyReplicaSelection: ReadOnlyReplicaSelection
181198

182199
#if DistributedTracingSupport
183200
/// The distributed tracing configuration to use for the Valkey connection.
@@ -195,7 +212,7 @@ public struct ValkeyClientConfiguration: Sendable {
195212
/// - commandTimeout: The timeout for a connection response.
196213
/// - blockingCommandTimeout: The timeout for a blocking command response.
197214
/// - tls: The TLS configuration.
198-
/// - useReadOnlyReplicas: Execute readonly commands on replicas
215+
/// - readOnlyReplicaSelection: Whether we want to select replicas for readonly commands and how we do it
199216
public init(
200217
authentication: Authentication? = nil,
201218
connectionPool: ConnectionPool = .init(),
@@ -204,7 +221,7 @@ public struct ValkeyClientConfiguration: Sendable {
204221
commandTimeout: Duration = .seconds(30),
205222
blockingCommandTimeout: Duration = .seconds(120),
206223
tls: TLS = .disable,
207-
useReadOnlyReplicas: Bool = false
224+
readOnlyReplicaSelection: ReadOnlyReplicaSelection = .none
208225
) {
209226
self.authentication = authentication
210227
self.connectionPool = connectionPool
@@ -213,6 +230,6 @@ public struct ValkeyClientConfiguration: Sendable {
213230
self.commandTimeout = commandTimeout
214231
self.blockingCommandTimeout = blockingCommandTimeout
215232
self.tls = tls
216-
self.useReadOnlyReplicas = useReadOnlyReplicas
233+
self.readOnlyReplicaSelection = readOnlyReplicaSelection
217234
}
218235
}

Tests/ClusterIntegrationTests/ClusterIntegrationTests.swift

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -288,11 +288,11 @@ struct ClusterIntegrationTests {
288288
afterMigrate: () async throws -> Void = {},
289289
finished: () async throws -> Void = {}
290290
) async throws {
291-
let nodeAClient = try await client.nodeClient(for: [hashSlot], readOnly: false)
291+
let nodeAClient = try await client.nodeClient(for: [hashSlot], nodeSelection: .primary)
292292
// find another shard
293293
var nodeBClient: ValkeyNodeClient
294294
repeat {
295-
nodeBClient = try await client.nodeClient(for: [HashSlot(rawValue: Int.random(in: 0..<16384))!], readOnly: false)
295+
nodeBClient = try await client.nodeClient(for: [HashSlot(rawValue: Int.random(in: 0..<16384))!], nodeSelection: .primary)
296296
} while nodeAClient === nodeBClient
297297

298298
guard let (hostnameA, portA) = nodeAClient.serverAddress.getHostnameAndPort() else { return }
@@ -359,7 +359,7 @@ struct ClusterIntegrationTests {
359359
try await ClusterIntegrationTests.withValkeyCluster([(host: firstNodeHostname, port: firstNodePort)], logger: logger) {
360360
client in
361361
try await ClusterIntegrationTests.withKey(connection: client, suffix: "{foo}") { key in
362-
let node = try await client.nodeClient(for: [HashSlot(key: key)], readOnly: false)
362+
let node = try await client.nodeClient(for: [HashSlot(key: key)], nodeSelection: .primary)
363363
var commands: [any ValkeyCommand] = .init()
364364
commands.append(SET(key, value: "cluster pipeline test"))
365365
commands.append(GET(key))
@@ -381,13 +381,13 @@ struct ClusterIntegrationTests {
381381
client in
382382
try await ClusterIntegrationTests.withKey(connection: client, suffix: "{foo}") { key in
383383
let hashSlot = HashSlot(key: key)
384-
let node = try await client.nodeClient(for: [hashSlot], readOnly: false)
384+
let node = try await client.nodeClient(for: [hashSlot], nodeSelection: .primary)
385385
// get a key from same node
386386
let key2 = try await {
387387
while true {
388388
let key2 = ValkeyKey(UUID().uuidString)
389389
let hashSlot2 = HashSlot(key: key2)
390-
let node2 = try await client.nodeClient(for: [hashSlot2], readOnly: false)
390+
let node2 = try await client.nodeClient(for: [hashSlot2], nodeSelection: .primary)
391391
if node2.serverAddress == node.serverAddress {
392392
return key2
393393
}
@@ -418,12 +418,12 @@ struct ClusterIntegrationTests {
418418
client in
419419
try await ClusterIntegrationTests.withKey(connection: client, suffix: "{foo}") { key in
420420
let hashSlot = HashSlot(key: key)
421-
let node = try await client.nodeClient(for: [hashSlot], readOnly: false)
421+
let node = try await client.nodeClient(for: [hashSlot], nodeSelection: .primary)
422422
let key2 = try await {
423423
while true {
424424
let key2 = ValkeyKey(UUID().uuidString)
425425
let hashSlot2 = HashSlot(key: key2)
426-
let node2 = try await client.nodeClient(for: [hashSlot2], readOnly: false)
426+
let node2 = try await client.nodeClient(for: [hashSlot2], nodeSelection: .primary)
427427
if node2.serverAddress != node.serverAddress {
428428
return key2
429429
}
@@ -453,12 +453,12 @@ struct ClusterIntegrationTests {
453453
client in
454454
try await ClusterIntegrationTests.withKey(connection: client, suffix: "{foo}") { key in
455455
let hashSlot = HashSlot(key: key)
456-
let node = try await client.nodeClient(for: [hashSlot], readOnly: false)
456+
let node = try await client.nodeClient(for: [hashSlot], nodeSelection: .primary)
457457
let key2 = try await {
458458
while true {
459459
let key2 = ValkeyKey(UUID().uuidString)
460460
let hashSlot2 = HashSlot(key: key2)
461-
let node2 = try await client.nodeClient(for: [hashSlot2], readOnly: false)
461+
let node2 = try await client.nodeClient(for: [hashSlot2], nodeSelection: .primary)
462462
if node2.serverAddress != node.serverAddress {
463463
return key2
464464
}
@@ -488,7 +488,7 @@ struct ClusterIntegrationTests {
488488
try await ClusterIntegrationTests.withValkeyCluster([(host: firstNodeHostname, port: firstNodePort)], logger: logger) {
489489
clusterClient in
490490
try await ClusterIntegrationTests.withKey(connection: clusterClient) { key in
491-
let node = try await clusterClient.nodeClient(for: [HashSlot(key: key)], readOnly: false)
491+
let node = try await clusterClient.nodeClient(for: [HashSlot(key: key)], nodeSelection: .primary)
492492
try await clusterClient.set(key, value: "bar")
493493
let cluster = try await clusterClient.clusterShards()
494494
let shard = try #require(
@@ -530,7 +530,7 @@ struct ClusterIntegrationTests {
530530
let keySuffix = "{\(UUID().uuidString)}"
531531
try await ClusterIntegrationTests.withKey(connection: client, suffix: keySuffix) { key in
532532
let hashSlot = HashSlot(key: key)
533-
let node = try await client.nodeClient(for: [hashSlot], readOnly: false)
533+
let node = try await client.nodeClient(for: [hashSlot], nodeSelection: .primary)
534534
try await client.set(key, value: "Testing before import")
535535

536536
try await ClusterIntegrationTests.testMigratingHashSlot(hashSlot, client: client) {
@@ -566,7 +566,7 @@ struct ClusterIntegrationTests {
566566
try await ClusterIntegrationTests.withKey(connection: client, suffix: keySuffix) { key in
567567
try await ClusterIntegrationTests.withKey(connection: client, suffix: keySuffix) { key2 in
568568
let hashSlot = HashSlot(key: key)
569-
let node = try await client.nodeClient(for: [hashSlot], readOnly: false)
569+
let node = try await client.nodeClient(for: [hashSlot], nodeSelection: .primary)
570570
try await client.lpush(key, elements: ["testing1"])
571571

572572
try await ClusterIntegrationTests.testMigratingHashSlot(hashSlot, client: client) {
@@ -838,7 +838,7 @@ struct ClusterIntegrationTests {
838838
@available(valkeySwift 1.0, *)
839839
static func withValkeyCluster<T>(
840840
_ nodeAddresses: [(host: String, port: Int)],
841-
nodeClientConfiguration: ValkeyClientConfiguration = .init(useReadOnlyReplicas: true),
841+
nodeClientConfiguration: ValkeyClientConfiguration = .init(readOnlyReplicaSelection: .cycle),
842842
logger: Logger,
843843
_ body: (ValkeyClusterClient) async throws -> sending T
844844
) async throws -> T {

Tests/ValkeyTests/Cluster/ValkeyClusterClientStateMachineTests.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ struct ValkeyClusterClientStateMachineTests {
5757

5858
// try to get shard
5959
#expect(throws: ValkeyClusterError.clusterIsUnavailable) {
60-
try stateMachine.poolFastPath(for: CollectionOfOne(HashSlot(rawValue: 100)!), readOnly: false)
60+
try stateMachine.poolFastPath(for: CollectionOfOne(HashSlot(rawValue: 100)!), nodeSelection: .primary)
6161
}
6262
// let's register for the slow path
6363
let successNotifier = SuccessNotifier()
@@ -94,7 +94,7 @@ struct ValkeyClusterClientStateMachineTests {
9494

9595
// try to get shard
9696
#expect(throws: ValkeyClusterError.clusterIsUnavailable) {
97-
try stateMachine.poolFastPath(for: CollectionOfOne(HashSlot(rawValue: 100)!), readOnly: false)
97+
try stateMachine.poolFastPath(for: CollectionOfOne(HashSlot(rawValue: 100)!), nodeSelection: .primary)
9898
}
9999
// let's register for the slow path
100100
let successNotifier = SuccessNotifier()

0 commit comments

Comments
 (0)