@@ -26,6 +26,7 @@ import (
2626 "sync"
2727 "time"
2828
29+ "github.com/kelseyhightower/envconfig"
2930 "github.com/triggermesh/aws-custom-runtime/pkg/events/apiGateway"
3031)
3132
@@ -39,13 +40,29 @@ type message struct {
3940type responseWrapper struct {
4041 http.ResponseWriter
4142 StatusCode int
42- Body []byte
43+ Body []byte
44+ }
45+
46+ // Specification is a set of env variables that can be used to configure runtime API
47+ type Specification struct {
48+ // Number of bootstrap processes
49+ NumberOfinvokers int `envconfig:"invoker_count" default:"4"`
50+ // Request body size limit, Mb
51+ RequestSizeLimit int64 `envconfig:"request_size_limit" default:"5"`
52+ // Funtions deadline, seconds
53+ FunctionTTL int64 `envconfig:"function_ttl" default:"10"`
54+ // Lambda runtime API port for functions
55+ InternalAPIport string `envconfig:"internal_api_port" default:"80"`
56+ // Lambda API port to put function requests and get results
57+ ExternalAPIport string `envconfig:"external_api_port" default:"8080"`
58+ // Either return function result "as is" or consider it as API Gateway JSON
59+ EventType string `envconfig:"event_type"`
4360}
4461
4562var (
46- numberOfinvokers = 4 // Number of bootstrap processes
47- requestSizeLimit int64 = 5 // Request body size limit, Mb
48- functionTTL int64 = 10 // Funtions deadline, seconds
63+ // numberOfinvokers = 4
64+ // requestSizeLimit int64 = 5
65+ // functionTTL int64 = 10
4966
5067 tasks chan message
5168 results map [string ]chan message
@@ -73,7 +90,7 @@ func (rw *responseWrapper) Write(data []byte) (int, error) {
7390}
7491
7592func (rw * responseWrapper ) WriteHeader (statusCode int ) {
76- rw .StatusCode = statusCode ;
93+ rw .StatusCode = statusCode
7794}
7895
7996func setupEnv () error {
@@ -88,9 +105,9 @@ func setupEnv() error {
88105 return nil
89106}
90107
91- func newTask (w http.ResponseWriter , r * http.Request ) {
92- requestSizeLimitInBytes := requestSizeLimit * 1e+6
93- functionTTLInNanoSeconds := functionTTL * 1e+9
108+ func ( s * Specification ) newTask (w http.ResponseWriter , r * http.Request ) {
109+ requestSizeLimitInBytes := s . RequestSizeLimit * 1e+6
110+ functionTTLInNanoSeconds := s . FunctionTTL * 1e+9
94111 body , err := ioutil .ReadAll (http .MaxBytesReader (w , r .Body , requestSizeLimitInBytes ))
95112 if err != nil {
96113 http .Error (w , err .Error (), http .StatusInternalServerError )
@@ -104,7 +121,7 @@ func newTask(w http.ResponseWriter, r *http.Request) {
104121 deadline : now + functionTTLInNanoSeconds ,
105122 data : body ,
106123 }
107- fmt .Printf ("<- %s %s\n " , task .id , task .data )
124+ log .Printf ("<- %s %s\n " , task .id , task .data )
108125
109126 resultsChannel := make (chan message )
110127 mutex .Lock ()
@@ -116,11 +133,11 @@ func newTask(w http.ResponseWriter, r *http.Request) {
116133
117134 select {
118135 case <- time .After (time .Duration (functionTTLInNanoSeconds )):
119- fmt .Printf ("-> ! %s Deadline is reached\n " , task .id )
136+ log .Printf ("-> ! %s Deadline is reached\n " , task .id )
120137 w .WriteHeader (http .StatusGone )
121138 w .Write ([]byte (fmt .Sprintf ("Deadline is reached, data %s" , task .data )))
122139 case result := <- resultsChannel :
123- fmt .Printf ("-> %s %d %s\n " , result .id , result .statusCode , result .data )
140+ log .Printf ("-> %s %d %s\n " , result .id , result .statusCode , result .data )
124141 w .WriteHeader (result .statusCode )
125142 w .Write (result .data )
126143 }
@@ -174,7 +191,7 @@ func responseHandler(w http.ResponseWriter, r *http.Request) {
174191
175192 data , err := ioutil .ReadAll (r .Body )
176193 if err != nil {
177- fmt .Printf ("! %s\n " , err )
194+ log .Printf ("! %s\n " , err )
178195 return
179196 }
180197 defer r .Body .Close ()
@@ -208,11 +225,10 @@ func responseHandler(w http.ResponseWriter, r *http.Request) {
208225 return
209226}
210227
211- func mapEvent (h http.Handler ) http.Handler {
212- eventType , _ := os .LookupEnv ("EVENT" )
228+ func (s * Specification ) mapEvent (h http.Handler ) http.Handler {
213229 return http .HandlerFunc (func (w http.ResponseWriter , r * http.Request ) {
214230 rw := responseWrapper {w , 200 , []byte {}}
215- switch eventType {
231+ switch s . EventType {
216232 case "API_GATEWAY" :
217233 apiGateway .Request (r )
218234 h .ServeHTTP (& rw , r )
@@ -223,58 +239,46 @@ func mapEvent(h http.Handler) http.Handler {
223239 })
224240}
225241
226- func setLimits () {
227- if v , ok := os .LookupEnv ("INVOKER_COUNT" ); ok {
228- if vv , err := strconv .Atoi (v ); err != nil {
229- fmt .Printf ("can't set invokers limit, using default value %d\n " , numberOfinvokers )
230- } else {
231- numberOfinvokers = vv
232- }
233- }
234- if v , ok := os .LookupEnv ("REQUEST_SIZE_LIMIT" ); ok {
235- if vv , err := strconv .Atoi (v ); err != nil {
236- fmt .Printf ("can't set request size limit, using default value %d\n " , requestSizeLimit )
237- } else {
238- requestSizeLimit = int64 (vv )
239- }
240- }
241- if v , ok := os .LookupEnv ("FUNCTION_TTL" ); ok {
242- if vv , err := strconv .Atoi (v ); err != nil {
243- fmt .Printf ("can't set function ttl, using default value %d\n " , functionTTL )
244- } else {
245- functionTTL = int64 (vv )
246- }
247- }
242+ func ping (w http.ResponseWriter , r * http.Request ) {
243+ w .WriteHeader (http .StatusOK )
244+ w .Write ([]byte ("pong" ))
245+ return
248246}
249247
250- func api () error {
248+ func ( s * Specification ) api () error {
251249 apiRouter := http .NewServeMux ()
252250 apiRouter .HandleFunc (awsEndpoint + "/init/error" , initError )
253251 apiRouter .HandleFunc (awsEndpoint + "/invocation/next" , getTask )
254252 apiRouter .HandleFunc (awsEndpoint + "/invocation/" , responseHandler )
255- return http .ListenAndServe (":80" , apiRouter )
253+ apiRouter .HandleFunc ("/2018-06-01/ping" , ping )
254+ return http .ListenAndServe (":" + s .InternalAPIport , apiRouter )
256255}
257256
258257func main () {
259258 tasks = make (chan message , 100 )
260259 results = make (map [string ]chan message )
261260 defer close (tasks )
262261
263- fmt .Println ("Setting limits" )
264- setLimits ()
262+ var spec Specification
263+
264+ err := envconfig .Process ("" , & spec )
265+ if err != nil {
266+ log .Fatal (err )
267+ }
268+ log .Printf ("%+v\n " , spec )
265269
266- fmt .Println ("Setup env" )
270+ log .Println ("Setup app env" )
267271 if err := setupEnv (); err != nil {
268272 log .Fatalln (err )
269273 }
270274
271- fmt .Println ("Starting API" )
275+ log .Println ("Starting API" )
272276 go func () {
273- log .Fatalln (api ())
277+ log .Fatalln (spec . api ())
274278 }()
275279
276- for i := 0 ; i < numberOfinvokers ; i ++ {
277- fmt .Println ("Starting bootstrap" , i + 1 )
280+ for i := 0 ; i < spec . NumberOfinvokers ; i ++ {
281+ log .Println ("Starting bootstrap" , i + 1 )
278282 go func (i int ) {
279283 cmd := exec .Command ("sh" , "-c" , environment ["LAMBDA_TASK_ROOT" ]+ "/bootstrap" )
280284 cmd .Env = append (os .Environ (), fmt .Sprintf ("BOOTSTRAP_INDEX=%d" , i ))
@@ -287,8 +291,8 @@ func main() {
287291 }
288292
289293 taskRouter := http .NewServeMux ()
290- taskHandler := http .HandlerFunc (newTask )
291- taskRouter .Handle ("/" , mapEvent (taskHandler ))
292- fmt .Println ("Listening..." )
293- log .Fatalln (http .ListenAndServe (":8080" , taskRouter ))
294+ taskHandler := http .HandlerFunc (spec . newTask )
295+ taskRouter .Handle ("/" , spec . mapEvent (taskHandler ))
296+ log .Println ("Listening..." )
297+ log .Fatalln (http .ListenAndServe (":" + spec . ExternalAPIport , taskRouter ))
294298}
0 commit comments