Skip to content

Commit 01bd337

Browse files
committed
Add support for reading from readonly replicas
Signed-off-by: Adam Fowler <adamfowler71@gmail.com>
1 parent 16ce7dd commit 01bd337

File tree

6 files changed

+70
-27
lines changed

6 files changed

+70
-27
lines changed

Sources/Valkey/Cluster/ValkeyClusterClient.swift

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,9 @@ public final class ValkeyClusterClient: Sendable {
146146
@inlinable
147147
public func execute<Command: ValkeyCommand>(_ command: Command) async throws -> Command.Response {
148148
let hashSlot = try self.hashSlot(for: command.keysAffected)
149+
let readOnly = self.clientConfiguration.useReadOnlyReplicas && command.isReadOnly
149150
var clientSelector: () async throws -> ValkeyNodeClient = {
150-
try await self.nodeClient(for: hashSlot.map { [$0] } ?? [])
151+
try await self.nodeClient(for: hashSlot.map { [$0] } ?? [], readOnly: readOnly)
151152
}
152153

153154
var asking = false
@@ -464,11 +465,12 @@ public final class ValkeyClusterClient: Sendable {
464465
@inlinable
465466
public func withConnection<Value>(
466467
forKeys keys: some Collection<ValkeyKey>,
468+
readOnly: Bool = false,
467469
isolation: isolated (any Actor)? = #isolation,
468470
operation: (ValkeyConnection) async throws -> sending Value
469471
) async throws -> Value {
470472
let hashSlots = keys.compactMap { HashSlot(key: $0) }
471-
let node = try await self.nodeClient(for: hashSlots)
473+
let node = try await self.nodeClient(for: hashSlots, readOnly: self.clientConfiguration.useReadOnlyReplicas && readOnly)
472474
return try await node.withConnection(isolation: isolation, operation: operation)
473475
}
474476

@@ -570,7 +572,7 @@ public final class ValkeyClusterClient: Sendable {
570572
// Get hash slot for key and add all the commands you have iterated through so far to the
571573
// node associated with that key and break out of loop
572574
let hashSlot = try self.hashSlot(for: keysAffected)
573-
let node = try await self.nodeClient(for: hashSlot.map { [$0] } ?? [])
575+
let node = try await self.nodeClient(for: hashSlot.map { [$0] } ?? [], readOnly: false)
574576
let address = node.serverAddress
575577
let nodeAndCommands = NodeAndCommands(node: node, commandIndices: .init(commands.startIndex..<index))
576578
nodeMap[address] = nodeAndCommands
@@ -586,7 +588,7 @@ public final class ValkeyClusterClient: Sendable {
586588
if keysAffected.count > 0 {
587589
// If command affects a key get hash slot for key and add command to the node associated with that key
588590
let hashSlot = try self.hashSlot(for: keysAffected)
589-
let node = try await self.nodeClient(for: hashSlot.map { [$0] } ?? [])
591+
let node = try await self.nodeClient(for: hashSlot.map { [$0] } ?? [], readOnly: false)
590592
prevAddress = node.serverAddress
591593
nodeMap[prevAddress, default: .init(node: node, commandIndices: [])].commandIndices.append(index)
592594
} else {
@@ -597,7 +599,7 @@ public final class ValkeyClusterClient: Sendable {
597599
}
598600
} else {
599601
// if none of the commands affect any keys then choose a random node
600-
let node = try await self.nodeClient(for: [])
602+
let node = try await self.nodeClient(for: [], readOnly: false)
601603
let address = node.serverAddress
602604
let nodeAndCommands = NodeAndCommands(node: node, commandIndices: .init(commands.startIndex..<index))
603605
nodeMap[address] = nodeAndCommands
@@ -854,14 +856,14 @@ public final class ValkeyClusterClient: Sendable {
854856
/// - `ValkeyClusterError.clusterIsUnavailable` if no healthy nodes are available
855857
/// - `ValkeyClusterError.clusterIsMissingSlotAssignment` if the slot assignment cannot be determined
856858
@inlinable
857-
package func nodeClient(for slots: some (Collection<HashSlot> & Sendable)) async throws -> ValkeyNodeClient {
859+
package func nodeClient(for slots: some (Collection<HashSlot> & Sendable), readOnly: Bool) async throws -> ValkeyNodeClient {
858860
var retries = 0
859861
while retries < 3 {
860862
defer { retries += 1 }
861863

862864
do {
863865
return try self.stateLock.withLock { state -> ValkeyNodeClient in
864-
try state.poolFastPath(for: slots)
866+
try state.poolFastPath(for: slots, readOnly: readOnly)
865867
}
866868
} catch let error as ValkeyClusterError where error == .clusterIsUnavailable {
867869
let waiterID = self.nextRequestID()

Sources/Valkey/Cluster/ValkeyClusterClientStateMachine.swift

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -532,14 +532,20 @@ where
532532
}
533533

534534
@inlinable
535-
package func poolFastPath(for slots: some Collection<HashSlot>) throws(ValkeyClusterError) -> ConnectionPool {
535+
package func poolFastPath(for slots: some Collection<HashSlot>, readOnly: Bool) throws(ValkeyClusterError) -> ConnectionPool {
536536
switch self.clusterState {
537537
case .unavailable:
538538
throw ValkeyClusterError.clusterIsUnavailable
539539

540540
case .degraded(let context):
541541
let shardID = try context.hashSlotShardMap.nodeID(for: slots)
542-
if let pool = self.runningClients[shardID.primary]?.pool {
542+
let nodeID =
543+
if readOnly {
544+
shardID.replicas.randomElement() ?? shardID.primary
545+
} else {
546+
shardID.primary
547+
}
548+
if let pool = self.runningClients[nodeID]?.pool {
543549
return pool
544550
}
545551
// If we don't have a node for a shard, that means that this shard got created from
@@ -549,7 +555,13 @@ where
549555

550556
case .healthy(let context):
551557
let shardID = try context.hashSlotShardMap.nodeID(for: slots)
552-
if let pool = self.runningClients[shardID.primary]?.pool {
558+
let nodeID =
559+
if readOnly {
560+
shardID.replicas.randomElement() ?? shardID.primary
561+
} else {
562+
shardID.primary
563+
}
564+
if let pool = self.runningClients[nodeID]?.pool {
553565
return pool
554566
}
555567
// If we don't have a node for a shard, that means that this shard got created from

Sources/Valkey/Subscriptions/ValkeyClusterClient+subscribe.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ extension ValkeyClusterClient {
2020
isolation: isolated (any Actor)? = #isolation,
2121
_ operation: (ValkeyConnection) async throws -> sending Value
2222
) async throws -> sending Value {
23-
let node = try await self.nodeClient(for: [])
23+
let node = try await self.nodeClient(for: [], readOnly: false)
2424
let id = node.subscriptionConnectionIDGenerator.next()
2525

2626
let connection = try await withTaskCancellationHandler {

Sources/Valkey/ValkeyClientConfiguration.swift

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,14 @@ public struct ValkeyClientConfiguration: Sendable {
171171
/// The TLS to use for the Valkey connection.
172172
public var tls: TLS
173173

174+
/// Execute readonly commands on replicas.
175+
///
176+
/// Cluster by default will redirect commands from replica nodes to the primary node.
177+
/// Setting this flag will allow replicas to run readonly commands. This will reduce
178+
/// load on your primary nodes but there is a chance you will receive stale data as
179+
/// the replica is not up to date.
180+
public var useReadOnlyReplicas: Bool
181+
174182
#if DistributedTracingSupport
175183
/// The distributed tracing configuration to use for the Valkey connection.
176184
/// Defaults to using the globally bootstrapped tracer with OpenTelemetry semantic conventions.
@@ -187,14 +195,16 @@ public struct ValkeyClientConfiguration: Sendable {
187195
/// - commandTimeout: The timeout for a connection response.
188196
/// - blockingCommandTimeout: The timeout for a blocking command response.
189197
/// - tls: The TLS configuration.
198+
/// - useReadOnlyReplicas: Execute readonly commands on replicas
190199
public init(
191200
authentication: Authentication? = nil,
192201
connectionPool: ConnectionPool = .init(),
193202
keepAliveBehavior: KeepAliveBehavior = .init(),
194203
retryParameters: RetryParameters = .init(),
195204
commandTimeout: Duration = .seconds(30),
196205
blockingCommandTimeout: Duration = .seconds(120),
197-
tls: TLS = .disable
206+
tls: TLS = .disable,
207+
useReadOnlyReplicas: Bool = false
198208
) {
199209
self.authentication = authentication
200210
self.connectionPool = connectionPool
@@ -203,5 +213,6 @@ public struct ValkeyClientConfiguration: Sendable {
203213
self.commandTimeout = commandTimeout
204214
self.blockingCommandTimeout = blockingCommandTimeout
205215
self.tls = tls
216+
self.useReadOnlyReplicas = useReadOnlyReplicas
206217
}
207218
}

Tests/ClusterIntegrationTests/ClusterIntegrationTests.swift

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,24 @@ struct ClusterIntegrationTests {
5454
}
5555
}
5656

57+
@Test
58+
@available(valkeySwift 1.0, *)
59+
func testWithReadOnlyConnection() async throws {
60+
var logger = Logger(label: "ValkeyCluster")
61+
logger.logLevel = .trace
62+
let firstNodeHostname = clusterFirstNodeHostname!
63+
let firstNodePort = clusterFirstNodePort ?? 6379
64+
try await Self.withValkeyCluster([(host: firstNodeHostname, port: firstNodePort)], logger: logger) { client in
65+
try await Self.withKey(connection: client) { key in
66+
try await client.set(key, value: "Hello")
67+
try await client.withConnection(forKeys: [key], readOnly: true) { connection in
68+
let response = try await connection.get(key)
69+
#expect(response.map { String(buffer: $0) } == "Hello")
70+
}
71+
}
72+
}
73+
}
74+
5775
@Test
5876
@available(valkeySwift 1.0, *)
5977
func testFailover() async throws {
@@ -270,11 +288,11 @@ struct ClusterIntegrationTests {
270288
afterMigrate: () async throws -> Void = {},
271289
finished: () async throws -> Void = {}
272290
) async throws {
273-
let nodeAClient = try await client.nodeClient(for: [hashSlot])
291+
let nodeAClient = try await client.nodeClient(for: [hashSlot], readOnly: false)
274292
// find another shard
275293
var nodeBClient: ValkeyNodeClient
276294
repeat {
277-
nodeBClient = try await client.nodeClient(for: [HashSlot(rawValue: Int.random(in: 0..<16384))!])
295+
nodeBClient = try await client.nodeClient(for: [HashSlot(rawValue: Int.random(in: 0..<16384))!], readOnly: false)
278296
} while nodeAClient === nodeBClient
279297

280298
guard let (hostnameA, portA) = nodeAClient.serverAddress.getHostnameAndPort() else { return }
@@ -341,7 +359,7 @@ struct ClusterIntegrationTests {
341359
try await ClusterIntegrationTests.withValkeyCluster([(host: firstNodeHostname, port: firstNodePort)], logger: logger) {
342360
client in
343361
try await ClusterIntegrationTests.withKey(connection: client, suffix: "{foo}") { key in
344-
let node = try await client.nodeClient(for: [HashSlot(key: key)])
362+
let node = try await client.nodeClient(for: [HashSlot(key: key)], readOnly: false)
345363
var commands: [any ValkeyCommand] = .init()
346364
commands.append(SET(key, value: "cluster pipeline test"))
347365
commands.append(GET(key))
@@ -363,13 +381,13 @@ struct ClusterIntegrationTests {
363381
client in
364382
try await ClusterIntegrationTests.withKey(connection: client, suffix: "{foo}") { key in
365383
let hashSlot = HashSlot(key: key)
366-
let node = try await client.nodeClient(for: [hashSlot])
384+
let node = try await client.nodeClient(for: [hashSlot], readOnly: false)
367385
// get a key from same node
368386
let key2 = try await {
369387
while true {
370388
let key2 = ValkeyKey(UUID().uuidString)
371389
let hashSlot2 = HashSlot(key: key2)
372-
let node2 = try await client.nodeClient(for: [hashSlot2])
390+
let node2 = try await client.nodeClient(for: [hashSlot2], readOnly: false)
373391
if node2.serverAddress == node.serverAddress {
374392
return key2
375393
}
@@ -400,12 +418,12 @@ struct ClusterIntegrationTests {
400418
client in
401419
try await ClusterIntegrationTests.withKey(connection: client, suffix: "{foo}") { key in
402420
let hashSlot = HashSlot(key: key)
403-
let node = try await client.nodeClient(for: [hashSlot])
421+
let node = try await client.nodeClient(for: [hashSlot], readOnly: false)
404422
let key2 = try await {
405423
while true {
406424
let key2 = ValkeyKey(UUID().uuidString)
407425
let hashSlot2 = HashSlot(key: key2)
408-
let node2 = try await client.nodeClient(for: [hashSlot2])
426+
let node2 = try await client.nodeClient(for: [hashSlot2], readOnly: false)
409427
if node2.serverAddress != node.serverAddress {
410428
return key2
411429
}
@@ -435,12 +453,12 @@ struct ClusterIntegrationTests {
435453
client in
436454
try await ClusterIntegrationTests.withKey(connection: client, suffix: "{foo}") { key in
437455
let hashSlot = HashSlot(key: key)
438-
let node = try await client.nodeClient(for: [hashSlot])
456+
let node = try await client.nodeClient(for: [hashSlot], readOnly: false)
439457
let key2 = try await {
440458
while true {
441459
let key2 = ValkeyKey(UUID().uuidString)
442460
let hashSlot2 = HashSlot(key: key2)
443-
let node2 = try await client.nodeClient(for: [hashSlot2])
461+
let node2 = try await client.nodeClient(for: [hashSlot2], readOnly: false)
444462
if node2.serverAddress != node.serverAddress {
445463
return key2
446464
}
@@ -470,7 +488,7 @@ struct ClusterIntegrationTests {
470488
try await ClusterIntegrationTests.withValkeyCluster([(host: firstNodeHostname, port: firstNodePort)], logger: logger) {
471489
clusterClient in
472490
try await ClusterIntegrationTests.withKey(connection: clusterClient) { key in
473-
let node = try await clusterClient.nodeClient(for: [HashSlot(key: key)])
491+
let node = try await clusterClient.nodeClient(for: [HashSlot(key: key)], readOnly: false)
474492
try await clusterClient.set(key, value: "bar")
475493
let cluster = try await clusterClient.clusterShards()
476494
let shard = try #require(
@@ -512,7 +530,7 @@ struct ClusterIntegrationTests {
512530
let keySuffix = "{\(UUID().uuidString)}"
513531
try await ClusterIntegrationTests.withKey(connection: client, suffix: keySuffix) { key in
514532
let hashSlot = HashSlot(key: key)
515-
let node = try await client.nodeClient(for: [hashSlot])
533+
let node = try await client.nodeClient(for: [hashSlot], readOnly: false)
516534
try await client.set(key, value: "Testing before import")
517535

518536
try await ClusterIntegrationTests.testMigratingHashSlot(hashSlot, client: client) {
@@ -548,7 +566,7 @@ struct ClusterIntegrationTests {
548566
try await ClusterIntegrationTests.withKey(connection: client, suffix: keySuffix) { key in
549567
try await ClusterIntegrationTests.withKey(connection: client, suffix: keySuffix) { key2 in
550568
let hashSlot = HashSlot(key: key)
551-
let node = try await client.nodeClient(for: [hashSlot])
569+
let node = try await client.nodeClient(for: [hashSlot], readOnly: false)
552570
try await client.lpush(key, elements: ["testing1"])
553571

554572
try await ClusterIntegrationTests.testMigratingHashSlot(hashSlot, client: client) {
@@ -820,7 +838,7 @@ struct ClusterIntegrationTests {
820838
@available(valkeySwift 1.0, *)
821839
static func withValkeyCluster<T>(
822840
_ nodeAddresses: [(host: String, port: Int)],
823-
nodeClientConfiguration: ValkeyClientConfiguration = .init(),
841+
nodeClientConfiguration: ValkeyClientConfiguration = .init(useReadOnlyReplicas: true),
824842
logger: Logger,
825843
_ body: (ValkeyClusterClient) async throws -> sending T
826844
) async throws -> T {

Tests/ValkeyTests/Cluster/ValkeyClusterClientStateMachineTests.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ struct ValkeyClusterClientStateMachineTests {
5757

5858
// try to get shard
5959
#expect(throws: ValkeyClusterError.clusterIsUnavailable) {
60-
try stateMachine.poolFastPath(for: CollectionOfOne(HashSlot(rawValue: 100)!))
60+
try stateMachine.poolFastPath(for: CollectionOfOne(HashSlot(rawValue: 100)!), readOnly: false)
6161
}
6262
// let's register for the slow path
6363
let successNotifier = SuccessNotifier()
@@ -94,7 +94,7 @@ struct ValkeyClusterClientStateMachineTests {
9494

9595
// try to get shard
9696
#expect(throws: ValkeyClusterError.clusterIsUnavailable) {
97-
try stateMachine.poolFastPath(for: CollectionOfOne(HashSlot(rawValue: 100)!))
97+
try stateMachine.poolFastPath(for: CollectionOfOne(HashSlot(rawValue: 100)!), readOnly: false)
9898
}
9999
// let's register for the slow path
100100
let successNotifier = SuccessNotifier()

0 commit comments

Comments
 (0)