1- // Copyright 2018 TriggerMesh, Inc
2- //
3- // Licensed under the Apache License, Version 2.0 (the "License");
4- // you may not use this file except in compliance with the License.
5- // You may obtain a copy of the License at
6- //
7- // http://www.apache.org/licenses/LICENSE-2.0
8- //
9- // Unless required by applicable law or agreed to in writing, software
10- // distributed under the License is distributed on an "AS IS" BASIS,
11- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12- // See the License for the specific language governing permissions and
13- // limitations under the License.
1+ /*
2+ Copyright 2021 Triggermesh Inc.
3+
4+ Licensed under the Apache License, Version 2.0 (the "License");
5+ you may not use this file except in compliance with the License.
6+ You may obtain a copy of the License at
7+
8+ http://www.apache.org/licenses/LICENSE-2.0
9+
10+ Unless required by applicable law or agreed to in writing, software
11+ distributed under the License is distributed on an "AS IS" BASIS,
12+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ See the License for the specific language governing permissions and
14+ limitations under the License.
15+ */
1416
1517package main
1618
@@ -27,10 +29,9 @@ import (
2729 "time"
2830
2931 "github.com/kelseyhightower/envconfig"
30- "github.com/triggermesh/aws-custom-runtime/pkg/events"
31- "github.com/triggermesh/aws-custom-runtime/pkg/events/apigateway"
32- "github.com/triggermesh/aws-custom-runtime/pkg/events/cloudevents"
33- "github.com/triggermesh/aws-custom-runtime/pkg/events/passthrough"
32+
33+ "github.com/triggermesh/aws-custom-runtime/pkg/converter"
34+ "github.com/triggermesh/aws-custom-runtime/pkg/sender"
3435)
3536
3637var (
@@ -67,13 +68,16 @@ type Specification struct {
6768 // Lambda API port to put function requests and get results
6869 ExternalAPIport string `envconfig:"external_api_port" default:"8080"`
6970
70- // Apply response wrapping before sending it back to the client.
71- // Common case - AWS Lambda functions usually returns data formatted for API Gateway service.
72- // Set "RESPONSE_WRAPPER: API_GATEWAY" and receive events as if they were processed by API Gateway.
73- // Opposite scenario - return responses in CloudEvent format: "RESPONSE_WRAPPER: CLOUDEVENTS"
74- // NOTE: Response wrapper does both encoding and decoding depending on the type. We should consider
75- // separating wrappers by their function.
76- ResponseWrapper string `envconfig:"response_wrapper"`
71+ Sink string `envconfig:"k_sink"`
72+ ResponseFormat string `envconfig:"response_format"`
73+ }
74+
75+ type Handler struct {
76+ Sender * sender.Sender
77+ Converter converter.Converter
78+
79+ requestSizeLimit int64
80+ functionTTL int64
7781}
7882
7983type message struct {
@@ -98,10 +102,10 @@ func (rw *responseWrapper) WriteHeader(statusCode int) {
98102 rw .StatusCode = statusCode
99103}
100104
101- func ( s * Specification ) setupEnv () error {
105+ func setupEnv (internalAPIport string ) error {
102106 environment ["_HANDLER" ], _ = os .LookupEnv ("_HANDLER" )
103107 environment ["LAMBDA_TASK_ROOT" ], _ = os .LookupEnv ("LAMBDA_TASK_ROOT" )
104- environment ["AWS_LAMBDA_RUNTIME_API" ] += ":" + s . InternalAPIport
108+ environment ["AWS_LAMBDA_RUNTIME_API" ] += ":" + internalAPIport
105109
106110 for k , v := range environment {
107111 if err := os .Setenv (k , v ); err != nil {
@@ -111,9 +115,9 @@ func (s *Specification) setupEnv() error {
111115 return nil
112116}
113117
114- func (s * Specification ) newTask (w http.ResponseWriter , r * http.Request ) {
115- requestSizeLimitInBytes := s . RequestSizeLimit * 1e+6
116- functionTTLInNanoSeconds := s . FunctionTTL * 1e+9
118+ func (h * Handler ) newTask (w http.ResponseWriter , r * http.Request ) {
119+ requestSizeLimitInBytes := h . requestSizeLimit * 1e+6
120+ functionTTLInNanoSeconds := h . functionTTL * 1e+9
117121 body , err := ioutil .ReadAll (http .MaxBytesReader (w , r .Body , requestSizeLimitInBytes ))
118122 if err != nil {
119123 http .Error (w , err .Error (), http .StatusInternalServerError )
@@ -140,12 +144,19 @@ func (s *Specification) newTask(w http.ResponseWriter, r *http.Request) {
140144 select {
141145 case <- time .After (time .Duration (functionTTLInNanoSeconds )):
142146 log .Printf ("-> ! %s Deadline is reached\n " , task .id )
143- w .WriteHeader (http .StatusGone )
144- w .Write ([]byte (fmt .Sprintf ("Deadline is reached, data %s" , task .data )))
147+ resp := []byte (fmt .Sprintf ("Deadline is reached, data %s" , task .data ))
148+ if err := h .Sender .Send (resp , http .StatusGone , w ); err != nil {
149+ log .Printf ("! %s %v\n " , task .id , err )
150+ }
145151 case result := <- resultsChannel :
146152 log .Printf ("-> %s %d %s\n " , result .id , result .statusCode , result .data )
147- w .WriteHeader (result .statusCode )
148- w .Write (result .data )
153+ body , err := h .Converter .Convert (result .data )
154+ if err != nil {
155+ log .Printf ("! %s %v\n " , result .id , err )
156+ }
157+ if err := h .Sender .Send (body , result .statusCode , w ); err != nil {
158+ log .Printf ("! %s %v\n " , result .id , err )
159+ }
149160 }
150161 mutex .Lock ()
151162 delete (results , task .id )
@@ -231,31 +242,6 @@ func responseHandler(w http.ResponseWriter, r *http.Request) {
231242 return
232243}
233244
234- func (s * Specification ) mapEvent (h http.Handler ) http.Handler {
235- var mapper events.Mapper
236-
237- switch s .ResponseWrapper {
238- case "API_GATEWAY" :
239- mapper = apigateway .NewMapper ()
240- case "CLOUDEVENTS" :
241- mapper = cloudevents .NewMapper ()
242- if err := envconfig .Process ("CE" , mapper ); err != nil {
243- log .Fatalf ("Cannot process CloudEvents wrapper env variables: %v" , err )
244- }
245- default :
246- mapper = passthrough .NewMapper ()
247- }
248-
249- return http .HandlerFunc (func (w http.ResponseWriter , r * http.Request ) {
250- rw := responseWrapper {
251- ResponseWriter : w ,
252- }
253- mapper .Request (r )
254- h .ServeHTTP (& rw , r )
255- mapper .Response (w , rw .StatusCode , rw .Body )
256- })
257- }
258-
259245func ping (w http.ResponseWriter , r * http.Request ) {
260246 w .WriteHeader (http .StatusOK )
261247 w .Write ([]byte ("pong" ))
@@ -282,28 +268,46 @@ func api() error {
282268}
283269
284270func main () {
271+ // parse env
285272 var spec Specification
286273 if err := envconfig .Process ("" , & spec ); err != nil {
287274 log .Fatalf ("Cannot process env variables: %v" , err )
288275 }
289276 log .Printf ("%+v\n " , spec )
290277
291278 log .Println ("Setting up runtime env" )
292- if err := spec . setupEnv (); err != nil {
279+ if err := setupEnv (spec . InternalAPIport ); err != nil {
293280 log .Fatalf ("Cannot setup runime env: %v" , err )
294281 }
295282
283+ // create converter
284+ conv , err := converter .New (spec .ResponseFormat )
285+ if err != nil {
286+ log .Fatalf ("Cannot create converter: %v" , err )
287+ }
288+
289+ // setup sender
290+ handler := Handler {
291+ Sender : sender .New (spec .Sink , conv .ContentType ()),
292+ Converter : conv ,
293+ requestSizeLimit : spec .RequestSizeLimit ,
294+ functionTTL : spec .FunctionTTL ,
295+ }
296+
297+ // setup channels
296298 tasks = make (chan message , 100 )
297299 results = make (map [string ]chan message )
298300 defer close (tasks )
299301
302+ // start Lambda API
300303 log .Println ("Starting API" )
301304 go func () {
302305 if err := api (); err != nil {
303306 log .Fatalf ("Runtime internal API error: %v" , err )
304307 }
305308 }()
306309
310+ // start invokers
307311 for i := 0 ; i < spec .NumberOfinvokers ; i ++ {
308312 log .Println ("Starting bootstrap" , i + 1 )
309313 go func (i int ) {
@@ -317,11 +321,12 @@ func main() {
317321 }(i )
318322 }
319323
324+ // start external API
320325 taskRouter := http .NewServeMux ()
321- taskHandler := http .HandlerFunc (spec .newTask )
322- taskRouter .Handle ("/" , spec . mapEvent ( taskHandler ) )
326+ taskHandler := http .HandlerFunc (handler .newTask )
327+ taskRouter .Handle ("/" , taskHandler )
323328 log .Println ("Listening..." )
324- err : = http .ListenAndServe (":" + spec .ExternalAPIport , taskRouter )
329+ err = http .ListenAndServe (":" + spec .ExternalAPIport , taskRouter )
325330 if err != nil && err != http .ErrServerClosed {
326331 log .Fatalf ("Runtime external API error: %v" , err )
327332 }
0 commit comments