@@ -18,7 +18,8 @@ import NIOCore
1818import NIOHTTP1
1919import NIOSSL
2020
21- final class RequestBag < Delegate: HTTPClientResponseDelegate > {
21+ @preconcurrency
22+ final class RequestBag < Delegate: HTTPClientResponseDelegate & Sendable > : Sendable {
2223 /// Defends against the call stack getting too large when consuming body parts.
2324 ///
2425 /// If the response body comes in lots of tiny chunks, we'll deliver those tiny chunks to users
@@ -35,16 +36,23 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
3536 }
3637
3738 private let delegate : Delegate
38- private var request : HTTPClient . Request
3939
40- // the request state is synchronized on the task eventLoop
41- private var state : StateMachine
42-
43- // the consume body part stack depth is synchronized on the task event loop.
44- private var consumeBodyPartStackDepth : Int
40+ struct LoopBoundState : @unchecked Sendable {
41+ // The 'StateMachine' *isn't* Sendable (it holds various objects which aren't). This type
42+ // needs to be sendable so that we can construct a loop bound box off of the event loop
43+ // to hold this state and then subsequently only access it from the event loop. This needs
44+ // to happen so that the request bag can be constructed off of the event loop. If it's
45+ // constructed on the event loop then there's a timing window between users issuing
46+ // a request and calling shutdown where the underlying pool doesn't know about the request
47+ // so the shutdown call may cancel it.
48+ var request : HTTPClient . Request
49+ var state : StateMachine
50+ var consumeBodyPartStackDepth : Int
51+ // if a redirect occurs, we store the task for it so we can propagate cancellation
52+ var redirectTask : HTTPClient . Task < Delegate . Response > ? = nil
53+ }
4554
46- // if a redirect occurs, we store the task for it so we can propagate cancellation
47- private var redirectTask : HTTPClient . Task < Delegate . Response > ? = nil
55+ private let loopBoundState : NIOLoopBoundBox < LoopBoundState >
4856
4957 // MARK: HTTPClientTask properties
5058
@@ -61,6 +69,8 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
6169
6270 let eventLoopPreference : HTTPClient . EventLoopPreference
6371
72+ let tlsConfiguration : TLSConfiguration ?
73+
6474 init (
6575 request: HTTPClient . Request ,
6676 eventLoopPreference: HTTPClient . EventLoopPreference ,
@@ -73,9 +83,13 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
7383 self . poolKey = . init( request, dnsOverride: requestOptions. dnsOverride)
7484 self . eventLoopPreference = eventLoopPreference
7585 self . task = task
76- self . state = . init( redirectHandler: redirectHandler)
77- self . consumeBodyPartStackDepth = 0
78- self . request = request
86+
87+ let loopBoundState = LoopBoundState (
88+ request: request,
89+ state: StateMachine ( redirectHandler: redirectHandler) ,
90+ consumeBodyPartStackDepth: 0
91+ )
92+ self . loopBoundState = NIOLoopBoundBox . makeBoxSendingValue ( loopBoundState, eventLoop: task. eventLoop)
7993 self . connectionDeadline = connectionDeadline
8094 self . requestOptions = requestOptions
8195 self . delegate = delegate
@@ -84,6 +98,8 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
8498 self . requestHead = head
8599 self . requestFramingMetadata = metadata
86100
101+ self . tlsConfiguration = request. tlsConfiguration
102+
87103 self . task. taskDelegate = self
88104 self . task. futureResult. whenComplete { _ in
89105 self . task. taskDelegate = nil
@@ -92,16 +108,13 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
92108
93109 private func requestWasQueued0( _ scheduler: HTTPRequestScheduler ) {
94110 self . logger. debug ( " Request was queued (waiting for a connection to become available) " )
95-
96- self . task. eventLoop. assertInEventLoop ( )
97- self . state. requestWasQueued ( scheduler)
111+ self . loopBoundState. value. state. requestWasQueued ( scheduler)
98112 }
99113
100114 // MARK: - Request -
101115
102116 private func willExecuteRequest0( _ executor: HTTPRequestExecutor ) {
103- self . task. eventLoop. assertInEventLoop ( )
104- let action = self . state. willExecuteRequest ( executor)
117+ let action = self . loopBoundState. value. state. willExecuteRequest ( executor)
105118 switch action {
106119 case . cancelExecuter( let executor) :
107120 executor. cancelRequest ( self )
@@ -115,26 +128,22 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
115128 }
116129
117130 private func requestHeadSent0( ) {
118- self . task. eventLoop. assertInEventLoop ( )
119-
120131 self . delegate. didSendRequestHead ( task: self . task, self . requestHead)
121132
122- if self . request. body == nil {
133+ if self . loopBoundState . value . request. body == nil {
123134 self . delegate. didSendRequest ( task: self . task)
124135 }
125136 }
126137
127138 private func resumeRequestBodyStream0( ) {
128- self . task. eventLoop. assertInEventLoop ( )
129-
130- let produceAction = self . state. resumeRequestBodyStream ( )
139+ let produceAction = self . loopBoundState. value. state. resumeRequestBodyStream ( )
131140
132141 switch produceAction {
133142 case . startWriter:
134- guard let body = self . request. body else {
143+ guard let body = self . loopBoundState . value . request. body else {
135144 preconditionFailure ( " Expected to have a body, if the `HTTPRequestStateMachine` resume a request stream " )
136145 }
137- self . request. body = nil
146+ self . loopBoundState . value . request. body = nil
138147
139148 let writer = HTTPClient . Body. StreamWriter {
140149 self . writeNextRequestPart ( $0)
@@ -153,9 +162,7 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
153162 }
154163
155164 private func pauseRequestBodyStream0( ) {
156- self . task. eventLoop. assertInEventLoop ( )
157-
158- self . state. pauseRequestBodyStream ( )
165+ self . loopBoundState. value. state. pauseRequestBodyStream ( )
159166 }
160167
161168 private func writeNextRequestPart( _ part: IOData ) -> EventLoopFuture < Void > {
@@ -169,9 +176,7 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
169176 }
170177
171178 private func writeNextRequestPart0( _ part: IOData ) -> EventLoopFuture < Void > {
172- self . eventLoop. assertInEventLoop ( )
173-
174- let action = self . state. writeNextRequestPart ( part, taskEventLoop: self . task. eventLoop)
179+ let action = self . loopBoundState. value. state. writeNextRequestPart ( part, taskEventLoop: self . task. eventLoop)
175180
176181 switch action {
177182 case . failTask( let error) :
@@ -193,9 +198,7 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
193198 }
194199
195200 private func finishRequestBodyStream( _ result: Result < Void , Error > ) {
196- self . task. eventLoop. assertInEventLoop ( )
197-
198- let action = self . state. finishRequestBodyStream ( result)
201+ let action = self . loopBoundState. value. state. finishRequestBodyStream ( result)
199202
200203 switch action {
201204 case . none:
@@ -226,20 +229,22 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
226229 // MARK: - Response -
227230
228231 private func receiveResponseHead0( _ head: HTTPResponseHead ) {
229- self . task. eventLoop. assertInEventLoop ( )
230-
231- self . delegate. didVisitURL ( task: self . task, self . request, head)
232+ self . delegate. didVisitURL ( task: self . task, self . loopBoundState. value. request, head)
232233
233234 // runs most likely on channel eventLoop
234- switch self . state. receiveResponseHead ( head) {
235+ switch self . loopBoundState . value . state. receiveResponseHead ( head) {
235236 case . none:
236237 break
237238
238239 case . signalBodyDemand( let executor) :
239240 executor. demandResponseBodyStream ( self )
240241
241242 case . redirect( let executor, let handler, let head, let newURL) :
242- self . redirectTask = handler. redirect ( status: head. status, to: newURL, promise: self . task. promise)
243+ self . loopBoundState. value. redirectTask = handler. redirect (
244+ status: head. status,
245+ to: newURL,
246+ promise: self . task. promise
247+ )
243248 executor. cancelRequest ( self )
244249
245250 case . forwardResponseHead( let head) :
@@ -253,17 +258,19 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
253258 }
254259
255260 private func receiveResponseBodyParts0( _ buffer: CircularBuffer < ByteBuffer > ) {
256- self . task. eventLoop. assertInEventLoop ( )
257-
258- switch self . state. receiveResponseBodyParts ( buffer) {
261+ switch self . loopBoundState. value. state. receiveResponseBodyParts ( buffer) {
259262 case . none:
260263 break
261264
262265 case . signalBodyDemand( let executor) :
263266 executor. demandResponseBodyStream ( self )
264267
265268 case . redirect( let executor, let handler, let head, let newURL) :
266- self . redirectTask = handler. redirect ( status: head. status, to: newURL, promise: self . task. promise)
269+ self . loopBoundState. value. redirectTask = handler. redirect (
270+ status: head. status,
271+ to: newURL,
272+ promise: self . task. promise
273+ )
267274 executor. cancelRequest ( self )
268275
269276 case . forwardResponsePart( let part) :
@@ -277,8 +284,7 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
277284 }
278285
279286 private func succeedRequest0( _ buffer: CircularBuffer < ByteBuffer > ? ) {
280- self . task. eventLoop. assertInEventLoop ( )
281- let action = self . state. succeedRequest ( buffer)
287+ let action = self . loopBoundState. value. state. succeedRequest ( buffer)
282288
283289 switch action {
284290 case . none:
@@ -299,13 +305,15 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
299305 }
300306
301307 case . redirect( let handler, let head, let newURL) :
302- self . redirectTask = handler. redirect ( status: head. status, to: newURL, promise: self . task. promise)
308+ self . loopBoundState. value. redirectTask = handler. redirect (
309+ status: head. status,
310+ to: newURL,
311+ promise: self . task. promise
312+ )
303313 }
304314 }
305315
306316 private func consumeMoreBodyData0( resultOfPreviousConsume result: Result < Void , Error > ) {
307- self . task. eventLoop. assertInEventLoop ( )
308-
309317 // We get defensive here about the maximum stack depth. It's possible for the `didReceiveBodyPart`
310318 // future to be returned to us completed. If it is, we will recurse back into this method. To
311319 // break that recursion we have a max stack depth which we increment and decrement in this method:
@@ -316,24 +324,27 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
316324 // that risk ending up in this loop. That's because we don't need an accurate count: our limit is
317325 // a best-effort target anyway, one stack frame here or there does not put us at risk. We're just
318326 // trying to prevent ourselves looping out of control.
319- self . consumeBodyPartStackDepth += 1
327+ self . loopBoundState . value . consumeBodyPartStackDepth += 1
320328 defer {
321- self . consumeBodyPartStackDepth -= 1
322- assert ( self . consumeBodyPartStackDepth >= 0 )
329+ self . loopBoundState . value . consumeBodyPartStackDepth -= 1
330+ assert ( self . loopBoundState . value . consumeBodyPartStackDepth >= 0 )
323331 }
324332
325- let consumptionAction = self . state. consumeMoreBodyData ( resultOfPreviousConsume: result)
333+ let consumptionAction = self . loopBoundState. value. state. consumeMoreBodyData (
334+ resultOfPreviousConsume: result
335+ )
326336
327337 switch consumptionAction {
328338 case . consume( let byteBuffer) :
329339 self . delegate. didReceiveBodyPart ( task: self . task, byteBuffer)
330340 . hop ( to: self . task. eventLoop)
341+ . assumeIsolated ( )
331342 . whenComplete { result in
332- if self . consumeBodyPartStackDepth < Self . maxConsumeBodyPartStackDepth {
343+ if self . loopBoundState . value . consumeBodyPartStackDepth < Self . maxConsumeBodyPartStackDepth {
333344 self . consumeMoreBodyData0 ( resultOfPreviousConsume: result)
334345 } else {
335346 // We need to unwind the stack, let's take a break.
336- self . task. eventLoop. execute {
347+ self . task. eventLoop. assumeIsolated ( ) . execute {
337348 self . consumeMoreBodyData0 ( resultOfPreviousConsume: result)
338349 }
339350 }
@@ -344,7 +355,7 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
344355 case . finishStream:
345356 do {
346357 let response = try self . delegate. didFinishRequest ( task: self . task)
347- self . task. promise. succeed ( response)
358+ self . task. promise. assumeIsolated ( ) . succeed ( response)
348359 } catch {
349360 self . task. promise. fail ( error)
350361 }
@@ -358,13 +369,11 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
358369 }
359370
360371 private func fail0( _ error: Error ) {
361- self . task. eventLoop. assertInEventLoop ( )
362-
363- let action = self . state. fail ( error)
372+ let action = self . loopBoundState. value. state. fail ( error)
364373
365374 self . executeFailAction0 ( action)
366375
367- self . redirectTask? . fail ( reason: error)
376+ self . loopBoundState . value . redirectTask? . fail ( reason: error)
368377 }
369378
370379 private func executeFailAction0( _ action: RequestBag < Delegate > . StateMachine . FailAction ) {
@@ -381,8 +390,7 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
381390 }
382391
383392 func deadlineExceeded0( ) {
384- self . task. eventLoop. assertInEventLoop ( )
385- let action = self . state. deadlineExceeded ( )
393+ let action = self . loopBoundState. value. state. deadlineExceeded ( )
386394
387395 switch action {
388396 case . cancelScheduler( let scheduler) :
@@ -404,9 +412,6 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
404412}
405413
406414extension RequestBag : HTTPSchedulableRequest , HTTPClientTaskDelegate {
407- var tlsConfiguration : TLSConfiguration ? {
408- self . request. tlsConfiguration
409- }
410415
411416 func requestWasQueued( _ scheduler: HTTPRequestScheduler ) {
412417 if self . task. eventLoop. inEventLoop {
0 commit comments