@@ -6,6 +6,7 @@ local cjson = require "cjson.safe"
66local pl_stringx = require (" pl.stringx" )
77local date = require (" date" )
88local get_request_id = require (" kong.tracing.request_id" ).get
9+ local AWS_Stream = require (" kong.plugins.aws-lambda.aws_stream" )
910
1011local EMPTY = {}
1112
@@ -350,6 +351,115 @@ local function remove_array_mt_for_empty_table(tbl)
350351 return tbl
351352end
352353
354+ local HttpIntegrationResponseProcessor = {}
355+ function HttpIntegrationResponseProcessor :new ()
356+ local instance = {
357+ first_chunk_body = " " ,
358+ null_count = 0 ,
359+ prelude = " "
360+ }
361+ setmetatable (instance , {__index = HttpIntegrationResponseProcessor })
362+ return instance
363+ end
364+
365+ -- Update `ngx.status` and the parameter `headers` from the prelude string.
366+ -- Return error if any.
367+ function HttpIntegrationResponseProcessor :update_status_and_headers_from_prelude (headers )
368+ local p , err = cjson .decode (self .prelude )
369+ if err or not p then
370+ return error (err )
371+ end
372+ if p [" statusCode" ] then
373+ ngx .status = p [" statusCode" ]
374+ -- headers and cookies will only be applied if the statusCode is provided
375+ if p [" headers" ] then
376+ for k , v in pairs (p [" headers" ]) do
377+ headers [k ] = v
378+ end
379+ end
380+ if p [" cookies" ] then
381+ headers [" Set-Cookie" ] = p [" cookies" ]
382+ end
383+ end
384+ end
385+
386+ -- Process one message.
387+ -- Return error if any.
388+ function HttpIntegrationResponseProcessor :process_message (msg , headers )
389+ for _ , header in ipairs (msg .headers ) do
390+ if self .null_count == 8 then
391+ break
392+ end
393+ if header .key == " :event-type" and header .value == " PayloadChunk" then
394+ -- print(msg.body)
395+ for i = 1 , # msg .body do
396+ local c = msg .body :byte (i )
397+ if c == 0 then
398+ self .null_count = self .null_count + 1
399+ if self .null_count == 8 then
400+ local err = self :update_status_and_headers_from_prelude (headers )
401+ if err then
402+ return error (err )
403+ end
404+
405+ self .first_chunk_body = msg .body :sub (i + 1 )
406+ break
407+ end
408+ else
409+ self .prelude = self .prelude .. string.char (c )
410+ end
411+ end
412+ end
413+ end
414+ end
415+
416+
417+ function HttpIntegrationResponseProcessor :process (reader , buffer_size , headers )
418+ while self .null_count < 8 do
419+ local chunk , err = reader (buffer_size )
420+ if err then
421+ return nil , error (err )
422+ end
423+
424+ if chunk then
425+ -- the chunk is in `application/vnd.amazon.eventstream` formatted
426+ -- which is a binary format, we need to parse it
427+ local parser , err = AWS_Stream :new (chunk , false )
428+ if err or parser == nil then
429+ -- print("ERROR: ", err)
430+ return nil , error (err )
431+ end
432+
433+ while true do
434+ if self .null_count == 8 then
435+ break -- will jump to the end of the function
436+ end
437+
438+ local msg = parser :next_message ()
439+
440+ if not msg then
441+ break -- read again
442+ end
443+
444+ -- print(require("pl.pretty").write(msg))
445+ local err = self :process_message (msg , headers )
446+ if err then
447+ return nil , error (err )
448+ end
449+ end
450+ end
451+ end
452+
453+ return self .first_chunk_body , nil
454+ end
455+
456+ -- Process the http integration response.
457+ -- This will update `ngx.status` and the parameter `headers`.
458+ -- Return the first chunk body and error if any.
459+ local function process_http_integration_response (reader , buffer_size , headers )
460+ local processor = HttpIntegrationResponseProcessor :new ()
461+ return processor :process (reader , buffer_size , headers )
462+ end
353463
354464return {
355465 aws_serializer = aws_serializer ,
@@ -358,4 +468,5 @@ return {
358468 build_request_payload = build_request_payload ,
359469 extract_proxy_response = extract_proxy_response ,
360470 remove_array_mt_for_empty_table = remove_array_mt_for_empty_table ,
471+ process_http_integration_response = process_http_integration_response ,
361472}
0 commit comments