@@ -21,6 +21,7 @@ local invokeWithResponseStream = require "kong.plugins.aws-lambda.execute"
2121local build_request_payload = request_util .build_request_payload
2222local extract_proxy_response = request_util .extract_proxy_response
2323local remove_array_mt_for_empty_table = request_util .remove_array_mt_for_empty_table
24+ local cjson = require " cjson.safe"
2425
2526local aws = require (" resty.aws" )
2627local AWS_GLOBAL_CONFIG
@@ -196,20 +197,115 @@ local function invoke_streaming(conf, lambda_service)
196197 headers [VIA_HEADER ] = VIA_HEADER_VALUE
197198 end
198199
199- ngx .status = res .status
200- -- print("status" .. ngx.status)
200+ -- if error, set status and header, return error
201+ if res .status > 400 then
202+ ngx .status = res .status
203+ for k , v in pairs (headers ) do
204+ ngx .header [k ] = v
205+ -- print("Header [" .. k .. "] = " .. v)
206+ end
207+ return error (res .body .Message )
208+ end
209+
210+ -- not error, set default status and content type
211+ ngx .status = 200
212+ headers [" Content-Type" ] = " application/octet-stream"
213+
214+ -- record x-amzn-Remapped-Content-Type and clear it
215+ local is_http_integration_response = headers [" x-amzn-Remapped-Content-Type" ] == " application/vnd.awslambda.http-integration-response"
216+ headers [" x-amzn-Remapped-Content-Type" ] = nil
217+
218+ -- read from the body stream
219+ local reader = res .body_reader
220+ local buffer_size = 8192
221+
222+ -- for http integration response, parse the prelude just like Lambda Function Url
223+ local first_chunk_body = " "
224+ if is_http_integration_response then
225+ local null_count = 0
226+ local prelude = " "
227+ while null_count < 8 do
228+ local chunk , err = reader (buffer_size )
229+ if err then
230+ return error (err )
231+ end
232+
233+ if chunk then
234+ -- the chunk is in `application/vnd.amazon.eventstream` formatted
235+ -- which is a binary format, we need to parse it
236+ local parser , err = AWS_Stream :new (chunk , false )
237+ if err or parser == nil then
238+ -- print("ERROR: ", err)
239+ return error (err )
240+ end
241+
242+ while true do
243+ if null_count == 8 then
244+ break
245+ end
246+
247+ local msg = parser :next_message ()
248+
249+ if not msg then
250+ break
251+ end
252+
253+ -- print(require("pl.pretty").write(msg))
254+ for _ , header in ipairs (msg .headers ) do
255+ if null_count == 8 then
256+ break
257+ end
258+ if header .key == " :event-type" and header .value == " PayloadChunk" then
259+ -- print(msg.body)
260+ for i = 1 , # msg .body do
261+ local c = msg .body :byte (i )
262+ if c == 0 then
263+ null_count = null_count + 1
264+ if null_count == 8 then
265+ -- parse the prelude
266+ local p , err = cjson .decode (prelude )
267+ if err or not p then
268+ return error (err )
269+ end
270+ if p [" statusCode" ] then
271+ ngx .status = p [" statusCode" ]
272+ -- headers and cookies will only be applied if the statusCode is provided
273+ if p [" headers" ] then
274+ for k , v in pairs (p [" headers" ]) do
275+ headers [k ] = v
276+ end
277+ end
278+ if p [" cookies" ] then
279+ headers [" Set-Cookie" ] = p [" cookies" ]
280+ end
281+ end
282+
283+ first_chunk_body = msg .body :sub (i + 1 )
284+ break
285+ end
286+ else
287+ prelude = prelude .. string.char (c )
288+ end
289+ end
290+ end
291+ end
292+ end
293+ end
294+ end
295+ end
296+
297+ -- set headers to ngx
201298 for k , v in pairs (headers ) do
202299 ngx .header [k ] = v
203300 -- print("Header [" .. k .. "] = " .. v)
204301 end
205302
206- if ngx .status > 400 then
207- return error (res .body .Message )
303+ -- send the first chunk if any
304+ if first_chunk_body ~= " " then
305+ ngx .print (first_chunk_body )
306+ ngx .flush (true )
208307 end
209308
210- -- read from the body stream
211- local reader = res .body_reader
212- local buffer_size = 8192
213309 repeat
214310 local chunk , err = reader (buffer_size )
215311 if err then
@@ -238,7 +334,6 @@ local function invoke_streaming(conf, lambda_service)
238334 -- print(msg.body)
239335 ngx .print (msg .body )
240336 ngx .flush (true )
241- -- TODO: identify `application/vnd.awslambda.http-integration-response` just like Lambda Function URL
242337 end
243338 end
244339 end
0 commit comments