Skip to content

Commit e6756f9

Browse files
committed
add support for http header routing
Signed-off-by: Justin Reasoner <gjreasoner@gmail.com>
1 parent 993b7bf commit e6756f9

File tree

8 files changed

+49
-24
lines changed

8 files changed

+49
-24
lines changed

interceptor/config/serving.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ type Serving struct {
4545
TLSCertStorePaths string `envconfig:"KEDA_HTTP_PROXY_TLS_CERT_STORE_PATHS" default:""`
4646
// TLSPort is the port that the server should serve on if TLS is enabled
4747
TLSPort int `envconfig:"KEDA_HTTP_PROXY_TLS_PORT" default:"8443"`
48+
// RoutingHeader is an optional header that can be used to route requests
49+
// to different backends when set and sent in the HTTP request
50+
RoutingHeader string `envconfig:"KEDA_HTTP_ROUTING_HEADER" default:""`
4851
}
4952

5053
// Parse parses standard configs using envconfig and returns a pointer to the

interceptor/handler/static.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func (sh *Static) ServeHTTP(w http.ResponseWriter, r *http.Request) {
3131
stream := util.StreamFromContext(ctx)
3232

3333
statusText := http.StatusText(sh.statusCode)
34-
routingKey := routing.NewKeyFromRequest(r)
34+
routingKey := routing.NewKeyFromRequest(r, "")
3535
namespacedName := k8s.NamespacedNameFromObject(httpso)
3636
logger.Error(sh.err, statusText, "routingKey", routingKey, "namespacedName", namespacedName, "stream", stream)
3737

interceptor/handler/static_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ var _ = Describe("ServeHTTP", func() {
2323
st = http.StatusText(sc)
2424

2525
se = errors.New("test error")
26+
27+
rh = ""
2628
)
2729

2830
BeforeEach(func() {
@@ -47,7 +49,7 @@ var _ = Describe("ServeHTTP", func() {
4749
err := json.Unmarshal([]byte(obj), &m)
4850
Expect(err).NotTo(HaveOccurred())
4951

50-
rk := routing.NewKeyFromRequest(r)
52+
rk := routing.NewKeyFromRequest(r, rh)
5153
Expect(m).To(HaveKeyWithValue("error", se.Error()))
5254
Expect(m).To(HaveKeyWithValue("msg", st))
5355
Expect(m).To(HaveKeyWithValue("routingKey", rk.String()))

interceptor/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func main() {
106106
queues := queue.NewMemory()
107107

108108
sharedInformerFactory := informers.NewSharedInformerFactory(httpCl, servingCfg.ConfigMapCacheRsyncPeriod)
109-
routingTable, err := routing.NewTable(sharedInformerFactory, servingCfg.WatchNamespace, queues)
109+
routingTable, err := routing.NewTable(sharedInformerFactory, servingCfg.WatchNamespace, queues, servingCfg.RoutingHeader)
110110
if err != nil {
111111
setupLog.Error(err, "fetching routing table")
112112
os.Exit(1)

pkg/routing/key.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func NewKeyFromURL(url *url.URL) Key {
3333
return NewKey(url.Host, url.Path)
3434
}
3535

36-
func NewKeyFromRequest(req *http.Request) Key {
36+
func NewKeyFromRequest(req *http.Request, httpRoutingHeaderKey string) Key {
3737
if req == nil {
3838
return nil
3939
}
@@ -48,6 +48,12 @@ func NewKeyFromRequest(req *http.Request) Key {
4848
keyURL.Host = reqHost
4949
}
5050

51+
if httpRoutingHeaderKey != "" {
52+
if httpRoutingHeaderValue := req.Header.Get(httpRoutingHeaderKey); httpRoutingHeaderValue != "" {
53+
keyURL.Host = httpRoutingHeaderValue
54+
}
55+
}
56+
5157
return NewKeyFromURL(&keyURL)
5258
}
5359

pkg/routing/key_test.go

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,11 @@ package routing
22

33
import (
44
"fmt"
5-
"net/http"
6-
"net/url"
7-
5+
httpv1alpha1 "github.com/kedacore/http-add-on/operator/apis/http/v1alpha1"
86
. "github.com/onsi/ginkgo/v2"
97
. "github.com/onsi/gomega"
10-
11-
httpv1alpha1 "github.com/kedacore/http-add-on/operator/apis/http/v1alpha1"
8+
"net/http"
9+
"net/url"
1210
)
1311

1412
var _ = Describe("Key", func() {
@@ -125,23 +123,36 @@ var _ = Describe("Key", func() {
125123
})
126124

127125
Context("NewFromRequest", func() {
126+
const (
127+
host = "kubernetes.io"
128+
path = "abc/def"
129+
norm0 = "//kubernetes.io/abc/def/"
130+
norm1 = "//get-thing/abc/def/"
131+
serviceHeader = "x-service-action-a"
132+
serviceHost = "get-thing"
133+
)
134+
128135
It("returns expected key for Request", func() {
129-
const (
130-
host = "kubernetes.io"
131-
path = "abc/def"
132-
norm = "//kubernetes.io/abc/def/"
133-
)
136+
r, err := http.NewRequest("GET", fmt.Sprintf("https://%s:443/%s?123=456#789", host, path), nil)
137+
Expect(err).NotTo(HaveOccurred())
138+
Expect(r).NotTo(BeNil())
139+
140+
key := NewKeyFromRequest(r, "")
141+
Expect(key).To(Equal(Key(norm0)))
142+
})
134143

144+
It("returns service host for Request with http routing header", func() {
135145
r, err := http.NewRequest("GET", fmt.Sprintf("https://%s:443/%s?123=456#789", host, path), nil)
136146
Expect(err).NotTo(HaveOccurred())
137147
Expect(r).NotTo(BeNil())
148+
r.Header.Set(serviceHeader, serviceHost)
138149

139-
key := NewKeyFromRequest(r)
140-
Expect(key).To(Equal(Key(norm)))
150+
key := NewKeyFromRequest(r, serviceHeader)
151+
Expect(key).To(Equal(Key(norm1)))
141152
})
142153

143154
It("returns nil for nil Request", func() {
144-
key := NewKeyFromRequest(nil)
155+
key := NewKeyFromRequest(nil, "")
145156
Expect(key).To(BeNil())
146157
})
147158
})

pkg/routing/table.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,16 @@ type table struct {
4242
memoryHolder util.AtomicValue[TableMemory]
4343
memorySignaler util.Signaler
4444
queueCounter queue.Counter
45+
routingHeader string
4546
}
4647

47-
func NewTable(sharedInformerFactory externalversions.SharedInformerFactory, namespace string, counter queue.Counter) (Table, error) {
48+
func NewTable(sharedInformerFactory externalversions.SharedInformerFactory, namespace string, counter queue.Counter, routingHeader string) (Table, error) {
4849
httpScaledObjects := informershttpv1alpha1.New(sharedInformerFactory, namespace, nil).HTTPScaledObjects()
4950

5051
t := table{
5152
httpScaledObjects: make(map[types.NamespacedName]*httpv1alpha1.HTTPScaledObject),
5253
memorySignaler: util.NewSignaler(),
54+
routingHeader: routingHeader,
5355
}
5456

5557
informer, ok := httpScaledObjects.Informer().(sharedIndexInformer)
@@ -134,7 +136,7 @@ func (t *table) Route(req *http.Request) *httpv1alpha1.HTTPScaledObject {
134136
return nil
135137
}
136138

137-
key := NewKeyFromRequest(req)
139+
key := NewKeyFromRequest(req, t.routingHeader)
138140
return tm.Route(key)
139141
}
140142

pkg/routing/table_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import (
2222

2323
var _ = Describe("Table", func() {
2424
const (
25-
namespace = "default"
25+
namespace = "default"
26+
routingHeader = "X-HTTP-Routing"
2627
)
2728

2829
var (
@@ -111,7 +112,7 @@ var _ = Describe("Table", func() {
111112

112113
Context("New", func() {
113114
It("returns a table with fields initialized", func() {
114-
i, err := NewTable(sharedInformerFactory, namespace, queue.NewFakeCounter())
115+
i, err := NewTable(sharedInformerFactory, namespace, queue.NewFakeCounter(), routingHeader)
115116
Expect(err).NotTo(HaveOccurred())
116117
Expect(i).NotTo(BeNil())
117118

@@ -136,7 +137,7 @@ var _ = Describe("Table", func() {
136137
)
137138

138139
BeforeEach(func() {
139-
i, _ := NewTable(sharedInformerFactory, namespace, queue.NewFakeCounter())
140+
i, _ := NewTable(sharedInformerFactory, namespace, queue.NewFakeCounter(), routingHeader)
140141
t = i.(*table)
141142
})
142143

@@ -181,7 +182,7 @@ var _ = Describe("Table", func() {
181182
)
182183

183184
BeforeEach(func() {
184-
i, _ := NewTable(sharedInformerFactory, namespace, queue.NewFakeCounter())
185+
i, _ := NewTable(sharedInformerFactory, namespace, queue.NewFakeCounter(), routingHeader)
185186
t = i.(*table)
186187
})
187188

@@ -279,7 +280,7 @@ var _ = Describe("Table", func() {
279280
)
280281

281282
BeforeEach(func() {
282-
i, _ := NewTable(sharedInformerFactory, namespace, queue.NewFakeCounter())
283+
i, _ := NewTable(sharedInformerFactory, namespace, queue.NewFakeCounter(), routingHeader)
283284
t = i.(*table)
284285
})
285286

0 commit comments

Comments
 (0)