Skip to content

Commit a844795

Browse files
committed
add a test, add a func in PostgresClient + better docs
1 parent 61da70d commit a844795

File tree

4 files changed

+113
-7
lines changed

4 files changed

+113
-7
lines changed

Sources/PostgresNIO/Connection/PostgresConnection.swift

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -438,8 +438,15 @@ extension PostgresConnection {
438438
}
439439
}
440440

441-
// use this for queries where you want to consume the rows.
442-
// we can use the `consume` scope to better ensure structured concurrency when consuming the rows.
441+
/// Run a query on the Postgres server the connection is connected to, returning the metadata.
442+
///
443+
/// - Parameters:
444+
/// - query: The ``PostgresQuery`` to run
445+
/// - logger: The `Logger` to log into for the query
446+
/// - file: The file, the query was started in. Used for better error reporting.
447+
/// - line: The line, the query was started in. Used for better error reporting.
448+
/// - consume: The closure to consume the ``PostgresRowSequence``.
449+
/// - Returns: The result of the `consume` closure as well as the query metadata.
443450
public func query<Result>(
444451
_ query: PostgresQuery,
445452
logger: Logger,
@@ -573,8 +580,15 @@ extension PostgresConnection {
573580
}
574581
}
575582

576-
// use this for queries where you want to consume the rows.
577-
// we can use the `consume` scope to better ensure structured concurrency when consuming the rows.
583+
/// Execute a statement on the Postgres server the connection is connected to,
584+
/// returning the metadata.
585+
///
586+
/// - Parameters:
587+
/// - query: The ``PostgresQuery`` to run
588+
/// - logger: The `Logger` to log into for the query
589+
/// - file: The file, the query was started in. Used for better error reporting.
590+
/// - line: The line, the query was started in. Used for better error reporting.
591+
/// - Returns: The query metadata.
578592
@discardableResult
579593
public func execute(
580594
_ query: PostgresQuery,

Sources/PostgresNIO/Pool/PostgresClient.swift

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,60 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service {
435435
}
436436
}
437437

438+
/// Run a query on the Postgres server the connection is connected to, returning the metadata.
439+
///
440+
/// - Parameters:
441+
/// - query: The ``PostgresQuery`` to run
442+
/// - logger: The `Logger` to log into for the query
443+
/// - file: The file, the query was started in. Used for better error reporting.
444+
/// - line: The line, the query was started in. Used for better error reporting.
445+
/// - consume: The closure to consume the ``PostgresRowSequence``.
446+
/// - Returns: The result of the `consume` closure as well as the query metadata.
447+
public func query<Result>(
448+
_ query: PostgresQuery,
449+
logger: Logger? = nil,
450+
file: String = #fileID,
451+
line: Int = #line,
452+
_ consume: (PostgresRowSequence) async throws -> Result
453+
) async throws -> (Result, PostgresQueryMetadata) {
454+
let logger = logger ?? Self.loggingDisabled
455+
456+
do {
457+
guard query.binds.count <= Int(UInt16.max) else {
458+
throw PSQLError(code: .tooManyParameters, query: query, file: file, line: line)
459+
}
460+
461+
let connection = try await self.leaseConnection()
462+
463+
var logger = logger
464+
logger[postgresMetadataKey: .connectionID] = "\(connection.id)"
465+
466+
let promise = connection.channel.eventLoop.makePromise(of: PSQLRowStream.self)
467+
let context = ExtendedQueryContext(
468+
query: query,
469+
logger: logger,
470+
promise: promise
471+
)
472+
473+
connection.channel.write(HandlerTask.extendedQuery(context), promise: nil)
474+
475+
let (rowStream, rowSequence) = try await promise.futureResult.map { rowStream in
476+
(rowStream, rowStream.asyncSequence(onFinish: { self.pool.releaseConnection(connection) }))
477+
}.get()
478+
let result = try await consume(rowSequence)
479+
try await rowStream.drain().get()
480+
guard let metadata = PostgresQueryMetadata(string: rowStream.commandTag) else {
481+
throw PSQLError.invalidCommandTag(rowStream.commandTag)
482+
}
483+
return (result, metadata)
484+
} catch var error as PSQLError {
485+
error.file = file
486+
error.line = line
487+
error.query = query
488+
throw error // rethrow with more metadata
489+
}
490+
}
491+
438492
/// Execute a prepared statement, taking care of the preparation when necessary
439493
public func execute<Statement: PostgresPreparedStatement, Row>(
440494
_ preparedStatement: Statement,

Tests/IntegrationTests/PostgresClientTests.swift

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,6 @@ final class PostgresClientTests: XCTestCase {
181181
}
182182
}
183183

184-
185184
func testQueryDirectly() async throws {
186185
var mlogger = Logger(label: "test")
187186
mlogger.logLevel = .debug
@@ -218,6 +217,47 @@ final class PostgresClientTests: XCTestCase {
218217
}
219218
}
220219

220+
func testQueryMetadataDirectly() async throws {
221+
var mlogger = Logger(label: "test")
222+
mlogger.logLevel = .debug
223+
let logger = mlogger
224+
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 8)
225+
self.addTeardownBlock {
226+
try await eventLoopGroup.shutdownGracefully()
227+
}
228+
229+
let clientConfig = PostgresClient.Configuration.makeTestConfiguration()
230+
let client = PostgresClient(configuration: clientConfig, eventLoopGroup: eventLoopGroup, backgroundLogger: logger)
231+
232+
await withThrowingTaskGroup(of: Void.self) { taskGroup in
233+
taskGroup.addTask {
234+
await client.run()
235+
}
236+
237+
for i in 0..<10000 {
238+
taskGroup.addTask {
239+
do {
240+
let (_, metadata) = try await client.query("SELECT 1", logger: logger) { _ in
241+
// Don't consume the row, the function itself should drain the row
242+
}
243+
XCTAssertEqual(metadata.command, "SELECT")
244+
XCTAssertNil(metadata.oid)
245+
XCTAssertEqual(metadata.rows, 1)
246+
logger.info("Success", metadata: ["run": "\(i)"])
247+
} catch {
248+
XCTFail("Unexpected error: \(error)")
249+
}
250+
}
251+
}
252+
253+
for _ in 0..<10000 {
254+
_ = await taskGroup.nextResult()!
255+
}
256+
257+
taskGroup.cancelAll()
258+
}
259+
}
260+
221261
func testQueryTable() async throws {
222262
let tableName = "test_client_prepared_statement"
223263

docker-compose.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
version: '3.7'
2-
31
x-shared-config: &shared_config
42
environment:
53
POSTGRES_HOST_AUTH_METHOD: "${POSTGRES_HOST_AUTH_METHOD:-scram-sha-256}"

0 commit comments

Comments
 (0)