Skip to content

Commit dc94503

Browse files
authored
Add Test: Lease connection after shutdown has started fails (#441)
1 parent e178163 commit dc94503

File tree

2 files changed

+165
-17
lines changed

2 files changed

+165
-17
lines changed

Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,122 @@ final class ConnectionPoolTests: XCTestCase {
491491
}
492492
}
493493
}
494+
495+
func testLeasingConnectionAfterShutdownIsInvokedFails() async throws {
496+
let clock = MockClock()
497+
let factory = MockConnectionFactory<MockClock>()
498+
let keepAliveDuration = Duration.seconds(30)
499+
let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self)
500+
501+
var mutableConfig = ConnectionPoolConfiguration()
502+
mutableConfig.minimumConnectionCount = 4
503+
mutableConfig.maximumConnectionSoftLimit = 4
504+
mutableConfig.maximumConnectionHardLimit = 4
505+
mutableConfig.idleTimeout = .seconds(10)
506+
let config = mutableConfig
507+
508+
let pool = ConnectionPool(
509+
configuration: config,
510+
idGenerator: ConnectionIDGenerator(),
511+
requestType: ConnectionRequest<MockConnection>.self,
512+
keepAliveBehavior: keepAlive,
513+
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self),
514+
clock: clock
515+
) {
516+
try await factory.makeConnection(id: $0, for: $1)
517+
}
518+
519+
try await withThrowingTaskGroup(of: Void.self) { taskGroup in
520+
taskGroup.addTask {
521+
await pool.run()
522+
}
523+
524+
// create 4 persisted connections
525+
for _ in 0..<4 {
526+
await factory.nextConnectAttempt { connectionID in
527+
return 1
528+
}
529+
}
530+
531+
// shutdown
532+
taskGroup.cancelAll()
533+
534+
do {
535+
_ = try await pool.leaseConnection()
536+
XCTFail("Expected a failure")
537+
} catch {
538+
print("failed")
539+
XCTAssertEqual(error as? ConnectionPoolError, .poolShutdown)
540+
}
541+
542+
print("will close connections: \(factory.runningConnections)")
543+
for connection in factory.runningConnections {
544+
try await connection.signalToClose
545+
connection.closeIfClosing()
546+
}
547+
}
548+
}
549+
550+
func testLeasingConnectionsAfterShutdownIsInvokedFails() async throws {
551+
let clock = MockClock()
552+
let factory = MockConnectionFactory<MockClock>()
553+
let keepAliveDuration = Duration.seconds(30)
554+
let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self)
555+
556+
var mutableConfig = ConnectionPoolConfiguration()
557+
mutableConfig.minimumConnectionCount = 4
558+
mutableConfig.maximumConnectionSoftLimit = 4
559+
mutableConfig.maximumConnectionHardLimit = 4
560+
mutableConfig.idleTimeout = .seconds(10)
561+
let config = mutableConfig
562+
563+
let pool = ConnectionPool(
564+
configuration: config,
565+
idGenerator: ConnectionIDGenerator(),
566+
requestType: ConnectionFuture.self,
567+
keepAliveBehavior: keepAlive,
568+
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self),
569+
clock: clock
570+
) {
571+
try await factory.makeConnection(id: $0, for: $1)
572+
}
573+
574+
try await withThrowingTaskGroup(of: Void.self) { taskGroup in
575+
taskGroup.addTask {
576+
await pool.run()
577+
}
578+
579+
// create 4 persisted connections
580+
for _ in 0..<4 {
581+
await factory.nextConnectAttempt { connectionID in
582+
return 1
583+
}
584+
}
585+
586+
// shutdown
587+
taskGroup.cancelAll()
588+
589+
// create 4 connection requests
590+
let requests = (0..<4).map { ConnectionFuture(id: $0) }
591+
592+
// lease 4 connections at once
593+
pool.leaseConnections(requests)
594+
595+
for request in requests {
596+
do {
597+
_ = try await request.future.success
598+
XCTFail("Expected a failure")
599+
} catch {
600+
XCTAssertEqual(error as? ConnectionPoolError, .poolShutdown)
601+
}
602+
}
603+
604+
for connection in factory.runningConnections {
605+
try await connection.signalToClose
606+
connection.closeIfClosing()
607+
}
608+
}
609+
}
494610
}
495611

496612
struct ConnectionFuture: ConnectionRequestProtocol {

Tests/ConnectionPoolModuleTests/Mocks/MockConnection.swift

Lines changed: 49 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,38 +2,59 @@ import DequeModule
22
@testable import _ConnectionPoolModule
33

44
// Sendability enforced through the lock
5-
final class MockConnection: PooledConnection, @unchecked Sendable {
5+
final class MockConnection: PooledConnection, Sendable {
66
typealias ID = Int
77

88
let id: ID
99

1010
private enum State {
11-
case running([@Sendable ((any Error)?) -> ()])
11+
case running([CheckedContinuation<Void, any Error>], [@Sendable ((any Error)?) -> ()])
1212
case closing([@Sendable ((any Error)?) -> ()])
1313
case closed
1414
}
1515

16-
private let lock = NIOLock()
17-
private var _state = State.running([])
16+
private let lock: NIOLockedValueBox<State> = NIOLockedValueBox(.running([], []))
1817

1918
init(id: Int) {
2019
self.id = id
2120
}
2221

22+
var signalToClose: Void {
23+
get async throws {
24+
try await withCheckedThrowingContinuation { continuation in
25+
let runRightAway = self.lock.withLockedValue { state -> Bool in
26+
switch state {
27+
case .running(var continuations, let callbacks):
28+
continuations.append(continuation)
29+
state = .running(continuations, callbacks)
30+
return false
31+
32+
case .closing, .closed:
33+
return true
34+
}
35+
}
36+
37+
if runRightAway {
38+
continuation.resume()
39+
}
40+
}
41+
}
42+
}
43+
2344
func onClose(_ closure: @escaping @Sendable ((any Error)?) -> ()) {
24-
let enqueued = self.lock.withLock { () -> Bool in
25-
switch self._state {
45+
let enqueued = self.lock.withLockedValue { state -> Bool in
46+
switch state {
2647
case .closed:
2748
return false
2849

29-
case .running(var callbacks):
50+
case .running(let continuations, var callbacks):
3051
callbacks.append(closure)
31-
self._state = .running(callbacks)
52+
state = .running(continuations, callbacks)
3253
return true
3354

3455
case .closing(var callbacks):
3556
callbacks.append(closure)
36-
self._state = .closing(callbacks)
57+
state = .closing(callbacks)
3758
return true
3859
}
3960
}
@@ -44,25 +65,30 @@ final class MockConnection: PooledConnection, @unchecked Sendable {
4465
}
4566

4667
func close() {
47-
self.lock.withLock {
48-
switch self._state {
49-
case .running(let callbacks):
50-
self._state = .closing(callbacks)
68+
let continuations = self.lock.withLockedValue { state -> [CheckedContinuation<Void, any Error>] in
69+
switch state {
70+
case .running(let continuations, let callbacks):
71+
state = .closing(callbacks)
72+
return continuations
5173

5274
case .closing, .closed:
53-
break
75+
return []
5476
}
5577
}
78+
79+
for continuation in continuations {
80+
continuation.resume()
81+
}
5682
}
5783

5884
func closeIfClosing() {
59-
let callbacks = self.lock.withLock { () -> [@Sendable ((any Error)?) -> ()] in
60-
switch self._state {
85+
let callbacks = self.lock.withLockedValue { state -> [@Sendable ((any Error)?) -> ()] in
86+
switch state {
6187
case .running, .closed:
6288
return []
6389

6490
case .closing(let callbacks):
65-
self._state = .closed
91+
state = .closed
6692
return callbacks
6793
}
6894
}
@@ -73,3 +99,9 @@ final class MockConnection: PooledConnection, @unchecked Sendable {
7399
}
74100
}
75101

102+
extension MockConnection: CustomStringConvertible {
103+
var description: String {
104+
let state = self.lock.withLockedValue { $0 }
105+
return "MockConnection(id: \(self.id), state: \(state))"
106+
}
107+
}

0 commit comments

Comments
 (0)