@@ -68,7 +68,7 @@ public class HTTPClient {
6868 public let eventLoopGroup : EventLoopGroup
6969 let eventLoopGroupProvider : EventLoopGroupProvider
7070 let configuration : Configuration
71- let pool : ConnectionPool
71+ let poolManager : HTTPConnectionPool . Manager
7272 var state : State
7373 private let stateLock = Lock ( )
7474
@@ -110,14 +110,18 @@ public class HTTPClient {
110110 #endif
111111 }
112112 self . configuration = configuration
113- self . pool = ConnectionPool ( configuration: configuration,
114- backgroundActivityLogger: backgroundActivityLogger)
113+ self . poolManager = HTTPConnectionPool . Manager (
114+ eventLoopGroup: self . eventLoopGroup,
115+ configuration: self . configuration,
116+ backgroundActivityLogger: backgroundActivityLogger
117+ )
115118 self . state = . upAndRunning
116119 }
117120
118121 deinit {
119- assert ( self . pool. count == 0 )
120- assert ( self . state == . shutDown, " Client not shut down before the deinit. Please call client.syncShutdown() when no longer needed. " )
122+ guard case . shutDown = self . state else {
123+ preconditionFailure ( " Client not shut down before the deinit. Please call client.syncShutdown() when no longer needed. " )
124+ }
121125 }
122126
123127 /// Shuts down the client and `EventLoopGroup` if it was created by the client.
@@ -175,14 +179,16 @@ public class HTTPClient {
175179 switch self . eventLoopGroupProvider {
176180 case . shared:
177181 self . state = . shutDown
178- callback ( nil )
182+ queue. async {
183+ callback ( nil )
184+ }
179185 case . createNew:
180186 switch self . state {
181187 case . shuttingDown:
182188 self . state = . shutDown
183189 self . eventLoopGroup. shutdownGracefully ( queue: queue, callback)
184190 case . shutDown, . upAndRunning:
185- assertionFailure ( " The only valid state at this point is \( State . shutDown ) " )
191+ assertionFailure ( " The only valid state at this point is \( String ( describing : State . shuttingDown ) ) " )
186192 }
187193 }
188194 }
@@ -191,33 +197,35 @@ public class HTTPClient {
191197 private func shutdown( requiresCleanClose: Bool , queue: DispatchQueue , _ callback: @escaping ( Error ? ) -> Void ) {
192198 do {
193199 try self . stateLock. withLock {
194- if self . state != . upAndRunning {
200+ guard case . upAndRunning = self . state else {
195201 throw HTTPClientError . alreadyShutdown
196202 }
197- self . state = . shuttingDown
203+ self . state = . shuttingDown( requiresCleanClose : requiresCleanClose , callback : callback )
198204 }
199205 } catch {
200206 callback ( error)
201207 return
202208 }
203209
204- self . pool. close ( on: self . eventLoopGroup. next ( ) ) . whenComplete { result in
205- var closeError : Error ?
210+ let promise = self . eventLoopGroup. next ( ) . makePromise ( of: Bool . self)
211+ self . poolManager. shutdown ( promise: promise)
212+ promise. futureResult. whenComplete { result in
206213 switch result {
207- case . failure( let error) :
208- closeError = error
209- case . success( let cleanShutdown) :
210- if !cleanShutdown, requiresCleanClose {
211- closeError = HTTPClientError . uncleanShutdown
214+ case . failure:
215+ preconditionFailure ( " Shutting down the connection pool must not fail, ever. " )
216+ case . success( let unclean) :
217+ let ( callback, uncleanError) = self . stateLock. withLock { ( ) -> ( ( Error ? ) -> Void , Error ? ) in
218+ guard case . shuttingDown( let requiresClean, callback: let callback) = self . state else {
219+ preconditionFailure ( " Why did the pool manager shut down, if it was not instructed to " )
220+ }
221+
222+ let error : Error ? = ( requiresClean && unclean) ? HTTPClientError . uncleanShutdown : nil
223+ return ( callback, error)
212224 }
213225
214- self . shutdownEventLoop ( queue: queue) { eventLoopError in
215- // we prioritise .uncleanShutdown here
216- if let error = closeError {
217- callback ( error)
218- } else {
219- callback ( eventLoopError)
220- }
226+ self . shutdownEventLoop ( queue: queue) { error in
227+ let reportedError = error ?? uncleanError
228+ callback ( reportedError)
221229 }
222230 }
223231 }
@@ -492,7 +500,7 @@ public class HTTPClient {
492500 let taskEL : EventLoop
493501 switch eventLoopPreference. preference {
494502 case . indifferent:
495- taskEL = self . pool . associatedEventLoop ( for : ConnectionPool . Key ( request ) ) ?? self . eventLoopGroup. next ( )
503+ taskEL = self . eventLoopGroup. next ( )
496504 case . delegate( on: let eventLoop) :
497505 precondition ( self . eventLoopGroup. makeIterator ( ) . contains { $0 === eventLoop } , " Provided EventLoop must be part of clients EventLoopGroup. " )
498506 taskEL = eventLoop
@@ -540,75 +548,31 @@ public class HTTPClient {
540548 }
541549
542550 let task = Task < Delegate . Response > ( eventLoop: taskEL, logger: logger)
543- let setupComplete = taskEL. makePromise ( of: Void . self)
544- let connection = self . pool. getConnection ( request,
545- preference: eventLoopPreference,
546- taskEventLoop: taskEL,
547- deadline: deadline,
548- setupComplete: setupComplete. futureResult,
549- logger: logger)
550-
551- let taskHandler = TaskHandler ( task: task,
552- kind: request. kind,
553- delegate: delegate,
554- redirectHandler: redirectHandler,
555- ignoreUncleanSSLShutdown: self . configuration. ignoreUncleanSSLShutdown,
556- logger: logger)
557-
558- connection. flatMap { connection -> EventLoopFuture < Void > in
559- logger. debug ( " got connection for request " ,
560- metadata: [ " ahc-connection " : " \( connection) " ,
561- " ahc-request " : " \( request. method) \( request. url) " ,
562- " ahc-channel-el " : " \( connection. channel. eventLoop) " ,
563- " ahc-task-el " : " \( taskEL) " ] )
564-
565- let channel = connection. channel
566-
567- func prepareChannelForTask0( ) -> EventLoopFuture < Void > {
568- do {
569- let syncPipelineOperations = channel. pipeline. syncOperations
570-
571- if let timeout = self . resolve ( timeout: self . configuration. timeout. read, deadline: deadline) {
572- try syncPipelineOperations. addHandler ( IdleStateHandler ( readTimeout: timeout) )
573- }
574-
575- try syncPipelineOperations. addHandler ( taskHandler)
576- } catch {
577- connection. release ( closing: true , logger: logger)
578- return channel. eventLoop. makeFailedFuture ( error)
579- }
580-
581- task. setConnection ( connection)
551+ do {
552+ let requestBag = try RequestBag (
553+ request: request,
554+ eventLoopPreference: eventLoopPreference,
555+ task: task,
556+ redirectHandler: redirectHandler,
557+ connectionDeadline: . now( ) + ( self . configuration. timeout. connect ?? . seconds( 10 ) ) ,
558+ requestOptions: . fromClientConfiguration( self . configuration) ,
559+ delegate: delegate
560+ )
582561
583- let isCancelled = task. lock. withLock {
584- task. cancelled
562+ var deadlineSchedule : Scheduled < Void > ?
563+ if let deadline = deadline {
564+ deadlineSchedule = taskEL. scheduleTask ( deadline: deadline) {
565+ requestBag. fail ( HTTPClientError . deadlineExceeded)
585566 }
586567
587- if !isCancelled {
588- return channel. writeAndFlush ( request) . flatMapError { _ in
589- // At this point the `TaskHandler` will already be present
590- // to handle the failure and pass it to the `promise`
591- channel. eventLoop. makeSucceededVoidFuture ( )
592- }
593- } else {
594- return channel. eventLoop. makeSucceededVoidFuture ( )
568+ task. promise. futureResult. whenComplete { _ in
569+ deadlineSchedule? . cancel ( )
595570 }
596571 }
597572
598- if channel. eventLoop. inEventLoop {
599- return prepareChannelForTask0 ( )
600- } else {
601- return channel. eventLoop. flatSubmit {
602- return prepareChannelForTask0 ( )
603- }
604- }
605- } . always { _ in
606- setupComplete. succeed ( ( ) )
607- } . whenFailure { error in
608- taskHandler. callOutToDelegateFireAndForget { task in
609- delegate. didReceiveError ( task: task, error)
610- }
611- task. promise. fail ( error)
573+ self . poolManager. executeRequest ( requestBag)
574+ } catch {
575+ task. fail ( with: error, delegateType: Delegate . self)
612576 }
613577
614578 return task
@@ -821,7 +785,7 @@ public class HTTPClient {
821785
822786 enum State {
823787 case upAndRunning
824- case shuttingDown
788+ case shuttingDown( requiresCleanClose : Bool , callback : ( Error ? ) -> Void )
825789 case shutDown
826790 }
827791}
@@ -926,6 +890,7 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
926890 case serverOfferedUnsupportedApplicationProtocol( String )
927891 case requestStreamCancelled
928892 case getConnectionFromPoolTimeout
893+ case deadlineExceeded
929894 }
930895
931896 private var code : Code
@@ -995,6 +960,9 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
995960 return HTTPClientError ( code: . serverOfferedUnsupportedApplicationProtocol( proto) )
996961 }
997962
963+ /// The request deadline was exceeded. The request was cancelled because of this.
964+ public static let deadlineExceeded = HTTPClientError ( code: . deadlineExceeded)
965+
998966 /// The remote server responded with a status code >= 300, before the full request was sent. The request stream
999967 /// was therefore cancelled
1000968 public static let requestStreamCancelled = HTTPClientError ( code: . requestStreamCancelled)
0 commit comments