Skip to content

Commit 83c2cbd

Browse files
committed
Add cluster node selection for readonly commands
Signed-off-by: Adam Fowler <adamfowler71@gmail.com>
1 parent e162aac commit 83c2cbd

File tree

3 files changed

+59
-12
lines changed

3 files changed

+59
-12
lines changed

Sources/Valkey/Cluster/ValkeyClusterClient.swift

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -258,9 +258,16 @@ public final class ValkeyClusterClient: Sendable {
258258
_ commands: [any ValkeyCommand]
259259
) async -> [Result<RESPToken, any Error>] {
260260
guard commands.count > 0 else { return [] }
261+
let readOnlyCommand = commands.reduce(true) { $0 && $1.isReadOnly }
262+
let nodeSelection =
263+
if readOnlyCommand {
264+
self.clientConfiguration.readOnlyReplicaSelection.clusterNodeSelection
265+
} else {
266+
ValkeyClusterNodeSelection.primary
267+
}
261268
// get a list of nodes and the commands that should be run on them
262269
do {
263-
let nodes = try await self.splitCommandsAcrossNodes(commands: commands)
270+
let nodes = try await self.splitCommandsAcrossNodes(commands: commands, nodeSelection: nodeSelection)
264271
// if this list has one element, then just run the pipeline on that single node
265272
if nodes.count == 1 {
266273
do {
@@ -570,7 +577,10 @@ public final class ValkeyClusterClient: Sendable {
570577
/// These array of indices are then used to create collections of commands to
571578
/// run on each node
572579
@usableFromInline
573-
func splitCommandsAcrossNodes(commands: [any ValkeyCommand]) async throws -> [ValkeyServerAddress: NodeAndCommands].Values {
580+
func splitCommandsAcrossNodes(
581+
commands: [any ValkeyCommand],
582+
nodeSelection: ValkeyClusterNodeSelection
583+
) async throws -> [ValkeyServerAddress: NodeAndCommands].Values {
574584
var nodeMap: [ValkeyServerAddress: NodeAndCommands] = [:]
575585
var index = commands.startIndex
576586
var prevAddress: ValkeyServerAddress? = nil
@@ -583,7 +593,7 @@ public final class ValkeyClusterClient: Sendable {
583593
// Get hash slot for key and add all the commands you have iterated through so far to the
584594
// node associated with that key and break out of loop
585595
let hashSlot = try self.hashSlot(for: keysAffected)
586-
let node = try await self.nodeClient(for: hashSlot.map { [$0] } ?? [], nodeSelection: .primary)
596+
let node = try await self.nodeClient(for: hashSlot.map { [$0] } ?? [], nodeSelection: nodeSelection)
587597
let address = node.serverAddress
588598
let nodeAndCommands = NodeAndCommands(node: node, commandIndices: .init(commands.startIndex..<index))
589599
nodeMap[address] = nodeAndCommands
@@ -599,7 +609,7 @@ public final class ValkeyClusterClient: Sendable {
599609
if keysAffected.count > 0 {
600610
// If command affects a key get hash slot for key and add command to the node associated with that key
601611
let hashSlot = try self.hashSlot(for: keysAffected)
602-
let node = try await self.nodeClient(for: hashSlot.map { [$0] } ?? [], nodeSelection: .primary)
612+
let node = try await self.nodeClient(for: hashSlot.map { [$0] } ?? [], nodeSelection: nodeSelection)
603613
prevAddress = node.serverAddress
604614
nodeMap[prevAddress, default: .init(node: node, commandIndices: [])].commandIndices.append(index)
605615
} else {
@@ -610,7 +620,7 @@ public final class ValkeyClusterClient: Sendable {
610620
}
611621
} else {
612622
// if none of the commands affect any keys then choose a random node
613-
let node = try await self.nodeClient(for: [], nodeSelection: .primary)
623+
let node = try await self.nodeClient(for: [], nodeSelection: nodeSelection)
614624
let address = node.serverAddress
615625
let nodeAndCommands = NodeAndCommands(node: node, commandIndices: .init(commands.startIndex..<index))
616626
nodeMap[address] = nodeAndCommands

Tests/ClusterIntegrationTests/ClusterIntegrationTests.swift

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -610,13 +610,16 @@ struct ClusterIntegrationTests {
610610
[(host: firstNodeHostname, port: firstNodePort)],
611611
logger: logger
612612
) { client in
613-
let nodesAndIndices = try await client.splitCommandsAcrossNodes(commands: values.commands)
614-
#expect(nodesAndIndices.count == values.selection.count)
615-
let sortedNodeAndIndics = nodesAndIndices.sorted { $0.commandIndices[0] < $1.commandIndices[0] }
616-
var iterator = sortedNodeAndIndics.makeIterator()
617-
var expectedIterator = values.selection.makeIterator()
618-
while let result = iterator.next() {
619-
#expect(result.commandIndices == expectedIterator.next())
613+
let nodeSelections: [ValkeyClusterNodeSelection] = [.primary, .cycleReplicas(234)]
614+
for selection in nodeSelections {
615+
let nodesAndIndices = try await client.splitCommandsAcrossNodes(commands: values.commands, nodeSelection: selection)
616+
#expect(nodesAndIndices.count == values.selection.count)
617+
let sortedNodeAndIndics = nodesAndIndices.sorted { $0.commandIndices[0] < $1.commandIndices[0] }
618+
var iterator = sortedNodeAndIndics.makeIterator()
619+
var expectedIterator = values.selection.makeIterator()
620+
while let result = iterator.next() {
621+
#expect(result.commandIndices == expectedIterator.next())
622+
}
620623
}
621624
}
622625
}
@@ -665,6 +668,40 @@ struct ClusterIntegrationTests {
665668
}
666669
}
667670

671+
@Test
672+
@available(valkeySwift 1.0, *)
673+
func testClientPipelineMultipleNodesReadonly() async throws {
674+
var logger = Logger(label: "ValkeyCluster")
675+
logger.logLevel = .trace
676+
let firstNodeHostname = clusterFirstNodeHostname!
677+
let firstNodePort = clusterFirstNodePort ?? 6379
678+
try await ClusterIntegrationTests.withValkeyCluster([(host: firstNodeHostname, port: firstNodePort)], logger: logger) {
679+
client in
680+
let keys = (0..<100).map { ValkeyKey("Test\($0)") }
681+
var setCommands: [any ValkeyCommand] = .init()
682+
for key in keys {
683+
setCommands.append(SET(key, value: key.description))
684+
}
685+
_ = await client.execute(setCommands)
686+
var getCommands: [any ValkeyCommand] = .init()
687+
for i in 0..<100 {
688+
let key = ValkeyKey("Test\(i)")
689+
getCommands.append(GET(key))
690+
}
691+
let results = await client.execute(getCommands)
692+
let response = try results[0].get().decode(as: String.self)
693+
#expect(response == "Test0")
694+
let response2 = try results[2].get().decode(as: String.self)
695+
#expect(response2 == "Test2")
696+
697+
var delCommands: [any ValkeyCommand] = .init()
698+
for key in keys {
699+
delCommands.append(DEL(keys: [key]))
700+
}
701+
_ = await client.execute(delCommands)
702+
}
703+
}
704+
668705
@Test
669706
@available(valkeySwift 1.0, *)
670707
func testClientPipelineMultipleNodesParameterPack() async throws {

0 commit comments

Comments
 (0)