Skip to content

Commit 830c3e7

Browse files
authored
Merge branch 'main' into mmbm-row-seq-expose-metadata
2 parents e70add9 + 5d817be commit 830c3e7

16 files changed

+462
-103
lines changed

Package.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ let package = Package(
2020
dependencies: [
2121
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.2.0"),
2222
.package(url: "https://github.com/apple/swift-collections.git", from: "1.0.4"),
23-
.package(url: "https://github.com/apple/swift-nio.git", from: "2.59.0"),
23+
.package(url: "https://github.com/apple/swift-nio.git", from: "2.81.0"),
2424
.package(url: "https://github.com/apple/swift-nio-transport-services.git", from: "1.19.0"),
2525
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.25.0"),
2626
.package(url: "https://github.com/apple/swift-crypto.git", "2.0.0" ..< "4.0.0"),

Sources/ConnectionPoolModule/NIOLock.swift

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@ import WinSDK
2424
import Glibc
2525
#elseif canImport(Musl)
2626
import Musl
27+
#elseif canImport(Bionic)
28+
import Bionic
29+
#elseif canImport(WASILibc)
30+
import WASILibc
31+
#if canImport(wasi_pthread)
32+
import wasi_pthread
33+
#endif
2734
#else
2835
#error("The concurrency NIOLock module was unable to identify your C library.")
2936
#endif
@@ -37,16 +44,16 @@ typealias LockPrimitive = pthread_mutex_t
3744
#endif
3845

3946
@usableFromInline
40-
enum LockOperations { }
47+
enum LockOperations {}
4148

4249
extension LockOperations {
4350
@inlinable
4451
static func create(_ mutex: UnsafeMutablePointer<LockPrimitive>) {
4552
mutex.assertValidAlignment()
4653

47-
#if os(Windows)
54+
#if os(Windows)
4855
InitializeSRWLock(mutex)
49-
#else
56+
#elseif (compiler(<6.1) && !os(WASI)) || (compiler(>=6.1) && _runtime(_multithreaded))
5057
var attr = pthread_mutexattr_t()
5158
pthread_mutexattr_init(&attr)
5259
debugOnly {
@@ -55,43 +62,43 @@ extension LockOperations {
5562

5663
let err = pthread_mutex_init(mutex, &attr)
5764
precondition(err == 0, "\(#function) failed in pthread_mutex with error \(err)")
58-
#endif
65+
#endif
5966
}
6067

6168
@inlinable
6269
static func destroy(_ mutex: UnsafeMutablePointer<LockPrimitive>) {
6370
mutex.assertValidAlignment()
6471

65-
#if os(Windows)
72+
#if os(Windows)
6673
// SRWLOCK does not need to be free'd
67-
#else
74+
#elseif (compiler(<6.1) && !os(WASI)) || (compiler(>=6.1) && _runtime(_multithreaded))
6875
let err = pthread_mutex_destroy(mutex)
6976
precondition(err == 0, "\(#function) failed in pthread_mutex with error \(err)")
70-
#endif
77+
#endif
7178
}
7279

7380
@inlinable
7481
static func lock(_ mutex: UnsafeMutablePointer<LockPrimitive>) {
7582
mutex.assertValidAlignment()
7683

77-
#if os(Windows)
84+
#if os(Windows)
7885
AcquireSRWLockExclusive(mutex)
79-
#else
86+
#elseif (compiler(<6.1) && !os(WASI)) || (compiler(>=6.1) && _runtime(_multithreaded))
8087
let err = pthread_mutex_lock(mutex)
8188
precondition(err == 0, "\(#function) failed in pthread_mutex with error \(err)")
82-
#endif
89+
#endif
8390
}
8491

8592
@inlinable
8693
static func unlock(_ mutex: UnsafeMutablePointer<LockPrimitive>) {
8794
mutex.assertValidAlignment()
8895

89-
#if os(Windows)
96+
#if os(Windows)
9097
ReleaseSRWLockExclusive(mutex)
91-
#else
98+
#elseif (compiler(<6.1) && !os(WASI)) || (compiler(>=6.1) && _runtime(_multithreaded))
9299
let err = pthread_mutex_unlock(mutex)
93100
precondition(err == 0, "\(#function) failed in pthread_mutex with error \(err)")
94-
#endif
101+
#endif
95102
}
96103
}
97104

@@ -129,9 +136,11 @@ final class LockStorage<Value>: ManagedBuffer<Value, LockPrimitive> {
129136
@inlinable
130137
static func create(value: Value) -> Self {
131138
let buffer = Self.create(minimumCapacity: 1) { _ in
132-
return value
139+
value
133140
}
134-
// Avoid 'unsafeDowncast' as there is a miscompilation on 5.10.
141+
// Intentionally using a force cast here to avoid a miss compiliation in 5.10.
142+
// This is as fast as an unsafeDownCast since ManagedBuffer is inlined and the optimizer
143+
// can eliminate the upcast/downcast pair
135144
let storage = buffer as! Self
136145

137146
storage.withUnsafeMutablePointers { _, lockPtr in
@@ -165,7 +174,7 @@ final class LockStorage<Value>: ManagedBuffer<Value, LockPrimitive> {
165174
@inlinable
166175
func withLockPrimitive<T>(_ body: (UnsafeMutablePointer<LockPrimitive>) throws -> T) rethrows -> T {
167176
try self.withUnsafeMutablePointerToElements { lockPtr in
168-
return try body(lockPtr)
177+
try body(lockPtr)
169178
}
170179
}
171180

@@ -179,17 +188,14 @@ final class LockStorage<Value>: ManagedBuffer<Value, LockPrimitive> {
179188
}
180189
}
181190

182-
extension LockStorage: @unchecked Sendable { }
183-
184191
/// A threading lock based on `libpthread` instead of `libdispatch`.
185192
///
186-
/// - note: ``NIOLock`` has reference semantics.
193+
/// - Note: ``NIOLock`` has reference semantics.
187194
///
188195
/// This object provides a lock on top of a single `pthread_mutex_t`. This kind
189196
/// of lock is safe to use with `libpthread`-based threading models, such as the
190197
/// one used by NIO. On Windows, the lock is based on the substantially similar
191198
/// `SRWLOCK` type.
192-
@usableFromInline
193199
struct NIOLock {
194200
@usableFromInline
195201
internal let _storage: LockStorage<Void>
@@ -220,7 +226,7 @@ struct NIOLock {
220226

221227
@inlinable
222228
internal func withLockPrimitive<T>(_ body: (UnsafeMutablePointer<LockPrimitive>) throws -> T) rethrows -> T {
223-
return try self._storage.withLockPrimitive(body)
229+
try self._storage.withLockPrimitive(body)
224230
}
225231
}
226232

@@ -243,12 +249,12 @@ extension NIOLock {
243249
}
244250

245251
@inlinable
246-
func withLockVoid(_ body: () throws -> Void) rethrows -> Void {
252+
func withLockVoid(_ body: () throws -> Void) rethrows {
247253
try self.withLock(body)
248254
}
249255
}
250256

251-
extension NIOLock: Sendable {}
257+
extension NIOLock: @unchecked Sendable {}
252258

253259
extension UnsafeMutablePointer {
254260
@inlinable
@@ -264,6 +270,10 @@ extension UnsafeMutablePointer {
264270
/// https://forums.swift.org/t/support-debug-only-code/11037 for a discussion.
265271
@inlinable
266272
internal func debugOnly(_ body: () -> Void) {
267-
// FIXME: duplicated with NIO.
268-
assert({ body(); return true }())
273+
assert(
274+
{
275+
body()
276+
return true
277+
}()
278+
)
269279
}

Sources/ConnectionPoolModule/NIOLockedValueBox.swift

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
/// Provides locked access to `Value`.
1919
///
20-
/// - note: ``NIOLockedValueBox`` has reference semantics and holds the `Value`
20+
/// - Note: ``NIOLockedValueBox`` has reference semantics and holds the `Value`
2121
/// alongside a lock behind a reference.
2222
///
2323
/// This is no different than creating a ``Lock`` and protecting all
@@ -39,8 +39,48 @@ struct NIOLockedValueBox<Value> {
3939
/// Access the `Value`, allowing mutation of it.
4040
@inlinable
4141
func withLockedValue<T>(_ mutate: (inout Value) throws -> T) rethrows -> T {
42-
return try self._storage.withLockedValue(mutate)
42+
try self._storage.withLockedValue(mutate)
43+
}
44+
45+
/// Provides an unsafe view over the lock and its value.
46+
///
47+
/// This can be beneficial when you require fine grained control over the lock in some
48+
/// situations but don't want lose the benefits of ``withLockedValue(_:)`` in others by
49+
/// switching to ``NIOLock``.
50+
var unsafe: Unsafe {
51+
Unsafe(_storage: self._storage)
52+
}
53+
54+
/// Provides an unsafe view over the lock and its value.
55+
struct Unsafe {
56+
@usableFromInline
57+
let _storage: LockStorage<Value>
58+
59+
/// Manually acquire the lock.
60+
@inlinable
61+
func lock() {
62+
self._storage.lock()
63+
}
64+
65+
/// Manually release the lock.
66+
@inlinable
67+
func unlock() {
68+
self._storage.unlock()
69+
}
70+
71+
/// Mutate the value, assuming the lock has been acquired manually.
72+
///
73+
/// - Parameter mutate: A closure with scoped access to the value.
74+
/// - Returns: The result of the `mutate` closure.
75+
@inlinable
76+
func withValueAssumingLockIsAcquired<Result>(
77+
_ mutate: (_ value: inout Value) throws -> Result
78+
) rethrows -> Result {
79+
try self._storage.withUnsafeMutablePointerToHeader { value in
80+
try mutate(&value.pointee)
81+
}
82+
}
4383
}
4484
}
4585

46-
extension NIOLockedValueBox: Sendable where Value: Sendable {}
86+
extension NIOLockedValueBox: @unchecked Sendable where Value: Sendable {}

Sources/PostgresNIO/Connection/PostgresConnection+Configuration.swift

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,9 +192,22 @@ extension PostgresConnection {
192192
/// - Parameters:
193193
/// - channel: The `NIOCore/Channel` to use. The channel must already be active and connected to an
194194
/// endpoint (i.e. `NIOCore/Channel/isActive` must be `true`).
195-
/// - tls: The TLS mode to use. Defaults to ``TLS-swift.struct/disable``.
195+
/// - tls: The TLS mode to use.
196+
public init(establishedChannel channel: Channel, tls: PostgresConnection.Configuration.TLS, username: String, password: String?, database: String?) {
197+
self.init(endpointInfo: .configureChannel(channel), tls: tls, username: username, password: password, database: database)
198+
}
199+
200+
/// Create a configuration for establishing a connection to a Postgres server over a preestablished
201+
/// `NIOCore/Channel`.
202+
///
203+
/// This is provided for calling code which wants to manage the underlying connection transport on its
204+
/// own, such as when tunneling a connection through SSH.
205+
///
206+
/// - Parameters:
207+
/// - channel: The `NIOCore/Channel` to use. The channel must already be active and connected to an
208+
/// endpoint (i.e. `NIOCore/Channel/isActive` must be `true`).
196209
public init(establishedChannel channel: Channel, username: String, password: String?, database: String?) {
197-
self.init(endpointInfo: .configureChannel(channel), tls: .disable, username: username, password: password, database: database)
210+
self.init(establishedChannel: channel, tls: .disable, username: username, password: password, database: database)
198211
}
199212

200213
// MARK: - Implementation details

Sources/PostgresNIO/Connection/PostgresConnection.swift

Lines changed: 107 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,18 +60,18 @@ public final class PostgresConnection: @unchecked Sendable {
6060
func start(configuration: InternalConfiguration) -> EventLoopFuture<Void> {
6161
// 1. configure handlers
6262

63-
let configureSSLCallback: ((Channel) throws -> ())?
63+
let configureSSLCallback: ((Channel, PostgresChannelHandler) throws -> ())?
6464

6565
switch configuration.tls.base {
6666
case .prefer(let context), .require(let context):
67-
configureSSLCallback = { channel in
67+
configureSSLCallback = { channel, postgresChannelHandler in
6868
channel.eventLoop.assertInEventLoop()
6969

7070
let sslHandler = try NIOSSLClientHandler(
7171
context: context,
7272
serverHostname: configuration.serverNameForTLS
7373
)
74-
try channel.pipeline.syncOperations.addHandler(sslHandler, position: .first)
74+
try channel.pipeline.syncOperations.addHandler(sslHandler, position: .before(postgresChannelHandler))
7575
}
7676
case .disable:
7777
configureSSLCallback = nil
@@ -530,6 +530,110 @@ extension PostgresConnection {
530530
throw error // rethrow with more metadata
531531
}
532532
}
533+
534+
#if compiler(>=6.0)
535+
/// Puts the connection into an open transaction state, for the provided `closure`'s lifetime.
536+
///
537+
/// The function starts a transaction by running a `BEGIN` query on the connection against the database. It then
538+
/// lends the connection to the user provided closure. The user can then modify the database as they wish. If the user
539+
/// provided closure returns successfully, the function will attempt to commit the changes by running a `COMMIT`
540+
/// query against the database. If the user provided closure throws an error, the function will attempt to rollback the
541+
/// changes made within the closure.
542+
///
543+
/// - Parameters:
544+
/// - logger: The `Logger` to log into for the transaction.
545+
/// - file: The file, the transaction was started in. Used for better error reporting.
546+
/// - line: The line, the transaction was started in. Used for better error reporting.
547+
/// - closure: The user provided code to modify the database. Use the provided connection to run queries.
548+
/// The connection must stay in the transaction mode. Otherwise this method will throw!
549+
/// - Returns: The closure's return value.
550+
public func withTransaction<Result>(
551+
logger: Logger,
552+
file: String = #file,
553+
line: Int = #line,
554+
isolation: isolated (any Actor)? = #isolation,
555+
// DO NOT FIX THE WHITESPACE IN THE NEXT LINE UNTIL 5.10 IS UNSUPPORTED
556+
// https://github.com/swiftlang/swift/issues/79285
557+
_ process: (PostgresConnection) async throws -> sending Result) async throws -> sending Result {
558+
do {
559+
try await self.query("BEGIN;", logger: logger)
560+
} catch {
561+
throw PostgresTransactionError(file: file, line: line, beginError: error)
562+
}
563+
564+
var closureHasFinished: Bool = false
565+
do {
566+
let value = try await process(self)
567+
closureHasFinished = true
568+
try await self.query("COMMIT;", logger: logger)
569+
return value
570+
} catch {
571+
var transactionError = PostgresTransactionError(file: file, line: line)
572+
if !closureHasFinished {
573+
transactionError.closureError = error
574+
do {
575+
try await self.query("ROLLBACK;", logger: logger)
576+
} catch {
577+
transactionError.rollbackError = error
578+
}
579+
} else {
580+
transactionError.commitError = error
581+
}
582+
583+
throw transactionError
584+
}
585+
}
586+
#else
587+
/// Puts the connection into an open transaction state, for the provided `closure`'s lifetime.
588+
///
589+
/// The function starts a transaction by running a `BEGIN` query on the connection against the database. It then
590+
/// lends the connection to the user provided closure. The user can then modify the database as they wish. If the user
591+
/// provided closure returns successfully, the function will attempt to commit the changes by running a `COMMIT`
592+
/// query against the database. If the user provided closure throws an error, the function will attempt to rollback the
593+
/// changes made within the closure.
594+
///
595+
/// - Parameters:
596+
/// - logger: The `Logger` to log into for the transaction.
597+
/// - file: The file, the transaction was started in. Used for better error reporting.
598+
/// - line: The line, the transaction was started in. Used for better error reporting.
599+
/// - closure: The user provided code to modify the database. Use the provided connection to run queries.
600+
/// The connection must stay in the transaction mode. Otherwise this method will throw!
601+
/// - Returns: The closure's return value.
602+
public func withTransaction<Result>(
603+
logger: Logger,
604+
file: String = #file,
605+
line: Int = #line,
606+
_ process: (PostgresConnection) async throws -> Result
607+
) async throws -> Result {
608+
do {
609+
try await self.query("BEGIN;", logger: logger)
610+
} catch {
611+
throw PostgresTransactionError(file: file, line: line, beginError: error)
612+
}
613+
614+
var closureHasFinished: Bool = false
615+
do {
616+
let value = try await process(self)
617+
closureHasFinished = true
618+
try await self.query("COMMIT;", logger: logger)
619+
return value
620+
} catch {
621+
var transactionError = PostgresTransactionError(file: file, line: line)
622+
if !closureHasFinished {
623+
transactionError.closureError = error
624+
do {
625+
try await self.query("ROLLBACK;", logger: logger)
626+
} catch {
627+
transactionError.rollbackError = error
628+
}
629+
} else {
630+
transactionError.commitError = error
631+
}
632+
633+
throw transactionError
634+
}
635+
}
636+
#endif
533637
}
534638

535639
// MARK: EventLoopFuture interface

0 commit comments

Comments
 (0)