From 79a927a44dbf51c984e44a5e6e170a75c77f7245 Mon Sep 17 00:00:00 2001 From: Nilanshu Sharma Date: Thu, 6 Nov 2025 18:15:17 -0800 Subject: [PATCH 1/6] Custom responses for cluster commands Signed-off-by: Nilanshu Sharma --- Sources/Valkey/Commands/ClusterCommands.swift | 12 +- .../Custom/ClusterCustomCommands.swift | 507 ++++++++++++++++++ .../ValkeyCommandsRender.swift | 3 + .../ClientIntegrationTests.swift | 57 ++ docker-compose.cluster.yml | 33 +- 5 files changed, 589 insertions(+), 23 deletions(-) diff --git a/Sources/Valkey/Commands/ClusterCommands.swift b/Sources/Valkey/Commands/ClusterCommands.swift index f380250d..b163a3c9 100644 --- a/Sources/Valkey/Commands/ClusterCommands.swift +++ b/Sources/Valkey/Commands/ClusterCommands.swift @@ -365,8 +365,6 @@ public enum CLUSTER { /// Returns a list of all TCP links to and from peer nodes. @_documentation(visibility: internal) public struct LINKS: ValkeyCommand { - public typealias Response = RESPToken.Array - @inlinable public static var name: String { "CLUSTER LINKS" } @inlinable public init() { @@ -789,8 +787,6 @@ public enum CLUSTER { } } } - public typealias Response = RESPToken.Array - @inlinable public static var name: String { "CLUSTER SLOT-STATS" } public var filter: Filter @@ -807,8 +803,6 @@ public enum CLUSTER { /// Returns the mapping of cluster slots to nodes. @_documentation(visibility: internal) public struct SLOTS: ValkeyCommand { - public typealias Response = RESPToken.Array - @inlinable public static var name: String { "CLUSTER SLOTS" } @inlinable public init() { @@ -1068,7 +1062,7 @@ extension ValkeyClientProtocol { /// - Response: [Array]: An array of cluster links and their attributes. @inlinable @discardableResult - public func clusterLinks() async throws -> RESPToken.Array { + public func clusterLinks() async throws -> CLUSTER.LINKS.Response { try await execute(CLUSTER.LINKS()) } @@ -1229,7 +1223,7 @@ extension ValkeyClientProtocol { /// - Response: [Array]: Array of nested arrays, where the inner array element represents a slot and its respective usage statistics. @inlinable @discardableResult - public func clusterSlotStats(filter: CLUSTER.SLOTSTATS.Filter) async throws -> RESPToken.Array { + public func clusterSlotStats(filter: CLUSTER.SLOTSTATS.Filter) async throws -> CLUSTER.SLOTSTATS.Response { try await execute(CLUSTER.SLOTSTATS(filter: filter)) } @@ -1244,7 +1238,7 @@ extension ValkeyClientProtocol { /// - Response: [Array]: Nested list of slot ranges with networking information. @inlinable @discardableResult - public func clusterSlots() async throws -> RESPToken.Array { + public func clusterSlots() async throws -> CLUSTER.SLOTS.Response { try await execute(CLUSTER.SLOTS()) } diff --git a/Sources/Valkey/Commands/Custom/ClusterCustomCommands.swift b/Sources/Valkey/Commands/Custom/ClusterCustomCommands.swift index ee79bafb..7ff85f20 100644 --- a/Sources/Valkey/Commands/Custom/ClusterCustomCommands.swift +++ b/Sources/Valkey/Commands/Custom/ClusterCustomCommands.swift @@ -11,6 +11,10 @@ extension CLUSTER.GETKEYSINSLOT { public typealias Response = [ValkeyKey] } +extension CLUSTER.LINKS { + public typealias Response = [ValkeyClusterLink] +} + extension CLUSTER.MYID { public typealias Response = String } @@ -23,6 +27,14 @@ extension CLUSTER.SHARDS { public typealias Response = ValkeyClusterDescription } +extension CLUSTER.SLOTSTATS { + public typealias Response = [ValkeyClusterSlotStats] +} + +extension CLUSTER.SLOTS { + public typealias Response = [ValkeyClusterSlotRange] +} + package struct ValkeyClusterParseError: Error, Equatable { package enum Reason: Error { case clusterDescriptionTokenIsNotAnArray @@ -35,6 +47,19 @@ package struct ValkeyClusterParseError: Error, Equatable { case missingRequiredValueForNode case shardIsMissingHashSlots case shardIsMissingNode + case clusterLinksTokenIsNotAnArray + case clusterLinkTokenIsNotAnArrayOrMap + case missingRequiredValueForLink + case invalidLinkDirection + case clusterSlotStatsTokenIsNotAnArray + case clusterSlotStatsTokenIsNotAnArrayOrMap + case missingRequiredValueForSlotStats + case clusterSlotsTokenIsNotAnArray + case clusterSlotRangeTokenIsNotAnArray + case clusterSlotNodeTokenIsNotAnArray + case missingRequiredValueForSlotRange + case missingRequiredValueForSlotNode + case clusterSlotNodeMetadataIsNotAnArrayOrMap } package var reason: Reason @@ -46,6 +71,198 @@ package struct ValkeyClusterParseError: Error, Equatable { } } +/// Slot usage statistics for a hash slot in a Valkey cluster. +/// +/// A description is returned when you call ``ValkeyClientProtocol/clusterSlotStats(filter:)``. +public struct ValkeyClusterSlotStats: Hashable, Sendable, RESPTokenDecodable { + /// The hash slot number + public var slot: Int + /// Number of keys in the slot + public var keyCount: Int? + /// CPU time consumed by the slot in microseconds + public var cpuUsec: Int? + /// Network bytes read for the slot + public var networkBytesIn: Int? + /// Network bytes written for the slot + public var networkBytesOut: Int? + + /// Creates a new cluster slot stats + /// - Parameters: + /// - slot: The hash slot number + /// - keyCount: Number of keys in the slot + /// - cpuUsec: CPU time consumed in microseconds + /// - networkBytesIn: Network bytes read + /// - networkBytesOut: Network bytes written + public init( + slot: Int, + keyCount: Int? = nil, + cpuUsec: Int? = nil, + networkBytesIn: Int? = nil, + networkBytesOut: Int? = nil + ) { + self.slot = slot + self.keyCount = keyCount + self.cpuUsec = cpuUsec + self.networkBytesIn = networkBytesIn + self.networkBytesOut = networkBytesOut + } + + /// Creates a cluster slot stats from the response token you provide. + /// - Parameter respToken: The response token. + public init(fromRESP respToken: RESPToken) throws { + do { + self = try Self.makeClusterSlotStats(respToken: respToken) + } catch { + throw ValkeyClusterParseError(reason: error, token: respToken) + } + } +} + +/// A slot range mapping in a Valkey cluster. +/// +/// A description is returned when you call ``ValkeyClientProtocol/clusterSlots()``. +public struct ValkeyClusterSlotRange: Hashable, Sendable, RESPTokenDecodable { + /// A node serving a slot range in a Valkey cluster. + public struct Node: Hashable, Sendable { + /// The IP address of the node + public var ip: String + /// The port of the node + public var port: Int + /// The node ID + public var nodeId: String + /// Additional networking metadata + public var metadata: [String: String] + + /// Creates a new cluster slot node + /// - Parameters: + /// - ip: The IP address + /// - port: The port + /// - nodeId: The node ID + /// - metadata: Additional networking metadata + public init( + ip: String, + port: Int, + nodeId: String, + metadata: [String: String] = [:] + ) { + self.ip = ip + self.port = port + self.nodeId = nodeId + self.metadata = metadata + } + } + + /// The start slot of the range + public var startSlot: Int + /// The end slot of the range + public var endSlot: Int + /// The nodes serving this slot range + public var nodes: [Node] + + /// Creates a new cluster slot range + /// - Parameters: + /// - startSlot: The start slot + /// - endSlot: The end slot + /// - nodes: The nodes serving this range + public init(startSlot: Int, endSlot: Int, nodes: [Node]) { + self.startSlot = startSlot + self.endSlot = endSlot + self.nodes = nodes + } + + /// Creates a cluster slot range from the response token you provide. + /// - Parameter respToken: The response token. + public init(fromRESP respToken: RESPToken) throws { + do { + self = try Self.makeClusterSlotRange(respToken: respToken) + } catch { + throw ValkeyClusterParseError(reason: error, token: respToken) + } + } +} + +/// A cluster link between nodes in a Valkey cluster. +/// +/// A description is returned when you call ``ValkeyClientProtocol/clusterLinks()``. +public struct ValkeyClusterLink: Hashable, Sendable, RESPTokenDecodable { + /// Direction of the cluster link. + public struct Direction: Sendable, Hashable, RawRepresentable { + /// The link is established by the local node to the peer. + public static let to = Direction(base: .to) + /// The link is accepted by the local node from the peer. + public static let from = Direction(base: .from) + + public init?(rawValue: String) { + guard let base = Base(rawValue: rawValue) else { + return nil + } + self.base = base + } + + public var rawValue: String { + self.base.rawValue + } + + enum Base: String { + case to + case from + } + + private(set) var base: Base + + init(base: Base) { + self.base = base + } + } + + /// The direction of the link (to or from) + public var direction: Direction + /// The node ID of the peer + public var node: String + /// Creation time of the link + public var createTime: Int + /// Events currently registered for the link (e.g., "r", "w", "rw") + public var events: String + /// Allocated size of the link's send buffer + public var sendBufferAllocated: Int + /// Size of the portion of the link's send buffer currently holding data + public var sendBufferUsed: Int + + /// Creates a new cluster link + /// - Parameters: + /// - direction: The direction of the link + /// - node: The node ID of the peer + /// - createTime: Creation time of the link + /// - events: Events registered for the link + /// - sendBufferAllocated: Allocated send buffer size + /// - sendBufferUsed: Used send buffer size + public init( + direction: Direction, + node: String, + createTime: Int, + events: String, + sendBufferAllocated: Int, + sendBufferUsed: Int + ) { + self.direction = direction + self.node = node + self.createTime = createTime + self.events = events + self.sendBufferAllocated = sendBufferAllocated + self.sendBufferUsed = sendBufferUsed + } + + /// Creates a cluster link from the response token you provide. + /// - Parameter respToken: The response token. + public init(fromRESP respToken: RESPToken) throws { + do { + self = try Self.makeClusterLink(respToken: respToken) + } catch { + throw ValkeyClusterParseError(reason: error, token: respToken) + } + } +} + /// A description of a Valkey cluster. /// /// A description is return when you call ``ValkeyClientProtocol/clusterShards()``. @@ -409,6 +626,296 @@ extension ValkeyClusterDescription.Node { } } +extension ValkeyClusterLink { + fileprivate static func makeClusterLink(respToken: RESPToken) throws(ValkeyClusterParseError.Reason) -> ValkeyClusterLink { + switch respToken.value { + case .array(let array): + return try Self.makeFromTokenSequence(MapStyleArray(underlying: array)) + + case .map(let map): + let mapped = map.lazy.compactMap { (keyNode, value) -> (String, RESPToken)? in + if let key = try? String(fromRESP: keyNode) { + return (key, value) + } else { + return nil + } + } + return try Self.makeFromTokenSequence(mapped) + + default: + throw .clusterLinkTokenIsNotAnArrayOrMap + } + } + + fileprivate static func makeFromTokenSequence( + _ sequence: TokenSequence + ) throws(ValkeyClusterParseError.Reason) -> Self where TokenSequence.Element == (String, RESPToken) { + var direction: ValkeyClusterLink.Direction? + var node: String? + var createTime: Int64? + var events: String? + var sendBufferAllocated: Int64? + var sendBufferUsed: Int64? + + for (key, value) in sequence { + switch key { + case "direction": + guard let directionString = try? String(fromRESP: value), + let directionValue = ValkeyClusterLink.Direction(rawValue: directionString) else { + throw .invalidLinkDirection + } + direction = directionValue + + case "node": + node = try? String(fromRESP: value) + + case "create-time": + createTime = try? Int64(fromRESP: value) + + case "events": + events = try? String(fromRESP: value) + + case "send-buffer-allocated": + sendBufferAllocated = try? Int64(fromRESP: value) + + case "send-buffer-used": + sendBufferUsed = try? Int64(fromRESP: value) + + default: + // ignore unexpected keys for forward compatibility + continue + } + } + + guard let direction = direction, + let node = node, + let createTime = createTime, + let events = events, + let sendBufferAllocated = sendBufferAllocated, + let sendBufferUsed = sendBufferUsed else { + throw .missingRequiredValueForLink + } + + return ValkeyClusterLink( + direction: direction, + node: node, + createTime: Int(createTime), + events: events, + sendBufferAllocated: Int(sendBufferAllocated), + sendBufferUsed: Int(sendBufferUsed) + ) + } +} + +extension ValkeyClusterSlotStats { + fileprivate static func makeClusterSlotStats(respToken: RESPToken) throws(ValkeyClusterParseError.Reason) -> ValkeyClusterSlotStats { + guard case .array(let array) = respToken.value else { + throw .clusterSlotStatsTokenIsNotAnArray + } + + guard array.count >= 2 else { + throw .missingRequiredValueForSlotStats + } + + var iterator = array.makeIterator() + + // First element: slot number + guard let slotToken = iterator.next(), + case .number(let slotNumber) = slotToken.value else { + throw .missingRequiredValueForSlotStats + } + + // Second element: statistics map + guard let statsToken = iterator.next() else { + throw .missingRequiredValueForSlotStats + } + + return try Self.makeFromStatsToken(slot: Int(slotNumber), statsToken: statsToken) + } + + fileprivate static func makeFromStatsToken(slot: Int, statsToken: RESPToken) throws(ValkeyClusterParseError.Reason) -> Self { + var keyCount: Int64? + var cpuUsec: Int64? + var networkBytesIn: Int64? + var networkBytesOut: Int64? + + switch statsToken.value { + case .map(let map): + let mapped = map.lazy.compactMap { (keyNode, value) -> (String, RESPToken)? in + if let key = try? String(fromRESP: keyNode) { + return (key, value) + } else { + return nil + } + } + for (key, value) in mapped { + switch key { + case "key-count": + keyCount = try? Int64(fromRESP: value) + + case "cpu-usec": + cpuUsec = try? Int64(fromRESP: value) + + case "network-bytes-in": + networkBytesIn = try? Int64(fromRESP: value) + + case "network-bytes-out": + networkBytesOut = try? Int64(fromRESP: value) + + default: + // ignore unexpected keys for forward compatibility + continue + } + } + + case .array(let array): + // Handle stats as key-value pairs in array format + let mapArray = MapStyleArray(underlying: array) + for (key, valueToken) in mapArray { + switch key { + case "key-count": + keyCount = try? Int64(fromRESP: valueToken) + + case "cpu-usec": + cpuUsec = try? Int64(fromRESP: valueToken) + + case "network-bytes-in": + networkBytesIn = try? Int64(fromRESP: valueToken) + + case "network-bytes-out": + networkBytesOut = try? Int64(fromRESP: valueToken) + + default: + // ignore unexpected keys for forward compatibility + continue + } + } + + default: + throw .clusterSlotStatsTokenIsNotAnArrayOrMap + } + + return ValkeyClusterSlotStats( + slot: slot, + keyCount: keyCount.map { Int($0) }, + cpuUsec: cpuUsec.map { Int($0) }, + networkBytesIn: networkBytesIn.map { Int($0) }, + networkBytesOut: networkBytesOut.map { Int($0) } + ) + } +} + +extension ValkeyClusterSlotRange { + fileprivate static func makeClusterSlotRange(respToken: RESPToken) throws(ValkeyClusterParseError.Reason) -> ValkeyClusterSlotRange { + guard case .array(let array) = respToken.value else { + throw .clusterSlotRangeTokenIsNotAnArray + } + + guard array.count >= 3 else { + throw .missingRequiredValueForSlotRange + } + + var iterator = array.makeIterator() + + // First element: start slot + guard let startSlotToken = iterator.next(), + case .number(let startSlotNumber) = startSlotToken.value else { + throw .missingRequiredValueForSlotRange + } + + // Second element: end slot + guard let endSlotToken = iterator.next(), + case .number(let endSlotNumber) = endSlotToken.value else { + throw .missingRequiredValueForSlotRange + } + + let startSlot = Int(startSlotNumber) + let endSlot = Int(endSlotNumber) + + // Remaining elements are nodes + var nodes: [Node] = [] + while let nodeToken = iterator.next() { + let node = try Node.makeSlotNode(respToken: nodeToken) + nodes.append(node) + } + + return ValkeyClusterSlotRange( + startSlot: startSlot, + endSlot: endSlot, + nodes: nodes + ) + } +} + +extension ValkeyClusterSlotRange.Node { + fileprivate static func makeSlotNode(respToken: RESPToken) throws(ValkeyClusterParseError.Reason) -> ValkeyClusterSlotRange.Node { + guard case .array(let array) = respToken.value else { + throw .clusterSlotNodeTokenIsNotAnArray + } + + guard array.count >= 3 else { + throw .missingRequiredValueForSlotNode + } + + var iterator = array.makeIterator() + + // First element: IP address + guard let ipToken = iterator.next(), + let ip = try? String(fromRESP: ipToken) else { + throw .missingRequiredValueForSlotNode + } + + // Second element: port + guard let portToken = iterator.next(), + case .number(let portNumber) = portToken.value else { + throw .missingRequiredValueForSlotNode + } + let port = Int(portNumber) + + // Third element: node ID + guard let nodeIdToken = iterator.next(), + let nodeId = try? String(fromRESP: nodeIdToken) else { + throw .missingRequiredValueForSlotNode + } + + var metadata: [String: String] = [:] + + // Any additional elements are treated as metadata + while let metadataToken = iterator.next() { + switch metadataToken.value { + case .map(let map): + // Handle metadata as a map + for (keyToken, valueToken) in map { + if let key = try? String(fromRESP: keyToken), + let value = try? String(fromRESP: valueToken) { + metadata[key] = value + } + } + case .array(let array): + // Skip empty arrays (indicates no additional metadata) + guard array.count > 0 else { continue } + + // Handle metadata as key-value pairs in array format (using MapStyleArray) + let mapArray = MapStyleArray(underlying: array) + for (key, valueToken) in mapArray { + if let value = try? String(fromRESP: valueToken) { + metadata[key] = value + } + } + default: + throw .clusterSlotNodeMetadataIsNotAnArrayOrMap + } + } + + return ValkeyClusterSlotRange.Node( + ip: ip, + port: port, + nodeId: nodeId, + metadata: metadata + ) + } +} + struct MapStyleArray: Sequence { var underlying: RESPToken.Array diff --git a/Sources/_ValkeyCommandsBuilder/ValkeyCommandsRender.swift b/Sources/_ValkeyCommandsBuilder/ValkeyCommandsRender.swift index f36c1574..a2ef64d4 100644 --- a/Sources/_ValkeyCommandsBuilder/ValkeyCommandsRender.swift +++ b/Sources/_ValkeyCommandsBuilder/ValkeyCommandsRender.swift @@ -16,6 +16,9 @@ private let disableResponseCalculationCommands: Set = [ "BZMPOP", "BZPOPMAX", "BZPOPMIN", + "CLUSTER SLOTS", + "CLUSTER SLOT-STATS", + "CLUSTER LINKS", "CLUSTER GETKEYSINSLOT", "CLUSTER MYID", "CLUSTER MYSHARDID", diff --git a/Tests/IntegrationTests/ClientIntegrationTests.swift b/Tests/IntegrationTests/ClientIntegrationTests.swift index 9d3182c5..a930b222 100644 --- a/Tests/IntegrationTests/ClientIntegrationTests.swift +++ b/Tests/IntegrationTests/ClientIntegrationTests.swift @@ -623,6 +623,63 @@ struct ClientIntegratedTests { let delCount = try await connection.del(keys: [ValkeyKey(key)]) #expect(delCount == 1) + + func testClusterSlots() async throws { + var logger = Logger(label: "Valkey") + logger.logLevel = .debug + try await withValkeyConnection(.hostname(valkeyHostname, port: 36001), logger: logger) { client in + let clusterSlots = try await client.clusterSlots() + for clusterSlot in clusterSlots { + #expect(clusterSlot.startSlot >= 0 && clusterSlot.startSlot <= 16383) + #expect(clusterSlot.endSlot >= 0 && clusterSlot.endSlot <= 16383) + for node in clusterSlot.nodes { + #expect(!node.ip.isEmpty) + #expect(node.port >= 0 && node.port <= 65535) + #expect(!node.nodeId.isEmpty) + } + } + } + } + + @Test + @available(valkeySwift 1.0, *) + func testClusterLinks() async throws { + var logger = Logger(label: "Valkey") + logger.logLevel = .debug + try await withValkeyConnection(.hostname(valkeyHostname, port: 36001), logger: logger) { client in + let clusterLinks = try await client.clusterLinks() + #expect(!clusterLinks.isEmpty && clusterLinks.count > 0) + for clusterLink in clusterLinks { + #expect(clusterLink.direction == .from || clusterLink.direction == .to) + #expect(!clusterLink.node.isEmpty) + #expect(clusterLink.createTime > 0) + #expect(!clusterLink.events.isEmpty) + #expect(clusterLink.sendBufferAllocated >= 0) + #expect(clusterLink.sendBufferUsed >= 0) + } + } + } + + @Test + @available(valkeySwift 1.0, *) + func testClusterSlotStats() async throws { + var logger = Logger(label: "Valkey") + logger.logLevel = .debug + try await withValkeyConnection(.hostname(valkeyHostname, port: 36001), logger: logger) { client in + let slotStats = try await client.clusterSlotStats( + filter: .orderby( + CLUSTER.SLOTSTATS.FilterOrderby( + metric: "key-count", + limit: 10, + order: .desc + ) + ) + ) + print(slotStats) + #expect(!slotStats.isEmpty && slotStats.count == 10) + for slotStat in slotStats { + // Only verify slot, other fields are optional + #expect(slotStat.slot >= 0 && slotStat.slot <= 16383) } } } diff --git a/docker-compose.cluster.yml b/docker-compose.cluster.yml index 7fbc7c72..2dd271dd 100644 --- a/docker-compose.cluster.yml +++ b/docker-compose.cluster.yml @@ -12,48 +12,53 @@ services: # for more information. valkey_1: image: 'valkey/valkey:latest' - network_mode: "host" container_name: valkey_1 - command: valkey-server --port 36001 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes + ports: + - "36001:36001" + command: valkey-server --bind 0.0.0.0 --port 36001 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes valkey_2: image: 'valkey/valkey:latest' - network_mode: "host" container_name: valkey_2 - command: valkey-server --port 36002 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes + ports: + - "36002:36002" + command: valkey-server --bind 0.0.0.0 --port 36002 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes valkey_3: image: 'valkey/valkey:latest' - network_mode: "host" container_name: valkey_3 - command: valkey-server --port 36003 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes + ports: + - "36003:36003" + command: valkey-server --bind 0.0.0.0 --port 36003 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes valkey_4: image: 'valkey/valkey:latest' - network_mode: "host" container_name: valkey_4 - command: valkey-server --port 36004 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes + ports: + - "36004:36004" + command: valkey-server --bind 0.0.0.0 --port 36004 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes valkey_5: image: 'valkey/valkey:latest' - network_mode: "host" container_name: valkey_5 - command: valkey-server --port 36005 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes + ports: + - "36005:36005" + command: valkey-server --bind 0.0.0.0 --port 36005 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes valkey_6: image: 'valkey/valkey:latest' - network_mode: "host" container_name: valkey_6 - command: valkey-server --port 36006 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes + ports: + - "36006:36006" + command: valkey-server --bind 0.0.0.0 --port 36006 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes # Ephemeral container to create the valkey cluster connections. # Once the setup is done, this container shuts down # and the cluster can be used by the service app container cluster_initiator: image: 'valkey/valkey:latest' - network_mode: "host" container_name: cluster_initiator - command: valkey-cli --cluster create localhost:36001 localhost:36002 localhost:36003 localhost:36004 localhost:36005 localhost:36006 --cluster-replicas 1 --cluster-yes + command: valkey-cli --cluster create valkey_1:36001 valkey_2:36002 valkey_3:36003 valkey_4:36004 valkey_5:36005 valkey_6:36006 --cluster-replicas 1 --cluster-yes tty: true depends_on: - valkey_1 From cbe091ee3efe5328096e67f04c7dc34e452e7f4d Mon Sep 17 00:00:00 2001 From: Nilanshu Sharma Date: Fri, 7 Nov 2025 11:04:16 -0800 Subject: [PATCH 2/6] Fixing Custom response object fields Signed-off-by: Nilanshu Sharma --- .../Custom/ClusterCustomCommands.swift | 411 +++++++++--------- .../ClientIntegrationTests.swift | 61 ++- 2 files changed, 255 insertions(+), 217 deletions(-) diff --git a/Sources/Valkey/Commands/Custom/ClusterCustomCommands.swift b/Sources/Valkey/Commands/Custom/ClusterCustomCommands.swift index 7ff85f20..161e24c3 100644 --- a/Sources/Valkey/Commands/Custom/ClusterCustomCommands.swift +++ b/Sources/Valkey/Commands/Custom/ClusterCustomCommands.swift @@ -11,10 +11,6 @@ extension CLUSTER.GETKEYSINSLOT { public typealias Response = [ValkeyKey] } -extension CLUSTER.LINKS { - public typealias Response = [ValkeyClusterLink] -} - extension CLUSTER.MYID { public typealias Response = String } @@ -23,6 +19,10 @@ extension CLUSTER.MYSHARDID { public typealias Response = String } +extension CLUSTER.LINKS { + public typealias Response = [ValkeyClusterLink] +} + extension CLUSTER.SHARDS { public typealias Response = ValkeyClusterDescription } @@ -71,198 +71,6 @@ package struct ValkeyClusterParseError: Error, Equatable { } } -/// Slot usage statistics for a hash slot in a Valkey cluster. -/// -/// A description is returned when you call ``ValkeyClientProtocol/clusterSlotStats(filter:)``. -public struct ValkeyClusterSlotStats: Hashable, Sendable, RESPTokenDecodable { - /// The hash slot number - public var slot: Int - /// Number of keys in the slot - public var keyCount: Int? - /// CPU time consumed by the slot in microseconds - public var cpuUsec: Int? - /// Network bytes read for the slot - public var networkBytesIn: Int? - /// Network bytes written for the slot - public var networkBytesOut: Int? - - /// Creates a new cluster slot stats - /// - Parameters: - /// - slot: The hash slot number - /// - keyCount: Number of keys in the slot - /// - cpuUsec: CPU time consumed in microseconds - /// - networkBytesIn: Network bytes read - /// - networkBytesOut: Network bytes written - public init( - slot: Int, - keyCount: Int? = nil, - cpuUsec: Int? = nil, - networkBytesIn: Int? = nil, - networkBytesOut: Int? = nil - ) { - self.slot = slot - self.keyCount = keyCount - self.cpuUsec = cpuUsec - self.networkBytesIn = networkBytesIn - self.networkBytesOut = networkBytesOut - } - - /// Creates a cluster slot stats from the response token you provide. - /// - Parameter respToken: The response token. - public init(fromRESP respToken: RESPToken) throws { - do { - self = try Self.makeClusterSlotStats(respToken: respToken) - } catch { - throw ValkeyClusterParseError(reason: error, token: respToken) - } - } -} - -/// A slot range mapping in a Valkey cluster. -/// -/// A description is returned when you call ``ValkeyClientProtocol/clusterSlots()``. -public struct ValkeyClusterSlotRange: Hashable, Sendable, RESPTokenDecodable { - /// A node serving a slot range in a Valkey cluster. - public struct Node: Hashable, Sendable { - /// The IP address of the node - public var ip: String - /// The port of the node - public var port: Int - /// The node ID - public var nodeId: String - /// Additional networking metadata - public var metadata: [String: String] - - /// Creates a new cluster slot node - /// - Parameters: - /// - ip: The IP address - /// - port: The port - /// - nodeId: The node ID - /// - metadata: Additional networking metadata - public init( - ip: String, - port: Int, - nodeId: String, - metadata: [String: String] = [:] - ) { - self.ip = ip - self.port = port - self.nodeId = nodeId - self.metadata = metadata - } - } - - /// The start slot of the range - public var startSlot: Int - /// The end slot of the range - public var endSlot: Int - /// The nodes serving this slot range - public var nodes: [Node] - - /// Creates a new cluster slot range - /// - Parameters: - /// - startSlot: The start slot - /// - endSlot: The end slot - /// - nodes: The nodes serving this range - public init(startSlot: Int, endSlot: Int, nodes: [Node]) { - self.startSlot = startSlot - self.endSlot = endSlot - self.nodes = nodes - } - - /// Creates a cluster slot range from the response token you provide. - /// - Parameter respToken: The response token. - public init(fromRESP respToken: RESPToken) throws { - do { - self = try Self.makeClusterSlotRange(respToken: respToken) - } catch { - throw ValkeyClusterParseError(reason: error, token: respToken) - } - } -} - -/// A cluster link between nodes in a Valkey cluster. -/// -/// A description is returned when you call ``ValkeyClientProtocol/clusterLinks()``. -public struct ValkeyClusterLink: Hashable, Sendable, RESPTokenDecodable { - /// Direction of the cluster link. - public struct Direction: Sendable, Hashable, RawRepresentable { - /// The link is established by the local node to the peer. - public static let to = Direction(base: .to) - /// The link is accepted by the local node from the peer. - public static let from = Direction(base: .from) - - public init?(rawValue: String) { - guard let base = Base(rawValue: rawValue) else { - return nil - } - self.base = base - } - - public var rawValue: String { - self.base.rawValue - } - - enum Base: String { - case to - case from - } - - private(set) var base: Base - - init(base: Base) { - self.base = base - } - } - - /// The direction of the link (to or from) - public var direction: Direction - /// The node ID of the peer - public var node: String - /// Creation time of the link - public var createTime: Int - /// Events currently registered for the link (e.g., "r", "w", "rw") - public var events: String - /// Allocated size of the link's send buffer - public var sendBufferAllocated: Int - /// Size of the portion of the link's send buffer currently holding data - public var sendBufferUsed: Int - - /// Creates a new cluster link - /// - Parameters: - /// - direction: The direction of the link - /// - node: The node ID of the peer - /// - createTime: Creation time of the link - /// - events: Events registered for the link - /// - sendBufferAllocated: Allocated send buffer size - /// - sendBufferUsed: Used send buffer size - public init( - direction: Direction, - node: String, - createTime: Int, - events: String, - sendBufferAllocated: Int, - sendBufferUsed: Int - ) { - self.direction = direction - self.node = node - self.createTime = createTime - self.events = events - self.sendBufferAllocated = sendBufferAllocated - self.sendBufferUsed = sendBufferUsed - } - - /// Creates a cluster link from the response token you provide. - /// - Parameter respToken: The response token. - public init(fromRESP respToken: RESPToken) throws { - do { - self = try Self.makeClusterLink(respToken: respToken) - } catch { - throw ValkeyClusterParseError(reason: error, token: respToken) - } - } -} - /// A description of a Valkey cluster. /// /// A description is return when you call ``ValkeyClientProtocol/clusterShards()``. @@ -423,6 +231,198 @@ public struct ValkeyClusterDescription: Hashable, Sendable, RESPTokenDecodable { } } +/// A cluster link between nodes in a Valkey cluster. +/// +/// A description is returned when you call ``ValkeyClientProtocol/clusterLinks()``. +public struct ValkeyClusterLink: Hashable, Sendable, RESPTokenDecodable { + /// Direction of the cluster link. + public struct Direction: Sendable, Hashable, RawRepresentable { + /// The link is established by the local node to the peer. + public static let to = Direction(base: .to) + /// The link is accepted by the local node from the peer. + public static let from = Direction(base: .from) + + public init?(rawValue: String) { + guard let base = Base(rawValue: rawValue) else { + return nil + } + self.base = base + } + + public var rawValue: String { + self.base.rawValue + } + + enum Base: String { + case to + case from + } + + private(set) var base: Base + + init(base: Base) { + self.base = base + } + } + + /// The direction of the link (to or from) + public var direction: Direction? + /// The node ID of the peer + public var node: String? + /// Creation time of the link + public var createTime: Int? + /// Events currently registered for the link (e.g., "r", "w", "rw") + public var events: String? + /// Allocated size of the link's send buffer + public var sendBufferAllocated: Int? + /// Size of the portion of the link's send buffer currently holding data + public var sendBufferUsed: Int? + + /// Creates a new cluster link + /// - Parameters: + /// - direction: The direction of the link + /// - node: The node ID of the peer + /// - createTime: Creation time of the link + /// - events: Events registered for the link + /// - sendBufferAllocated: Allocated send buffer size + /// - sendBufferUsed: Used send buffer size + public init( + direction: Direction? = nil, + node: String? = nil, + createTime: Int? = nil, + events: String? = nil, + sendBufferAllocated: Int? = nil, + sendBufferUsed: Int? = nil + ) { + self.direction = direction + self.node = node + self.createTime = createTime + self.events = events + self.sendBufferAllocated = sendBufferAllocated + self.sendBufferUsed = sendBufferUsed + } + + /// Creates a cluster link from the response token you provide. + /// - Parameter respToken: The response token. + public init(fromRESP respToken: RESPToken) throws { + do { + self = try Self.makeClusterLink(respToken: respToken) + } catch { + throw ValkeyClusterParseError(reason: error, token: respToken) + } + } +} + +/// Slot usage statistics for a hash slot in a Valkey cluster. +/// +/// A description is returned when you call ``ValkeyClientProtocol/clusterSlotStats(filter:)``. +public struct ValkeyClusterSlotStats: Hashable, Sendable, RESPTokenDecodable { + /// The hash slot number + public var slot: Int + /// Number of keys in the slot + public var keyCount: Int? + /// CPU time consumed by the slot in microseconds + public var cpuUsec: Int? + /// Network bytes read for the slot + public var networkBytesIn: Int? + /// Network bytes written for the slot + public var networkBytesOut: Int? + + /// Creates a new cluster slot stats + /// - Parameters: + /// - slot: The hash slot number + /// - keyCount: Number of keys in the slot + /// - cpuUsec: CPU time consumed in microseconds + /// - networkBytesIn: Network bytes read + /// - networkBytesOut: Network bytes written + public init( + slot: Int, + keyCount: Int? = nil, + cpuUsec: Int? = nil, + networkBytesIn: Int? = nil, + networkBytesOut: Int? = nil + ) { + self.slot = slot + self.keyCount = keyCount + self.cpuUsec = cpuUsec + self.networkBytesIn = networkBytesIn + self.networkBytesOut = networkBytesOut + } + + /// Creates a cluster slot stats from the response token you provide. + /// - Parameter respToken: The response token. + public init(fromRESP respToken: RESPToken) throws { + do { + self = try Self.makeClusterSlotStats(respToken: respToken) + } catch { + throw ValkeyClusterParseError(reason: error, token: respToken) + } + } +} + +/// A slot range mapping in a Valkey cluster. +/// +/// A description is returned when you call ``ValkeyClientProtocol/clusterSlots()``. +public struct ValkeyClusterSlotRange: Hashable, Sendable, RESPTokenDecodable { + /// A node serving a slot range in a Valkey cluster. + public struct Node: Hashable, Sendable { + /// The IP address of the node + public var ip: String + /// The port of the node + public var port: Int + /// The node ID + public var nodeId: String + /// Additional networking metadata + public var metadata: [String: String] + + /// Creates a new cluster slot node + /// - Parameters: + /// - ip: The IP address + /// - port: The port + /// - nodeId: The node ID + /// - metadata: Additional networking metadata + public init( + ip: String, + port: Int, + nodeId: String, + metadata: [String: String] = [:] + ) { + self.ip = ip + self.port = port + self.nodeId = nodeId + self.metadata = metadata + } + } + + /// The start slot of the range + public var startSlot: Int + /// The end slot of the range + public var endSlot: Int + /// The nodes serving this slot range + public var nodes: [Node] + + /// Creates a new cluster slot range + /// - Parameters: + /// - startSlot: The start slot + /// - endSlot: The end slot + /// - nodes: The nodes serving this range + public init(startSlot: Int, endSlot: Int, nodes: [Node]) { + self.startSlot = startSlot + self.endSlot = endSlot + self.nodes = nodes + } + + /// Creates a cluster slot range from the response token you provide. + /// - Parameter respToken: The response token. + public init(fromRESP respToken: RESPToken) throws { + do { + self = try Self.makeClusterSlotRange(respToken: respToken) + } catch { + throw ValkeyClusterParseError(reason: error, token: respToken) + } + } +} + extension ValkeyClusterDescription { fileprivate static func makeClusterDescription(respToken: RESPToken) throws(ValkeyClusterParseError.Reason) -> ValkeyClusterDescription { guard case .array(let shardsToken) = respToken.value else { @@ -687,22 +687,13 @@ extension ValkeyClusterLink { } } - guard let direction = direction, - let node = node, - let createTime = createTime, - let events = events, - let sendBufferAllocated = sendBufferAllocated, - let sendBufferUsed = sendBufferUsed else { - throw .missingRequiredValueForLink - } - return ValkeyClusterLink( direction: direction, node: node, - createTime: Int(createTime), + createTime: createTime.map { Int($0) }, events: events, - sendBufferAllocated: Int(sendBufferAllocated), - sendBufferUsed: Int(sendBufferUsed) + sendBufferAllocated: sendBufferAllocated.map { Int($0) }, + sendBufferUsed: sendBufferUsed.map { Int($0) } ) } } @@ -741,6 +732,7 @@ extension ValkeyClusterSlotStats { switch statsToken.value { case .map(let map): + // For RESP3, handle RESPToken stats as map let mapped = map.lazy.compactMap { (keyNode, value) -> (String, RESPToken)? in if let key = try? String(fromRESP: keyNode) { return (key, value) @@ -769,7 +761,7 @@ extension ValkeyClusterSlotStats { } case .array(let array): - // Handle stats as key-value pairs in array format + // // For RESP2, handle RESPToken stats as key-value pairs in array format let mapArray = MapStyleArray(underlying: array) for (key, valueToken) in mapArray { switch key { @@ -853,6 +845,7 @@ extension ValkeyClusterSlotRange.Node { throw .clusterSlotNodeTokenIsNotAnArray } + // IP, Port and Node Id are expected, additional metadata is optional guard array.count >= 3 else { throw .missingRequiredValueForSlotNode } diff --git a/Tests/IntegrationTests/ClientIntegrationTests.swift b/Tests/IntegrationTests/ClientIntegrationTests.swift index a930b222..1fba570e 100644 --- a/Tests/IntegrationTests/ClientIntegrationTests.swift +++ b/Tests/IntegrationTests/ClientIntegrationTests.swift @@ -595,6 +595,7 @@ struct ClientIntegratedTests { @Test @available(valkeySwift 1.0, *) +<<<<<<< HEAD func testMultipleDB() async throws { var logger = Logger(label: "Valkey") logger.logLevel = .debug @@ -643,6 +644,8 @@ struct ClientIntegratedTests { @Test @available(valkeySwift 1.0, *) +======= +>>>>>>> 9f1118e (Fixing Custom response object fields) func testClusterLinks() async throws { var logger = Logger(label: "Valkey") logger.logLevel = .debug @@ -650,12 +653,24 @@ struct ClientIntegratedTests { let clusterLinks = try await client.clusterLinks() #expect(!clusterLinks.isEmpty && clusterLinks.count > 0) for clusterLink in clusterLinks { - #expect(clusterLink.direction == .from || clusterLink.direction == .to) - #expect(!clusterLink.node.isEmpty) - #expect(clusterLink.createTime > 0) - #expect(!clusterLink.events.isEmpty) - #expect(clusterLink.sendBufferAllocated >= 0) - #expect(clusterLink.sendBufferUsed >= 0) + if let direction = clusterLink.direction { + #expect(direction == .from || direction == .to) + } + if let node = clusterLink.node { + #expect(!node.isEmpty) + } + if let createTime = clusterLink.createTime { + #expect(createTime > 0) + } + if let events = clusterLink.events { + #expect(!events.isEmpty) + } + if let sendBufferAllocated = clusterLink.sendBufferAllocated { + #expect(sendBufferAllocated >= 0) + } + if let sendBufferUsed = clusterLink.sendBufferUsed { + #expect(sendBufferUsed >= 0) + } } } } @@ -675,11 +690,41 @@ struct ClientIntegratedTests { ) ) ) - print(slotStats) #expect(!slotStats.isEmpty && slotStats.count == 10) for slotStat in slotStats { - // Only verify slot, other fields are optional + // slot is a required field, other fields are optional #expect(slotStat.slot >= 0 && slotStat.slot <= 16383) + if let keyCount = slotStat.keyCount { + #expect(keyCount >= 0) + } + if let cpuUsec = slotStat.cpuUsec { + #expect(cpuUsec >= 0) + } + if let networkBytesIn = slotStat.networkBytesIn { + #expect(networkBytesIn >= 0) + } + if let networkBytesOut = slotStat.networkBytesOut { + #expect(networkBytesOut >= 0) + } + } + } + } + + @Test + @available(valkeySwift 1.0, *) + func testClusterSlots() async throws { + var logger = Logger(label: "Valkey") + logger.logLevel = .debug + try await withValkeyConnection(.hostname(valkeyHostname, port: 36001), logger: logger) { client in + let clusterSlots = try await client.clusterSlots() + for clusterSlot in clusterSlots { + #expect(clusterSlot.startSlot >= 0 && clusterSlot.startSlot <= 16383) + #expect(clusterSlot.endSlot >= 0 && clusterSlot.endSlot <= 16383) + for node in clusterSlot.nodes { + #expect(!node.ip.isEmpty) + #expect(node.port >= 0 && node.port <= 65535) + #expect(!node.nodeId.isEmpty) + } } } } From fce444ce944020d2c4a6c95590102da7ddc00cd0 Mon Sep 17 00:00:00 2001 From: Nilanshu Sharma Date: Fri, 7 Nov 2025 12:11:44 -0800 Subject: [PATCH 3/6] Fixing soundness checks Signed-off-by: Nilanshu Sharma --- .../ClientIntegrationTests.swift | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/Tests/IntegrationTests/ClientIntegrationTests.swift b/Tests/IntegrationTests/ClientIntegrationTests.swift index 1fba570e..3be8818b 100644 --- a/Tests/IntegrationTests/ClientIntegrationTests.swift +++ b/Tests/IntegrationTests/ClientIntegrationTests.swift @@ -595,7 +595,6 @@ struct ClientIntegratedTests { @Test @available(valkeySwift 1.0, *) -<<<<<<< HEAD func testMultipleDB() async throws { var logger = Logger(label: "Valkey") logger.logLevel = .debug @@ -624,28 +623,12 @@ struct ClientIntegratedTests { let delCount = try await connection.del(keys: [ValkeyKey(key)]) #expect(delCount == 1) - - func testClusterSlots() async throws { - var logger = Logger(label: "Valkey") - logger.logLevel = .debug - try await withValkeyConnection(.hostname(valkeyHostname, port: 36001), logger: logger) { client in - let clusterSlots = try await client.clusterSlots() - for clusterSlot in clusterSlots { - #expect(clusterSlot.startSlot >= 0 && clusterSlot.startSlot <= 16383) - #expect(clusterSlot.endSlot >= 0 && clusterSlot.endSlot <= 16383) - for node in clusterSlot.nodes { - #expect(!node.ip.isEmpty) - #expect(node.port >= 0 && node.port <= 65535) - #expect(!node.nodeId.isEmpty) - } } } } @Test @available(valkeySwift 1.0, *) -======= ->>>>>>> 9f1118e (Fixing Custom response object fields) func testClusterLinks() async throws { var logger = Logger(label: "Valkey") logger.logLevel = .debug From 9e58e9a087119a380f97404c8cbb79bb1f17145e Mon Sep 17 00:00:00 2001 From: Nilanshu Sharma Date: Fri, 7 Nov 2025 13:23:40 -0800 Subject: [PATCH 4/6] Fixing stylechecks and restoring availability macros Signed-off-by: Nilanshu Sharma --- .../Custom/ClusterCustomCommands.swift | 24 ++++++++++++------- .../ValkeyConnectionPool/ConnectionPool.swift | 1 + .../ConnectionRequest.swift | 1 + 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/Sources/Valkey/Commands/Custom/ClusterCustomCommands.swift b/Sources/Valkey/Commands/Custom/ClusterCustomCommands.swift index 161e24c3..cfdf5115 100644 --- a/Sources/Valkey/Commands/Custom/ClusterCustomCommands.swift +++ b/Sources/Valkey/Commands/Custom/ClusterCustomCommands.swift @@ -661,7 +661,8 @@ extension ValkeyClusterLink { switch key { case "direction": guard let directionString = try? String(fromRESP: value), - let directionValue = ValkeyClusterLink.Direction(rawValue: directionString) else { + let directionValue = ValkeyClusterLink.Direction(rawValue: directionString) + else { throw .invalidLinkDirection } direction = directionValue @@ -712,7 +713,8 @@ extension ValkeyClusterSlotStats { // First element: slot number guard let slotToken = iterator.next(), - case .number(let slotNumber) = slotToken.value else { + case .number(let slotNumber) = slotToken.value + else { throw .missingRequiredValueForSlotStats } @@ -811,13 +813,15 @@ extension ValkeyClusterSlotRange { // First element: start slot guard let startSlotToken = iterator.next(), - case .number(let startSlotNumber) = startSlotToken.value else { + case .number(let startSlotNumber) = startSlotToken.value + else { throw .missingRequiredValueForSlotRange } // Second element: end slot guard let endSlotToken = iterator.next(), - case .number(let endSlotNumber) = endSlotToken.value else { + case .number(let endSlotNumber) = endSlotToken.value + else { throw .missingRequiredValueForSlotRange } @@ -854,20 +858,23 @@ extension ValkeyClusterSlotRange.Node { // First element: IP address guard let ipToken = iterator.next(), - let ip = try? String(fromRESP: ipToken) else { + let ip = try? String(fromRESP: ipToken) + else { throw .missingRequiredValueForSlotNode } // Second element: port guard let portToken = iterator.next(), - case .number(let portNumber) = portToken.value else { + case .number(let portNumber) = portToken.value + else { throw .missingRequiredValueForSlotNode } let port = Int(portNumber) // Third element: node ID guard let nodeIdToken = iterator.next(), - let nodeId = try? String(fromRESP: nodeIdToken) else { + let nodeId = try? String(fromRESP: nodeIdToken) + else { throw .missingRequiredValueForSlotNode } @@ -880,7 +887,8 @@ extension ValkeyClusterSlotRange.Node { // Handle metadata as a map for (keyToken, valueToken) in map { if let key = try? String(fromRESP: keyToken), - let value = try? String(fromRESP: valueToken) { + let value = try? String(fromRESP: valueToken) + { metadata[key] = value } } diff --git a/Sources/ValkeyConnectionPool/ConnectionPool.swift b/Sources/ValkeyConnectionPool/ConnectionPool.swift index dab41e13..d86e3972 100644 --- a/Sources/ValkeyConnectionPool/ConnectionPool.swift +++ b/Sources/ValkeyConnectionPool/ConnectionPool.swift @@ -593,6 +593,7 @@ extension DiscardingTaskGroup: TaskGroupProtocol { } } +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) extension TaskGroup: TaskGroupProtocol { @inlinable mutating func addTask_(operation: @escaping @Sendable () async -> Void) { diff --git a/Sources/ValkeyConnectionPool/ConnectionRequest.swift b/Sources/ValkeyConnectionPool/ConnectionRequest.swift index 7ba53c0a..ebbf345b 100644 --- a/Sources/ValkeyConnectionPool/ConnectionRequest.swift +++ b/Sources/ValkeyConnectionPool/ConnectionRequest.swift @@ -1,3 +1,4 @@ +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) public struct ConnectionRequest: ConnectionRequestProtocol { public typealias ID = Int From c69dc6860f70f1d9c70a3d219941bf7cb5d60feb Mon Sep 17 00:00:00 2001 From: Nilanshu Sharma Date: Fri, 7 Nov 2025 14:16:47 -0800 Subject: [PATCH 5/6] Fixing format checks Signed-off-by: Nilanshu Sharma --- .../Commands/Custom/ClusterCustomCommands.swift | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/Sources/Valkey/Commands/Custom/ClusterCustomCommands.swift b/Sources/Valkey/Commands/Custom/ClusterCustomCommands.swift index cfdf5115..71f8ac6e 100644 --- a/Sources/Valkey/Commands/Custom/ClusterCustomCommands.swift +++ b/Sources/Valkey/Commands/Custom/ClusterCustomCommands.swift @@ -661,7 +661,7 @@ extension ValkeyClusterLink { switch key { case "direction": guard let directionString = try? String(fromRESP: value), - let directionValue = ValkeyClusterLink.Direction(rawValue: directionString) + let directionValue = ValkeyClusterLink.Direction(rawValue: directionString) else { throw .invalidLinkDirection } @@ -713,7 +713,7 @@ extension ValkeyClusterSlotStats { // First element: slot number guard let slotToken = iterator.next(), - case .number(let slotNumber) = slotToken.value + case .number(let slotNumber) = slotToken.value else { throw .missingRequiredValueForSlotStats } @@ -813,14 +813,14 @@ extension ValkeyClusterSlotRange { // First element: start slot guard let startSlotToken = iterator.next(), - case .number(let startSlotNumber) = startSlotToken.value + case .number(let startSlotNumber) = startSlotToken.value else { throw .missingRequiredValueForSlotRange } // Second element: end slot guard let endSlotToken = iterator.next(), - case .number(let endSlotNumber) = endSlotToken.value + case .number(let endSlotNumber) = endSlotToken.value else { throw .missingRequiredValueForSlotRange } @@ -858,14 +858,14 @@ extension ValkeyClusterSlotRange.Node { // First element: IP address guard let ipToken = iterator.next(), - let ip = try? String(fromRESP: ipToken) + let ip = try? String(fromRESP: ipToken) else { throw .missingRequiredValueForSlotNode } // Second element: port guard let portToken = iterator.next(), - case .number(let portNumber) = portToken.value + case .number(let portNumber) = portToken.value else { throw .missingRequiredValueForSlotNode } @@ -873,7 +873,7 @@ extension ValkeyClusterSlotRange.Node { // Third element: node ID guard let nodeIdToken = iterator.next(), - let nodeId = try? String(fromRESP: nodeIdToken) + let nodeId = try? String(fromRESP: nodeIdToken) else { throw .missingRequiredValueForSlotNode } @@ -887,7 +887,7 @@ extension ValkeyClusterSlotRange.Node { // Handle metadata as a map for (keyToken, valueToken) in map { if let key = try? String(fromRESP: keyToken), - let value = try? String(fromRESP: valueToken) + let value = try? String(fromRESP: valueToken) { metadata[key] = value } From 28496ed5f24f96ffbd9c8784c8faf4b5a10b21fe Mon Sep 17 00:00:00 2001 From: Nilanshu Sharma Date: Fri, 7 Nov 2025 19:14:18 -0800 Subject: [PATCH 6/6] Disabling intergration test conditionally Signed-off-by: Nilanshu Sharma --- .../ClientIntegrationTests.swift | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/Tests/IntegrationTests/ClientIntegrationTests.swift b/Tests/IntegrationTests/ClientIntegrationTests.swift index 3be8818b..fb821dde 100644 --- a/Tests/IntegrationTests/ClientIntegrationTests.swift +++ b/Tests/IntegrationTests/ClientIntegrationTests.swift @@ -627,12 +627,12 @@ struct ClientIntegratedTests { } } - @Test + @Test(.disabled(if: clusterFirstNodeHostname == nil, "VALKEY_NODE1_HOSTNAME environment variable is not set.")) @available(valkeySwift 1.0, *) func testClusterLinks() async throws { var logger = Logger(label: "Valkey") logger.logLevel = .debug - try await withValkeyConnection(.hostname(valkeyHostname, port: 36001), logger: logger) { client in + try await withValkeyConnection(.hostname(clusterFirstNodeHostname!, port: clusterFirstNodePort ?? 36001), logger: logger) { client in let clusterLinks = try await client.clusterLinks() #expect(!clusterLinks.isEmpty && clusterLinks.count > 0) for clusterLink in clusterLinks { @@ -658,12 +658,13 @@ struct ClientIntegratedTests { } } - @Test + @Test(.disabled(if: clusterFirstNodeHostname == nil, "VALKEY_NODE1_HOSTNAME environment variable is not set.")) @available(valkeySwift 1.0, *) func testClusterSlotStats() async throws { var logger = Logger(label: "Valkey") logger.logLevel = .debug - try await withValkeyConnection(.hostname(valkeyHostname, port: 36001), logger: logger) { client in + + try await withValkeyConnection(.hostname(clusterFirstNodeHostname!, port: clusterFirstNodePort ?? 36001), logger: logger) { client in let slotStats = try await client.clusterSlotStats( filter: .orderby( CLUSTER.SLOTSTATS.FilterOrderby( @@ -693,12 +694,12 @@ struct ClientIntegratedTests { } } - @Test + @Test(.disabled(if: clusterFirstNodeHostname == nil, "VALKEY_NODE1_HOSTNAME environment variable is not set.")) @available(valkeySwift 1.0, *) func testClusterSlots() async throws { var logger = Logger(label: "Valkey") logger.logLevel = .debug - try await withValkeyConnection(.hostname(valkeyHostname, port: 36001), logger: logger) { client in + try await withValkeyConnection(.hostname(clusterFirstNodeHostname!, port: clusterFirstNodePort ?? 36001), logger: logger) { client in let clusterSlots = try await client.clusterSlots() for clusterSlot in clusterSlots { #expect(clusterSlot.startSlot >= 0 && clusterSlot.startSlot <= 16383) @@ -713,3 +714,6 @@ struct ClientIntegratedTests { } } + +private let clusterFirstNodeHostname: String? = ProcessInfo.processInfo.environment["VALKEY_NODE1_HOSTNAME"] +private let clusterFirstNodePort: Int? = ProcessInfo.processInfo.environment["VALKEY_NODE1_PORT"].flatMap { Int($0) }