22//
33// This source file is part of the SwiftAWSLambdaRuntime open source project
44//
5- // Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors
5+ // Copyright (c) 2017-2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors
66// Licensed under Apache License v2.0
77//
88// See LICENSE.txt for license information
@@ -97,9 +97,16 @@ internal final class HTTPClient {
9797 private func connect( ) -> EventLoopFuture < Channel > {
9898 let bootstrap = ClientBootstrap ( group: self . eventLoop)
9999 . channelInitializer { channel in
100- channel. pipeline. addHTTPClientHandlers ( ) . flatMap {
101- channel. pipeline. addHandlers ( [ HTTPHandler ( keepAlive: self . configuration. keepAlive) ,
102- UnaryHandler ( keepAlive: self . configuration. keepAlive) ] )
100+ do {
101+ try channel. pipeline. syncOperations. addHTTPClientHandlers ( )
102+ // Lambda quotas... An invocation payload is maximal 6MB in size:
103+ // https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
104+ try channel. pipeline. syncOperations. addHandler (
105+ NIOHTTPClientResponseAggregator ( maxContentLength: 6 * 1024 * 1024 ) )
106+ try channel. pipeline. syncOperations. addHandler ( LambdaChannelHandler ( ) )
107+ return channel. eventLoop. makeSucceededFuture ( ( ) )
108+ } catch {
109+ return channel. eventLoop. makeFailedFuture ( error)
103110 }
104111 }
105112
@@ -131,10 +138,10 @@ internal final class HTTPClient {
131138 }
132139
133140 internal struct Response : Equatable {
134- public var version : HTTPVersion
135- public var status : HTTPResponseStatus
136- public var headers : HTTPHeaders
137- public var body : ByteBuffer ?
141+ var version : HTTPVersion
142+ var status : HTTPResponseStatus
143+ var headers : HTTPHeaders
144+ var body : ByteBuffer ?
138145 }
139146
140147 internal enum Errors : Error {
@@ -149,133 +156,77 @@ internal final class HTTPClient {
149156 }
150157}
151158
152- private final class HTTPHandler : ChannelDuplexHandler {
153- typealias OutboundIn = HTTPClient . Request
154- typealias InboundOut = HTTPClient . Response
155- typealias InboundIn = HTTPClientResponsePart
159+ // no need in locks since we validate only one request can run at a time
160+ private final class LambdaChannelHandler : ChannelDuplexHandler {
161+ typealias InboundIn = NIOHTTPClientResponseFull
162+ typealias OutboundIn = HTTPRequestWrapper
156163 typealias OutboundOut = HTTPClientRequestPart
157164
158- private let keepAlive : Bool
159- private var readState : ReadState = . idle
160-
161- init ( keepAlive: Bool ) {
162- self . keepAlive = keepAlive
163- }
164-
165- func write( context: ChannelHandlerContext , data: NIOAny , promise: EventLoopPromise < Void > ? ) {
166- let request = unwrapOutboundIn ( data)
167-
168- var head = HTTPRequestHead ( version: . init( major: 1 , minor: 1 ) , method: request. method, uri: request. url, headers: request. headers)
169- head. headers. add ( name: " host " , value: request. targetHost)
170- switch request. method {
171- case . POST, . PUT:
172- head. headers. add ( name: " content-length " , value: String ( request. body? . readableBytes ?? 0 ) )
173- default :
174- break
175- }
176-
177- // We don't add a "Connection" header here if we want to keep the connection open,
178- // HTTP/1.1 defines specifies the following in RFC 2616, Section 8.1.2.1:
179- //
180- // An HTTP/1.1 server MAY assume that a HTTP/1.1 client intends to
181- // maintain a persistent connection unless a Connection header including
182- // the connection-token "close" was sent in the request. If the server
183- // chooses to close the connection immediately after sending the
184- // response, it SHOULD send a Connection header including the
185- // connection-token close.
186- //
187- // See also UnaryHandler.channelRead below.
188- if !self . keepAlive {
189- head. headers. add ( name: " connection " , value: " close " )
190- }
191-
192- context. write ( self . wrapOutboundOut ( HTTPClientRequestPart . head ( head) ) ) . flatMap { _ -> EventLoopFuture < Void > in
193- if let body = request. body {
194- return context. writeAndFlush ( self . wrapOutboundOut ( HTTPClientRequestPart . body ( . byteBuffer( body) ) ) )
195- } else {
196- context. flush ( )
197- return context. eventLoop. makeSucceededFuture ( ( ) )
198- }
199- } . cascade ( to: promise)
200- }
201-
202- func channelRead( context: ChannelHandlerContext , data: NIOAny ) {
203- let response = unwrapInboundIn ( data)
204-
205- switch response {
206- case . head( let head) :
207- guard case . idle = self . readState else {
208- preconditionFailure ( " invalid read state \( self . readState) " )
209- }
210- self . readState = . head( head)
211- case . body( var bodyPart) :
212- switch self . readState {
213- case . head( let head) :
214- self . readState = . body( head, bodyPart)
215- case . body( let head, var body) :
216- body. writeBuffer ( & bodyPart)
217- self . readState = . body( head, body)
218- default :
219- preconditionFailure ( " invalid read state \( self . readState) " )
220- }
221- case . end:
222- switch self . readState {
223- case . head( let head) :
224- context. fireChannelRead ( wrapInboundOut ( HTTPClient . Response ( version: head. version, status: head. status, headers: head. headers, body: nil ) ) )
225- self . readState = . idle
226- case . body( let head, let body) :
227- context. fireChannelRead ( wrapInboundOut ( HTTPClient . Response ( version: head. version, status: head. status, headers: head. headers, body: body) ) )
228- self . readState = . idle
229- default :
230- preconditionFailure ( " invalid read state \( self . readState) " )
231- }
232- }
233- }
234-
235- private enum ReadState {
165+ enum State {
236166 case idle
237- case head ( HTTPResponseHead )
238- case body ( HTTPResponseHead , ByteBuffer )
167+ case running ( promise : EventLoopPromise < HTTPClient . Response > , timeout : Scheduled < Void > ? )
168+ case waitForConnectionClose ( HTTPClient . Response , EventLoopPromise < HTTPClient . Response > )
239169 }
240- }
241-
242- // no need in locks since we validate only one request can run at a time
243- private final class UnaryHandler : ChannelDuplexHandler {
244- typealias OutboundIn = HTTPRequestWrapper
245- typealias InboundIn = HTTPClient . Response
246- typealias OutboundOut = HTTPClient . Request
247-
248- private let keepAlive : Bool
249170
250- private var pending : ( promise : EventLoopPromise < HTTPClient . Response > , timeout : Scheduled < Void > ? ) ?
171+ private var state : State = . idle
251172 private var lastError : Error ?
252173
253- init ( keepAlive: Bool ) {
254- self . keepAlive = keepAlive
255- }
174+ init ( ) { }
256175
257176 func write( context: ChannelHandlerContext , data: NIOAny , promise: EventLoopPromise < Void > ? ) {
258- guard self . pending == nil else {
177+ guard case . idle = self . state else {
259178 preconditionFailure ( " invalid state, outstanding request " )
260179 }
261180 let wrapper = unwrapOutboundIn ( data)
181+
182+ var head = HTTPRequestHead (
183+ version: . http1_1,
184+ method: wrapper. request. method,
185+ uri: wrapper. request. url,
186+ headers: wrapper. request. headers
187+ )
188+ head. headers. add ( name: " host " , value: wrapper. request. targetHost)
189+ switch head. method {
190+ case . POST, . PUT:
191+ head. headers. add ( name: " content-length " , value: String ( wrapper. request. body? . readableBytes ?? 0 ) )
192+ default :
193+ break
194+ }
195+
262196 let timeoutTask = wrapper. request. timeout. map {
263197 context. eventLoop. scheduleTask ( in: $0) {
264- if self . pending != nil {
265- context . pipeline . fireErrorCaught ( HTTPClient . Errors . timeout )
198+ guard case . running = self . state else {
199+ preconditionFailure ( " invalid state " )
266200 }
201+
202+ context. pipeline. fireErrorCaught ( HTTPClient . Errors. timeout)
267203 }
268204 }
269- self . pending = ( promise: wrapper. promise, timeout: timeoutTask)
270- context. writeAndFlush ( wrapOutboundOut ( wrapper. request) , promise: promise)
205+ self . state = . running( promise: wrapper. promise, timeout: timeoutTask)
206+
207+ context. write ( wrapOutboundOut ( . head( head) ) , promise: nil )
208+ if let body = wrapper. request. body {
209+ context. write ( wrapOutboundOut ( . body( IOData . byteBuffer ( body) ) ) , promise: nil )
210+ }
211+ context. writeAndFlush ( wrapOutboundOut ( . end( nil ) ) , promise: promise)
271212 }
272213
273214 func channelRead( context: ChannelHandlerContext , data: NIOAny ) {
274- let response = unwrapInboundIn ( data)
275- guard let pending = self . pending else {
215+ guard case . running( let promise, let timeout) = self . state else {
276216 preconditionFailure ( " invalid state, no pending request " )
277217 }
278218
219+ let response = unwrapInboundIn ( data)
220+
221+ let httpResponse = HTTPClient . Response (
222+ version: response. head. version,
223+ status: response. head. status,
224+ headers: response. head. headers,
225+ body: response. body
226+ )
227+
228+ timeout? . cancel ( )
229+
279230 // As defined in RFC 7230 Section 6.3:
280231 // HTTP/1.1 defaults to the use of "persistent connections", allowing
281232 // multiple requests and responses to be carried over a single
@@ -285,14 +236,31 @@ private final class UnaryHandler: ChannelDuplexHandler {
285236 //
286237 // That's why we only assume the connection shall be closed if we receive
287238 // a "connection = close" header.
288- let serverCloseConnection = response. headers. first ( name: " connection " ) ? . lowercased ( ) == " close "
289-
290- if !self . keepAlive || serverCloseConnection || response. version != . init( major: 1 , minor: 1 ) {
291- pending. promise. futureResult. whenComplete { _ in
292- _ = context. channel. close ( )
293- }
239+ let serverCloseConnection =
240+ response. head. headers [ " connection " ] . contains ( where: { $0. lowercased ( ) == " close " } )
241+
242+ let closeConnection = serverCloseConnection || response. head. version != . http1_1
243+
244+ if closeConnection {
245+ // If we were succeeding the request promise here directly and closing the connection
246+ // after succeeding the promise we may run into a race condition:
247+ //
248+ // The lambda runtime will ask for the next work item directly after a succeeded post
249+ // response request. The desire for the next work item might be faster than the attempt
250+ // to close the connection. This will lead to a situation where we try to the connection
251+ // but the next request has already been scheduled on the connection that we want to
252+ // close. For this reason we postpone succeeding the promise until the connection has
253+ // been closed. This codepath will only be hit in the very, very unlikely event of the
254+ // Lambda control plane demanding to close connection. (It's more or less only
255+ // implemented to support http1.1 correctly.) This behavior is ensured with the test
256+ // `LambdaTest.testNoKeepAliveServer`.
257+ self . state = . waitForConnectionClose( httpResponse, promise)
258+ _ = context. channel. close ( )
259+ return
260+ } else {
261+ self . state = . idle
262+ promise. succeed ( httpResponse)
294263 }
295- self . completeWith ( . success( response) )
296264 }
297265
298266 func errorCaught( context: ChannelHandlerContext , error: Error ) {
@@ -303,36 +271,44 @@ private final class UnaryHandler: ChannelDuplexHandler {
303271
304272 func channelInactive( context: ChannelHandlerContext ) {
305273 // fail any pending responses with last error or assume peer disconnected
306- if self . pending != nil {
307- let error = self . lastError ?? HTTPClient . Errors. connectionResetByPeer
308- self . completeWith ( . failure( error) )
309- }
310274 context. fireChannelInactive ( )
275+
276+ switch self . state {
277+ case . idle:
278+ break
279+ case . running( let promise, let timeout) :
280+ self . state = . idle
281+ timeout? . cancel ( )
282+ promise. fail ( self . lastError ?? HTTPClient . Errors. connectionResetByPeer)
283+
284+ case . waitForConnectionClose( let response, let promise) :
285+ self . state = . idle
286+ promise. succeed ( response)
287+ }
311288 }
312289
313290 func triggerUserOutboundEvent( context: ChannelHandlerContext , event: Any , promise: EventLoopPromise < Void > ? ) {
314291 switch event {
315292 case is RequestCancelEvent :
316- if self . pending != nil {
317- self . completeWith ( . failure( HTTPClient . Errors. cancelled) )
293+ switch self . state {
294+ case . idle:
295+ break
296+ case . running( let promise, let timeout) :
297+ self . state = . idle
298+ timeout? . cancel ( )
299+ promise. fail ( HTTPClient . Errors. cancelled)
300+
318301 // after the cancel error has been send, we want to close the connection so
319302 // that no more packets can be read on this connection.
320303 _ = context. channel. close ( )
304+ case . waitForConnectionClose( _, let promise) :
305+ self . state = . idle
306+ promise. fail ( HTTPClient . Errors. cancelled)
321307 }
322308 default :
323309 context. triggerUserOutboundEvent ( event, promise: promise)
324310 }
325311 }
326-
327- private func completeWith( _ result: Result < HTTPClient . Response , Error > ) {
328- guard let pending = self . pending else {
329- preconditionFailure ( " invalid state, no pending request " )
330- }
331- self . pending = nil
332- self . lastError = nil
333- pending. timeout? . cancel ( )
334- pending. promise. completeWith ( result)
335- }
336312}
337313
338314private struct HTTPRequestWrapper {
0 commit comments