@@ -28,14 +28,13 @@ final class HTTPConnectionPool {
2828 private var _idleTimer = [ Connection . ID : Scheduled < Void > ] ( )
2929 /// The connection backoff timeout timers. Protected by the stateLock
3030 private var _backoffTimer = [ Connection . ID : Scheduled < Void > ] ( )
31+ /// The request connection timeout timers. Protected by the stateLock
32+ private var _requestTimer = [ Request . ID : Scheduled < Void > ] ( )
3133
3234 private static let fallbackConnectTimeout : TimeAmount = . seconds( 30 )
3335
3436 let key : ConnectionPool . Key
3537
36- private let timerLock = Lock ( )
37- private var _requestTimer = [ Request . ID : Scheduled < Void > ] ( )
38-
3938 private var logger : Logger
4039
4140 private let eventLoopGroup : EventLoopGroup
@@ -110,21 +109,64 @@ final class HTTPConnectionPool {
110109 }
111110 }
112111
112+ enum RequestAction {
113+ enum Unlocked {
114+ case executeRequest( Request , Connection )
115+ case executeRequests( [ Request ] , Connection )
116+ case failRequest( Request , Error )
117+ case failRequests( [ Request ] , Error )
118+ case none
119+ }
120+
121+ enum Locked {
122+ case scheduleRequestTimeout( for: Request , on: EventLoop )
123+ case cancelRequestTimeout( Request . ID )
124+ case cancelRequestTimeouts( [ Request ] )
125+ case none
126+ }
127+ }
128+
113129 struct Locked {
114130 var connection : ConnectionAction . Locked
131+ var request : RequestAction . Locked
115132 }
116133
117134 struct Unlocked {
118135 var connection : ConnectionAction . Unlocked
119- var request : StateMachine . RequestAction
136+ var request : RequestAction . Unlocked
120137 }
121138
122139 var locked : Locked
123140 var unlocked : Unlocked
124141
125142 init ( from stateMachineAction: StateMachine . Action ) {
126- self . locked = Locked ( connection: . none)
127- self . unlocked = Unlocked ( connection: . none, request: stateMachineAction. request)
143+ self . locked = Locked ( connection: . none, request: . none)
144+ self . unlocked = Unlocked ( connection: . none, request: . none)
145+
146+ switch stateMachineAction. request {
147+ case . cancelRequestTimeout( let requestID) :
148+ self . locked. request = . cancelRequestTimeout( requestID)
149+ case . executeRequest( let request, let connection, cancelTimeout: let cancelTimeout) :
150+ if cancelTimeout {
151+ self . locked. request = . cancelRequestTimeout( request. id)
152+ }
153+ self . unlocked. request = . executeRequest( request, connection)
154+ case . executeRequestsAndCancelTimeouts( let requests, let connection) :
155+ self . locked. request = . cancelRequestTimeouts( requests)
156+ self . unlocked. request = . executeRequests( requests, connection)
157+ case . failRequest( let request, let error, cancelTimeout: let cancelTimeout) :
158+ if cancelTimeout {
159+ self . locked. request = . cancelRequestTimeout( request. id)
160+ }
161+ self . unlocked. request = . failRequest( request, error)
162+ case . failRequestsAndCancelTimeouts( let requests, let error) :
163+ self . locked. request = . cancelRequestTimeouts( requests)
164+ self . unlocked. request = . failRequests( requests, error)
165+ case . scheduleRequestTimeout( for: let request, on: let eventLoop) :
166+ self . locked. request = . scheduleRequestTimeout( for: request, on: eventLoop)
167+ case . none:
168+ break
169+ }
128170
129171 switch stateMachineAction. connection {
130172 case . createConnection( let connectionID, on: let eventLoop) :
@@ -154,14 +196,15 @@ final class HTTPConnectionPool {
154196 let unlockedActions = self . stateLock. withLock { ( ) -> Actions . Unlocked in
155197 let stateMachineAction = closure ( & self . _state)
156198 let poolAction = Actions ( from: stateMachineAction)
157- self . runLockedActions ( poolAction. locked)
199+ self . runLockedConnectionAction ( poolAction. locked. connection)
200+ self . runLockedRequestAction ( poolAction. locked. request)
158201 return poolAction. unlocked
159202 }
160203 self . runUnlockedActions ( unlockedActions)
161204 }
162205
163- private func runLockedActions ( _ actions : Actions . Locked ) {
164- switch actions . connection {
206+ private func runLockedConnectionAction ( _ action : Actions . ConnectionAction . Locked ) {
207+ switch action {
165208 case . scheduleBackoffTimer( let connectionID, backoff: let backoff, on: let eventLoop) :
166209 self . scheduleConnectionStartBackoffTimer ( connectionID, backoff, on: eventLoop)
167210
@@ -181,6 +224,22 @@ final class HTTPConnectionPool {
181224 }
182225 }
183226
227+ private func runLockedRequestAction( _ action: Actions . RequestAction . Locked ) {
228+ switch action {
229+ case . scheduleRequestTimeout( for: let request, on: let eventLoop) :
230+ self . scheduleRequestTimeout ( request, on: eventLoop)
231+
232+ case . cancelRequestTimeout( let requestID) :
233+ self . cancelRequestTimeout ( requestID)
234+
235+ case . cancelRequestTimeouts( let requests) :
236+ requests. forEach { self . cancelRequestTimeout ( $0. id) }
237+
238+ case . none:
239+ break
240+ }
241+ }
242+
184243 private func runUnlockedActions( _ actions: Actions . Unlocked ) {
185244 self . runUnlockedConnectionAction ( actions. connection)
186245 self . runUnlockedRequestAction ( actions. request)
@@ -225,38 +284,20 @@ final class HTTPConnectionPool {
225284 }
226285 }
227286
228- private func runUnlockedRequestAction( _ action: StateMachine . RequestAction ) {
229- // The order of execution fail/execute request vs cancelling the request timeout timer does
230- // not matter in the actions here. The actions don't cause any side effects that will be
231- // reported back to the state machine and are not dependent on each other.
232-
287+ private func runUnlockedRequestAction( _ action: Actions . RequestAction . Unlocked ) {
233288 switch action {
234- case . executeRequest( let request, let connection, cancelTimeout: let cancelTimeout) :
235- if cancelTimeout {
236- self . cancelRequestTimeout ( request. id)
237- }
289+ case . executeRequest( let request, let connection) :
238290 connection. executeRequest ( request. req)
239291
240- case . executeRequestsAndCancelTimeouts( let requests, let connection) :
241- self . cancelRequestTimeouts ( requests)
292+ case . executeRequests( let requests, let connection) :
242293 requests. forEach { connection. executeRequest ( $0. req) }
243294
244- case . failRequest( let request, let error, cancelTimeout: let cancelTimeout) :
245- if cancelTimeout {
246- self . cancelRequestTimeout ( request. id)
247- }
295+ case . failRequest( let request, let error) :
248296 request. req. fail ( error)
249297
250- case . failRequestsAndCancelTimeouts( let requests, let error) :
251- self . cancelRequestTimeouts ( requests)
298+ case . failRequests( let requests, let error) :
252299 requests. forEach { $0. req. fail ( error) }
253300
254- case . scheduleRequestTimeout( let request, on: let eventLoop) :
255- self . scheduleRequestTimeout ( request, on: eventLoop)
256-
257- case . cancelRequestTimeout( let requestID) :
258- self . cancelRequestTimeout ( requestID)
259-
260301 case . none:
261302 break
262303 }
@@ -282,49 +323,29 @@ final class HTTPConnectionPool {
282323 private func scheduleRequestTimeout( _ request: Request , on eventLoop: EventLoop ) {
283324 let requestID = request. id
284325 let scheduled = eventLoop. scheduleTask ( deadline: request. connectionDeadline) {
285- // The timer has fired. Now we need to do a couple of things:
286- //
287- // 1. Remove ourselves from the timer dictionary to not leak any data. If our
288- // waiter entry still exists, we need to tell the state machine, that we want
289- // to fail the request.
290- let timeoutFired = self . timerLock. withLock {
291- self . _requestTimer. removeValue ( forKey: requestID) != nil
292- }
293-
294- // 2. If the entry did not exists anymore, we can assume that the request was
295- // scheduled on another connection. The timer still fired anyhow because of a
296- // race. In such a situation we don't need to do anything.
297- guard timeoutFired else { return }
298-
299- // 3. Tell the state machine about the timeout
300- self . modifyStateAndRunActions {
301- $0. timeoutRequest ( requestID)
326+ // there might be a race between a the timeout timer and the pool scheduling the
327+ // request on another thread.
328+ self . modifyStateAndRunActions { stateMachine in
329+ if self . _requestTimer. removeValue ( forKey: requestID) != nil {
330+ // The timer still exists. State Machines assumes it is alive. Inform state
331+ // machine.
332+ return stateMachine. timeoutRequest ( requestID)
333+ }
334+ return . none
302335 }
303336 }
304337
305- self . timerLock. withLockVoid {
306- assert ( self . _requestTimer [ requestID] == nil )
307- self . _requestTimer [ requestID] = scheduled
308- }
338+ assert ( self . _requestTimer [ requestID] == nil )
339+ self . _requestTimer [ requestID] = scheduled
309340
310341 request. req. requestWasQueued ( self )
311342 }
312343
313344 private func cancelRequestTimeout( _ id: Request . ID ) {
314- let scheduled = self . timerLock . withLock {
315- self . _requestTimer . removeValue ( forKey : id )
345+ guard let cancelTimer = self . _requestTimer . removeValue ( forKey : id ) else {
346+ preconditionFailure ( " Expected to have a timer for request \( id ) at this point. " )
316347 }
317-
318- scheduled? . cancel ( )
319- }
320-
321- private func cancelRequestTimeouts( _ requests: [ Request ] ) {
322- let scheduled = self . timerLock. withLock {
323- requests. compactMap {
324- self . _requestTimer. removeValue ( forKey: $0. id)
325- }
326- }
327- scheduled. forEach { $0. cancel ( ) }
348+ cancelTimer. cancel ( )
328349 }
329350
330351 private func scheduleIdleTimerForConnection( _ connectionID: Connection . ID , on eventLoop: EventLoop ) {
0 commit comments