Skip to content

Commit 468ae25

Browse files
authored
Land ConnectionPool (#428)
1 parent 472ff4a commit 468ae25

12 files changed

+1135
-26
lines changed

Sources/ConnectionPoolModule/ConnectionPool.swift

Lines changed: 483 additions & 1 deletion
Large diffs are not rendered by default.

Sources/ConnectionPoolModule/ConnectionRequest.swift

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,56 @@ public struct ConnectionRequest<Connection: PooledConnection>: ConnectionRequest
2020
self.continuation.resume(with: result)
2121
}
2222
}
23+
24+
fileprivate let requestIDGenerator = _ConnectionPoolModule.ConnectionIDGenerator()
25+
26+
@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
27+
extension ConnectionPool where Request == ConnectionRequest<Connection> {
28+
public convenience init(
29+
configuration: ConnectionPoolConfiguration,
30+
idGenerator: ConnectionIDGenerator = _ConnectionPoolModule.ConnectionIDGenerator(),
31+
keepAliveBehavior: KeepAliveBehavior,
32+
observabilityDelegate: ObservabilityDelegate,
33+
clock: Clock = ContinuousClock(),
34+
connectionFactory: @escaping ConnectionFactory
35+
) {
36+
self.init(
37+
configuration: configuration,
38+
idGenerator: idGenerator,
39+
requestType: ConnectionRequest<Connection>.self,
40+
keepAliveBehavior: keepAliveBehavior,
41+
observabilityDelegate: observabilityDelegate,
42+
clock: clock,
43+
connectionFactory: connectionFactory
44+
)
45+
}
46+
47+
public func leaseConnection() async throws -> Connection {
48+
let requestID = requestIDGenerator.next()
49+
50+
let connection = try await withTaskCancellationHandler {
51+
if Task.isCancelled {
52+
throw CancellationError()
53+
}
54+
55+
return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Connection, Error>) in
56+
let request = Request(
57+
id: requestID,
58+
continuation: continuation
59+
)
60+
61+
self.leaseConnection(request)
62+
}
63+
} onCancel: {
64+
self.cancelLeaseConnection(requestID)
65+
}
66+
67+
return connection
68+
}
69+
70+
public func withConnection<Result>(_ closure: (Connection) async throws -> Result) async throws -> Result {
71+
let connection = try await self.leaseConnection()
72+
defer { self.releaseConnection(connection) }
73+
return try await closure(connection)
74+
}
75+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Implementation vendored from SwiftNIO:
2+
// https://github.com/apple/swift-nio
3+
4+
//===----------------------------------------------------------------------===//
5+
//
6+
// This source file is part of the SwiftNIO open source project
7+
//
8+
// Copyright (c) 2022 Apple Inc. and the SwiftNIO project authors
9+
// Licensed under Apache License v2.0
10+
//
11+
// See LICENSE.txt for license information
12+
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
13+
//
14+
// SPDX-License-Identifier: Apache-2.0
15+
//
16+
//===----------------------------------------------------------------------===//
17+
18+
/// Provides locked access to `Value`.
19+
///
20+
/// - note: ``NIOLockedValueBox`` has reference semantics and holds the `Value`
21+
/// alongside a lock behind a reference.
22+
///
23+
/// This is no different than creating a ``Lock`` and protecting all
24+
/// accesses to a value using the lock. But it's easy to forget to actually
25+
/// acquire/release the lock in the correct place. ``NIOLockedValueBox`` makes
26+
/// that much easier.
27+
@usableFromInline
28+
struct NIOLockedValueBox<Value> {
29+
30+
@usableFromInline
31+
internal let _storage: LockStorage<Value>
32+
33+
/// Initialize the `Value`.
34+
@inlinable
35+
init(_ value: Value) {
36+
self._storage = .create(value: value)
37+
}
38+
39+
/// Access the `Value`, allowing mutation of it.
40+
@inlinable
41+
func withLockedValue<T>(_ mutate: (inout Value) throws -> T) rethrows -> T {
42+
return try self._storage.withLockedValue(mutate)
43+
}
44+
}
45+
46+
extension NIOLockedValueBox: Sendable where Value: Sendable {}

Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -342,9 +342,9 @@ extension PoolStateMachine {
342342
/// Call ``leaseConnection(at:)`` or ``closeConnection(at:)`` with the supplied index after
343343
/// this. If you want to park the connection no further call is required.
344344
@inlinable
345-
mutating func releaseConnection(_ connectionID: Connection.ID, streams: UInt16) -> (Int, AvailableConnectionContext) {
345+
mutating func releaseConnection(_ connectionID: Connection.ID, streams: UInt16) -> (Int, AvailableConnectionContext)? {
346346
guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else {
347-
preconditionFailure("A connection that we don't know was released? Something is very wrong...")
347+
return nil
348348
}
349349

350350
let connectionInfo = self.connections[index].release(streams: streams)
@@ -657,3 +657,6 @@ extension PoolStateMachine {
657657

658658
@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
659659
extension PoolStateMachine.ConnectionGroup.BackoffDoneAction: Equatable where TimerCancellationToken: Equatable {}
660+
661+
@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
662+
extension PoolStateMachine.ConnectionGroup.ClosedAction: Equatable where TimerCancellationToken: Equatable {}

Sources/ConnectionPoolModule/PoolStateMachine.swift

Lines changed: 40 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,9 @@ struct PoolStateMachine<
234234

235235
@inlinable
236236
mutating func releaseConnection(_ connection: Connection, streams: UInt16) -> Action {
237-
let (index, context) = self.connections.releaseConnection(connection.id, streams: streams)
237+
guard let (index, context) = self.connections.releaseConnection(connection.id, streams: streams) else {
238+
return .none()
239+
}
238240
return self.handleAvailableConnection(index: index, availableContext: context)
239241
}
240242

@@ -251,8 +253,13 @@ struct PoolStateMachine<
251253

252254
@inlinable
253255
mutating func connectionEstablished(_ connection: Connection, maxStreams: UInt16) -> Action {
254-
let (index, context) = self.connections.newConnectionEstablished(connection, maxStreams: maxStreams)
255-
return self.handleAvailableConnection(index: index, availableContext: context)
256+
switch self.poolState {
257+
case .running, .shuttingDown(graceful: true):
258+
let (index, context) = self.connections.newConnectionEstablished(connection, maxStreams: maxStreams)
259+
return self.handleAvailableConnection(index: index, availableContext: context)
260+
case .shuttingDown(graceful: false), .shutDown:
261+
return .init(request: .none, connection: .closeConnection(connection, []))
262+
}
256263
}
257264

258265
@inlinable
@@ -274,31 +281,43 @@ struct PoolStateMachine<
274281

275282
@inlinable
276283
mutating func connectionEstablishFailed(_ error: Error, for request: ConnectionRequest) -> Action {
277-
self.failedConsecutiveConnectionAttempts += 1
284+
switch self.poolState {
285+
case .running, .shuttingDown(graceful: true):
286+
self.failedConsecutiveConnectionAttempts += 1
278287

279-
let connectionTimer = self.connections.backoffNextConnectionAttempt(request.connectionID)
280-
let backoff = Self.calculateBackoff(failedAttempt: self.failedConsecutiveConnectionAttempts)
281-
let timer = Timer(connectionTimer, duration: backoff)
282-
return .init(request: .none, connection: .scheduleTimers(.init(timer)))
288+
let connectionTimer = self.connections.backoffNextConnectionAttempt(request.connectionID)
289+
let backoff = Self.calculateBackoff(failedAttempt: self.failedConsecutiveConnectionAttempts)
290+
let timer = Timer(connectionTimer, duration: backoff)
291+
return .init(request: .none, connection: .scheduleTimers(.init(timer)))
292+
293+
case .shuttingDown(graceful: false), .shutDown:
294+
return .none()
295+
}
283296
}
284297

285298
@inlinable
286299
mutating func connectionCreationBackoffDone(_ connectionID: ConnectionID) -> Action {
287-
let soonAvailable = self.connections.soonAvailableConnections
288-
let retry = (soonAvailable - 1) < self.requestQueue.count
289-
290-
switch self.connections.backoffDone(connectionID, retry: retry) {
291-
case .createConnection(let request, let continuation):
292-
let timers: TinyFastSequence<TimerCancellationToken>
293-
if let continuation {
294-
timers = .init(element: continuation)
295-
} else {
296-
timers = .init()
300+
switch self.poolState {
301+
case .running, .shuttingDown(graceful: true):
302+
let soonAvailable = self.connections.soonAvailableConnections
303+
let retry = (soonAvailable - 1) < self.requestQueue.count
304+
305+
switch self.connections.backoffDone(connectionID, retry: retry) {
306+
case .createConnection(let request, let continuation):
307+
let timers: TinyFastSequence<TimerCancellationToken>
308+
if let continuation {
309+
timers = .init(element: continuation)
310+
} else {
311+
timers = .init()
312+
}
313+
return .init(request: .none, connection: .makeConnection(request, timers))
314+
315+
case .cancelTimers(let timers):
316+
return .init(request: .none, connection: .cancelTimers(.init(timers)))
297317
}
298-
return .init(request: .none, connection: .makeConnection(request, timers))
299318

300-
case .cancelTimers(let timers):
301-
return .init(request: .none, connection: .cancelTimers(.init(timers)))
319+
case .shuttingDown(graceful: false), .shutDown:
320+
return .none()
302321
}
303322
}
304323

0 commit comments

Comments
 (0)