@@ -64,6 +64,8 @@ extension PostgresRowSequence {
6464extension PostgresRowSequence. AsyncIterator : Sendable { }
6565
6666extension PostgresRowSequence {
67+ /// Collect and return all rows.
68+ /// - Returns: The rows.
6769 public func collect( ) async throws -> [ PostgresRow ] {
6870 var result = [ PostgresRow] ( )
6971 for try await row in self {
@@ -72,13 +74,30 @@ extension PostgresRowSequence {
7274 return result
7375 }
7476
77+ /// Collect and return all rows, alongside the query metadata.
78+ /// - Returns: The query metadata and the rows.
7579 public func collectWithMetadata( ) async throws -> ( metadata: PostgresQueryMetadata , rows: [ PostgresRow ] ) {
7680 let rows = try await self . collect ( )
7781 guard let metadata = PostgresQueryMetadata ( string: self . rowStream. commandTag) else {
7882 throw PSQLError . invalidCommandTag ( self . rowStream. commandTag)
7983 }
8084 return ( metadata, rows)
8185 }
86+
87+ /// Consumes all rows and returns the query metadata.
88+ /// - Parameter onRow: Processes each row.
89+ /// - Returns: The query metadata.
90+ public func consume(
91+ onRow: @Sendable ( PostgresRow) throws -> ( )
92+ ) async throws -> PostgresQueryMetadata {
93+ for try await row in self {
94+ try onRow ( row)
95+ }
96+ guard let metadata = PostgresQueryMetadata ( string: self . rowStream. commandTag) else {
97+ throw PSQLError . invalidCommandTag ( self . rowStream. commandTag)
98+ }
99+ return metadata
100+ }
82101}
83102
84103struct AdaptiveRowBuffer: NIOAsyncSequenceProducerBackPressureStrategy {
0 commit comments