Skip to content

Commit b863152

Browse files
authored
SWIFT-749 Rename MongoClient.shutdown to MongoClient.close and improve shutdown logic (#446)
1 parent 973acb2 commit b863152

File tree

8 files changed

+232
-95
lines changed

8 files changed

+232
-95
lines changed

Sources/MongoSwift/APM.swift

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -701,7 +701,8 @@ private func publishEvent<T: MongoSwiftEvent>(type: T.Type, eventPtr: OpaquePoin
701701

702702
/// An extension of `ConnectionPool` to add monitoring capability for commands and server discovery and monitoring.
703703
extension ConnectionPool {
704-
/// Internal function to install monitoring callbacks for this pool.
704+
/// Internal function to install monitoring callbacks for this pool. **This method may only be called before any
705+
/// connections are checked out from the pool.**
705706
internal func initializeMonitoring(client: MongoClient) {
706707
guard let callbacks = mongoc_apm_callbacks_new() else {
707708
fatalError("failed to initialize new mongoc_apm_callbacks_t")
@@ -722,13 +723,6 @@ extension ConnectionPool {
722723
mongoc_apm_set_server_heartbeat_succeeded_cb(callbacks, serverHeartbeatSucceeded)
723724
mongoc_apm_set_server_heartbeat_failed_cb(callbacks, serverHeartbeatFailed)
724725

725-
// we can pass the MongoClient as unretained because the callbacks are stored on clientHandle, so if the
726-
// callback is being executed, this pool and therefore its parent `MongoClient` must still be valid.
727-
switch self.state {
728-
case let .open(pool):
729-
mongoc_client_pool_set_apm_callbacks(pool, callbacks, Unmanaged.passUnretained(client).toOpaque())
730-
case .closed:
731-
fatalError("ConnectionPool was already closed")
732-
}
726+
self.setAPMCallbacks(callbacks: callbacks, client: client)
733727
}
734728
}

Sources/MongoSwift/ConnectionPool.swift

Lines changed: 129 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import CLibMongoC
2+
import Foundation
3+
import NIOConcurrencyHelpers
24

35
/// A connection to the database.
46
internal class Connection {
@@ -13,27 +15,60 @@ internal class Connection {
1315
}
1416

1517
deinit {
16-
switch self.pool.state {
17-
case let .open(pool):
18-
mongoc_client_pool_push(pool, self.clientHandle)
19-
case .closed:
20-
assertionFailure("ConnectionPool was already closed")
18+
do {
19+
try self.pool.checkIn(self)
20+
} catch {
21+
assertionFailure("Failed to check connection back in: \(error)")
2122
}
2223
}
2324
}
2425

26+
extension NSCondition {
27+
fileprivate func withLock<T>(_ body: () throws -> T) rethrows -> T {
28+
self.lock()
29+
defer { self.unlock() }
30+
return try body()
31+
}
32+
}
33+
2534
/// A pool of one or more connections.
2635
internal class ConnectionPool {
2736
/// Represents the state of a `ConnectionPool`.
2837
internal enum State {
2938
/// Indicates that the `ConnectionPool` is open and using the associated pointer to a `mongoc_client_pool_t`.
3039
case open(pool: OpaquePointer)
40+
/// Indicates that the `ConnectionPool` is in the process of closing. Connections can be checked back in, but
41+
/// no new connections can be checked out.
42+
case closing(pool: OpaquePointer)
3143
/// Indicates that the `ConnectionPool` has been closed and contains no connections.
3244
case closed
3345
}
3446

3547
/// The state of this `ConnectionPool`.
36-
internal private(set) var state: State
48+
private var state: State
49+
/// The number of connections currently checked out of the pool.
50+
private var connCount = 0
51+
/// Lock over `state` and `connCount`.
52+
private let stateLock = NSCondition()
53+
54+
/// Internal helper for testing purposes that returns whether the pool is in the `closing` state.
55+
internal var isClosing: Bool {
56+
self.stateLock.withLock {
57+
guard case .closing = self.state else {
58+
return false
59+
}
60+
return true
61+
}
62+
}
63+
64+
/// Internal helper for testing purposes that returns the number of connections currently checked out from the pool.
65+
internal var checkedOutConnections: Int {
66+
self.stateLock.withLock {
67+
self.connCount
68+
}
69+
}
70+
71+
internal static let PoolClosedError = LogicError(message: "ConnectionPool was already closed")
3772

3873
/// Initializes the pool using the provided `ConnectionString` and options.
3974
internal init(from connString: ConnectionString, options: ClientOptions?) throws {
@@ -56,7 +91,7 @@ internal class ConnectionPool {
5691

5792
self.state = .open(pool: pool)
5893
if let options = options {
59-
try self.setTLSOptions(options)
94+
self.setTLSOptions(options)
6095
}
6196
}
6297

@@ -67,38 +102,79 @@ internal class ConnectionPool {
67102
}
68103
}
69104

70-
/// Closes the pool, cleaning up underlying resources. This method blocks as it sends `endSessions` to the server.
71-
internal func shutdown() {
72-
switch self.state {
73-
case let .open(pool):
74-
mongoc_client_pool_destroy(pool)
75-
case .closed:
76-
return
105+
/// Closes the pool, cleaning up underlying resources. **This method blocks until all connections are returned to
106+
/// the pool.**
107+
internal func close() throws {
108+
try self.stateLock.withLock {
109+
switch self.state {
110+
case let .open(pool):
111+
self.state = .closing(pool: pool)
112+
case .closing, .closed:
113+
throw Self.PoolClosedError
114+
}
115+
116+
while self.connCount > 0 {
117+
// wait for signal from checkIn().
118+
self.stateLock.wait()
119+
}
120+
121+
switch self.state {
122+
case .open, .closed:
123+
throw InternalError(message: "ConnectionPool in unexpected state \(self.state) during close()")
124+
case let .closing(pool):
125+
mongoc_client_pool_destroy(pool)
126+
self.state = .closed
127+
}
77128
}
78-
self.state = .closed
79129
}
80130

81131
/// Checks out a connection. This connection will return itself to the pool when its reference count drops to 0.
82-
/// This method will block until a connection is available.
132+
/// This method will block until a connection is available. Throws an error if the pool is in the process of
133+
/// closing or has finished closing.
83134
internal func checkOut() throws -> Connection {
84-
switch self.state {
85-
case let .open(pool):
86-
return Connection(clientHandle: mongoc_client_pool_pop(pool), pool: self)
87-
case .closed:
88-
throw InternalError(message: "ConnectionPool was already closed")
135+
try self.stateLock.withLock {
136+
switch self.state {
137+
case let .open(pool):
138+
self.connCount += 1
139+
return Connection(clientHandle: mongoc_client_pool_pop(pool), pool: self)
140+
case .closing, .closed:
141+
throw Self.PoolClosedError
142+
}
89143
}
90144
}
91145

92-
/// Checks out a connection from the pool, or returns `nil` if none are currently available.
146+
/// Checks out a connection from the pool, or returns `nil` if none are currently available. Throws an error if the
147+
/// pool is not open. This method may block waiting on the state lock as well as libmongoc's locks and thus must be
148+
// run within the thread pool.
93149
internal func tryCheckOut() throws -> Connection? {
94-
switch self.state {
95-
case let .open(pool):
96-
guard let handle = mongoc_client_pool_try_pop(pool) else {
97-
return nil
150+
try self.stateLock.withLock {
151+
switch self.state {
152+
case let .open(pool):
153+
guard let handle = mongoc_client_pool_try_pop(pool) else {
154+
return nil
155+
}
156+
self.connCount += 1
157+
return Connection(clientHandle: handle, pool: self)
158+
case .closing, .closed:
159+
throw Self.PoolClosedError
160+
}
161+
}
162+
}
163+
164+
/// Checks a connection into the pool. Accepts the connection if the pool is still open or in the process of
165+
/// closing; throws an error if the pool has already finished closing. This method may block waiting on the state
166+
/// lock as well as libmongoc's locks, and thus must be run within the thread pool.
167+
fileprivate func checkIn(_ connection: Connection) throws {
168+
try self.stateLock.withLock {
169+
switch self.state {
170+
case let .open(pool), let .closing(pool):
171+
mongoc_client_pool_push(pool, connection.clientHandle)
172+
self.connCount -= 1
173+
// signal to close() that we are updating the count.
174+
self.stateLock.signal()
175+
case .closed:
176+
throw Self.PoolClosedError
98177
}
99-
return Connection(clientHandle: handle, pool: self)
100-
case .closed:
101-
throw InternalError(message: "ConnectionPool was already closed")
102178
}
103179
}
104180

@@ -109,9 +185,9 @@ internal class ConnectionPool {
109185
return try body(connection)
110186
}
111187

112-
// Sets TLS/SSL options that the user passes in through the client level. This must be called from
113-
// the ConnectionPool init before the pool is used.
114-
private func setTLSOptions(_ options: ClientOptions) throws {
188+
// Sets TLS/SSL options that the user passes in through the client level. **This must only be called from
189+
// the ConnectionPool initializer**.
190+
private func setTLSOptions(_ options: ClientOptions) {
115191
// return early so we don't set an empty options struct on the libmongoc pool. doing so will make libmongoc
116192
// attempt to use TLS for connections.
117193
guard options.tls == true ||
@@ -147,11 +223,14 @@ internal class ConnectionPool {
147223
if let invalidHosts = options.tlsAllowInvalidHostnames {
148224
opts.allow_invalid_hostname = invalidHosts
149225
}
226+
227+
// lock isn't needed as this is called before pool is in use.
150228
switch self.state {
151229
case let .open(pool):
152230
mongoc_client_pool_set_ssl_opts(pool, &opts)
153-
case .closed:
154-
throw InternalError(message: "ConnectionPool was already closed")
231+
case .closing, .closed:
232+
// if we get here, we must have called this method outside of `ConnectionPool.init`.
233+
fatalError("ConnectionPool in unexpected state \(self.state) while setting TLS options")
155234
}
156235
}
157236

@@ -187,6 +266,22 @@ internal class ConnectionPool {
187266
return ConnectionString(copying: uri)
188267
}
189268
}
269+
270+
/// Sets the provided APM callbacks on this pool, using the provided client as the "context" value. **This method
271+
/// may only be called before any connections are checked out of the pool.** Ideally this code would just live in
272+
/// `ConnectionPool.init`. However, the client we accept here has to be fully initialized before we can pass it
273+
/// as the context. In order for it to be fully initialized its pool must exist already.
274+
internal func setAPMCallbacks(callbacks: OpaquePointer, client: MongoClient) {
275+
// lock isn't needed as this is called before pool is in use.
276+
switch self.state {
277+
case let .open(pool):
278+
mongoc_client_pool_set_apm_callbacks(pool, callbacks, Unmanaged.passUnretained(client).toOpaque())
279+
case .closing, .closed:
280+
// this method is called via `initializeMonitoring()`, which is called from `MongoClient.init`.
281+
// unless we have a bug it's impossible that the pool is already closed.
282+
fatalError("ConnectionPool in unexpected state \(self.state) while setting APM callbacks")
283+
}
284+
}
190285
}
191286

192287
extension String {

Sources/MongoSwift/MongoClient.swift

Lines changed: 38 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,10 @@ public struct DatabaseOptions: CodingStrategyProvider {
157157
// sourcery: skipSyncExport
158158
/// A MongoDB Client providing an asynchronous, SwiftNIO-based API.
159159
public class MongoClient {
160+
/// The pool of connections backing this client.
160161
internal let connectionPool: ConnectionPool
161162

163+
/// Executor responsible for executing operations on behalf of this client and its child objects.
162164
internal let operationExecutor: OperationExecutor
163165

164166
/// Default size for a client's NIOThreadPool.
@@ -167,8 +169,10 @@ public class MongoClient {
167169
/// Default maximum size for connection pools created by this client.
168170
internal static let defaultMaxConnectionPoolSize = 100
169171

170-
/// Indicates whether this client has been closed.
171-
internal private(set) var isClosed = false
172+
/// Indicates whether this client has been closed. A lock around this variable is not needed because:
173+
/// - This value is only modified on success of `ConnectionPool.close()`. That method will succeed exactly once.
174+
/// - This value is only read in `deinit`. That occurs exactly once after the above modification is complete.
175+
private var wasClosed = false
172176

173177
/// Handlers for command monitoring events.
174178
internal var commandEventHandlers: [CommandEventHandler]
@@ -182,9 +186,6 @@ public class MongoClient {
182186
/// A unique identifier for this client. Sets _id to the generator's current value and increments the generator.
183187
internal let _id = clientIdGenerator.add(1)
184188

185-
/// Error thrown when user attempts to use a closed client.
186-
internal static let ClosedClientError = LogicError(message: "MongoClient was already closed")
187-
188189
/// Encoder whose options are inherited by databases derived from this client.
189190
public let encoder: BSONEncoder
190191

@@ -255,37 +256,53 @@ public class MongoClient {
255256
}
256257

257258
deinit {
258-
assert(self.isClosed, "MongoClient was not closed before deinitialization")
259+
assert(
260+
self.wasClosed,
261+
"MongoClient was not closed before deinitialization. " +
262+
"Please call `close()` or `syncClose()` when the client is no longer needed."
263+
)
259264
}
260265

261-
/// Shuts this `MongoClient` down, closing all connection to the server and cleaning up internal state.
262-
/// Call this method exactly once when you are finished using the client. You must ensure that all operations
263-
/// using the client have completed before calling this. The returned future must be fulfilled before the
264-
/// `EventLoopGroup` provided to this client's constructor is shut down.
265-
public func shutdown() -> EventLoopFuture<Void> {
266-
self.operationExecutor.execute {
267-
self.connectionPool.shutdown()
268-
self.isClosed = true
266+
/**
267+
* Closes this `MongoClient`, closing all connections to the server and cleaning up internal state.
268+
*
269+
* Call this method exactly once when you are finished using the client. You must ensure that all operations using
270+
* the client have completed before calling this.
271+
*
272+
* The returned future will not be fulfilled until all cursors and change streams created from this client have been
273+
* been killed, and all sessions created from this client have been ended.
274+
*
275+
* The returned future must be fulfilled before the `EventLoopGroup` provided to this client's constructor is shut
276+
* down.
277+
*/
278+
public func close() -> EventLoopFuture<Void> {
279+
let closeResult = self.operationExecutor.execute {
280+
try self.connectionPool.close()
269281
}
270282
.flatMap {
271-
self.operationExecutor.close()
283+
self.operationExecutor.shutdown()
284+
}
285+
closeResult.whenComplete { _ in
286+
self.wasClosed = true
272287
}
288+
289+
return closeResult
273290
}
274291

275292
/**
276293
* Shuts this `MongoClient` down in a blocking fashion, closing all connections to the server and cleaning up
277294
* internal state.
278295
*
279296
* Call this method exactly once when you are finished using the client. You must ensure that all operations
280-
* using the client have completed before calling this.
297+
* using the client have completed before calling this. This method will block until all cursors and change streams
298+
* created from this client have been killed, and all sessions created from this client have been ended.
281299
*
282300
* This method must complete before the `EventLoopGroup` provided to this client's constructor is shut down.
283301
*/
284-
public func syncShutdown() {
285-
self.connectionPool.shutdown()
286-
self.isClosed = true
287-
// TODO: SWIFT-349 log any errors encountered here.
288-
try? self.operationExecutor.syncClose()
302+
public func syncClose() throws {
303+
try self.connectionPool.close()
304+
try self.operationExecutor.syncShutdown()
305+
self.wasClosed = true
289306
}
290307

291308
/// Starts a new `ClientSession` with the provided options. When you are done using this session, you must call
@@ -602,15 +619,6 @@ public class MongoClient {
602619
self.sdamEventHandlers.append(CallbackEventHandler(handlerFunc))
603620
}
604621

605-
/// Executes an `Operation` using this `MongoClient` and an optionally provided session.
606-
internal func executeOperation<T: Operation>(
607-
_ operation: T,
608-
using connection: Connection? = nil,
609-
session: ClientSession? = nil
610-
) throws -> T.OperationResult {
611-
try self.operationExecutor.execute(operation, using: connection, client: self, session: session).wait()
612-
}
613-
614622
/// Internal method to check the `ReadConcern` that was ultimately set on this client. **This method may block
615623
/// and is for testing purposes only**.
616624
internal func getMongocReadConcern() throws -> ReadConcern? {

0 commit comments

Comments
 (0)