@@ -364,6 +364,154 @@ class HTTP1ConnectionTests: XCTestCase {
364364 XCTAssertEqual ( connectionDelegate. hitConnectionClosed, 1 )
365365 XCTAssertEqual ( httpBin. activeConnections, 0 )
366366 }
367+
368+ // In order to test backpressure we need to make sure that reads will not happen
369+ // until the backpressure promise is succeeded. Since we cannot guarantee when
370+ // messages will be delivered to a client pipeline and we need this test to be
371+ // fast (no waiting for arbitrary amounts of time), we do the following.
372+ // First, we enforce NIO to send us only 1 byte at a time. Then we send a message
373+ // of 4 bytes. This will guarantee that if we see first byte of the message, other
374+ // bytes a ready to be read as well. This will allow us to test if subsequent reads
375+ // are waiting for backpressure promise.
376+ func testDownloadStreamingBackpressure( ) {
377+ class BackpressureTestDelegate : HTTPClientResponseDelegate {
378+ typealias Response = Void
379+
380+ var _reads = 0
381+ var _channel : Channel ?
382+
383+ let lock : Lock
384+ let backpressurePromise : EventLoopPromise < Void >
385+ let messageReceived : EventLoopPromise < Void >
386+
387+ init ( eventLoop: EventLoop ) {
388+ self . lock = Lock ( )
389+ self . backpressurePromise = eventLoop. makePromise ( )
390+ self . messageReceived = eventLoop. makePromise ( )
391+ }
392+
393+ var reads : Int {
394+ return self . lock. withLock {
395+ self . _reads
396+ }
397+ }
398+
399+ func willExecuteOnChannel( _ channel: Channel ) {
400+ self . lock. withLockVoid {
401+ self . _channel = channel
402+ }
403+ }
404+
405+ func didReceiveHead( task: HTTPClient . Task < Void > , _ head: HTTPResponseHead ) -> EventLoopFuture < Void > {
406+ return task. futureResult. eventLoop. makeSucceededVoidFuture ( )
407+ }
408+
409+ func didReceiveBodyPart( task: HTTPClient . Task < Response > , _ buffer: ByteBuffer ) -> EventLoopFuture < Void > {
410+ // We count a number of reads received.
411+ self . lock. withLockVoid {
412+ self . _reads += 1
413+ }
414+ // We need to notify the test when first byte of the message is arrived.
415+ self . messageReceived. succeed ( ( ) )
416+ return self . backpressurePromise. futureResult
417+ }
418+
419+ func didFinishRequest( task: HTTPClient . Task < Response > ) throws { }
420+ }
421+
422+ final class WriteAfterFutureSucceedsHandler : ChannelInboundHandler {
423+ typealias InboundIn = HTTPServerRequestPart
424+ typealias OutboundOut = HTTPServerResponsePart
425+
426+ let endFuture : EventLoopFuture < Void >
427+
428+ init ( endFuture: EventLoopFuture < Void > ) {
429+ self . endFuture = endFuture
430+ }
431+
432+ func channelRead( context: ChannelHandlerContext , data: NIOAny ) {
433+ switch self . unwrapInboundIn ( data) {
434+ case . head:
435+ let head = HTTPResponseHead ( version: HTTPVersion ( major: 1 , minor: 1 ) , status: . ok)
436+ context. writeAndFlush ( wrapOutboundOut ( . head( head) ) , promise: nil )
437+ case . body:
438+ // ignore
439+ break
440+ case . end:
441+ let buffer = context. channel. allocator. buffer ( string: " 1234 " )
442+ context. writeAndFlush ( self . wrapOutboundOut ( . body( . byteBuffer( buffer) ) ) , promise: nil )
443+
444+ self . endFuture. hop ( to: context. eventLoop) . whenSuccess {
445+ context. writeAndFlush ( self . wrapOutboundOut ( . end( nil ) ) , promise: nil )
446+ }
447+ }
448+ }
449+ }
450+
451+ let logger = Logger ( label: " test " )
452+
453+ // cannot test with NIOTS as `maxMessagesPerRead` is not supported
454+ let eventLoopGroup = MultiThreadedEventLoopGroup ( numberOfThreads: 1 )
455+ defer { XCTAssertNoThrow ( try eventLoopGroup. syncShutdownGracefully ( ) ) }
456+ let requestEventLoop = eventLoopGroup. next ( )
457+ let backpressureDelegate = BackpressureTestDelegate ( eventLoop: requestEventLoop)
458+
459+ let httpBin = HTTPBin { _ in
460+ WriteAfterFutureSucceedsHandler (
461+ endFuture: backpressureDelegate. backpressurePromise. futureResult
462+ )
463+ }
464+ defer { XCTAssertNoThrow ( try httpBin. shutdown ( ) ) }
465+
466+ var maybeChannel : Channel ?
467+ XCTAssertNoThrow ( maybeChannel = try ClientBootstrap ( group: eventLoopGroup)
468+ . channelOption ( ChannelOptions . maxMessagesPerRead, value: 1 )
469+ . channelOption ( ChannelOptions . recvAllocator, value: FixedSizeRecvByteBufferAllocator ( capacity: 1 ) )
470+ . connect ( host: " localhost " , port: httpBin. port)
471+ . wait ( ) )
472+ guard let channel = maybeChannel else { return XCTFail ( " Expected to have a channel at this point " ) }
473+ let connectionDelegate = MockConnectionDelegate ( )
474+ var maybeConnection : HTTP1Connection ?
475+ XCTAssertNoThrow ( maybeConnection = try channel. eventLoop. submit { try HTTP1Connection . start (
476+ channel: channel,
477+ connectionID: 0 ,
478+ delegate: connectionDelegate,
479+ configuration: . init( ) ,
480+ logger: logger
481+ ) } . wait ( ) )
482+ guard let connection = maybeConnection else { return XCTFail ( " Expected to have a connection at this point " ) }
483+
484+ var maybeRequestBag : RequestBag < BackpressureTestDelegate > ?
485+
486+ XCTAssertNoThrow ( maybeRequestBag = try RequestBag (
487+ request: HTTPClient . Request ( url: " http://localhost: \( httpBin. port) /custom " ) ,
488+ eventLoopPreference: . delegate( on: requestEventLoop) ,
489+ task: . init( eventLoop: requestEventLoop, logger: logger) ,
490+ redirectHandler: nil ,
491+ connectionDeadline: . now( ) + . seconds( 30 ) ,
492+ requestOptions: . forTests( ) ,
493+ delegate: backpressureDelegate
494+ ) )
495+ guard let requestBag = maybeRequestBag else { return XCTFail ( " Expected to be able to create a request bag " ) }
496+ backpressureDelegate. willExecuteOnChannel ( connection. channel)
497+
498+ connection. executeRequest ( requestBag)
499+
500+ let requestFuture = requestBag. task. futureResult
501+
502+ // Send 4 bytes, but only one should be received until the backpressure promise is succeeded.
503+
504+ // Now we wait until message is delivered to client channel pipeline
505+ XCTAssertNoThrow ( try backpressureDelegate. messageReceived. futureResult. wait ( ) )
506+ XCTAssertEqual ( backpressureDelegate. reads, 1 )
507+
508+ // Succeed the backpressure promise.
509+ backpressureDelegate. backpressurePromise. succeed ( ( ) )
510+ XCTAssertNoThrow ( try requestFuture. wait ( ) )
511+
512+ // At this point all other bytes should be delivered.
513+ XCTAssertEqual ( backpressureDelegate. reads, 4 )
514+ }
367515}
368516
369517class MockHTTP1ConnectionDelegate : HTTP1ConnectionDelegate {
0 commit comments