Skip to content

Commit e178163

Browse files
authored
Add test to lease multiple connections at once (#440)
- Add test to lease multiple connections at once - Rename `Waiter` to `Future` - Rename `Waiter.Result` to `Future.Success`
1 parent d5d16e3 commit e178163

File tree

3 files changed

+99
-14
lines changed

3 files changed

+99
-14
lines changed

Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ final class ConnectionPoolTests: XCTestCase {
401401
_ = try await pool.leaseConnection()
402402
}
403403

404-
let connectionAttemptWaiter = Waiter(of: Void.self)
404+
let connectionAttemptWaiter = Future(of: Void.self)
405405

406406
taskGroup.addTask {
407407
try await factory.nextConnectAttempt { connectionID in
@@ -410,7 +410,7 @@ final class ConnectionPoolTests: XCTestCase {
410410
}
411411
}
412412

413-
try await connectionAttemptWaiter.result
413+
try await connectionAttemptWaiter.success
414414
leaseTask.cancel()
415415

416416
let taskResult = await leaseTask.result
@@ -427,5 +427,87 @@ final class ConnectionPoolTests: XCTestCase {
427427
}
428428
}
429429
}
430+
431+
func testLeasingMultipleConnectionsAtOnceWorks() async throws {
432+
let clock = MockClock()
433+
let factory = MockConnectionFactory<MockClock>()
434+
let keepAliveDuration = Duration.seconds(30)
435+
let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self)
436+
437+
var mutableConfig = ConnectionPoolConfiguration()
438+
mutableConfig.minimumConnectionCount = 4
439+
mutableConfig.maximumConnectionSoftLimit = 4
440+
mutableConfig.maximumConnectionHardLimit = 4
441+
mutableConfig.idleTimeout = .seconds(10)
442+
let config = mutableConfig
443+
444+
let pool = ConnectionPool(
445+
configuration: config,
446+
idGenerator: ConnectionIDGenerator(),
447+
requestType: ConnectionFuture.self,
448+
keepAliveBehavior: keepAlive,
449+
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self),
450+
clock: clock
451+
) {
452+
try await factory.makeConnection(id: $0, for: $1)
453+
}
454+
455+
try await withThrowingTaskGroup(of: Void.self) { taskGroup in
456+
taskGroup.addTask {
457+
await pool.run()
458+
}
459+
460+
// create 4 persisted connections
461+
for _ in 0..<4 {
462+
await factory.nextConnectAttempt { connectionID in
463+
return 1
464+
}
465+
}
466+
467+
// create 4 connection requests
468+
let requests = (0..<4).map { ConnectionFuture(id: $0) }
469+
470+
// lease 4 connections at once
471+
pool.leaseConnections(requests)
472+
var connections = [MockConnection]()
473+
474+
for request in requests {
475+
let connection = try await request.future.success
476+
connections.append(connection)
477+
}
478+
479+
// Ensure that we got 4 distinct connections
480+
XCTAssertEqual(Set(connections.lazy.map(\.id)).count, 4)
481+
482+
// release all 4 leased connections
483+
for connection in connections {
484+
pool.releaseConnection(connection)
485+
}
486+
487+
// shutdown
488+
taskGroup.cancelAll()
489+
for connection in factory.runningConnections {
490+
connection.closeIfClosing()
491+
}
492+
}
493+
}
430494
}
431495

496+
struct ConnectionFuture: ConnectionRequestProtocol {
497+
let id: Int
498+
let future: Future<MockConnection>
499+
500+
init(id: Int) {
501+
self.id = id
502+
self.future = Future(of: MockConnection.self)
503+
}
504+
505+
func complete(with result: Result<MockConnection, ConnectionPoolError>) {
506+
switch result {
507+
case .success(let success):
508+
self.future.yield(value: success)
509+
case .failure(let failure):
510+
self.future.yield(error: failure)
511+
}
512+
}
513+
}

Tests/ConnectionPoolModuleTests/Mocks/MockConnectionFactory.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ final class MockConnectionFactory<Clock: _Concurrency.Clock> where Clock.Duratio
3030

3131
func makeConnection(
3232
id: Int,
33-
for pool: ConnectionPool<MockConnection, Int, ConnectionIDGenerator, ConnectionRequest<MockConnection>, Int, MockPingPongBehavior<MockConnection>, NoOpConnectionPoolMetrics<Int>, Clock>
33+
for pool: ConnectionPool<MockConnection, Int, ConnectionIDGenerator, some ConnectionRequestProtocol, Int, MockPingPongBehavior<MockConnection>, NoOpConnectionPoolMetrics<Int>, Clock>
3434
) async throws -> ConnectionAndMetadata<MockConnection> {
3535
// we currently don't support cancellation when creating a connection
3636
let result = try await withCheckedThrowingContinuation { (checkedContinuation: CheckedContinuation<(MockConnection, UInt16), any Error>) in

Tests/ConnectionPoolModuleTests/Utils/Waiter.swift renamed to Tests/ConnectionPoolModuleTests/Utils/Future.swift

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,34 @@
11
import Atomics
22
@testable import _ConnectionPoolModule
33

4-
final class Waiter<Result: Sendable>: Sendable {
4+
/// This is a `Future` type that shall make writing tests a bit simpler. I'm well aware, that this is a pattern
5+
/// that should not be embraced with structured concurrency. However writing all tests in full structured
6+
/// concurrency is an effort, that isn't worth the endgoals in my view.
7+
final class Future<Success: Sendable>: Sendable {
58
struct State: Sendable {
69

7-
var result: Swift.Result<Result, any Error>? = nil
8-
var continuations: [(Int, CheckedContinuation<Result, any Error>)] = []
10+
var result: Swift.Result<Success, any Error>? = nil
11+
var continuations: [(Int, CheckedContinuation<Success, any Error>)] = []
912

1013
}
1114

1215
let waiterID = ManagedAtomic(0)
1316
let stateBox: NIOLockedValueBox<State> = NIOLockedValueBox(State())
1417

15-
init(of: Result.Type) {}
18+
init(of: Success.Type) {}
1619

1720
enum GetAction {
1821
case fail(any Error)
19-
case succeed(Result)
22+
case succeed(Success)
2023
case none
2124
}
2225

23-
var result: Result {
26+
var success: Success {
2427
get async throws {
2528
let waiterID = self.waiterID.loadThenWrappingIncrement(ordering: .relaxed)
2629

2730
return try await withTaskCancellationHandler {
28-
return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Result, any Error>) in
31+
return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Success, any Error>) in
2932
let action = self.stateBox.withLockedValue { state -> GetAction in
3033
if Task.isCancelled {
3134
return .fail(CancellationError())
@@ -56,7 +59,7 @@ final class Waiter<Result: Sendable>: Sendable {
5659
}
5760
}
5861
} onCancel: {
59-
let cont = self.stateBox.withLockedValue { state -> CheckedContinuation<Result, any Error>? in
62+
let cont = self.stateBox.withLockedValue { state -> CheckedContinuation<Success, any Error>? in
6063
guard state.result == nil else { return nil }
6164

6265
guard let contIndex = state.continuations.firstIndex(where: { $0.0 == waiterID }) else {
@@ -71,10 +74,10 @@ final class Waiter<Result: Sendable>: Sendable {
7174
}
7275
}
7376

74-
func yield(value: Result) {
77+
func yield(value: Success) {
7578
let continuations = self.stateBox.withLockedValue { state in
7679
guard state.result == nil else {
77-
return [(Int, CheckedContinuation<Result, any Error>)]().lazy.map(\.1)
80+
return [(Int, CheckedContinuation<Success, any Error>)]().lazy.map(\.1)
7881
}
7982
state.result = .success(value)
8083

@@ -92,7 +95,7 @@ final class Waiter<Result: Sendable>: Sendable {
9295
func yield(error: any Error) {
9396
let continuations = self.stateBox.withLockedValue { state in
9497
guard state.result == nil else {
95-
return [(Int, CheckedContinuation<Result, any Error>)]().lazy.map(\.1)
98+
return [(Int, CheckedContinuation<Success, any Error>)]().lazy.map(\.1)
9699
}
97100
state.result = .failure(error)
98101

0 commit comments

Comments
 (0)