1212//
1313//===----------------------------------------------------------------------===//
1414
15+ import Algorithms
1516import NIOCore
1617import NIOHTTP1
1718
19+ @usableFromInline
20+ let bagOfBytesToByteBufferConversionChunkSize = 1024 * 1024 * 4
21+
22+ #if arch(arm) || arch(i386)
23+ // on 32-bit platforms we can't make use of a whole UInt32.max (as it doesn't fit in an Int)
24+ @usableFromInline
25+ let byteBufferMaxSize = Int . max
26+ #else
27+ // on 64-bit platforms we're good
28+ @usableFromInline
29+ let byteBufferMaxSize = Int ( UInt32 . max)
30+ #endif
31+
1832/// A representation of an HTTP request for the Swift Concurrency HTTPClient API.
1933///
2034/// This object is similar to ``HTTPClient/Request``, but used for the Swift Concurrency API.
@@ -93,34 +107,17 @@ extension HTTPClientRequest.Body {
93107
94108 /// Create an ``HTTPClientRequest/Body-swift.struct`` from a `RandomAccessCollection` of bytes.
95109 ///
96- /// This construction will flatten the bytes into a `ByteBuffer`. As a result, the peak memory
97- /// usage of this construction will be double the size of the original collection. The construction
98- /// of the `ByteBuffer` will be delayed until it's needed.
110+ /// This construction will flatten the ` bytes` into a `ByteBuffer` in chunks of ~4MB.
111+ /// As a result, the peak memory usage of this construction will be a small multiple of ~4MB.
112+ /// The construction of the `ByteBuffer` will be delayed until it's needed.
99113 ///
100114 /// - parameter bytes: The bytes of the request body.
101115 @inlinable
102116 @preconcurrency
103117 public static func bytes< Bytes: RandomAccessCollection & Sendable > (
104118 _ bytes: Bytes
105119 ) -> Self where Bytes. Element == UInt8 {
106- Self . _bytes ( bytes)
107- }
108-
109- @inlinable
110- static func _bytes< Bytes: RandomAccessCollection > (
111- _ bytes: Bytes
112- ) -> Self where Bytes. Element == UInt8 {
113- self . init ( . sequence(
114- length: . known( bytes. count) ,
115- canBeConsumedMultipleTimes: true
116- ) { allocator in
117- if let buffer = bytes. withContiguousStorageIfAvailable ( { allocator. buffer ( bytes: $0) } ) {
118- // fastpath
119- return buffer
120- }
121- // potentially really slow path
122- return allocator. buffer ( bytes: bytes)
123- } )
120+ self . bytes ( bytes, length: . known( bytes. count) )
124121 }
125122
126123 /// Create an ``HTTPClientRequest/Body-swift.struct`` from a `Sequence` of bytes.
@@ -146,32 +143,77 @@ extension HTTPClientRequest.Body {
146143 _ bytes: Bytes ,
147144 length: Length
148145 ) -> Self where Bytes. Element == UInt8 {
149- Self . _bytes ( bytes, length: length)
146+ Self . _bytes (
147+ bytes,
148+ length: length,
149+ bagOfBytesToByteBufferConversionChunkSize: bagOfBytesToByteBufferConversionChunkSize,
150+ byteBufferMaxSize: byteBufferMaxSize
151+ )
150152 }
151153
154+ /// internal method to test chunking
152155 @inlinable
153- static func _bytes< Bytes: Sequence > (
156+ @preconcurrency
157+ static func _bytes< Bytes: Sequence & Sendable > (
154158 _ bytes: Bytes ,
155- length: Length
159+ length: Length ,
160+ bagOfBytesToByteBufferConversionChunkSize: Int ,
161+ byteBufferMaxSize: Int
156162 ) -> Self where Bytes. Element == UInt8 {
157- self . init ( . sequence(
158- length: length. storage,
159- canBeConsumedMultipleTimes: false
160- ) { allocator in
161- if let buffer = bytes. withContiguousStorageIfAvailable ( { allocator. buffer ( bytes: $0) } ) {
162- // fastpath
163- return buffer
163+ // fast path
164+ let body : Self ? = bytes. withContiguousStorageIfAvailable { bufferPointer -> Self in
165+ // `some Sequence<UInt8>` is special as it can't be efficiently chunked lazily.
166+ // Therefore we need to do the chunking eagerly if it implements the fast path withContiguousStorageIfAvailable
167+ // If we do it eagerly, it doesn't make sense to do a bunch of small chunks, so we only chunk if it exceeds
168+ // the maximum size of a ByteBuffer.
169+ if bufferPointer. count <= byteBufferMaxSize {
170+ let buffer = ByteBuffer ( bytes: bufferPointer)
171+ return Self ( . sequence(
172+ length: length. storage,
173+ canBeConsumedMultipleTimes: true ,
174+ makeCompleteBody: { _ in buffer }
175+ ) )
176+ } else {
177+ // we need to copy `bufferPointer` eagerly as the pointer is only valid during the call to `withContiguousStorageIfAvailable`
178+ let buffers : Array < ByteBuffer > = bufferPointer. chunks ( ofCount: byteBufferMaxSize) . map { ByteBuffer ( bytes: $0) }
179+ return Self ( . asyncSequence(
180+ length: length. storage,
181+ makeAsyncIterator: {
182+ var iterator = buffers. makeIterator ( )
183+ return { _ in
184+ iterator. next ( )
185+ }
186+ }
187+ ) )
188+ }
189+ }
190+ if let body = body {
191+ return body
192+ }
193+
194+ // slow path
195+ return Self ( . asyncSequence(
196+ length: length. storage
197+ ) {
198+ var iterator = bytes. makeIterator ( )
199+ return { allocator in
200+ var buffer = allocator. buffer ( capacity: bagOfBytesToByteBufferConversionChunkSize)
201+ while buffer. writableBytes > 0 , let byte = iterator. next ( ) {
202+ buffer. writeInteger ( byte)
203+ }
204+ if buffer. readableBytes > 0 {
205+ return buffer
206+ }
207+ return nil
164208 }
165- // potentially really slow path
166- return allocator. buffer ( bytes: bytes)
167209 } )
168210 }
169211
170212 /// Create an ``HTTPClientRequest/Body-swift.struct`` from a `Collection` of bytes.
171213 ///
172- /// This construction will flatten the bytes into a `ByteBuffer`. As a result, the peak memory
173- /// usage of this construction will be double the size of the original collection. The construction
174- /// of the `ByteBuffer` will be delayed until it's needed.
214+ /// This construction will flatten the ` bytes` into a `ByteBuffer` in chunks of ~4MB.
215+ /// As a result, the peak memory usage of this construction will be a small multiple of ~4MB.
216+ /// The construction of the `ByteBuffer` will be delayed until it's needed.
175217 ///
176218 /// Caution should be taken with this method to ensure that the `length` is correct. Incorrect lengths
177219 /// will cause unnecessary runtime failures. Setting `length` to ``Length/unknown`` will trigger the upload
@@ -186,25 +228,27 @@ extension HTTPClientRequest.Body {
186228 _ bytes: Bytes ,
187229 length: Length
188230 ) -> Self where Bytes. Element == UInt8 {
189- Self . _bytes ( bytes, length: length)
190- }
191-
192- @inlinable
193- static func _bytes< Bytes: Collection > (
194- _ bytes: Bytes ,
195- length: Length
196- ) -> Self where Bytes. Element == UInt8 {
197- self . init ( . sequence(
198- length: length. storage,
199- canBeConsumedMultipleTimes: true
200- ) { allocator in
201- if let buffer = bytes. withContiguousStorageIfAvailable ( { allocator. buffer ( bytes: $0) } ) {
202- // fastpath
203- return buffer
204- }
205- // potentially really slow path
206- return allocator. buffer ( bytes: bytes)
207- } )
231+ if bytes. count <= bagOfBytesToByteBufferConversionChunkSize {
232+ return self . init ( . sequence(
233+ length: length. storage,
234+ canBeConsumedMultipleTimes: true
235+ ) { allocator in
236+ allocator. buffer ( bytes: bytes)
237+ } )
238+ } else {
239+ return self . init ( . asyncSequence(
240+ length: length. storage,
241+ makeAsyncIterator: {
242+ var iterator = bytes. chunks ( ofCount: bagOfBytesToByteBufferConversionChunkSize) . makeIterator ( )
243+ return { allocator in
244+ guard let chunk = iterator. next ( ) else {
245+ return nil
246+ }
247+ return allocator. buffer ( bytes: chunk)
248+ }
249+ }
250+ ) )
251+ }
208252 }
209253
210254 /// Create an ``HTTPClientRequest/Body-swift.struct`` from an `AsyncSequence` of `ByteBuffer`s.
@@ -223,14 +267,6 @@ extension HTTPClientRequest.Body {
223267 public static func stream< SequenceOfBytes: AsyncSequence & Sendable > (
224268 _ sequenceOfBytes: SequenceOfBytes ,
225269 length: Length
226- ) -> Self where SequenceOfBytes. Element == ByteBuffer {
227- Self . _stream ( sequenceOfBytes, length: length)
228- }
229-
230- @inlinable
231- static func _stream< SequenceOfBytes: AsyncSequence > (
232- _ sequenceOfBytes: SequenceOfBytes ,
233- length: Length
234270 ) -> Self where SequenceOfBytes. Element == ByteBuffer {
235271 let body = self . init ( . asyncSequence( length: length. storage) {
236272 var iterator = sequenceOfBytes. makeAsyncIterator ( )
@@ -243,7 +279,7 @@ extension HTTPClientRequest.Body {
243279
244280 /// Create an ``HTTPClientRequest/Body-swift.struct`` from an `AsyncSequence` of bytes.
245281 ///
246- /// This construction will consume 1kB chunks from the `Bytes` and send them at once. This optimizes for
282+ /// This construction will consume 4MB chunks from the `Bytes` and send them at once. This optimizes for
247283 /// `AsyncSequence`s where larger chunks are buffered up and available without actually suspending, such
248284 /// as those provided by `FileHandle`.
249285 ///
@@ -259,19 +295,11 @@ extension HTTPClientRequest.Body {
259295 public static func stream< Bytes: AsyncSequence & Sendable > (
260296 _ bytes: Bytes ,
261297 length: Length
262- ) -> Self where Bytes. Element == UInt8 {
263- Self . _stream ( bytes, length: length)
264- }
265-
266- @inlinable
267- static func _stream< Bytes: AsyncSequence > (
268- _ bytes: Bytes ,
269- length: Length
270298 ) -> Self where Bytes. Element == UInt8 {
271299 let body = self . init ( . asyncSequence( length: length. storage) {
272300 var iterator = bytes. makeAsyncIterator ( )
273301 return { allocator -> ByteBuffer ? in
274- var buffer = allocator. buffer ( capacity: 1024 ) // TODO: Magic number
302+ var buffer = allocator. buffer ( capacity: bagOfBytesToByteBufferConversionChunkSize )
275303 while buffer. writableBytes > 0 , let byte = try await iterator. next ( ) {
276304 buffer. writeInteger ( byte)
277305 }
@@ -313,3 +341,53 @@ extension HTTPClientRequest.Body {
313341 internal var storage : RequestBodyLength
314342 }
315343}
344+
345+ @available ( macOS 10 . 15 , iOS 13 . 0 , watchOS 6 . 0 , tvOS 13 . 0 , * )
346+ extension HTTPClientRequest . Body : AsyncSequence {
347+ public typealias Element = ByteBuffer
348+
349+ @inlinable
350+ public func makeAsyncIterator( ) -> AsyncIterator {
351+ switch self . mode {
352+ case . asyncSequence( _, let makeAsyncIterator) :
353+ return . init( storage: . makeNext( makeAsyncIterator ( ) ) )
354+ case . sequence( _, _, let makeCompleteBody) :
355+ return . init( storage: . byteBuffer( makeCompleteBody ( AsyncIterator . allocator) ) )
356+ case . byteBuffer( let byteBuffer) :
357+ return . init( storage: . byteBuffer( byteBuffer) )
358+ }
359+ }
360+ }
361+
362+ @available ( macOS 10 . 15 , iOS 13 . 0 , watchOS 6 . 0 , tvOS 13 . 0 , * )
363+ extension HTTPClientRequest . Body {
364+ public struct AsyncIterator : AsyncIteratorProtocol {
365+ @usableFromInline
366+ static let allocator = ByteBufferAllocator ( )
367+
368+ @usableFromInline
369+ enum Storage {
370+ case byteBuffer( ByteBuffer ? )
371+ case makeNext( ( ByteBufferAllocator ) async throws -> ByteBuffer ? )
372+ }
373+
374+ @usableFromInline
375+ var storage : Storage
376+
377+ @inlinable
378+ init ( storage: Storage ) {
379+ self . storage = storage
380+ }
381+
382+ @inlinable
383+ public mutating func next( ) async throws -> ByteBuffer ? {
384+ switch self . storage {
385+ case . byteBuffer( let buffer) :
386+ self . storage = . byteBuffer( nil )
387+ return buffer
388+ case . makeNext( let makeNext) :
389+ return try await makeNext ( Self . allocator)
390+ }
391+ }
392+ }
393+ }
0 commit comments