@@ -1705,6 +1705,7 @@ class HTTPClientTests: XCTestCase {
17051705 private let bodyPromises : [ EventLoopPromise < ByteBuffer > ]
17061706 private let endPromise : EventLoopPromise < Void >
17071707 private var bodyPartsSeenSoFar = 0
1708+ private var atEnd = false
17081709
17091710 init ( headPromise: EventLoopPromise < HTTPRequestHead > ,
17101711 bodyPromises: [ EventLoopPromise < ByteBuffer > ] ,
@@ -1727,10 +1728,14 @@ class HTTPClientTests: XCTestCase {
17271728 context. write ( self . wrapOutboundOut ( . head( . init( version: . init( major: 1 , minor: 1 ) , status: . ok) ) ) ,
17281729 promise: nil )
17291730 context. writeAndFlush ( self . wrapOutboundOut ( . end( nil ) ) , promise: self . endPromise)
1731+ self . atEnd = true
17301732 }
17311733 }
17321734
17331735 func handlerRemoved( context: ChannelHandlerContext ) {
1736+ guard !self . atEnd else {
1737+ return
1738+ }
17341739 struct NotFulfilledError : Error { }
17351740
17361741 self . headPromise. fail ( NotFulfilledError ( ) )
@@ -1753,10 +1758,7 @@ class HTTPClientTests: XCTestCase {
17531758 let bodyPromises = ( 0 ..< 16 ) . map { _ in group. next ( ) . makePromise ( of: ByteBuffer . self) }
17541759 let endPromise = group. next ( ) . makePromise ( of: Void . self)
17551760 let sentOffAllBodyPartsPromise = group. next ( ) . makePromise ( of: Void . self)
1756- // Because of https://github.com/swift-server/async-http-client/issues/200 we also need to pull off a terrible
1757- // hack and get the internal EventLoop out :(. Once the bug is fixed, this promise should only get the
1758- // StreamWriter.
1759- let streamWriterPromise = group. next ( ) . makePromise ( of: ( EventLoop, HTTPClient . Body. StreamWriter) . self)
1761+ let streamWriterPromise = group. next ( ) . makePromise ( of: HTTPClient . Body. StreamWriter. self)
17601762
17611763 func makeServer( ) -> Channel ? {
17621764 return try ? ServerBootstrap ( group: group)
@@ -1781,12 +1783,7 @@ class HTTPClientTests: XCTestCase {
17811783 method: . POST,
17821784 headers: [ " transfer-encoding " : " chunked " ] ,
17831785 body: . stream { streamWriter in
1784- // Due to https://github.com/swift-server/async-http-client/issues/200
1785- // we also need to pull off a terrible hack and get the internal
1786- // EventLoop out :(. Once the bug is fixed, this promise should only get
1787- // the StreamWriter.
1788- let currentEL = MultiThreadedEventLoopGroup . currentEventLoop! // HACK!!
1789- streamWriterPromise. succeed ( ( currentEL, streamWriter) )
1786+ streamWriterPromise. succeed ( streamWriter)
17901787 return sentOffAllBodyPartsPromise. futureResult
17911788 } )
17921789 }
@@ -1811,13 +1808,69 @@ class HTTPClientTests: XCTestCase {
18111808 buffer. clear ( )
18121809 buffer. writeString ( String ( bodyChunkNumber, radix: 16 ) )
18131810 XCTAssertEqual ( 1 , buffer. readableBytes)
1814- XCTAssertNoThrow ( try streamWriter. 0 . flatSubmit {
1815- streamWriter. 1 . write ( . byteBuffer( buffer) )
1816- } . wait ( ) )
1811+ XCTAssertNoThrow ( try streamWriter. write ( . byteBuffer( buffer) ) . wait ( ) )
18171812 XCTAssertNoThrow ( XCTAssertEqual ( buffer, try bodyPromises [ bodyChunkNumber] . futureResult. wait ( ) ) )
18181813 }
18191814 sentOffAllBodyPartsPromise. succeed ( ( ) )
18201815 XCTAssertNoThrow ( try endPromise. futureResult. wait ( ) )
18211816 XCTAssertNoThrow ( try runningRequest. wait ( ) )
18221817 }
1818+
1819+ func testUploadStreamingCallinToleratedFromOtsideEL( ) throws {
1820+ let httpBin = HTTPBin ( )
1821+ let httpClient = HTTPClient ( eventLoopGroupProvider: . shared( self . clientGroup) )
1822+ defer {
1823+ XCTAssertNoThrow ( try httpClient. syncShutdown ( ) )
1824+ XCTAssertNoThrow ( try httpBin. shutdown ( ) )
1825+ }
1826+
1827+ let request = try HTTPClient . Request ( url: " http://localhost: \( httpBin. port) /get " , method: . POST, body: . stream( length: 4 ) { writer in
1828+ let promise = httpClient. eventLoopGroup. next ( ) . makePromise ( of: Void . self)
1829+ // We have to toleare callins from any thread
1830+ DispatchQueue ( label: " upload-streaming " ) . async {
1831+ writer. write ( . byteBuffer( ByteBuffer . of ( string: " 1234 " ) ) ) . whenComplete { _ in
1832+ promise. succeed ( ( ) )
1833+ }
1834+ }
1835+ return promise. futureResult
1836+ } )
1837+ XCTAssertNoThrow ( try httpClient. execute ( request: request) . wait ( ) )
1838+ }
1839+
1840+ func testUploadStreamingIsCalledOnTaskEL( ) throws {
1841+ let group = getDefaultEventLoopGroup ( numberOfThreads: 4 )
1842+ defer {
1843+ XCTAssertNoThrow ( try group. syncShutdownGracefully ( ) )
1844+ }
1845+
1846+ let httpBin = HTTPBin ( )
1847+ let httpClient = HTTPClient ( eventLoopGroupProvider: . shared( group) )
1848+ defer {
1849+ XCTAssertNoThrow ( try httpClient. syncShutdown ( ) )
1850+ XCTAssertNoThrow ( try httpBin. shutdown ( ) )
1851+ }
1852+
1853+ let el1 = group. next ( )
1854+ let el2 = group. next ( )
1855+ XCTAssertFalse ( el1 === el2)
1856+
1857+ do {
1858+ // Pre-populate pool with a connection on a different EL
1859+ let request = try HTTPClient . Request ( url: " http://localhost: \( httpBin. port) /get " , method: . GET)
1860+ XCTAssertNoThrow ( try httpClient. execute ( request: request, delegate: ResponseAccumulator ( request: request) , eventLoop: . delegateAndChannel( on: el2) ) . wait ( ) )
1861+ }
1862+
1863+ let body : HTTPClient . Body = . stream( length: 8 ) { writer in
1864+ XCTAssert ( el1. inEventLoop)
1865+ let buffer = ByteBuffer . of ( string: " 1234 " )
1866+ return writer. write ( . byteBuffer( buffer) ) . flatMap {
1867+ XCTAssert ( el1. inEventLoop)
1868+ let buffer = ByteBuffer . of ( string: " 4321 " )
1869+ return writer. write ( . byteBuffer( buffer) )
1870+ }
1871+ }
1872+ let request = try HTTPClient . Request ( url: " http://localhost: \( httpBin. port) /post " , method: . POST, body: body)
1873+ let response = httpClient. execute ( request: request, delegate: ResponseAccumulator ( request: request) , eventLoop: . delegate( on: el1) )
1874+ XCTAssertNoThrow ( try response. wait ( ) )
1875+ }
18231876}
0 commit comments