1+ import NIO
2+
3+ #if compiler(>=6.0)
4+ /// Handle to send binary data for a `COPY ... FROM STDIN` query to the backend.
5+ ///
6+ /// It takes care of serializing `PostgresEncodable` column types into the binary format that Postgres expects.
7+ public struct PostgresBinaryCopyFromWriter : ~ Copyable {
8+ /// Handle to serialize columns into a row that is being written by `PostgresBinaryCopyFromWriter`.
9+ public struct ColumnWriter : ~ Copyable {
10+ /// The `PostgresBinaryCopyFromWriter` that is gathering the serialized data.
11+ ///
12+ /// We need to model this as `UnsafeMutablePointer` because we can't express in the Swift type system that
13+ /// `ColumnWriter` never exceeds the lifetime of `PostgresBinaryCopyFromWriter`.
14+ @usableFromInline
15+ let underlying : UnsafeMutablePointer < PostgresBinaryCopyFromWriter >
16+
17+ /// The number of columns that have been written by this `ColumnWriter`.
18+ @usableFromInline
19+ var columns : UInt16 = 0
20+
21+ @usableFromInline
22+ init ( underlying: UnsafeMutablePointer < PostgresBinaryCopyFromWriter > ) {
23+ self . underlying = underlying
24+ }
25+
26+ /// Serialize a single column to a row.
27+ ///
28+ /// - Important: It is critical that that data type encoded here exactly matches the data type in the
29+ /// databasse. For example, if the database stores an a 4-bit integer the corresponding `writeColumn` must
30+ /// be called with an `Int32`. Serializing an integer of a different width will cause a deserialization
31+ /// failure in the backend.
32+ @inlinable
33+ public mutating func writeColumn( _ column: ( some PostgresEncodable ) ? ) throws {
34+ columns += 1
35+ try underlying. pointee. writeColumn ( column)
36+ }
37+ }
38+
39+ /// The underlying `PostgresCopyFromWriter` that sends the serialized data to the backend.
40+ @usableFromInline let underlying : PostgresCopyFromWriter
41+
42+ /// The buffer in which we accumulate binary data. Once this buffer exceeds `bufferSize`, we flush it to
43+ /// the backend.
44+ @usableFromInline var buffer = ByteBuffer ( )
45+
46+ /// Once `buffer` exceeds this size, it gets flushed to the backend.
47+ @usableFromInline let bufferSize : Int
48+
49+ init ( underlying: PostgresCopyFromWriter , bufferSize: Int ) {
50+ self . underlying = underlying
51+ // Allocate 10% more than the buffer size because we only flush the buffer once it has exceeded `bufferSize`
52+ buffer. reserveCapacity ( bufferSize + bufferSize / 10 )
53+ self . bufferSize = bufferSize
54+ }
55+
56+ /// Serialize a single row to the backend. Call `writeColumn` on `columnWriter` for every column that should be
57+ /// included in the row.
58+ @inlinable
59+ public mutating func writeRow( _ body: ( _ columnWriter: inout ColumnWriter ) throws -> Void ) async throws {
60+ // Write a placeholder for the number of columns
61+ let columnIndex = buffer. writerIndex
62+ buffer. writeInteger ( UInt16 ( 0 ) )
63+
64+ let columns = try withUnsafeMutablePointer ( to: & self ) { pointerToSelf in
65+ // Important: We need to ensure that `pointerToSel` (and thus `ColumnWriter`) does not exceed the lifetime
66+ // of `self` because it is holding an unsafe reference to it.
67+ //
68+ // We achieve this because `ColumnWriter` is non-Copyable and thus the client can't store a copy to it.
69+ // Futhermore `columnWriter` is destroyed before the end of `withUnsafeMutablePointer`, which holds `self`
70+ // alive.
71+ var columnWriter = ColumnWriter ( underlying: pointerToSelf)
72+
73+ try body ( & columnWriter)
74+
75+ return columnWriter. columns
76+ }
77+
78+ // Fill in the number of columns
79+ buffer. setInteger ( columns, at: columnIndex)
80+
81+ if buffer. readableBytes > bufferSize {
82+ try await flush ( )
83+ }
84+ }
85+
86+ /// Serialize a single column to the buffer. Should only be called by `ColumnWriter`.
87+ @inlinable
88+ mutating func writeColumn( _ column: ( some PostgresEncodable ) ? ) throws {
89+ if let column {
90+ try buffer. writeLengthPrefixed ( as: Int32 . self) { buffer in
91+ let startIndex = buffer. writerIndex
92+ try column. encode ( into: & buffer, context: . default)
93+ return buffer. writerIndex - startIndex
94+ }
95+ } else {
96+ buffer. writeInteger ( Int32 ( - 1 ) )
97+ }
98+ }
99+
100+ /// Flush any pending data in the buffer to the backend.
101+ @usableFromInline
102+ mutating func flush( isolation: ( any Actor ) ? = #isolation) async throws {
103+ try await underlying. write ( buffer)
104+ buffer. clear ( )
105+ }
106+ }
107+ #endif
108+
1109/// Handle to send data for a `COPY ... FROM STDIN` query to the backend.
2110public struct PostgresCopyFromWriter : Sendable {
3111 private let channelHandler : NIOLoopBound < PostgresChannelHandler >
@@ -115,15 +223,25 @@ public struct PostgresCopyFromFormat: Sendable {
115223 public init ( ) { }
116224 }
117225
226+ /// Options that can be used to modify the `binary` format of a COPY operation.
227+ public struct BinaryOptions : Sendable {
228+ public init ( ) { }
229+ }
230+
118231 enum Format {
119232 case text( TextOptions )
233+ case binary( BinaryOptions )
120234 }
121235
122236 var format : Format
123237
124238 public static func text( _ options: TextOptions ) -> PostgresCopyFromFormat {
125239 return PostgresCopyFromFormat ( format: . text( options) )
126240 }
241+
242+ public static func binary( _ options: BinaryOptions ) -> PostgresCopyFromFormat {
243+ return PostgresCopyFromFormat ( format: . binary( options) )
244+ }
127245}
128246
129247#if compiler(>=6.0)
@@ -156,6 +274,8 @@ private func buildCopyFromQuery(
156274 // Set the delimiter as a Unicode code point. This avoids the possibility of SQL injection.
157275 queryOptions. append ( " DELIMITER U&' \\ \( String ( format: " %04x " , delimiter. value) ) ' " )
158276 }
277+ case . binary:
278+ queryOptions. append ( " FORMAT binary " )
159279 }
160280 precondition ( !queryOptions. isEmpty)
161281 query += " WITH ( "
@@ -165,6 +285,50 @@ private func buildCopyFromQuery(
165285}
166286
167287extension PostgresConnection {
288+ /// Copy data into a table using a `COPY <table name> FROM STDIN` query, transferring data in a binary format.
289+ ///
290+ /// - Parameters:
291+ /// - table: The name of the table into which to copy the data.
292+ /// - columns: The name of the columns to copy. If an empty array is passed, all columns are assumed to be copied.
293+ /// - bufferSize: How many bytes to accumulate a local buffer before flushing it to the database. Can affect
294+ /// performance characteristics of the copy operation.
295+ /// - writeData: Closure that produces the data for the table, to be streamed to the backend. Call `write` on the
296+ /// writer provided by the closure to send data to the backend and return from the closure once all data is sent.
297+ /// Throw an error from the closure to fail the data transfer. The error thrown by the closure will be rethrown
298+ /// by the `copyFrom` function.
299+ ///
300+ /// - Important: The table and column names are inserted into the `COPY FROM` query as passed and might thus be
301+ /// susceptible to SQL injection. Ensure no untrusted data is contained in these strings.
302+ public func copyFromBinary(
303+ table: String ,
304+ columns: [ String ] = [ ] ,
305+ options: PostgresCopyFromFormat . BinaryOptions = . init( ) ,
306+ bufferSize: Int = 100_000 ,
307+ logger: Logger ,
308+ isolation: isolated ( any Actor ) ? = #isolation,
309+ file: String = #fileID,
310+ line: Int = #line,
311+ writeData: ( inout PostgresBinaryCopyFromWriter ) async throws -> Void
312+ ) async throws {
313+ try await copyFrom ( table: table, columns: columns, format: . binary( PostgresCopyFromFormat . BinaryOptions ( ) ) , logger: logger) { writer in
314+ var header = ByteBuffer ( )
315+ header. writeString ( " PGCOPY \n " )
316+ header. writeInteger ( UInt8 ( 0xff ) )
317+ header. writeString ( " \r \n \0 " )
318+
319+ // Flag fields
320+ header. writeInteger ( UInt32 ( 0 ) )
321+
322+ // Header extension area length
323+ header. writeInteger ( UInt32 ( 0 ) )
324+ try await writer. write ( header)
325+
326+ var binaryWriter = PostgresBinaryCopyFromWriter ( underlying: writer, bufferSize: bufferSize)
327+ try await writeData ( & binaryWriter)
328+ try await binaryWriter. flush ( )
329+ }
330+ }
331+
168332 /// Copy data into a table using a `COPY <table name> FROM STDIN` query.
169333 ///
170334 /// - Parameters:
0 commit comments