1+ // concurrency/metrics.go
12package concurrency
23
34import (
@@ -9,48 +10,94 @@ import (
910 "go.uber.org/zap"
1011)
1112
12- // MonitorRateLimitHeaders monitors the rate limit headers (X-RateLimit-Remaining and Retry-After)
13- // in the HTTP response and adjusts concurrency accordingly.
14- // If X-RateLimit-Remaining is below a threshold or Retry-After is specified, decrease concurrency.
15- // If neither condition is met, consider scaling up if concurrency is below the maximum limit.
16- // - Threshold for X-RateLimit-Remaining: 10
17- // - Maximum concurrency: MaxConcurrency
18- func (ch * ConcurrencyHandler ) MonitorRateLimitHeaders (resp * http.Response ) {
19- // Extract X-RateLimit-Remaining and Retry-After headers from the response
13+ // EvaluateAndAdjustConcurrency evaluates the HTTP response from a server along with the request's response time
14+ // and adjusts the concurrency level of the system accordingly. It utilizes three monitoring functions:
15+ // MonitorRateLimitHeaders, MonitorServerResponseCodes, and MonitorResponseTimeVariability, each of which
16+ // provides feedback on different aspects of the response and system's current state. The function aggregates
17+ // feedback from these monitoring functions to make a decision on whether to scale up or scale down the concurrency.
18+ // The decision is based on a simple majority of suggestions: if more functions suggest scaling down (return -1),
19+ // it scales down; if more suggest scaling up (return 1), it scales up. This method centralizes concurrency control
20+ // decision-making, providing a systematic approach to managing request handling capacity based on real-time
21+ // operational metrics.
22+ //
23+ // Parameters:
24+ //
25+ // resp - The HTTP response received from the server.
26+ // responseTime - The time duration between sending the request and receiving the response.
27+ //
28+ // It logs the specific reason for scaling decisions, helping in traceability and fine-tuning system performance.
29+ func (ch * ConcurrencyHandler ) EvaluateAndAdjustConcurrency (resp * http.Response , responseTime time.Duration ) {
30+ // Call monitoring functions
31+ rateLimitFeedback := ch .MonitorRateLimitHeaders (resp )
32+ responseCodeFeedback := ch .MonitorServerResponseCodes (resp )
33+ responseTimeFeedback := ch .MonitorResponseTimeVariability (responseTime )
34+
35+ // Log the feedback from each monitoring function for debugging
36+ ch .logger .Debug ("Concurrency Adjustment Feedback" ,
37+ zap .Int ("RateLimitFeedback" , rateLimitFeedback ),
38+ zap .Int ("ResponseCodeFeedback" , responseCodeFeedback ),
39+ zap .Int ("ResponseTimeFeedback" , responseTimeFeedback ))
40+
41+ // Determine overall action based on feedback
42+ suggestions := []int {rateLimitFeedback , responseCodeFeedback , responseTimeFeedback }
43+ scaleDownCount := 0
44+ scaleUpCount := 0
45+
46+ for _ , suggestion := range suggestions {
47+ switch suggestion {
48+ case - 1 :
49+ scaleDownCount ++
50+ case 1 :
51+ scaleUpCount ++
52+ }
53+ }
54+
55+ // Log the counts for scale down and up suggestions
56+ ch .logger .Info ("Scaling Decision Counts" ,
57+ zap .Int ("ScaleDownCount" , scaleDownCount ),
58+ zap .Int ("ScaleUpCount" , scaleUpCount ))
59+
60+ // Decide on scaling action
61+ if scaleDownCount > scaleUpCount {
62+ ch .logger .Info ("Scaling down the concurrency" , zap .String ("Reason" , "More signals suggested to decrease concurrency" ))
63+ ch .ScaleDown ()
64+ } else if scaleUpCount > scaleDownCount {
65+ ch .logger .Info ("Scaling up the concurrency" , zap .String ("Reason" , "More signals suggested to increase concurrency" ))
66+ ch .ScaleUp ()
67+ } else {
68+ ch .logger .Info ("No change in concurrency" , zap .String ("Reason" , "Equal signals for both scaling up and down" ))
69+ }
70+ }
71+
72+ // MonitorRateLimitHeaders monitors the rate limit headers in the response and suggests a concurrency adjustment.
73+ func (ch * ConcurrencyHandler ) MonitorRateLimitHeaders (resp * http.Response ) int {
2074 remaining := resp .Header .Get ("X-RateLimit-Remaining" )
2175 retryAfter := resp .Header .Get ("Retry-After" )
76+ suggestion := 0
2277
2378 if remaining != "" {
2479 remainingValue , err := strconv .Atoi (remaining )
2580 if err == nil && remainingValue < 10 {
26- // Decrease concurrency if X-RateLimit-Remaining is below the threshold
27- if len (ch .sem ) > MinConcurrency {
28- newSize := len (ch .sem ) - 1
29- ch .logger .Info ("Reducing concurrency due to low X-RateLimit-Remaining" , zap .Int ("NewSize" , newSize ))
30- ch .ResizeSemaphore (newSize )
31- }
81+ // Suggest decrease concurrency if X-RateLimit-Remaining is below the threshold
82+ suggestion = - 1
3283 }
3384 }
3485
3586 if retryAfter != "" {
36- // Decrease concurrency if Retry-After is specified
37- if len (ch .sem ) > MinConcurrency {
38- newSize := len (ch .sem ) - 1
39- ch .logger .Info ("Reducing concurrency due to Retry-After header" , zap .Int ("NewSize" , newSize ))
40- ch .ResizeSemaphore (newSize )
41- }
87+ // Suggest decrease concurrency if Retry-After is specified
88+ suggestion = - 1
4289 } else {
43- // Scale up if concurrency is below the maximum limit
44- if len (ch .sem ) < MaxConcurrency {
45- newSize := len (ch .sem ) + 1
46- ch .logger .Info ("Increasing concurrency" , zap .Int ("NewSize" , newSize ))
47- ch .ResizeSemaphore (newSize )
90+ // Suggest increase concurrency if currently below maximum limit and no other decrease suggestion has been made
91+ if len (ch .sem ) < MaxConcurrency && suggestion == 0 {
92+ suggestion = 1
4893 }
4994 }
95+
96+ return suggestion
5097}
5198
52- // MonitorServerResponseCodes monitors server response codes and adjusts concurrency accordingly .
53- func (ch * ConcurrencyHandler ) MonitorServerResponseCodes (resp * http.Response ) {
99+ // MonitorServerResponseCodes monitors the response status codes and suggests a concurrency adjustment .
100+ func (ch * ConcurrencyHandler ) MonitorServerResponseCodes (resp * http.Response ) int {
54101 statusCode := resp .StatusCode
55102
56103 // Lock the metrics to ensure thread safety
@@ -63,7 +110,6 @@ func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) {
63110 ch .Metrics .TotalRateLimitErrors ++
64111 case statusCode >= 400 && statusCode < 500 :
65112 // Assuming 4xx errors as client errors
66- // Increase the TotalRetries count to indicate a client error
67113 ch .Metrics .TotalRetries ++
68114 }
69115
@@ -75,23 +121,19 @@ func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) {
75121 // Set the new error rate in the metrics
76122 ch .Metrics .ResponseCodeMetrics .ErrorRate = errorRate
77123
78- // Check if the error rate exceeds the threshold and adjust concurrency accordingly
79- if errorRate > ErrorRateThreshold && len (ch .sem ) > MinConcurrency {
80- // Decrease concurrency
81- newSize := len (ch .sem ) - 1
82- ch .logger .Info ("Reducing request concurrency due to high error rate" , zap .Int ("NewSize" , newSize ))
83- ch .ResizeSemaphore (newSize )
124+ // Determine action based on the error rate
125+ if errorRate > ErrorRateThreshold {
126+ // Suggest decrease concurrency
127+ return - 1
84128 } else if errorRate <= ErrorRateThreshold && len (ch .sem ) < MaxConcurrency {
85- // Scale up if error rate is below the threshold and concurrency is below the maximum limit
86- newSize := len (ch .sem ) + 1
87- ch .logger .Info ("Increasing request concurrency due to low error rate" , zap .Int ("NewSize" , newSize ))
88- ch .ResizeSemaphore (newSize )
129+ // Suggest increase concurrency if there is capacity
130+ return 1
89131 }
132+ return 0
90133}
91134
92- // MonitorResponseTimeVariability calculates the standard deviation of response times
93- // and uses moving averages to smooth out fluctuations, adjusting concurrency accordingly.
94- func (ch * ConcurrencyHandler ) MonitorResponseTimeVariability (responseTime time.Duration ) {
135+ // MonitorResponseTimeVariability monitors the response time variability and suggests a concurrency adjustment.
136+ func (ch * ConcurrencyHandler ) MonitorResponseTimeVariability (responseTime time.Duration ) int {
95137 ch .Metrics .Lock .Lock ()
96138 defer ch .Metrics .Lock .Unlock ()
97139
@@ -110,17 +152,15 @@ func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.D
110152 // Calculate standard deviation of response times
111153 stdDev := math .Sqrt (ch .Metrics .ResponseTimeVariability .Variance )
112154
113- // Adjust concurrency based on response time variability
114- if stdDev > ch .Metrics .ResponseTimeVariability .StdDevThreshold && len (ch .sem ) > MinConcurrency {
115- newSize := len (ch .sem ) - 1
116- ch .logger .Info ("Reducing request concurrency due to high response time variability" , zap .Int ("NewSize" , newSize ))
117- ch .ResizeSemaphore (newSize )
155+ // Determine action based on standard deviation
156+ if stdDev > ch .Metrics .ResponseTimeVariability .StdDevThreshold {
157+ // Suggest decrease concurrency
158+ return - 1
118159 } else if stdDev <= ch .Metrics .ResponseTimeVariability .StdDevThreshold && len (ch .sem ) < MaxConcurrency {
119- // Scale up if response time variability is below the threshold and concurrency is below the maximum limit
120- newSize := len (ch .sem ) + 1
121- ch .logger .Info ("Increasing request concurrency due to low response time variability" , zap .Int ("NewSize" , newSize ))
122- ch .ResizeSemaphore (newSize )
160+ // Suggest increase concurrency if there is capacity
161+ return 1
123162 }
163+ return 0
124164}
125165
126166// calculateVariance calculates the variance of response times.
@@ -134,35 +174,3 @@ func (ch *ConcurrencyHandler) calculateVariance(averageResponseTime time.Duratio
134174 ch .Metrics .ResponseTimeVariability .Variance = variance
135175 return variance
136176}
137-
138- // MonitorNetworkLatency measures Time to First Byte (TTFB) and monitors network throughput,
139- // adjusting concurrency based on changes in network latency and throughput.
140- func (ch * ConcurrencyHandler ) MonitorNetworkLatency (ttfb time.Duration , throughput float64 ) {
141- ch .Metrics .Lock .Lock ()
142- defer ch .Metrics .Lock .Unlock ()
143-
144- // Calculate the TTFB moving average
145- ch .Metrics .TTFB .Lock .Lock ()
146- defer ch .Metrics .TTFB .Lock .Unlock ()
147- ch .Metrics .TTFB .Total += ttfb
148- ch .Metrics .TTFB .Count ++
149- ttfbMovingAverage := ch .Metrics .TTFB .Total / time .Duration (ch .Metrics .TTFB .Count )
150-
151- // Calculate the throughput moving average
152- ch .Metrics .Throughput .Lock .Lock ()
153- defer ch .Metrics .Throughput .Lock .Unlock ()
154- ch .Metrics .Throughput .Total += throughput
155- ch .Metrics .Throughput .Count ++
156- throughputMovingAverage := ch .Metrics .Throughput .Total / float64 (ch .Metrics .Throughput .Count )
157-
158- // Adjust concurrency based on TTFB and throughput moving averages
159- if ttfbMovingAverage > MaxAcceptableTTFB && len (ch .sem ) > MinConcurrency {
160- newSize := len (ch .sem ) - 1
161- ch .logger .Info ("Reducing request concurrency due to high TTFB" , zap .Int ("NewSize" , newSize ))
162- ch .ResizeSemaphore (newSize )
163- } else if throughputMovingAverage > MaxAcceptableThroughput && len (ch .sem ) < MaxConcurrency {
164- newSize := len (ch .sem ) + 1
165- ch .logger .Info ("Increasing request concurrency due to high throughput" , zap .Int ("NewSize" , newSize ))
166- ch .ResizeSemaphore (newSize )
167- }
168- }
0 commit comments