Skip to content
This repository was archived by the owner on Dec 11, 2023. It is now read-only.

Commit b3ec30d

Browse files
committed
Mapper interface added
1 parent 569be4c commit b3ec30d

File tree

5 files changed

+131
-54
lines changed

5 files changed

+131
-54
lines changed

main.go

Lines changed: 64 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -27,43 +27,13 @@ import (
2727
"time"
2828

2929
"github.com/kelseyhightower/envconfig"
30-
"github.com/triggermesh/aws-custom-runtime/pkg/events/apiGateway"
30+
"github.com/triggermesh/aws-custom-runtime/pkg/events"
31+
"github.com/triggermesh/aws-custom-runtime/pkg/events/apigateway"
32+
"github.com/triggermesh/aws-custom-runtime/pkg/events/cloudevents"
33+
"github.com/triggermesh/aws-custom-runtime/pkg/events/passthrough"
3134
)
3235

33-
type message struct {
34-
id string
35-
deadline int64
36-
data []byte
37-
statusCode int
38-
}
39-
40-
type responseWrapper struct {
41-
http.ResponseWriter
42-
StatusCode int
43-
Body []byte
44-
}
45-
46-
// Specification is a set of env variables that can be used to configure runtime API
47-
type Specification struct {
48-
// Number of bootstrap processes
49-
NumberOfinvokers int `envconfig:"invoker_count" default:"4"`
50-
// Request body size limit, Mb
51-
RequestSizeLimit int64 `envconfig:"request_size_limit" default:"5"`
52-
// Funtions deadline, seconds
53-
FunctionTTL int64 `envconfig:"function_ttl" default:"10"`
54-
// Lambda runtime API port for functions
55-
InternalAPIport string `envconfig:"internal_api_port" default:"80"`
56-
// Lambda API port to put function requests and get results
57-
ExternalAPIport string `envconfig:"external_api_port" default:"8080"`
58-
// Either return function result "as is" or consider it as API Gateway JSON
59-
EventType string `envconfig:"event_type"`
60-
}
61-
6236
var (
63-
// numberOfinvokers = 4
64-
// requestSizeLimit int64 = 5
65-
// functionTTL int64 = 10
66-
6737
tasks chan message
6838
results map[string]chan message
6939

@@ -84,6 +54,43 @@ var (
8454
}
8555
)
8656

57+
// Specification is a set of env variables that can be used to configure runtime API
58+
type Specification struct {
59+
// Number of bootstrap processes
60+
NumberOfinvokers int `envconfig:"invoker_count" default:"4"`
61+
// Request body size limit, Mb
62+
RequestSizeLimit int64 `envconfig:"request_size_limit" default:"5"`
63+
// Funtions deadline, seconds
64+
FunctionTTL int64 `envconfig:"function_ttl" default:"10"`
65+
// Lambda runtime API port for functions
66+
InternalAPIport string `envconfig:"internal_api_port" default:"80"`
67+
// Lambda API port to put function requests and get results
68+
ExternalAPIport string `envconfig:"external_api_port" default:"8080"`
69+
// Parent Knative Service name
70+
Service string `envconfig:"k_service"`
71+
72+
// Apply response wrapping before sending it back to the client.
73+
// Common case - AWS Lambda functions usually returns data formatted for API Gateway service.
74+
// Set "RESPONSE_WRAPPER: API_GATEWAY" and receive events as if they were processed by API Gateway.
75+
// Opposite scenario - return responses in CloudEvent format: "RESPONSE_WRAPPER: CLOUDEVENTS"
76+
// NOTE: Response wrapper does both encoding and decoding depending on the type. We should consider
77+
// separating wrappers by their function.
78+
ResponseWrapper string `envconfig:"response_wrapper"`
79+
}
80+
81+
type message struct {
82+
id string
83+
deadline int64
84+
data []byte
85+
statusCode int
86+
}
87+
88+
type responseWrapper struct {
89+
http.ResponseWriter
90+
StatusCode int
91+
Body []byte
92+
}
93+
8794
func (rw *responseWrapper) Write(data []byte) (int, error) {
8895
rw.Body = data
8996
return len(data), nil
@@ -227,16 +234,24 @@ func responseHandler(w http.ResponseWriter, r *http.Request) {
227234
}
228235

229236
func (s *Specification) mapEvent(h http.Handler) http.Handler {
237+
var mapper events.Mapper
238+
239+
switch s.ResponseWrapper {
240+
case "API_GATEWAY":
241+
mapper = apigateway.NewMapper()
242+
case "CLOUDEVENTS":
243+
mapper = cloudevents.NewMapper(s.Service)
244+
default:
245+
mapper = passthrough.NewMapper()
246+
}
247+
230248
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
231-
rw := responseWrapper{w, 200, []byte{}}
232-
switch s.EventType {
233-
case "API_GATEWAY":
234-
apiGateway.Request(r)
235-
h.ServeHTTP(&rw, r)
236-
apiGateway.Response(w, rw.StatusCode, rw.Body)
237-
default:
238-
h.ServeHTTP(w, r)
249+
rw := responseWrapper{
250+
ResponseWriter: w,
239251
}
252+
mapper.Request(r)
253+
h.ServeHTTP(&rw, r)
254+
mapper.Response(w, rw.StatusCode, rw.Body)
240255
})
241256
}
242257

@@ -266,23 +281,21 @@ func api() error {
266281
}
267282

268283
func main() {
269-
tasks = make(chan message, 100)
270-
results = make(map[string]chan message)
271-
defer close(tasks)
272-
273284
var spec Specification
274-
275-
err := envconfig.Process("", &spec)
276-
if err != nil {
285+
if err := envconfig.Process("", &spec); err != nil {
277286
log.Fatalf("Cannot process env variables: %v", err)
278287
}
279288
log.Printf("%+v\n", spec)
280289

281-
log.Println("Setup app env")
290+
log.Println("Setting up runtime env")
282291
if err := spec.setupEnv(); err != nil {
283292
log.Fatalf("Cannot setup runime env: %v", err)
284293
}
285294

295+
tasks = make(chan message, 100)
296+
results = make(map[string]chan message)
297+
defer close(tasks)
298+
286299
log.Println("Starting API")
287300
go func() {
288301
if err := api(); err != nil {
@@ -307,7 +320,7 @@ func main() {
307320
taskHandler := http.HandlerFunc(spec.newTask)
308321
taskRouter.Handle("/", spec.mapEvent(taskHandler))
309322
log.Println("Listening...")
310-
err = http.ListenAndServe(":"+spec.ExternalAPIport, taskRouter)
323+
err := http.ListenAndServe(":"+spec.ExternalAPIport, taskRouter)
311324
if err != nil && err != http.ErrServerClosed {
312325
log.Fatalf("Runtime external API error: %v", err)
313326
}

pkg/events/apiGateway/mapper.go renamed to pkg/events/apigateway/mapper.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package apiGateway
1+
package apigateway
22

33
import (
44
"bytes"
@@ -11,7 +11,13 @@ import (
1111
"github.com/aws/aws-lambda-go/events"
1212
)
1313

14-
func Request(r *http.Request) {
14+
type APIGateway struct{}
15+
16+
func NewMapper() *APIGateway {
17+
return &APIGateway{}
18+
}
19+
20+
func (a *APIGateway) Request(r *http.Request) {
1521
event := events.APIGatewayProxyRequest{}
1622
body, err := ioutil.ReadAll(r.Body)
1723
if err != nil {
@@ -40,7 +46,7 @@ func Request(r *http.Request) {
4046
r.Body = ioutil.NopCloser(bytes.NewBuffer(js))
4147
}
4248

43-
func Response(w http.ResponseWriter, statusCode int, data []byte) (int, error) {
49+
func (a *APIGateway) Response(w http.ResponseWriter, statusCode int, data []byte) (int, error) {
4450
var js events.APIGatewayProxyResponse
4551
if err := json.Unmarshal(data, &js); err != nil {
4652
return 0, err

pkg/events/cloudevents/mapper.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package cloudevents
2+
3+
import (
4+
"fmt"
5+
"net/http"
6+
7+
cloudevents "github.com/cloudevents/sdk-go/v2"
8+
"github.com/google/uuid"
9+
)
10+
11+
type CloudEvents struct {
12+
Service string
13+
}
14+
15+
func NewMapper(service string) *CloudEvents {
16+
return &CloudEvents{
17+
Service: service,
18+
}
19+
}
20+
21+
func (c *CloudEvents) Request(r *http.Request) {}
22+
23+
func (c *CloudEvents) Response(w http.ResponseWriter, statusCode int, data []byte) (int, error) {
24+
response := cloudevents.NewEvent()
25+
response.SetType("ce.klr.triggermesh.io")
26+
response.SetSource(c.Service)
27+
response.SetID(uuid.New().String())
28+
if err := response.SetData(cloudevents.ApplicationJSON, data); err != nil {
29+
return 0, fmt.Errorf("failed to set event data: %w", err)
30+
}
31+
return w.Write([]byte(response.String()))
32+
}

pkg/events/interface.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package events
2+
3+
import "net/http"
4+
5+
type Mapper interface {
6+
Request(r *http.Request)
7+
Response(w http.ResponseWriter, statusCode int, data []byte) (int, error)
8+
}

pkg/events/passthrough/mapper.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package passthrough
2+
3+
import (
4+
"net/http"
5+
)
6+
7+
type Passthrough struct{}
8+
9+
func NewMapper() *Passthrough {
10+
return &Passthrough{}
11+
}
12+
13+
func (c *Passthrough) Request(r *http.Request) {}
14+
15+
func (c *Passthrough) Response(w http.ResponseWriter, statusCode int, data []byte) (int, error) {
16+
w.WriteHeader(statusCode)
17+
return w.Write(data)
18+
}

0 commit comments

Comments
 (0)