Skip to content

Commit f5a2877

Browse files
committed
feat(plugin/aws-lambda): support response streaming
1 parent 7c0a331 commit f5a2877

File tree

3 files changed

+536
-76
lines changed

3 files changed

+536
-76
lines changed
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
-- This file is a workaround to support response streaming for AWS Lambda.
2+
-- This file is copied from https://github.com/Kong/kong/blob/d875bacf80d3ff2657b33d7cb8863142ca1083a2/kong/tools/aws_stream.lua
3+
-- Thus this file should be removed when the resty-aws library is ready to support response streaming.
4+
-- See https://github.com/Kong/lua-resty-aws/issues/117 for more details.
5+
-- TODO: remove this file.
6+
7+
8+
--- Stream class.
9+
-- Decodes AWS response-stream types, currently application/vnd.amazon.eventstream
10+
-- @classmod Stream
11+
12+
local buf = require("string.buffer")
13+
local to_hex = require("resty.string").to_hex
14+
15+
local Stream = {}
16+
Stream.__index = Stream
17+
18+
19+
local _HEADER_EXTRACTORS = {
20+
-- bool true
21+
[0] = function(stream)
22+
return true, 0
23+
end,
24+
25+
-- bool false
26+
[1] = function(stream)
27+
return false, 0
28+
end,
29+
30+
-- string type
31+
[7] = function(stream)
32+
local header_value_len = stream:next_int(16)
33+
return stream:next_utf_8(header_value_len), header_value_len + 2 -- add the 2 bits read for the length
34+
end,
35+
36+
-- TODO ADD THE REST OF THE DATA TYPES
37+
-- EVEN THOUGH THEY'RE NOT REALLY USED
38+
}
39+
40+
--- Constructor.
41+
-- @function aws:Stream
42+
-- @param chunk string complete AWS response stream chunk for decoding
43+
-- @param is_hex boolean specify if the chunk bytes are already decoded to hex
44+
-- @usage
45+
-- local stream_parser = stream:new("00000120af0310f.......", true)
46+
-- local next, err = stream_parser:next_message()
47+
function Stream:new(chunk, is_hex)
48+
local self = {} -- override 'self' to be the new object/class
49+
setmetatable(self, Stream)
50+
51+
if #chunk < ((is_hex and 32) or 16) then
52+
return nil, "cannot parse a chunk less than 16 bytes long"
53+
end
54+
55+
self.read_count = 0
56+
self.chunk = buf.new()
57+
self.chunk:put((is_hex and chunk) or to_hex(chunk))
58+
59+
return self
60+
end
61+
62+
--- return the next `count` ascii bytes from the front of the chunk
63+
--- and then trims the chunk of those bytes
64+
-- @param count number whole utf-8 bytes to return
65+
-- @return string resulting utf-8 string
66+
function Stream:next_utf_8(count)
67+
local utf_bytes = self:next_bytes(count)
68+
69+
local ascii_string = ""
70+
for i = 1, #utf_bytes, 2 do
71+
local hex_byte = utf_bytes:sub(i, i + 1)
72+
local ascii_byte = string.char(tonumber(hex_byte, 16))
73+
ascii_string = ascii_string .. ascii_byte
74+
end
75+
return ascii_string
76+
end
77+
78+
--- returns the next `count` bytes from the front of the chunk
79+
--- and then trims the chunk of those bytes
80+
-- @param count number whole integer of bytes to return
81+
-- @return string hex-encoded next `count` bytes
82+
function Stream:next_bytes(count)
83+
if not self.chunk then
84+
return nil, "function cannot be called on its own - initialise a chunk reader with :new(chunk)"
85+
end
86+
87+
local bytes = self.chunk:get(count * 2)
88+
self.read_count = (count) + self.read_count
89+
90+
return bytes
91+
end
92+
93+
--- returns the next unsigned int from the front of the chunk
94+
--- and then trims the chunk of those bytes
95+
-- @param size integer bit length (8, 16, 32, etc)
96+
-- @return number whole integer of size specified
97+
-- @return string the original bytes, for reference/checksums
98+
function Stream:next_int(size)
99+
if not self.chunk then
100+
return nil, nil, "function cannot be called on its own - initialise a chunk reader with :new(chunk)"
101+
end
102+
103+
if size < 8 then
104+
return nil, nil, "cannot work on integers smaller than 8 bits long"
105+
end
106+
107+
local int, err = self:next_bytes(size / 8, trim)
108+
if err then
109+
return nil, nil, err
110+
end
111+
112+
return tonumber(int, 16), int
113+
end
114+
115+
--- returns the next message in the chunk, as a table.
116+
--- can be used as an iterator.
117+
-- @return table formatted next message from the given constructor chunk
118+
function Stream:next_message()
119+
if not self.chunk then
120+
return nil, "function cannot be called on its own - initialise a chunk reader with :new(chunk)"
121+
end
122+
123+
if #self.chunk < 1 then
124+
return false
125+
end
126+
127+
-- get the message length and pull that many bytes
128+
--
129+
-- this is a chicken and egg problem, because we need to
130+
-- read the message to get the length, to then re-read the
131+
-- whole message at correct offset
132+
local msg_len, orig_len, err = self:next_int(32)
133+
if err then
134+
return err
135+
end
136+
137+
-- get the headers length
138+
local headers_len, orig_headers_len, err = self:next_int(32)
139+
140+
-- get the preamble checksum
141+
local preamble_checksum, orig_preamble_checksum, err = self:next_int(32)
142+
143+
-- TODO: calculate checksum
144+
-- local result = crc32(orig_len .. origin_headers_len, preamble_checksum)
145+
-- if not result then
146+
-- return nil, "preamble checksum failed - message is corrupted"
147+
-- end
148+
149+
-- pull the headers from the buf
150+
local headers = {}
151+
local headers_bytes_read = 0
152+
153+
while headers_bytes_read < headers_len do
154+
-- the next 8-bit int is the "header key length"
155+
local header_key_len = self:next_int(8)
156+
local header_key = self:next_utf_8(header_key_len)
157+
headers_bytes_read = 1 + header_key_len + headers_bytes_read
158+
159+
-- next 8-bits is the header type, which is an enum
160+
local header_type = self:next_int(8)
161+
headers_bytes_read = 1 + headers_bytes_read
162+
163+
-- depending on the header type, depends on how long the header should max out at
164+
local header_value, header_value_len = _HEADER_EXTRACTORS[header_type](self)
165+
headers_bytes_read = header_value_len + headers_bytes_read
166+
167+
headers[#headers + 1] = {
168+
key = header_key,
169+
value = header_value,
170+
}
171+
end
172+
173+
-- finally, extract the body as a string by
174+
-- subtracting what's read so far from the
175+
-- total length obtained right at the start
176+
local body = self:next_utf_8(msg_len - self.read_count - 4)
177+
178+
-- last 4 bytes is a body checksum
179+
local msg_checksum = self:next_int(32)
180+
-- TODO CHECK FULL MESSAGE CHECKSUM
181+
-- local result = crc32(original_full_msg, msg_checksum)
182+
-- if not result then
183+
-- return nil, "preamble checksum failed - message is corrupted"
184+
-- end
185+
186+
-- rewind the tape
187+
self.read_count = 0
188+
189+
return {
190+
headers = headers,
191+
body = body,
192+
}
193+
end
194+
195+
return Stream
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
local http = require "resty.luasocket.http"
2+
local build_request = require("resty.aws.request.build")
3+
local sign_request = require("resty.aws.request.sign")
4+
5+
-- This file is a workaround to support response streaming for AWS Lambda.
6+
-- The content of this file is modified from:
7+
-- * https://github.com/Kong/lua-resty-aws/blob/9e66799319fcb6ab37cf7fbc408f2d8b6adc7851/src/resty/aws/request/execute.lua#L18
8+
-- * https://github.com/Kong/lua-resty-aws/blob/9e66799319fcb6ab37cf7fbc408f2d8b6adc7851/src/resty/aws/init.lua#L307
9+
-- Thus this file should be removed when the resty-aws library is ready to support response streaming.
10+
-- See https://github.com/Kong/lua-resty-aws/issues/117 for more details.
11+
-- TODO: remove this file.
12+
13+
-- implement AWS api protocols.
14+
-- returns the raw response to support streaming.
15+
--
16+
-- Input parameters:
17+
-- * signed_request table
18+
local function execute_request_raw(signed_request)
19+
local httpc = http.new()
20+
httpc:set_timeout(signed_request.timeout or 60000)
21+
22+
local ok, err = httpc:connect {
23+
host = signed_request.host,
24+
port = signed_request.port,
25+
scheme = signed_request.tls and "https" or "http",
26+
ssl_server_name = signed_request.host,
27+
ssl_verify = signed_request.ssl_verify,
28+
proxy_opts = signed_request.proxy_opts,
29+
}
30+
if not ok then
31+
return nil, ("failed to connect to '%s://%s:%s': %s"):format(
32+
signed_request.tls and "https" or "http",
33+
tostring(signed_request.host),
34+
tostring(signed_request.port),
35+
tostring(err))
36+
end
37+
38+
local response, err = httpc:request({
39+
path = signed_request.path,
40+
method = signed_request.method,
41+
headers = signed_request.headers,
42+
query = signed_request.query,
43+
body = signed_request.body,
44+
})
45+
if not response then
46+
return nil, ("failed sending request to '%s:%s': %s"):format(
47+
tostring(signed_request.host),
48+
tostring(signed_request.port),
49+
tostring(err))
50+
end
51+
52+
if signed_request.keepalive_idle_timeout then
53+
httpc:set_keepalive(signed_request.keepalive_idle_timeout)
54+
else
55+
httpc:close()
56+
end
57+
58+
return response, err
59+
end
60+
61+
local function invokeWithResponseStream(self, params)
62+
params = params or {}
63+
64+
-- print(require("pl.pretty").write(params))
65+
-- print(require("pl.pretty").write(self.config))
66+
67+
-- generate request data and format it according to the protocol
68+
local request = build_request({
69+
name = "InvokeWithResponseStream",
70+
http = {
71+
method = "POST",
72+
requestUri = "/2021-11-15/functions/" .. params.FunctionName .. "/response-streaming-invocations"
73+
},
74+
input = {
75+
shape = "InvokeWithResponseStreamRequest"
76+
},
77+
output = {
78+
shape = "InvokeWithResponseStreamResponse"
79+
},
80+
errors = { {
81+
shape = "ServiceException"
82+
}, {
83+
shape = "ResourceNotFoundException"
84+
}, {
85+
shape = "InvalidRequestContentException"
86+
}, {
87+
shape = "RequestTooLargeException"
88+
}, {
89+
shape = "UnsupportedMediaTypeException"
90+
}, {
91+
shape = "TooManyRequestsException"
92+
}, {
93+
shape = "InvalidParameterValueException"
94+
}, {
95+
shape = "EC2UnexpectedException"
96+
}, {
97+
shape = "SubnetIPAddressLimitReachedException"
98+
}, {
99+
shape = "ENILimitReachedException"
100+
}, {
101+
shape = "EFSMountConnectivityException"
102+
}, {
103+
shape = "EFSMountFailureException"
104+
}, {
105+
shape = "EFSMountTimeoutException"
106+
}, {
107+
shape = "EFSIOException"
108+
}, {
109+
shape = "SnapStartException"
110+
}, {
111+
shape = "SnapStartTimeoutException"
112+
}, {
113+
shape = "SnapStartNotReadyException"
114+
}, {
115+
shape = "EC2ThrottledException"
116+
}, {
117+
shape = "EC2AccessDeniedException"
118+
}, {
119+
shape = "InvalidSubnetIDException"
120+
}, {
121+
shape = "InvalidSecurityGroupIDException"
122+
}, {
123+
shape = "InvalidZipFileException"
124+
}, {
125+
shape = "KMSDisabledException"
126+
}, {
127+
shape = "KMSInvalidStateException"
128+
}, {
129+
shape = "KMSAccessDeniedException"
130+
}, {
131+
shape = "KMSNotFoundException"
132+
}, {
133+
shape = "InvalidRuntimeException"
134+
}, {
135+
shape = "ResourceConflictException"
136+
}, {
137+
shape = "ResourceNotReadyException"
138+
}, {
139+
shape = "RecursiveInvocationException"
140+
} },
141+
}, self.config, params)
142+
143+
-- print request
144+
-- print(require("pl.pretty").write(request))
145+
146+
-- sign the request according to the signature version required
147+
local signed_request, err = sign_request(self.config, request)
148+
if not signed_request then
149+
return nil, "failed to sign request: " .. tostring(err)
150+
end
151+
152+
-- print(require("pl.pretty").write(signed_request))
153+
154+
if self.config.dry_run then
155+
return signed_request
156+
end
157+
-- execute the request
158+
local response, err = execute_request_raw(signed_request)
159+
if not response then
160+
return nil, "Lambda:invokeWithResponseStream()" .. " " .. tostring(err)
161+
end
162+
163+
return response
164+
end
165+
166+
return invokeWithResponseStream

0 commit comments

Comments
 (0)