Skip to content

Commit e9d28bb

Browse files
authored
minimal debouncer implementation (#12)
* minimal debouncer implementation * add worker
1 parent 2a2dff3 commit e9d28bb

File tree

9 files changed

+175
-131
lines changed

9 files changed

+175
-131
lines changed

cmd/cache-offloader.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"neurocode.io/cache-offloader/pkg/metrics"
1414
"neurocode.io/cache-offloader/pkg/probes"
1515
"neurocode.io/cache-offloader/pkg/storage"
16+
"neurocode.io/cache-offloader/pkg/worker"
1617
)
1718

1819
func getInMemoryStorage(cfg config.Config) http.Cacher {
@@ -61,8 +62,10 @@ func main() {
6162
cfg := config.New()
6263
setupLogging(cfg.ServerConfig.LogLevel)
6364
m := metrics.NewPrometheusCollector()
65+
maxInFlightRevalidationRequests := 1000
6466
opts := http.ServerOpts{
6567
Config: cfg,
68+
Worker: worker.NewUpdateQueue(maxInFlightRevalidationRequests),
6669
MetricsCollector: m,
6770
ReadinessChecker: probes.NewReadinessChecker(),
6871
}

pkg/http/cache-mock_test.go

Lines changed: 36 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/http/cache.go

Lines changed: 55 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,27 @@ import (
2121
)
2222

2323
//go:generate mockgen -source=./cache.go -destination=./cache-mock_test.go -package=http
24-
type Cacher interface {
25-
LookUp(context.Context, string) (*model.Response, error)
26-
Store(context.Context, string, *model.Response) error
27-
}
24+
type (
25+
Worker interface {
26+
Start(string, func())
27+
}
28+
Cacher interface {
29+
LookUp(context.Context, string) (*model.Response, error)
30+
Store(context.Context, string, *model.Response) error
31+
}
2832

29-
type MetricsCollector interface {
30-
CacheHit(method string, statusCode int)
31-
CacheMiss(method string, statusCode int)
32-
}
33+
MetricsCollector interface {
34+
CacheHit(method string, statusCode int)
35+
CacheMiss(method string, statusCode int)
36+
}
3337

34-
type handler struct {
35-
cacher Cacher
36-
metricsCollector MetricsCollector
37-
cfg config.CacheConfig
38-
}
38+
handler struct {
39+
cacher Cacher
40+
worker Worker
41+
metricsCollector MetricsCollector
42+
cfg config.CacheConfig
43+
}
44+
)
3945

4046
func handleGzipServeErr(err error) {
4147
if err != nil {
@@ -94,47 +100,50 @@ func errHandler(res http.ResponseWriter, req *http.Request, err error) {
94100
http.Error(res, "service unavailable", http.StatusBadGateway)
95101
}
96102

97-
func newCacheHandler(c Cacher, m MetricsCollector, cfg config.CacheConfig) handler {
103+
func newCacheHandler(c Cacher, m MetricsCollector, w Worker, cfg config.CacheConfig) handler {
98104
return handler{
99105
cacher: c,
106+
worker: w,
100107
metricsCollector: m,
101108
cfg: cfg,
102109
}
103110
}
104111

105-
func (h handler) asyncCacheRevalidate(hashKey string, res http.ResponseWriter, req *http.Request) {
106-
ctx := context.Background()
107-
newReq := req.WithContext(ctx)
108-
109-
netTransport := &http.Transport{
110-
MaxIdleConnsPerHost: 1000,
111-
DisableKeepAlives: false,
112-
IdleConnTimeout: time.Hour * 1,
113-
Dial: (&net.Dialer{
114-
Timeout: 10 * time.Second,
115-
KeepAlive: 30 * time.Second,
116-
}).Dial,
117-
TLSHandshakeTimeout: 10 * time.Second,
118-
ResponseHeaderTimeout: 10 * time.Second,
119-
}
120-
client := &http.Client{
121-
Timeout: time.Second * 10,
122-
Transport: netTransport,
123-
}
124-
125-
newReq.URL.Host = h.cfg.DownstreamHost.Host
126-
newReq.URL.Scheme = h.cfg.DownstreamHost.Scheme
127-
newReq.RequestURI = ""
128-
resp, err := client.Do(newReq)
129-
if err != nil {
130-
log.Ctx(ctx).Error().Err(err).Msg("Errored when sending request to the server")
112+
func (h handler) asyncCacheRevalidate(hashKey string, req *http.Request) func() {
113+
return func() {
114+
ctx := context.Background()
115+
newReq := req.WithContext(ctx)
116+
117+
netTransport := &http.Transport{
118+
MaxIdleConnsPerHost: 1000,
119+
DisableKeepAlives: false,
120+
IdleConnTimeout: time.Hour * 1,
121+
Dial: (&net.Dialer{
122+
Timeout: 10 * time.Second,
123+
KeepAlive: 30 * time.Second,
124+
}).Dial,
125+
TLSHandshakeTimeout: 10 * time.Second,
126+
ResponseHeaderTimeout: 10 * time.Second,
127+
}
128+
client := &http.Client{
129+
Timeout: time.Second * 10,
130+
Transport: netTransport,
131+
}
131132

132-
return
133-
}
134-
err = h.cacheResponse(ctx, hashKey)(resp)
133+
newReq.URL.Host = h.cfg.DownstreamHost.Host
134+
newReq.URL.Scheme = h.cfg.DownstreamHost.Scheme
135+
newReq.RequestURI = ""
136+
resp, err := client.Do(newReq)
137+
if err != nil {
138+
log.Ctx(ctx).Error().Err(err).Msg("Errored when sending request to the server")
135139

136-
if err != nil {
137-
log.Print("Error occurred caching response")
140+
return
141+
}
142+
err = h.cacheResponse(ctx, hashKey)(resp)
143+
144+
if err != nil {
145+
log.Print("Error occurred caching response")
146+
}
138147
}
139148
}
140149

@@ -178,7 +187,7 @@ func (h handler) ServeHTTP(res http.ResponseWriter, req *http.Request) {
178187
h.metricsCollector.CacheHit(req.Method, result.Status)
179188

180189
if result.IsStale() {
181-
go h.asyncCacheRevalidate(hashKey, res, req)
190+
go h.worker.Start(hashKey, h.asyncCacheRevalidate(hashKey, req))
182191
}
183192
serveResponseFromMemory(res, result)
184193
}

pkg/http/cache_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func mustURL(t *testing.T, downstreamURL string) *url.URL {
6565

6666
func TestCacheHandler(t *testing.T) {
6767
ctrl := gomock.NewController(t)
68-
// defer ctrl.Finish()
68+
defer ctrl.Finish()
6969

7070
proxied := http.StatusUseProxy
7171
endpoint := "/status/200?q=1"
@@ -286,13 +286,18 @@ func TestCacheHandler(t *testing.T) {
286286
cfg: config.CacheConfig{
287287
DownstreamHost: mustURL(t, downstreamServer.URL),
288288
},
289+
worker: func() Worker {
290+
mock := NewMockWorker(ctrl)
291+
mock.EXPECT().Start(gomock.Any(), gomock.Any())
292+
293+
return mock
294+
}(),
289295
cacher: func() Cacher {
290296
mock := NewMockCacher(ctrl)
291297
mock.EXPECT().LookUp(gomock.Any(), gomock.Any()).Return(&model.Response{
292298
Status: http.StatusOK,
293299
Body: []byte("hello"),
294300
}, nil)
295-
// mock.EXPECT().Store(gomock.Any(), gomock.Any(), gomock.Any())
296301

297302
return mock
298303
}(),

pkg/http/server.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,14 @@ import (
1616
type ServerOpts struct {
1717
Config config.Config
1818
Cacher Cacher
19+
Worker Worker
1920
MetricsCollector MetricsCollector
2021
ReadinessChecker ReadinessChecker
2122
}
2223

2324
func RunServer(opts ServerOpts) {
2425
mux := h.NewServeMux()
25-
mux.Handle("/", newCacheHandler(opts.Cacher, opts.MetricsCollector, opts.Config.CacheConfig))
26+
mux.Handle("/", newCacheHandler(opts.Cacher, opts.MetricsCollector, opts.Worker, opts.Config.CacheConfig))
2627
mux.Handle("/metrics/prometheus", metricsHandler())
2728
mux.HandleFunc("/probes/liveness", livenessHandler)
2829

pkg/metrics/prometheus_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ func TestPrometheusMetrics(t *testing.T) {
1111
t.Run("should return a prometheus registry", func(t *testing.T) {
1212
collector := NewPrometheusCollector()
1313
assert.NotNil(t, collector)
14+
15+
prometheus.Unregister(collector.httpMetrics)
1416
})
1517

1618
t.Run("should use NA for invalid HTTP method", func(t *testing.T) {
@@ -21,5 +23,6 @@ func TestPrometheusMetrics(t *testing.T) {
2123

2224
assert.Nil(t, err)
2325
assert.NotNil(t, metric)
26+
prometheus.Unregister(collector.httpMetrics)
2427
})
2528
}

pkg/storage/debouncer.go

Lines changed: 0 additions & 81 deletions
This file was deleted.

pkg/worker/cache-updater.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package worker
2+
3+
import (
4+
"sync"
5+
6+
"github.com/rs/zerolog/log"
7+
)
8+
9+
type UpdateQueue struct {
10+
mtx sync.RWMutex
11+
queue map[string]bool
12+
size int
13+
}
14+
15+
func NewUpdateQueue(size int) *UpdateQueue {
16+
if size <= 0 {
17+
size = 1000
18+
}
19+
20+
return &UpdateQueue{
21+
queue: make(map[string]bool, size),
22+
size: size,
23+
}
24+
}
25+
26+
func (debouncer *UpdateQueue) Start(key string, work func()) {
27+
if len(debouncer.queue) >= debouncer.size {
28+
log.Warn().Msg("UpdateQueue is full, dropping request")
29+
30+
return
31+
}
32+
33+
debouncer.mtx.Lock()
34+
35+
if _, ok := debouncer.queue[key]; ok {
36+
debouncer.mtx.Unlock()
37+
38+
return
39+
}
40+
debouncer.queue[key] = true
41+
debouncer.mtx.Unlock()
42+
43+
work()
44+
45+
debouncer.mtx.Lock()
46+
delete(debouncer.queue, key)
47+
debouncer.mtx.Unlock()
48+
}

0 commit comments

Comments
 (0)