@@ -415,20 +415,21 @@ class HTTPClientInternalTests: XCTestCase {
415415 }
416416
417417 let group = getDefaultEventLoopGroup ( numberOfThreads: 3 )
418+ let serverGroup = MultiThreadedEventLoopGroup ( numberOfThreads: 1 )
418419 defer {
419420 XCTAssertNoThrow ( try group. syncShutdownGracefully ( ) )
421+ XCTAssertNoThrow ( try serverGroup. syncShutdownGracefully ( ) )
420422 }
421423
422424 let channelEL = group. next ( )
423425 let delegateEL = group. next ( )
424426 let randoEL = group. next ( )
425427
426428 let httpClient = HTTPClient ( eventLoopGroupProvider: . shared( group) )
427- let promise : EventLoopPromise < Channel > = httpClient. eventLoopGroup. next ( ) . makePromise ( )
428- let httpBin = HTTPBin ( channelPromise: promise)
429+ let server = NIOHTTP1TestServer ( group: serverGroup)
429430 defer {
431+ XCTAssertNoThrow ( try server. stop ( ) )
430432 XCTAssertNoThrow ( try httpClient. syncShutdown ( requiresCleanClose: true ) )
431- XCTAssertNoThrow ( try httpBin. shutdown ( ) )
432433 }
433434
434435 let body : HTTPClient . Body = . stream( length: 8 ) { writer in
@@ -439,21 +440,26 @@ class HTTPClientInternalTests: XCTestCase {
439440 }
440441 }
441442
442- let request = try Request ( url: " http://127.0.0.1: \( httpBin . port ) /custom " ,
443+ let request = try Request ( url: " http://127.0.0.1: \( server . serverPort ) /custom " ,
443444 body: body)
444445 let delegate = Delegate ( expectedEventLoop: delegateEL, randomOtherEventLoop: randoEL)
445446 let future = httpClient. execute ( request: request,
446447 delegate: delegate,
447448 eventLoop: . init( . testOnly_exact( channelOn: channelEL,
448449 delegateOn: delegateEL) ) ) . futureResult
449450
450- let channel = try promise. futureResult. wait ( )
451+ XCTAssertNoThrow ( try server. readInbound ( ) ) // .head
452+ XCTAssertNoThrow ( try server. readInbound ( ) ) // .body
453+ XCTAssertNoThrow ( try server. readInbound ( ) ) // .end
451454
452455 // Send 3 parts, but only one should be received until the future is complete
453- let buffer = channel. allocator. buffer ( string: " 1234 " )
454- try channel. writeAndFlush ( HTTPServerResponsePart . body ( . byteBuffer( buffer) ) ) . wait ( )
456+ XCTAssertNoThrow ( try server. writeOutbound ( . head( . init( version: . init( major: 1 , minor: 1 ) ,
457+ status: . ok,
458+ headers: HTTPHeaders ( [ ( " Transfer-Encoding " , " chunked " ) ] ) ) ) ) )
459+ let buffer = ByteBuffer ( string: " 1234 " )
460+ XCTAssertNoThrow ( try server. writeOutbound ( . body( . byteBuffer( buffer) ) ) )
461+ XCTAssertNoThrow ( try server. writeOutbound ( . end( nil ) ) )
455462
456- try channel. writeAndFlush ( HTTPServerResponsePart . end ( nil ) ) . wait ( )
457463 let ( receivedMessages, sentMessages) = try future. wait ( )
458464 XCTAssertEqual ( 2 , receivedMessages. count)
459465 XCTAssertEqual ( 4 , sentMessages. count)
@@ -488,7 +494,7 @@ class HTTPClientInternalTests: XCTestCase {
488494
489495 switch receivedMessages. dropFirst ( 0 ) . first {
490496 case . some( . head( let head) ) :
491- XCTAssertEqual ( [ " transfer-encoding " : " chunked " ] , head . headers )
497+ XCTAssertEqual ( head . headers [ " transfer-encoding " ] . first , " chunked " )
492498 default :
493499 XCTFail ( " wrong message " )
494500 }
@@ -1025,4 +1031,53 @@ class HTTPClientInternalTests: XCTestCase {
10251031 XCTAssertEqual ( request5. socketPath, " /tmp/file " )
10261032 XCTAssertEqual ( request5. uri, " /file/path " )
10271033 }
1034+
1035+ func testBodyPartStreamStateChangedBeforeNotification( ) throws {
1036+ class StateValidationDelegate : HTTPClientResponseDelegate {
1037+ typealias Response = Void
1038+
1039+ var handler : TaskHandler < StateValidationDelegate > !
1040+ var triggered = false
1041+
1042+ func didReceiveError( task: HTTPClient . Task < Response > , _ error: Error ) {
1043+ self . triggered = true
1044+ switch self . handler. state {
1045+ case . endOrError:
1046+ // expected
1047+ break
1048+ default :
1049+ XCTFail ( " unexpected state: \( self . handler. state) " )
1050+ }
1051+ }
1052+
1053+ func didFinishRequest( task: HTTPClient . Task < Void > ) throws { }
1054+ }
1055+
1056+ let channel = EmbeddedChannel ( )
1057+ XCTAssertNoThrow ( try channel. connect ( to: try SocketAddress ( unixDomainSocketPath: " /fake " ) ) . wait ( ) )
1058+
1059+ let task = Task < Void > ( eventLoop: channel. eventLoop, logger: HTTPClient . loggingDisabled)
1060+
1061+ let delegate = StateValidationDelegate ( )
1062+ let handler = TaskHandler ( task: task,
1063+ kind: . host,
1064+ delegate: delegate,
1065+ redirectHandler: nil ,
1066+ ignoreUncleanSSLShutdown: false ,
1067+ logger: HTTPClient . loggingDisabled)
1068+
1069+ delegate. handler = handler
1070+ try channel. pipeline. addHandler ( handler) . wait ( )
1071+
1072+ var request = try Request ( url: " http://localhost:8080/post " )
1073+ request. body = . stream( length: 1 ) { writer in
1074+ writer. write ( . byteBuffer( ByteBuffer ( string: " 1234 " ) ) )
1075+ }
1076+
1077+ XCTAssertThrowsError ( try channel. writeOutbound ( request) )
1078+ XCTAssertTrue ( delegate. triggered)
1079+
1080+ XCTAssertNoThrow ( try channel. readOutbound ( as: HTTPClientRequestPart . self) ) // .head
1081+ XCTAssertNoThrow ( XCTAssertTrue ( try channel. finish ( ) . isClean) )
1082+ }
10281083}
0 commit comments