From 18f4f7a2c7403e1ab3683d33ad4855341b6c4492 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Mon, 27 Oct 2025 13:29:22 +0000 Subject: [PATCH] Latest changes from PostgresNIO connection pool Signed-off-by: Adam Fowler --- Sources/Valkey/Node/ValkeyNodeClient.swift | 11 +++------- .../ConnectionLease.swift | 17 ++++++++++++++ .../ValkeyConnectionPool/ConnectionPool.swift | 22 ++++++++++++------- .../ConnectionPoolError.swift | 17 ++++++++++---- .../ConnectionPoolObservabilityDelegate.swift | 2 +- .../ConnectionRequest.swift | 17 +++++++------- Sources/ValkeyConnectionPool/NIOLock.swift | 4 ++-- .../NIOLockedValueBox.swift | 4 ++-- .../PoolStateMachine+ConnectionGroup.swift | 4 ++-- .../PoolStateMachine.swift | 3 ++- .../TinyFastSequence.swift | 8 ++++++- 11 files changed, 71 insertions(+), 38 deletions(-) create mode 100644 Sources/ValkeyConnectionPool/ConnectionLease.swift diff --git a/Sources/Valkey/Node/ValkeyNodeClient.swift b/Sources/Valkey/Node/ValkeyNodeClient.swift index 6051a0da..81452043 100644 --- a/Sources/Valkey/Node/ValkeyNodeClient.swift +++ b/Sources/Valkey/Node/ValkeyNodeClient.swift @@ -142,15 +142,10 @@ extension ValkeyNodeClient { isolation: isolated (any Actor)? = #isolation, operation: (ValkeyConnection) async throws -> sending Value ) async throws -> Value { - let connection = try await self.leaseConnection() + let lease = try await self.connectionPool.leaseConnection() + defer { lease.release() } - defer { self.connectionPool.releaseConnection(connection) } - - return try await operation(connection) - } - - private func leaseConnection() async throws -> ValkeyConnection { - try await self.connectionPool.leaseConnection() + return try await operation(lease.connection) } } diff --git a/Sources/ValkeyConnectionPool/ConnectionLease.swift b/Sources/ValkeyConnectionPool/ConnectionLease.swift new file mode 100644 index 00000000..5bab4d66 --- /dev/null +++ b/Sources/ValkeyConnectionPool/ConnectionLease.swift @@ -0,0 +1,17 @@ +public struct ConnectionLease: Sendable { + public var connection: Connection + + @usableFromInline + let _release: @Sendable (Connection) -> Void + + @inlinable + public init(connection: Connection, release: @escaping @Sendable (Connection) -> Void) { + self.connection = connection + self._release = release + } + + @inlinable + public func release() { + self._release(self.connection) + } +} diff --git a/Sources/ValkeyConnectionPool/ConnectionPool.swift b/Sources/ValkeyConnectionPool/ConnectionPool.swift index 209649c8..dab41e13 100644 --- a/Sources/ValkeyConnectionPool/ConnectionPool.swift +++ b/Sources/ValkeyConnectionPool/ConnectionPool.swift @@ -87,7 +87,7 @@ public protocol ConnectionRequestProtocol: Sendable { /// A function that is called with a connection or a /// `PoolError`. - func complete(with: Result) + func complete(with: Result, ConnectionPoolError>) } @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) @@ -275,6 +275,7 @@ where } } + @inlinable public func run() async { await withTaskCancellationHandler { if #available(macOS 14.0, iOS 17.0, tvOS 17.0, watchOS 10.0, *) { @@ -319,13 +320,15 @@ where } @available(macOS 14.0, iOS 17.0, tvOS 17.0, watchOS 10.0, *) - private func run(in taskGroup: inout DiscardingTaskGroup) async { + @inlinable + /* private */ func run(in taskGroup: inout DiscardingTaskGroup) async { for await event in self.eventStream { self.runEvent(event, in: &taskGroup) } } - private func run(in taskGroup: inout TaskGroup) async { + @inlinable + /* private */ func run(in taskGroup: inout TaskGroup) async { var running = 0 for await event in self.eventStream { running += 1 @@ -338,7 +341,8 @@ where } } - private func runEvent(_ event: NewPoolActions, in taskGroup: inout some TaskGroupProtocol) { + @inlinable + /* private */ func runEvent(_ event: NewPoolActions, in taskGroup: inout some TaskGroupProtocol) { switch event { case .makeConnection(let request): self.makeConnection(for: request, in: &taskGroup) @@ -405,8 +409,11 @@ where /*private*/ func runRequestAction(_ action: StateMachine.RequestAction) { switch action { case .leaseConnection(let requests, let connection): + let lease = ConnectionLease(connection: connection) { connection in + self.releaseConnection(connection) + } for request in requests { - request.complete(with: .success(connection)) + request.complete(with: .success(lease)) } case .failRequest(let request, let error): @@ -430,7 +437,7 @@ where self.connectionEstablished(bundle) // after the connection has been established, we keep the task open. This ensures - // that the pools run method cannot be exited before all connections have been + // that the pools run method can not be exited before all connections have been // closed. await withCheckedContinuation { (continuation: CheckedContinuation) in bundle.connection.onClose { @@ -458,7 +465,7 @@ where } @inlinable - /*private*/ func connectionEstablishFailed(_ error: any Error, for request: StateMachine.ConnectionRequest) { + /*private*/ func connectionEstablishFailed(_ error: Error, for request: StateMachine.ConnectionRequest) { self.observabilityDelegate.connectFailed(id: request.connectionID, error: error) self.modifyStateAndRunActions { state in @@ -586,7 +593,6 @@ extension DiscardingTaskGroup: TaskGroupProtocol { } } -@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) extension TaskGroup: TaskGroupProtocol { @inlinable mutating func addTask_(operation: @escaping @Sendable () async -> Void) { diff --git a/Sources/ValkeyConnectionPool/ConnectionPoolError.swift b/Sources/ValkeyConnectionPool/ConnectionPoolError.swift index eadf30f7..3b62f11d 100644 --- a/Sources/ValkeyConnectionPool/ConnectionPoolError.swift +++ b/Sources/ValkeyConnectionPool/ConnectionPoolError.swift @@ -1,15 +1,24 @@ public struct ConnectionPoolError: Error, Hashable { - enum Base: Error, Hashable { + @usableFromInline + enum Base: Error, Hashable, Sendable { case requestCancelled case poolShutdown } - private let base: Base + @usableFromInline + let base: Base + @inlinable init(_ base: Base) { self.base = base } /// The connection requests got cancelled - public static let requestCancelled = ConnectionPoolError(.requestCancelled) + @inlinable + public static var requestCancelled: Self { + ConnectionPoolError(.requestCancelled) + } /// The connection requests can't be fulfilled as the pool has already been shutdown - public static let poolShutdown = ConnectionPoolError(.poolShutdown) + @inlinable + public static var poolShutdown: Self { + ConnectionPoolError(.poolShutdown) + } } diff --git a/Sources/ValkeyConnectionPool/ConnectionPoolObservabilityDelegate.swift b/Sources/ValkeyConnectionPool/ConnectionPoolObservabilityDelegate.swift index 068f9ea6..1893f168 100644 --- a/Sources/ValkeyConnectionPool/ConnectionPoolObservabilityDelegate.swift +++ b/Sources/ValkeyConnectionPool/ConnectionPoolObservabilityDelegate.swift @@ -15,7 +15,7 @@ public protocol ConnectionPoolObservabilityDelegate: Sendable { /// time and is reported via ````. The func connectSucceeded(id: ConnectionID, streamCapacity: UInt16) - /// The utilization of the connection changed; a stream may have been used, returned or the + /// The utlization of the connection changed; a stream may have been used, returned or the /// maximum number of concurrent streams available on the connection changed. func connectionUtilizationChanged(id: ConnectionID, streamsUsed: UInt16, streamCapacity: UInt16) diff --git a/Sources/ValkeyConnectionPool/ConnectionRequest.swift b/Sources/ValkeyConnectionPool/ConnectionRequest.swift index f5f7fb14..7ba53c0a 100644 --- a/Sources/ValkeyConnectionPool/ConnectionRequest.swift +++ b/Sources/ValkeyConnectionPool/ConnectionRequest.swift @@ -1,22 +1,21 @@ -@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) public struct ConnectionRequest: ConnectionRequestProtocol { public typealias ID = Int public var id: ID @usableFromInline - private(set) var continuation: CheckedContinuation + private(set) var continuation: CheckedContinuation, any Error> @inlinable init( id: Int, - continuation: CheckedContinuation + continuation: CheckedContinuation, any Error> ) { self.id = id self.continuation = continuation } - public func complete(with result: Result) { + public func complete(with result: Result, ConnectionPoolError>) { self.continuation.resume(with: result) } } @@ -46,7 +45,7 @@ extension ConnectionPool where Request == ConnectionRequest { } @inlinable - public func leaseConnection() async throws -> Connection { + public func leaseConnection() async throws -> ConnectionLease { let requestID = requestIDGenerator.next() let connection = try await withTaskCancellationHandler { @@ -54,7 +53,7 @@ extension ConnectionPool where Request == ConnectionRequest { throw CancellationError() } - return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation, Error>) in let request = Request( id: requestID, continuation: continuation @@ -71,8 +70,8 @@ extension ConnectionPool where Request == ConnectionRequest { @inlinable public func withConnection(_ closure: (Connection) async throws -> Result) async throws -> Result { - let connection = try await self.leaseConnection() - defer { self.releaseConnection(connection) } - return try await closure(connection) + let lease = try await self.leaseConnection() + defer { lease.release() } + return try await closure(lease.connection) } } diff --git a/Sources/ValkeyConnectionPool/NIOLock.swift b/Sources/ValkeyConnectionPool/NIOLock.swift index d6b7ea03..03ab1fdd 100644 --- a/Sources/ValkeyConnectionPool/NIOLock.swift +++ b/Sources/ValkeyConnectionPool/NIOLock.swift @@ -12,7 +12,7 @@ // // This source file is part of the SwiftNIO open source project // -// Copyright (c) 2017-2022 the SwiftNIO project authors +// Copyright (c) 2017-2022 Apple Inc. and the SwiftNIO project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -145,7 +145,7 @@ final class LockStorage: ManagedBuffer { let buffer = Self.create(minimumCapacity: 1) { _ in value } - // Intentionally using a force cast here to avoid a miss compilation in 5.10. + // Intentionally using a force cast here to avoid a miss compiliation in 5.10. // This is as fast as an unsafeDownCast since ManagedBuffer is inlined and the optimizer // can eliminate the upcast/downcast pair let storage = buffer as! Self diff --git a/Sources/ValkeyConnectionPool/NIOLockedValueBox.swift b/Sources/ValkeyConnectionPool/NIOLockedValueBox.swift index e1a4185a..e8f9be55 100644 --- a/Sources/ValkeyConnectionPool/NIOLockedValueBox.swift +++ b/Sources/ValkeyConnectionPool/NIOLockedValueBox.swift @@ -12,7 +12,7 @@ // // This source file is part of the SwiftNIO open source project // -// Copyright (c) 2022 the SwiftNIO project authors +// Copyright (c) 2022 Apple Inc. and the SwiftNIO project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -51,7 +51,7 @@ struct NIOLockedValueBox { /// Provides an unsafe view over the lock and its value. /// - /// This can be beneficial when you require fine-grained control over the lock in some + /// This can be beneficial when you require fine grained control over the lock in some /// situations but don't want lose the benefits of ``withLockedValue(_:)`` in others by /// switching to ``NIOLock``. var unsafe: Unsafe { diff --git a/Sources/ValkeyConnectionPool/PoolStateMachine+ConnectionGroup.swift b/Sources/ValkeyConnectionPool/PoolStateMachine+ConnectionGroup.swift index fef8a3c9..bfd89a89 100644 --- a/Sources/ValkeyConnectionPool/PoolStateMachine+ConnectionGroup.swift +++ b/Sources/ValkeyConnectionPool/PoolStateMachine+ConnectionGroup.swift @@ -517,13 +517,13 @@ extension PoolStateMachine { @inlinable mutating func closeConnectionIfIdle(_ connectionID: Connection.ID) -> CloseAction? { guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else { - // because of a race, this connection (connection close runs against trigger of timeout) + // because of a race this connection (connection close runs against trigger of timeout) // was already removed from the state machine. return nil } if index < self.minimumConcurrentConnections { - // because of a race, a connection might receive an idle timeout after it was moved into + // because of a race a connection might receive a idle timeout after it was moved into // the persisted connections. If a connection is now persisted, we now need to ignore // the trigger return nil diff --git a/Sources/ValkeyConnectionPool/PoolStateMachine.swift b/Sources/ValkeyConnectionPool/PoolStateMachine.swift index 4399b8ee..f1195b2e 100644 --- a/Sources/ValkeyConnectionPool/PoolStateMachine.swift +++ b/Sources/ValkeyConnectionPool/PoolStateMachine.swift @@ -17,7 +17,7 @@ struct PoolConfiguration: Sendable { @usableFromInline var minimumConnectionCount: Int = 0 - /// The maximum number of connections for this pool, to be preserved. + /// The maximum number of connections to for this pool, to be preserved. @usableFromInline var maximumConnectionSoftLimit: Int = 10 @@ -434,6 +434,7 @@ struct PoolStateMachine< fatalError("Unimplemented") } + @usableFromInline mutating func triggerForceShutdown() -> Action { switch self.poolState { case .running: diff --git a/Sources/ValkeyConnectionPool/TinyFastSequence.swift b/Sources/ValkeyConnectionPool/TinyFastSequence.swift index 7b83be9e..7694461d 100644 --- a/Sources/ValkeyConnectionPool/TinyFastSequence.swift +++ b/Sources/ValkeyConnectionPool/TinyFastSequence.swift @@ -29,6 +29,12 @@ struct TinyFastSequence: Sequence { self.base = .none(reserveCapacity: 0) case 1: self.base = .one(collection.first!, reserveCapacity: 0) + case 2: + self.base = .two( + collection.first!, + collection[collection.index(after: collection.startIndex)], + reserveCapacity: 0 + ) default: if let collection = collection as? [Element] { self.base = .n(collection) @@ -46,7 +52,7 @@ struct TinyFastSequence: Sequence { case 1: self.base = .one(max2Sequence.first!, reserveCapacity: 0) case 2: - self.base = .n(Array(max2Sequence)) + self.base = .two(max2Sequence.first!, max2Sequence.second!, reserveCapacity: 0) default: fatalError() }