@@ -806,4 +806,81 @@ class HTTPClientInternalTests: XCTestCase {
806806 }
807807 }
808808 }
809+
810+ func testUploadStreamingIsCalledOnTaskEL( ) throws {
811+ let group = getDefaultEventLoopGroup ( numberOfThreads: 4 )
812+ defer {
813+ XCTAssertNoThrow ( try group. syncShutdownGracefully ( ) )
814+ }
815+
816+ let httpBin = HTTPBin ( )
817+ let httpClient = HTTPClient ( eventLoopGroupProvider: . shared( group) )
818+ defer {
819+ XCTAssertNoThrow ( try httpClient. syncShutdown ( ) )
820+ XCTAssertNoThrow ( try httpBin. shutdown ( ) )
821+ }
822+
823+ let el1 = group. next ( )
824+ let el2 = group. next ( )
825+ XCTAssert ( el1 !== el2)
826+
827+ let body : HTTPClient . Body = . stream( length: 8 ) { writer in
828+ XCTAssert ( el1. inEventLoop)
829+ let buffer = ByteBuffer . of ( string: " 1234 " )
830+ return writer. write ( . byteBuffer( buffer) ) . flatMap {
831+ XCTAssert ( el1. inEventLoop)
832+ let buffer = ByteBuffer . of ( string: " 4321 " )
833+ return writer. write ( . byteBuffer( buffer) )
834+ }
835+ }
836+ let request = try HTTPClient . Request ( url: " http://localhost: \( httpBin. port) /post " , method: . POST, body: body)
837+ let response = httpClient. execute ( request: request,
838+ delegate: ResponseAccumulator ( request: request) ,
839+ eventLoop: HTTPClient . EventLoopPreference ( . testOnly_exact( channelOn: el2,
840+ delegateOn: el1) ) )
841+ XCTAssert ( el1 === response. eventLoop)
842+ XCTAssertNoThrow ( try response. wait ( ) )
843+ }
844+
845+ func testWeCanActuallyExactlySetTheEventLoops( ) throws {
846+ let group = getDefaultEventLoopGroup ( numberOfThreads: 3 )
847+ defer {
848+ XCTAssertNoThrow ( try group. syncShutdownGracefully ( ) )
849+ }
850+
851+ let httpBin = HTTPBin ( )
852+ let httpClient = HTTPClient ( eventLoopGroupProvider: . shared( group) )
853+ defer {
854+ XCTAssertNoThrow ( try httpClient. syncShutdown ( ) )
855+ XCTAssertNoThrow ( try httpBin. shutdown ( ) )
856+ }
857+
858+ let el1 = group. next ( )
859+ let el2 = group. next ( )
860+ XCTAssert ( el1 !== el2)
861+
862+ let taskPromise = group. next ( ) . makePromise ( of: HTTPClient . Task< HTTPClient . Response> . self )
863+ let body : HTTPClient . Body = . stream( length: 8 ) { writer in
864+ XCTAssert ( el1. inEventLoop)
865+ let buffer = ByteBuffer . of ( string: " 1234 " )
866+ return writer. write ( . byteBuffer( buffer) ) . flatMap {
867+ XCTAssert ( el1. inEventLoop)
868+ let buffer = ByteBuffer . of ( string: " 4321 " )
869+ return taskPromise. futureResult. map { ( task: HTTPClient . Task < HTTPClient . Response > ) -> Void in
870+ XCTAssertNotNil ( task. connection)
871+ XCTAssert ( task. connection? . channel. eventLoop === el2)
872+ } . flatMap {
873+ writer. write ( . byteBuffer( buffer) )
874+ }
875+ }
876+ }
877+ let request = try HTTPClient . Request ( url: " http://localhost: \( httpBin. port) /post " , method: . POST, body: body)
878+ let response = httpClient. execute ( request: request,
879+ delegate: ResponseAccumulator ( request: request) ,
880+ eventLoop: HTTPClient . EventLoopPreference ( . testOnly_exact( channelOn: el2,
881+ delegateOn: el1) ) )
882+ taskPromise. succeed ( response)
883+ XCTAssert ( el1 === response. eventLoop)
884+ XCTAssertNoThrow ( try response. wait ( ) )
885+ }
809886}
0 commit comments