1919#endif
2020import Logging
2121import NIOCore
22+ import NIOHTTP1
2223import NIOPosix
2324import NIOSSL
2425import XCTest
2526
2627class HTTP2ClientTests : XCTestCase {
27- func makeDefaultHTTPClient( ) -> HTTPClient {
28+ func makeDefaultHTTPClient(
29+ eventLoopGroupProvider: HTTPClient . EventLoopGroupProvider = . createNew
30+ ) -> HTTPClient {
2831 var tlsConfig = TLSConfiguration . makeClientConfiguration ( )
2932 tlsConfig. certificateVerification = . none
3033 return HTTPClient (
31- eventLoopGroupProvider: . createNew ,
34+ eventLoopGroupProvider: eventLoopGroupProvider ,
3235 configuration: HTTPClient . Configuration (
3336 tlsConfiguration: tlsConfig,
3437 httpVersion: . automatic
@@ -37,6 +40,18 @@ class HTTP2ClientTests: XCTestCase {
3740 )
3841 }
3942
43+ func makeClientWithActiveHTTP2Connection< RequestHandler> (
44+ to bin: HTTPBin < RequestHandler > ,
45+ eventLoopGroupProvider: HTTPClient . EventLoopGroupProvider = . createNew
46+ ) -> HTTPClient {
47+ let client = self . makeDefaultHTTPClient ( eventLoopGroupProvider: eventLoopGroupProvider)
48+ var response : HTTPClient . Response ?
49+ XCTAssertNoThrow ( response = try client. get ( url: " https://localhost: \( bin. port) /get " ) . wait ( ) )
50+ XCTAssertEqual ( . ok, response? . status)
51+ XCTAssertEqual ( response? . version, . http2)
52+ return client
53+ }
54+
4055 func testSimpleGet( ) {
4156 let bin = HTTPBin ( . http2( compress: false ) )
4257 defer { XCTAssertNoThrow ( try bin. shutdown ( ) ) }
@@ -92,7 +107,7 @@ class HTTP2ClientTests: XCTestCase {
92107
93108 for _ in 0 ..< numberOfRequestsPerWorkers {
94109 var response : HTTPClient . Response ?
95- XCTAssertNoThrow ( response = try client. get ( url: url ) . wait ( ) )
110+ XCTAssertNoThrow ( response = try client. get ( url: " https://localhost: \( bin . port ) /get " ) . wait ( ) )
96111
97112 XCTAssertEqual ( . ok, response? . status)
98113 XCTAssertEqual ( response? . version, . http2)
@@ -187,4 +202,193 @@ class HTTP2ClientTests: XCTestCase {
187202 // all workers should be running, let's wait for them to finish
188203 allDone. wait ( )
189204 }
205+
206+ func testUncleanShutdownCancelsExecutingAndQueuedTasks( ) {
207+ let bin = HTTPBin ( . http2( compress: false ) )
208+ defer { XCTAssertNoThrow ( try bin. shutdown ( ) ) }
209+ let clientGroup = MultiThreadedEventLoopGroup ( numberOfThreads: 1 )
210+ defer { XCTAssertNoThrow ( try clientGroup. syncShutdownGracefully ( ) ) }
211+ // we need an active connection to guarantee that requests are executed immediately
212+ // without waiting for connection establishment
213+ let client = self . makeClientWithActiveHTTP2Connection ( to: bin, eventLoopGroupProvider: . shared( clientGroup) )
214+
215+ // start 20 requests which are guaranteed to never get any response
216+ // 10 of them will executed and the other 10 will be queued
217+ // because HTTPBin has a default `maxConcurrentStreams` limit of 10
218+ let responses = ( 0 ..< 20 ) . map { _ in
219+ client. get ( url: " https://localhost: \( bin. port) /wait " )
220+ }
221+
222+ XCTAssertNoThrow ( try client. syncShutdown ( ) )
223+
224+ var results : [ Result < HTTPClient . Response , Error > ] = [ ]
225+ XCTAssertNoThrow ( results = try EventLoopFuture
226+ . whenAllComplete ( responses, on: clientGroup. next ( ) )
227+ . timeout ( after: . seconds( 2 ) )
228+ . wait ( ) )
229+
230+ for result in results {
231+ switch result {
232+ case . success:
233+ XCTFail ( " Shouldn't succeed " )
234+ case . failure( let error) :
235+ XCTAssertEqual ( error as? HTTPClientError , . cancelled)
236+ }
237+ }
238+ }
239+
240+ func testCancelingRunningRequest( ) {
241+ let bin = HTTPBin ( . http2( compress: false ) ) { _ in SendHeaderAndWaitChannelHandler ( ) }
242+ defer { XCTAssertNoThrow ( try bin. shutdown ( ) ) }
243+ let client = self . makeDefaultHTTPClient ( )
244+ defer { XCTAssertNoThrow ( try client. syncShutdown ( ) ) }
245+
246+ var maybeRequest : HTTPClient . Request ?
247+ XCTAssertNoThrow ( maybeRequest = try HTTPClient . Request ( url: " https://localhost: \( bin. port) " ) )
248+ guard let request = maybeRequest else { return }
249+
250+ var task : HTTPClient . Task < Void > !
251+ let delegate = HeadReceivedCallback { _ in
252+ // request is definitely running because we just received a head from the server
253+ task. cancel ( )
254+ }
255+ task = client. execute (
256+ request: request,
257+ delegate: delegate
258+ )
259+
260+ XCTAssertThrowsError ( try task. futureResult. timeout ( after: . seconds( 2 ) ) . wait ( ) ) {
261+ XCTAssertEqual ( $0 as? HTTPClientError , . cancelled)
262+ }
263+ }
264+
265+ func testStressCancelingRunningRequestFromDifferentThreads( ) {
266+ let bin = HTTPBin ( . http2( compress: false ) ) { _ in SendHeaderAndWaitChannelHandler ( ) }
267+ defer { XCTAssertNoThrow ( try bin. shutdown ( ) ) }
268+ let client = self . makeDefaultHTTPClient ( )
269+ defer { XCTAssertNoThrow ( try client. syncShutdown ( ) ) }
270+ let cancelPool = MultiThreadedEventLoopGroup ( numberOfThreads: 10 )
271+ defer { XCTAssertNoThrow ( try cancelPool. syncShutdownGracefully ( ) ) }
272+
273+ var maybeRequest : HTTPClient . Request ?
274+ XCTAssertNoThrow ( maybeRequest = try HTTPClient . Request ( url: " https://localhost: \( bin. port) " ) )
275+ guard let request = maybeRequest else { return }
276+
277+ let tasks = ( 0 ..< 100 ) . map { _ -> HTTPClient . Task < TestHTTPDelegate . Response > in
278+ var task : HTTPClient . Task < Void > !
279+ let delegate = HeadReceivedCallback { _ in
280+ // request is definitely running because we just received a head from the server
281+ cancelPool. next ( ) . execute {
282+ // canceling from a different thread
283+ task. cancel ( )
284+ }
285+ }
286+ task = client. execute (
287+ request: request,
288+ delegate: delegate
289+ )
290+ return task
291+ }
292+
293+ for task in tasks {
294+ XCTAssertThrowsError ( try task. futureResult. timeout ( after: . seconds( 2 ) ) . wait ( ) ) {
295+ XCTAssertEqual ( $0 as? HTTPClientError , . cancelled)
296+ }
297+ }
298+ }
299+
300+ func testPlatformConnectErrorIsForwardedOnTimeout( ) {
301+ let bin = HTTPBin ( . http2( compress: false ) )
302+ let clientGroup = MultiThreadedEventLoopGroup ( numberOfThreads: 2 )
303+ let el1 = clientGroup. next ( )
304+ let el2 = clientGroup. next ( )
305+ defer { XCTAssertNoThrow ( try clientGroup. syncShutdownGracefully ( ) ) }
306+ var tlsConfig = TLSConfiguration . makeClientConfiguration ( )
307+ tlsConfig. certificateVerification = . none
308+ let client = HTTPClient (
309+ eventLoopGroupProvider: . shared( clientGroup) ,
310+ configuration: HTTPClient . Configuration (
311+ tlsConfiguration: tlsConfig,
312+ timeout: . init( connect: . milliseconds( 1000 ) ) ,
313+ httpVersion: . automatic
314+ ) ,
315+ backgroundActivityLogger: Logger ( label: " HTTPClient " , factory: StreamLogHandler . standardOutput ( label: ) )
316+ )
317+ defer { XCTAssertNoThrow ( try client. syncShutdown ( ) ) }
318+
319+ var maybeRequest1 : HTTPClient . Request ?
320+ XCTAssertNoThrow ( maybeRequest1 = try HTTPClient . Request ( url: " https://localhost: \( bin. port) /get " ) )
321+ guard let request1 = maybeRequest1 else { return }
322+
323+ let task1 = client. execute ( request: request1, delegate: ResponseAccumulator ( request: request1) , eventLoop: . delegateAndChannel( on: el1) )
324+ var response1 : ResponseAccumulator . Response ?
325+ XCTAssertNoThrow ( response1 = try task1. wait ( ) )
326+
327+ XCTAssertEqual ( . ok, response1? . status)
328+ XCTAssertEqual ( response1? . version, . http2)
329+ let serverPort = bin. port
330+ XCTAssertNoThrow ( try bin. shutdown ( ) )
331+ // client is now in HTTP/2 state and the HTTPBin is closed
332+ // start a new server on the old port which closes all connections immediately
333+ let serverGroup = MultiThreadedEventLoopGroup ( numberOfThreads: 1 )
334+ defer { XCTAssertNoThrow ( try serverGroup. syncShutdownGracefully ( ) ) }
335+ var maybeServer : Channel ?
336+ XCTAssertNoThrow ( maybeServer = try ServerBootstrap ( group: serverGroup)
337+ . serverChannelOption ( ChannelOptions . socketOption ( . so_reuseaddr) , value: 1 )
338+ . childChannelInitializer { channel in
339+ channel. close ( )
340+ }
341+ . childChannelOption ( ChannelOptions . socketOption ( . so_reuseaddr) , value: 1 )
342+ . bind ( host: " 0.0.0.0 " , port: serverPort)
343+ . wait ( ) )
344+ guard let server = maybeServer else { return }
345+ defer { XCTAssertNoThrow ( try server. close ( ) . wait ( ) ) }
346+
347+ var maybeRequest2 : HTTPClient . Request ?
348+ XCTAssertNoThrow ( maybeRequest2 = try HTTPClient . Request ( url: " https://localhost: \( serverPort) / " ) )
349+ guard let request2 = maybeRequest2 else { return }
350+
351+ let task2 = client. execute ( request: request2, delegate: ResponseAccumulator ( request: request2) , eventLoop: . delegateAndChannel( on: el2) )
352+ XCTAssertThrowsError ( try task2. wait ( ) ) { error in
353+ XCTAssertNil (
354+ error as? HTTPClientError ,
355+ " error should be some platform specific error that the connection is closed/reset by the other side "
356+ )
357+ }
358+ }
359+ }
360+
361+ private final class HeadReceivedCallback : HTTPClientResponseDelegate {
362+ typealias Response = Void
363+ private let didReceiveHeadCallback : ( HTTPResponseHead ) -> Void
364+ init ( didReceiveHead: @escaping ( HTTPResponseHead ) -> Void ) {
365+ self . didReceiveHeadCallback = didReceiveHead
366+ }
367+
368+ func didReceiveHead( task: HTTPClient . Task < Void > , _ head: HTTPResponseHead ) -> EventLoopFuture < Void > {
369+ self . didReceiveHeadCallback ( head)
370+ return task. eventLoop. makeSucceededVoidFuture ( )
371+ }
372+
373+ func didFinishRequest( task: HTTPClient . Task < Void > ) throws { }
374+ }
375+
376+ /// sends some headers and waits indefinitely afterwards
377+ private final class SendHeaderAndWaitChannelHandler : ChannelInboundHandler {
378+ typealias InboundIn = HTTPServerRequestPart
379+ typealias OutboundOut = HTTPServerResponsePart
380+
381+ func channelRead( context: ChannelHandlerContext , data: NIOAny ) {
382+ let requestPart = self . unwrapInboundIn ( data)
383+ switch requestPart {
384+ case . head:
385+ context. writeAndFlush ( self . wrapOutboundOut ( . head( HTTPResponseHead (
386+ version: HTTPVersion ( major: 1 , minor: 1 ) ,
387+ status: . ok
388+ ) )
389+ ) , promise: nil )
390+ case . body, . end:
391+ return
392+ }
393+ }
190394}
0 commit comments