From 50c78d49e722f8ea35b20197abcf58866dea911e Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Thu, 7 Aug 2025 07:44:00 -0700 Subject: [PATCH 1/5] Create ResourceTracker Signed-off-by: Justin Jung --- pkg/util/resource/tracker.go | 147 +++++++++++++++++++++++++++++++++++ 1 file changed, 147 insertions(+) create mode 100644 pkg/util/resource/tracker.go diff --git a/pkg/util/resource/tracker.go b/pkg/util/resource/tracker.go new file mode 100644 index 00000000000..e57634fb004 --- /dev/null +++ b/pkg/util/resource/tracker.go @@ -0,0 +1,147 @@ +package resource + +import ( + "sync" + "time" +) + +type memoryBuckets struct { + buckets [3]uint64 // 3 buckets for 3 seconds + lastUpdate time.Time + currentIdx int +} + +type ResourceTracker struct { + cpuData map[string]time.Duration + memoryData map[string]*memoryBuckets + lastUpdate map[string]time.Time // Track last update per requestID + + mu sync.RWMutex +} + +type IResourceTracker interface { + AddDuration(requestID string, duration time.Duration) + AddBytes(requestID string, bytes uint64) + GetSlowestQuery() (requestID string, duration time.Duration) + GetHeaviestQuery() (requestID string, bytes uint64) +} + +func NewResourceTracker() *ResourceTracker { + rt := &ResourceTracker{ + cpuData: make(map[string]time.Duration), + memoryData: make(map[string]*memoryBuckets), + lastUpdate: make(map[string]time.Time), + } + + // Start cleanup goroutine + go rt.cleanupLoop() + + return rt +} + +func (rt *ResourceTracker) AddDuration(requestID string, duration time.Duration) { + rt.mu.Lock() + now := time.Now() + rt.cpuData[requestID] += duration + rt.lastUpdate[requestID] = now + rt.mu.Unlock() +} + +func (rt *ResourceTracker) AddBytes(requestID string, bytes uint64) { + rt.mu.Lock() + now := time.Now().Truncate(time.Second) + + buckets, exists := rt.memoryData[requestID] + if !exists { + buckets = &memoryBuckets{ + lastUpdate: now, + currentIdx: 0, + } + rt.memoryData[requestID] = buckets + } + + // Calculate seconds drift and rotate buckets if needed + secondsDrift := int(now.Sub(buckets.lastUpdate).Seconds()) + if secondsDrift > 0 { + // Clear old buckets + for i := 0; i < min(secondsDrift, 3); i++ { + nextIdx := (buckets.currentIdx + 1 + i) % 3 + buckets.buckets[nextIdx] = 0 + } + // Update current index + buckets.currentIdx = (buckets.currentIdx + secondsDrift) % 3 + buckets.lastUpdate = now + } + + // Add bytes to current bucket + buckets.buckets[buckets.currentIdx] += bytes + rt.lastUpdate[requestID] = time.Now() + rt.mu.Unlock() +} + +func (rt *ResourceTracker) GetSlowestQuery() (string, time.Duration) { + rt.mu.RLock() + defer rt.mu.RUnlock() + + var maxID string + var maxDuration time.Duration + + for id, duration := range rt.cpuData { + if duration > maxDuration { + maxDuration = duration + maxID = id + } + } + + return maxID, maxDuration +} + +func (rt *ResourceTracker) GetHeaviestQuery() (string, uint64) { + rt.mu.RLock() + defer rt.mu.RUnlock() + + var maxID string + var maxBytes uint64 + + for id, buckets := range rt.memoryData { + // Sum all buckets (represents last 3 seconds) + var totalBytes uint64 + for _, bytes := range buckets.buckets { + totalBytes += bytes + } + if totalBytes > maxBytes { + maxBytes = totalBytes + maxID = id + } + } + + return maxID, maxBytes +} + +func (rt *ResourceTracker) cleanupLoop() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for range ticker.C { + rt.cleanup() + } +} + +func (rt *ResourceTracker) cleanup() { + rt.mu.Lock() + defer rt.mu.Unlock() + + now := time.Now() + cutoff := now.Add(-5 * time.Second) + + // Remove stale requestIDs + for requestID, lastUpdate := range rt.lastUpdate { + if lastUpdate.Before(cutoff) { + delete(rt.cpuData, requestID) + delete(rt.memoryData, requestID) + delete(rt.lastUpdate, requestID) + } + } + + // Memory buckets are self-cleaning via rotation, no additional cleanup needed +} From f76c68ed30cae5fc60391e5915f33783459d68f3 Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Thu, 7 Aug 2025 08:08:22 -0700 Subject: [PATCH 2/5] Add 100k limit Signed-off-by: Justin Jung --- pkg/util/resource/tracker.go | 68 +++++++++++++++++------------------- 1 file changed, 32 insertions(+), 36 deletions(-) diff --git a/pkg/util/resource/tracker.go b/pkg/util/resource/tracker.go index e57634fb004..9ab48deadd3 100644 --- a/pkg/util/resource/tracker.go +++ b/pkg/util/resource/tracker.go @@ -11,8 +11,9 @@ type memoryBuckets struct { currentIdx int } +const maxActiveRequests = 100000 + type ResourceTracker struct { - cpuData map[string]time.Duration memoryData map[string]*memoryBuckets lastUpdate map[string]time.Time // Track last update per requestID @@ -20,15 +21,12 @@ type ResourceTracker struct { } type IResourceTracker interface { - AddDuration(requestID string, duration time.Duration) AddBytes(requestID string, bytes uint64) - GetSlowestQuery() (requestID string, duration time.Duration) GetHeaviestQuery() (requestID string, bytes uint64) } func NewResourceTracker() *ResourceTracker { rt := &ResourceTracker{ - cpuData: make(map[string]time.Duration), memoryData: make(map[string]*memoryBuckets), lastUpdate: make(map[string]time.Time), } @@ -39,27 +37,27 @@ func NewResourceTracker() *ResourceTracker { return rt } -func (rt *ResourceTracker) AddDuration(requestID string, duration time.Duration) { - rt.mu.Lock() - now := time.Now() - rt.cpuData[requestID] += duration - rt.lastUpdate[requestID] = now - rt.mu.Unlock() -} - func (rt *ResourceTracker) AddBytes(requestID string, bytes uint64) { rt.mu.Lock() - now := time.Now().Truncate(time.Second) + defer rt.mu.Unlock() + now := time.Now().Truncate(time.Second) + buckets, exists := rt.memoryData[requestID] if !exists { + // Check if we're at capacity + if len(rt.memoryData) >= maxActiveRequests { + // Evict oldest request + rt.evictOldest() + } + buckets = &memoryBuckets{ lastUpdate: now, currentIdx: 0, } rt.memoryData[requestID] = buckets } - + // Calculate seconds drift and rotate buckets if needed secondsDrift := int(now.Sub(buckets.lastUpdate).Seconds()) if secondsDrift > 0 { @@ -72,28 +70,10 @@ func (rt *ResourceTracker) AddBytes(requestID string, bytes uint64) { buckets.currentIdx = (buckets.currentIdx + secondsDrift) % 3 buckets.lastUpdate = now } - + // Add bytes to current bucket buckets.buckets[buckets.currentIdx] += bytes rt.lastUpdate[requestID] = time.Now() - rt.mu.Unlock() -} - -func (rt *ResourceTracker) GetSlowestQuery() (string, time.Duration) { - rt.mu.RLock() - defer rt.mu.RUnlock() - - var maxID string - var maxDuration time.Duration - - for id, duration := range rt.cpuData { - if duration > maxDuration { - maxDuration = duration - maxID = id - } - } - - return maxID, maxDuration } func (rt *ResourceTracker) GetHeaviestQuery() (string, uint64) { @@ -132,16 +112,32 @@ func (rt *ResourceTracker) cleanup() { defer rt.mu.Unlock() now := time.Now() - cutoff := now.Add(-5 * time.Second) + cutoff := now.Add(-3 * time.Second) // Remove stale requestIDs for requestID, lastUpdate := range rt.lastUpdate { if lastUpdate.Before(cutoff) { - delete(rt.cpuData, requestID) delete(rt.memoryData, requestID) delete(rt.lastUpdate, requestID) } } +} - // Memory buckets are self-cleaning via rotation, no additional cleanup needed +func (rt *ResourceTracker) evictOldest() { + var oldestID string + var oldestTime time.Time + + // Find oldest request + for requestID, lastUpdate := range rt.lastUpdate { + if oldestID == "" || lastUpdate.Before(oldestTime) { + oldestID = requestID + oldestTime = lastUpdate + } + } + + // Remove oldest request + if oldestID != "" { + delete(rt.memoryData, oldestID) + delete(rt.lastUpdate, oldestID) + } } From a076b446184951d57a076ce1ee9dec4b12ccbca3 Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Thu, 7 Aug 2025 08:08:37 -0700 Subject: [PATCH 3/5] Add unit tests Signed-off-by: Justin Jung --- pkg/util/resource/tracker_test.go | 206 ++++++++++++++++++++++++++++++ 1 file changed, 206 insertions(+) create mode 100644 pkg/util/resource/tracker_test.go diff --git a/pkg/util/resource/tracker_test.go b/pkg/util/resource/tracker_test.go new file mode 100644 index 00000000000..756085b661c --- /dev/null +++ b/pkg/util/resource/tracker_test.go @@ -0,0 +1,206 @@ +package resource + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestResourceTracker_AddBytes(t *testing.T) { + rt := &ResourceTracker{ + memoryData: make(map[string]*memoryBuckets), + lastUpdate: make(map[string]time.Time), + } + + rt.AddBytes("req1", 1000) + + assert.Len(t, rt.memoryData, 1) + assert.Contains(t, rt.memoryData, "req1") + assert.Equal(t, uint64(1000), rt.memoryData["req1"].buckets[0]) +} + +func TestResourceTracker_GetHeaviestQuery(t *testing.T) { + rt := &ResourceTracker{ + memoryData: make(map[string]*memoryBuckets), + lastUpdate: make(map[string]time.Time), + } + + rt.AddBytes("req1", 1000) + rt.AddBytes("req2", 2000) + rt.AddBytes("req3", 500) + + requestID, bytes := rt.GetHeaviestQuery() + assert.Equal(t, "req2", requestID) + assert.Equal(t, uint64(2000), bytes) +} + +func TestResourceTracker_EmptyTracker(t *testing.T) { + rt := &ResourceTracker{ + memoryData: make(map[string]*memoryBuckets), + lastUpdate: make(map[string]time.Time), + } + + requestID, bytes := rt.GetHeaviestQuery() + assert.Equal(t, "", requestID) + assert.Equal(t, uint64(0), bytes) +} + +func TestResourceTracker_SlidingWindow(t *testing.T) { + rt := &ResourceTracker{ + memoryData: make(map[string]*memoryBuckets), + lastUpdate: make(map[string]time.Time), + } + + // Add bytes at different times + rt.AddBytes("req1", 1000) + + // Simulate 1 second later + rt.mu.Lock() + rt.memoryData["req1"].lastUpdate = rt.memoryData["req1"].lastUpdate.Add(-1 * time.Second) + rt.mu.Unlock() + + rt.AddBytes("req1", 2000) + + // Should have both values in different buckets + _, bytes := rt.GetHeaviestQuery() + assert.Equal(t, uint64(3000), bytes) // 1000 + 2000 +} + +func TestResourceTracker_BucketRotation(t *testing.T) { + rt := &ResourceTracker{ + memoryData: make(map[string]*memoryBuckets), + lastUpdate: make(map[string]time.Time), + } + + rt.AddBytes("req1", 1000) + + // Simulate 4 seconds later (should clear old buckets) + rt.mu.Lock() + rt.memoryData["req1"].lastUpdate = rt.memoryData["req1"].lastUpdate.Add(-4 * time.Second) + rt.mu.Unlock() + + rt.AddBytes("req1", 2000) + + // Should only have the new value (old bucket cleared) + _, bytes := rt.GetHeaviestQuery() + assert.Equal(t, uint64(2000), bytes) +} + +func TestResourceTracker_Cleanup(t *testing.T) { + rt := &ResourceTracker{ + memoryData: make(map[string]*memoryBuckets), + lastUpdate: make(map[string]time.Time), + } + + rt.AddBytes("req1", 1000) + rt.AddBytes("req2", 2000) + + // Simulate old lastUpdate time + rt.mu.Lock() + rt.lastUpdate["req1"] = time.Now().Add(-5 * time.Second) + rt.mu.Unlock() + + rt.cleanup() + + assert.Len(t, rt.memoryData, 1) + assert.Contains(t, rt.memoryData, "req2") + assert.NotContains(t, rt.memoryData, "req1") +} + +func TestResourceTracker_MaxActiveRequests(t *testing.T) { + rt := &ResourceTracker{ + memoryData: make(map[string]*memoryBuckets), + lastUpdate: make(map[string]time.Time), + } + + // Manually set to limit for faster test + rt.mu.Lock() + for i := 0; i < maxActiveRequests; i++ { + rt.memoryData[fmt.Sprintf("req%d", i)] = &memoryBuckets{lastUpdate: time.Now()} + rt.lastUpdate[fmt.Sprintf("req%d", i)] = time.Now() + } + rt.mu.Unlock() + + // Add one more request (should trigger eviction) + rt.AddBytes("new_req", 9999) + + assert.Len(t, rt.memoryData, maxActiveRequests) + assert.Contains(t, rt.memoryData, "new_req") +} + +func TestResourceTracker_EvictOldest(t *testing.T) { + rt := &ResourceTracker{ + memoryData: make(map[string]*memoryBuckets), + lastUpdate: make(map[string]time.Time), + } + + now := time.Now() + + // Add requests with different timestamps + rt.memoryData["req1"] = &memoryBuckets{} + rt.lastUpdate["req1"] = now.Add(-10 * time.Second) // Oldest + + rt.memoryData["req2"] = &memoryBuckets{} + rt.lastUpdate["req2"] = now.Add(-5 * time.Second) + + rt.memoryData["req3"] = &memoryBuckets{} + rt.lastUpdate["req3"] = now + + rt.evictOldest() + + assert.Len(t, rt.memoryData, 2) + assert.NotContains(t, rt.memoryData, "req1") // Oldest should be evicted + assert.Contains(t, rt.memoryData, "req2") + assert.Contains(t, rt.memoryData, "req3") +} + +func TestResourceTracker_ConcurrentAccess(t *testing.T) { + rt := &ResourceTracker{ + memoryData: make(map[string]*memoryBuckets), + lastUpdate: make(map[string]time.Time), + } + + // Test concurrent writes + done := make(chan bool, 20) + for i := 0; i < 10; i++ { + go func(id int) { + for j := 0; j < 10; j++ { + rt.AddBytes(fmt.Sprintf("req%d", id), uint64(j)) + } + done <- true + }(i) + } + + // Test concurrent reads + for i := 0; i < 10; i++ { + go func() { + rt.GetHeaviestQuery() + done <- true + }() + } + + // Wait for all goroutines + for i := 0; i < 20; i++ { + <-done + } + + // Should have 10 requests + assert.Len(t, rt.memoryData, 10) +} + +func TestResourceTracker_AccumulateBytes(t *testing.T) { + rt := &ResourceTracker{ + memoryData: make(map[string]*memoryBuckets), + lastUpdate: make(map[string]time.Time), + } + + // Add bytes multiple times to same request + rt.AddBytes("req1", 1000) + rt.AddBytes("req1", 2000) + rt.AddBytes("req1", 3000) + + _, bytes := rt.GetHeaviestQuery() + assert.Equal(t, uint64(6000), bytes) // Should accumulate +} \ No newline at end of file From 0eef00e155b8cd3afc4f087896f7584156ed9aa7 Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Thu, 7 Aug 2025 08:44:17 -0700 Subject: [PATCH 4/5] Add service and tracking metric Signed-off-by: Justin Jung --- pkg/util/resource/tracker.go | 42 +++++++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/pkg/util/resource/tracker.go b/pkg/util/resource/tracker.go index 9ab48deadd3..8c29908f79d 100644 --- a/pkg/util/resource/tracker.go +++ b/pkg/util/resource/tracker.go @@ -1,8 +1,14 @@ package resource import ( + "context" "sync" "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/cortexproject/cortex/pkg/util/services" ) type memoryBuckets struct { @@ -14,6 +20,8 @@ type memoryBuckets struct { const maxActiveRequests = 100000 type ResourceTracker struct { + services.Service + memoryData map[string]*memoryBuckets lastUpdate map[string]time.Time // Track last update per requestID @@ -25,22 +33,24 @@ type IResourceTracker interface { GetHeaviestQuery() (requestID string, bytes uint64) } -func NewResourceTracker() *ResourceTracker { +func NewResourceTracker(registerer prometheus.Registerer) *ResourceTracker { rt := &ResourceTracker{ memoryData: make(map[string]*memoryBuckets), lastUpdate: make(map[string]time.Time), } - // Start cleanup goroutine - go rt.cleanupLoop() + promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{ + Name: "cortex_resource_tracker_active_requests", + }, rt.activeRequestCount) + rt.Service = services.NewBasicService(nil, rt.running, nil) return rt } func (rt *ResourceTracker) AddBytes(requestID string, bytes uint64) { rt.mu.Lock() defer rt.mu.Unlock() - + now := time.Now().Truncate(time.Second) buckets, exists := rt.memoryData[requestID] @@ -50,7 +60,7 @@ func (rt *ResourceTracker) AddBytes(requestID string, bytes uint64) { // Evict oldest request rt.evictOldest() } - + buckets = &memoryBuckets{ lastUpdate: now, currentIdx: 0, @@ -98,15 +108,27 @@ func (rt *ResourceTracker) GetHeaviestQuery() (string, uint64) { return maxID, maxBytes } -func (rt *ResourceTracker) cleanupLoop() { +func (rt *ResourceTracker) running(ctx context.Context) error { ticker := time.NewTicker(time.Second) defer ticker.Stop() - for range ticker.C { - rt.cleanup() + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + rt.cleanup() + } } } +func (rt *ResourceTracker) activeRequestCount() float64 { + rt.mu.RLock() + defer rt.mu.RUnlock() + + return float64(len(rt.lastUpdate)) +} + func (rt *ResourceTracker) cleanup() { rt.mu.Lock() defer rt.mu.Unlock() @@ -126,7 +148,7 @@ func (rt *ResourceTracker) cleanup() { func (rt *ResourceTracker) evictOldest() { var oldestID string var oldestTime time.Time - + // Find oldest request for requestID, lastUpdate := range rt.lastUpdate { if oldestID == "" || lastUpdate.Before(oldestTime) { @@ -134,7 +156,7 @@ func (rt *ResourceTracker) evictOldest() { oldestTime = lastUpdate } } - + // Remove oldest request if oldestID != "" { delete(rt.memoryData, oldestID) From 48ed7c9eab56865bc6cd0c45777a9db90521e7e6 Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Thu, 7 Aug 2025 11:19:06 -0700 Subject: [PATCH 5/5] Make constants configurable Signed-off-by: Justin Jung --- pkg/util/resource/tracker.go | 44 +++++++------ pkg/util/resource/tracker_test.go | 100 ++++++++++++++++++------------ 2 files changed, 80 insertions(+), 64 deletions(-) diff --git a/pkg/util/resource/tracker.go b/pkg/util/resource/tracker.go index 8c29908f79d..805ae137353 100644 --- a/pkg/util/resource/tracker.go +++ b/pkg/util/resource/tracker.go @@ -12,18 +12,17 @@ import ( ) type memoryBuckets struct { - buckets [3]uint64 // 3 buckets for 3 seconds + buckets []uint64 lastUpdate time.Time currentIdx int } -const maxActiveRequests = 100000 - type ResourceTracker struct { services.Service - memoryData map[string]*memoryBuckets - lastUpdate map[string]time.Time // Track last update per requestID + memoryData map[string]*memoryBuckets + windowSize int + maxActiveRequests int mu sync.RWMutex } @@ -33,10 +32,11 @@ type IResourceTracker interface { GetHeaviestQuery() (requestID string, bytes uint64) } -func NewResourceTracker(registerer prometheus.Registerer) *ResourceTracker { +func NewResourceTracker(windowSize, maxActiveRequests int, registerer prometheus.Registerer) *ResourceTracker { rt := &ResourceTracker{ - memoryData: make(map[string]*memoryBuckets), - lastUpdate: make(map[string]time.Time), + memoryData: make(map[string]*memoryBuckets), + windowSize: windowSize, + maxActiveRequests: maxActiveRequests, } promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{ @@ -56,12 +56,13 @@ func (rt *ResourceTracker) AddBytes(requestID string, bytes uint64) { buckets, exists := rt.memoryData[requestID] if !exists { // Check if we're at capacity - if len(rt.memoryData) >= maxActiveRequests { + if len(rt.memoryData) >= rt.maxActiveRequests { // Evict oldest request rt.evictOldest() } buckets = &memoryBuckets{ + buckets: make([]uint64, rt.windowSize), lastUpdate: now, currentIdx: 0, } @@ -72,18 +73,17 @@ func (rt *ResourceTracker) AddBytes(requestID string, bytes uint64) { secondsDrift := int(now.Sub(buckets.lastUpdate).Seconds()) if secondsDrift > 0 { // Clear old buckets - for i := 0; i < min(secondsDrift, 3); i++ { - nextIdx := (buckets.currentIdx + 1 + i) % 3 + for i := 0; i < min(secondsDrift, rt.windowSize); i++ { + nextIdx := (buckets.currentIdx + 1 + i) % rt.windowSize buckets.buckets[nextIdx] = 0 } // Update current index - buckets.currentIdx = (buckets.currentIdx + secondsDrift) % 3 + buckets.currentIdx = (buckets.currentIdx + secondsDrift) % rt.windowSize buckets.lastUpdate = now } // Add bytes to current bucket buckets.buckets[buckets.currentIdx] += bytes - rt.lastUpdate[requestID] = time.Now() } func (rt *ResourceTracker) GetHeaviestQuery() (string, uint64) { @@ -94,7 +94,7 @@ func (rt *ResourceTracker) GetHeaviestQuery() (string, uint64) { var maxBytes uint64 for id, buckets := range rt.memoryData { - // Sum all buckets (represents last 3 seconds) + // Sum all buckets var totalBytes uint64 for _, bytes := range buckets.buckets { totalBytes += bytes @@ -126,7 +126,7 @@ func (rt *ResourceTracker) activeRequestCount() float64 { rt.mu.RLock() defer rt.mu.RUnlock() - return float64(len(rt.lastUpdate)) + return float64(len(rt.memoryData)) } func (rt *ResourceTracker) cleanup() { @@ -134,13 +134,12 @@ func (rt *ResourceTracker) cleanup() { defer rt.mu.Unlock() now := time.Now() - cutoff := now.Add(-3 * time.Second) + cutoff := now.Add(-time.Duration(rt.windowSize) * time.Second) // Remove stale requestIDs - for requestID, lastUpdate := range rt.lastUpdate { - if lastUpdate.Before(cutoff) { + for requestID, buckets := range rt.memoryData { + if buckets.lastUpdate.Before(cutoff) { delete(rt.memoryData, requestID) - delete(rt.lastUpdate, requestID) } } } @@ -150,16 +149,15 @@ func (rt *ResourceTracker) evictOldest() { var oldestTime time.Time // Find oldest request - for requestID, lastUpdate := range rt.lastUpdate { - if oldestID == "" || lastUpdate.Before(oldestTime) { + for requestID, buckets := range rt.memoryData { + if oldestID == "" || buckets.lastUpdate.Before(oldestTime) { oldestID = requestID - oldestTime = lastUpdate + oldestTime = buckets.lastUpdate } } // Remove oldest request if oldestID != "" { delete(rt.memoryData, oldestID) - delete(rt.lastUpdate, oldestID) } } diff --git a/pkg/util/resource/tracker_test.go b/pkg/util/resource/tracker_test.go index 756085b661c..6f055211bac 100644 --- a/pkg/util/resource/tracker_test.go +++ b/pkg/util/resource/tracker_test.go @@ -10,8 +10,9 @@ import ( func TestResourceTracker_AddBytes(t *testing.T) { rt := &ResourceTracker{ - memoryData: make(map[string]*memoryBuckets), - lastUpdate: make(map[string]time.Time), + memoryData: make(map[string]*memoryBuckets), + windowSize: 3, + maxActiveRequests: 10, } rt.AddBytes("req1", 1000) @@ -23,8 +24,9 @@ func TestResourceTracker_AddBytes(t *testing.T) { func TestResourceTracker_GetHeaviestQuery(t *testing.T) { rt := &ResourceTracker{ - memoryData: make(map[string]*memoryBuckets), - lastUpdate: make(map[string]time.Time), + memoryData: make(map[string]*memoryBuckets), + windowSize: 3, + maxActiveRequests: 10, } rt.AddBytes("req1", 1000) @@ -38,8 +40,9 @@ func TestResourceTracker_GetHeaviestQuery(t *testing.T) { func TestResourceTracker_EmptyTracker(t *testing.T) { rt := &ResourceTracker{ - memoryData: make(map[string]*memoryBuckets), - lastUpdate: make(map[string]time.Time), + memoryData: make(map[string]*memoryBuckets), + windowSize: 3, + maxActiveRequests: 10, } requestID, bytes := rt.GetHeaviestQuery() @@ -49,20 +52,21 @@ func TestResourceTracker_EmptyTracker(t *testing.T) { func TestResourceTracker_SlidingWindow(t *testing.T) { rt := &ResourceTracker{ - memoryData: make(map[string]*memoryBuckets), - lastUpdate: make(map[string]time.Time), + memoryData: make(map[string]*memoryBuckets), + windowSize: 3, + maxActiveRequests: 10, } // Add bytes at different times rt.AddBytes("req1", 1000) - + // Simulate 1 second later rt.mu.Lock() rt.memoryData["req1"].lastUpdate = rt.memoryData["req1"].lastUpdate.Add(-1 * time.Second) rt.mu.Unlock() - + rt.AddBytes("req1", 2000) - + // Should have both values in different buckets _, bytes := rt.GetHeaviestQuery() assert.Equal(t, uint64(3000), bytes) // 1000 + 2000 @@ -70,19 +74,20 @@ func TestResourceTracker_SlidingWindow(t *testing.T) { func TestResourceTracker_BucketRotation(t *testing.T) { rt := &ResourceTracker{ - memoryData: make(map[string]*memoryBuckets), - lastUpdate: make(map[string]time.Time), + memoryData: make(map[string]*memoryBuckets), + windowSize: 3, + maxActiveRequests: 10, } rt.AddBytes("req1", 1000) - + // Simulate 4 seconds later (should clear old buckets) rt.mu.Lock() rt.memoryData["req1"].lastUpdate = rt.memoryData["req1"].lastUpdate.Add(-4 * time.Second) rt.mu.Unlock() - + rt.AddBytes("req1", 2000) - + // Should only have the new value (old bucket cleared) _, bytes := rt.GetHeaviestQuery() assert.Equal(t, uint64(2000), bytes) @@ -90,8 +95,9 @@ func TestResourceTracker_BucketRotation(t *testing.T) { func TestResourceTracker_Cleanup(t *testing.T) { rt := &ResourceTracker{ - memoryData: make(map[string]*memoryBuckets), - lastUpdate: make(map[string]time.Time), + memoryData: make(map[string]*memoryBuckets), + windowSize: 3, + maxActiveRequests: 10, } rt.AddBytes("req1", 1000) @@ -99,7 +105,7 @@ func TestResourceTracker_Cleanup(t *testing.T) { // Simulate old lastUpdate time rt.mu.Lock() - rt.lastUpdate["req1"] = time.Now().Add(-5 * time.Second) + rt.memoryData["req1"].lastUpdate = time.Now().Add(-5 * time.Second) rt.mu.Unlock() rt.cleanup() @@ -111,42 +117,52 @@ func TestResourceTracker_Cleanup(t *testing.T) { func TestResourceTracker_MaxActiveRequests(t *testing.T) { rt := &ResourceTracker{ - memoryData: make(map[string]*memoryBuckets), - lastUpdate: make(map[string]time.Time), + memoryData: make(map[string]*memoryBuckets), + windowSize: 3, + maxActiveRequests: 100, } // Manually set to limit for faster test rt.mu.Lock() - for i := 0; i < maxActiveRequests; i++ { - rt.memoryData[fmt.Sprintf("req%d", i)] = &memoryBuckets{lastUpdate: time.Now()} - rt.lastUpdate[fmt.Sprintf("req%d", i)] = time.Now() + for i := 0; i < rt.maxActiveRequests; i++ { + rt.memoryData[fmt.Sprintf("req%d", i)] = &memoryBuckets{ + buckets: make([]uint64, rt.windowSize), + lastUpdate: time.Now(), + } } rt.mu.Unlock() // Add one more request (should trigger eviction) rt.AddBytes("new_req", 9999) - assert.Len(t, rt.memoryData, maxActiveRequests) + assert.Len(t, rt.memoryData, rt.maxActiveRequests) assert.Contains(t, rt.memoryData, "new_req") } func TestResourceTracker_EvictOldest(t *testing.T) { rt := &ResourceTracker{ - memoryData: make(map[string]*memoryBuckets), - lastUpdate: make(map[string]time.Time), + memoryData: make(map[string]*memoryBuckets), + windowSize: 3, + maxActiveRequests: 10, } now := time.Now() - + // Add requests with different timestamps - rt.memoryData["req1"] = &memoryBuckets{} - rt.lastUpdate["req1"] = now.Add(-10 * time.Second) // Oldest - - rt.memoryData["req2"] = &memoryBuckets{} - rt.lastUpdate["req2"] = now.Add(-5 * time.Second) - - rt.memoryData["req3"] = &memoryBuckets{} - rt.lastUpdate["req3"] = now + rt.memoryData["req1"] = &memoryBuckets{ + buckets: make([]uint64, rt.windowSize), + lastUpdate: now.Add(-10 * time.Second), // Oldest + } + + rt.memoryData["req2"] = &memoryBuckets{ + buckets: make([]uint64, rt.windowSize), + lastUpdate: now.Add(-5 * time.Second), + } + + rt.memoryData["req3"] = &memoryBuckets{ + buckets: make([]uint64, rt.windowSize), + lastUpdate: now, + } rt.evictOldest() @@ -158,8 +174,9 @@ func TestResourceTracker_EvictOldest(t *testing.T) { func TestResourceTracker_ConcurrentAccess(t *testing.T) { rt := &ResourceTracker{ - memoryData: make(map[string]*memoryBuckets), - lastUpdate: make(map[string]time.Time), + memoryData: make(map[string]*memoryBuckets), + windowSize: 3, + maxActiveRequests: 10, } // Test concurrent writes @@ -192,8 +209,9 @@ func TestResourceTracker_ConcurrentAccess(t *testing.T) { func TestResourceTracker_AccumulateBytes(t *testing.T) { rt := &ResourceTracker{ - memoryData: make(map[string]*memoryBuckets), - lastUpdate: make(map[string]time.Time), + memoryData: make(map[string]*memoryBuckets), + windowSize: 3, + maxActiveRequests: 10, } // Add bytes multiple times to same request @@ -203,4 +221,4 @@ func TestResourceTracker_AccumulateBytes(t *testing.T) { _, bytes := rt.GetHeaviestQuery() assert.Equal(t, uint64(6000), bytes) // Should accumulate -} \ No newline at end of file +}