@@ -32,14 +32,15 @@ extension HTTPClient {
3232 /// A streaming uploader.
3333 ///
3434 /// ``StreamWriter`` abstracts
35- public struct StreamWriter {
36- let closure : ( IOData ) -> EventLoopFuture < Void >
35+ public struct StreamWriter : Sendable {
36+ let closure : @ Sendable ( IOData ) -> EventLoopFuture < Void >
3737
3838 /// Create new ``HTTPClient/Body/StreamWriter``
3939 ///
4040 /// - parameters:
4141 /// - closure: function that will be called to write actual bytes to the channel.
42- public init ( closure: @escaping ( IOData ) -> EventLoopFuture < Void > ) {
42+ @preconcurrency
43+ public init ( closure: @escaping @Sendable ( IOData ) -> EventLoopFuture < Void > ) {
4344 self . closure = closure
4445 }
4546
@@ -55,8 +56,8 @@ extension HTTPClient {
5556 func writeChunks< Bytes: Collection > (
5657 of bytes: Bytes ,
5758 maxChunkSize: Int
58- ) -> EventLoopFuture < Void > where Bytes. Element == UInt8 {
59- // `StreamWriter` is has design issues, for example
59+ ) -> EventLoopFuture < Void > where Bytes. Element == UInt8 , Bytes : Sendable {
60+ // `StreamWriter` has design issues, for example
6061 // - https://github.com/swift-server/async-http-client/issues/194
6162 // - https://github.com/swift-server/async-http-client/issues/264
6263 // - We're not told the EventLoop the task runs on and the user is free to return whatever EL they
@@ -66,49 +67,52 @@ extension HTTPClient {
6667 typealias Iterator = EnumeratedSequence < ChunksOfCountCollection < Bytes > > . Iterator
6768 typealias Chunk = ( offset: Int , element: ChunksOfCountCollection < Bytes > . Element )
6869
69- func makeIteratorAndFirstChunk(
70- bytes: Bytes
71- ) -> (
72- iterator: NIOLockedValueBox < Iterator > ,
73- chunk: Chunk
74- ) ? {
75- var iterator = bytes. chunks ( ofCount: maxChunkSize) . enumerated ( ) . makeIterator ( )
76- guard let chunk = iterator. next ( ) else {
77- return nil
70+ // HACK (again, we're not told the right EventLoop): Let's write 0 bytes to make the user tell us...
71+ return self . write ( . byteBuffer( ByteBuffer ( ) ) ) . flatMapWithEventLoop { ( _, loop) in
72+ func makeIteratorAndFirstChunk(
73+ bytes: Bytes
74+ ) -> ( iterator: Iterator , chunk: Chunk ) ? {
75+ var iterator = bytes. chunks ( ofCount: maxChunkSize) . enumerated ( ) . makeIterator ( )
76+ guard let chunk = iterator. next ( ) else {
77+ return nil
78+ }
79+
80+ return ( iterator, chunk)
7881 }
7982
80- return ( NIOLockedValueBox ( iterator) , chunk)
81- }
82-
83- guard let ( iterator, chunk) = makeIteratorAndFirstChunk ( bytes: bytes) else {
84- return self . write ( IOData . byteBuffer ( . init( ) ) )
85- }
83+ guard let iteratorAndChunk = makeIteratorAndFirstChunk ( bytes: bytes) else {
84+ return loop. makeSucceededVoidFuture ( )
85+ }
8686
87- @Sendable // can't use closure here as we recursively call ourselves which closures can't do
88- func writeNextChunk( _ chunk: Chunk , allDone: EventLoopPromise < Void > ) {
89- if let nextElement = iterator. withLockedValue ( { $0. next ( ) } ) {
90- self . write ( . byteBuffer( ByteBuffer ( bytes: chunk. element) ) ) . map {
91- let index = nextElement. offset
92- if ( index + 1 ) % 4 == 0 {
93- // Let's not stack-overflow if the futures insta-complete which they at least in HTTP/2
94- // mode.
95- // Also, we must frequently return to the EventLoop because we may get the pause signal
96- // from another thread. If we fail to do that promptly, we may balloon our body chunks
97- // into memory.
98- allDone. futureResult. eventLoop. execute {
99- writeNextChunk ( nextElement, allDone: allDone)
87+ var iterator = iteratorAndChunk. 0
88+ let chunk = iteratorAndChunk. 1
89+
90+ // can't use closure here as we recursively call ourselves which closures can't do
91+ func writeNextChunk( _ chunk: Chunk , allDone: EventLoopPromise < Void > ) {
92+ let loop = allDone. futureResult. eventLoop
93+ loop. assertInEventLoop ( )
94+
95+ if let ( index, element) = iterator. next ( ) {
96+ self . write ( . byteBuffer( ByteBuffer ( bytes: chunk. element) ) ) . hop ( to: loop) . assumeIsolated ( ) . map
97+ {
98+ if ( index + 1 ) % 4 == 0 {
99+ // Let's not stack-overflow if the futures insta-complete which they at least in HTTP/2
100+ // mode.
101+ // Also, we must frequently return to the EventLoop because we may get the pause signal
102+ // from another thread. If we fail to do that promptly, we may balloon our body chunks
103+ // into memory.
104+ allDone. futureResult. eventLoop. assumeIsolated ( ) . execute {
105+ writeNextChunk ( ( offset: index, element: element) , allDone: allDone)
106+ }
107+ } else {
108+ writeNextChunk ( ( offset: index, element: element) , allDone: allDone)
100109 }
101- } else {
102- writeNextChunk ( nextElement, allDone: allDone)
103- }
104- } . cascadeFailure ( to: allDone)
105- } else {
106- self . write ( . byteBuffer( ByteBuffer ( bytes: chunk. element) ) ) . cascade ( to: allDone)
110+ } . nonisolated ( ) . cascadeFailure ( to: allDone)
111+ } else {
112+ self . write ( . byteBuffer( ByteBuffer ( bytes: chunk. element) ) ) . cascade ( to: allDone)
113+ }
107114 }
108- }
109115
110- // HACK (again, we're not told the right EventLoop): Let's write 0 bytes to make the user tell us...
111- return self . write ( . byteBuffer( ByteBuffer ( ) ) ) . flatMapWithEventLoop { ( _, loop) in
112116 let allDone = loop. makePromise ( of: Void . self)
113117 writeNextChunk ( chunk, allDone: allDone)
114118 return allDone. futureResult
0 commit comments