@@ -10,62 +10,110 @@ import (
1010 "go.uber.org/zap"
1111)
1212
13- // EvaluateAndAdjustConcurrency evaluates the HTTP response from a server along with the request's response time
14- // and adjusts the concurrency level of the system accordingly. It utilizes three monitoring functions:
15- // MonitorRateLimitHeaders, MonitorServerResponseCodes, and MonitorResponseTimeVariability, each of which
16- // provides feedback on different aspects of the response and system's current state. The function aggregates
17- // feedback from these monitoring functions to make a decision on whether to scale up or scale down the concurrency.
18- // The decision is based on a simple majority of suggestions: if more functions suggest scaling down (return -1),
19- // it scales down; if more suggest scaling up (return 1), it scales up. This method centralizes concurrency control
20- // decision-making, providing a systematic approach to managing request handling capacity based on real-time
21- // operational metrics.
13+ // Defined weights for the metrics
14+ var metricWeights = map [string ]float64 {
15+ "RateLimit" : 5.0 , // High importance
16+ "ServerError" : 3.0 , // High importance
17+ "ResponseTime" : 1.0 , // Lower importance
18+ }
19+
20+ // EvaluateAndAdjustConcurrency assesses the current state of system metrics and decides whether to scale
21+ // up or down the number of concurrent operations allowed. It employs a combination of strategies:
22+ // a weighted scoring system, threshold-based direct actions, and cumulative impact assessment.
23+ //
24+ // A weighted scoring system is used to prioritize the importance of different system metrics. Each metric
25+ // can influence the scaling decision based on its assigned weight, reflecting its relative impact on system performance.
26+ //
27+ // Threshold-based scaling provides a fast-track decision path for critical metrics that have exceeded predefined limits.
28+ // If a critical metric, such as the rate limit remaining slots or server error rates, crosses a specified threshold,
29+ // immediate action is taken to scale down the concurrency to prevent system overload.
30+ //
31+ // Cumulative impact assessment calculates a cumulative score from all monitored metrics, taking into account
32+ // their respective weights. This score determines the overall tendency of the system to either scale up or down.
33+ // If the score indicates a negative trend (i.e., below zero), the system will scale down to reduce load.
34+ // Conversely, a positive score suggests that there is capacity to handle more concurrent operations, leading
35+ // to a scale-up decision.
2236//
2337// Parameters:
38+ // - resp: The HTTP response received from the server, providing status codes and headers for rate limiting.
39+ // - responseTime: The time duration between sending the request and receiving the response, indicating the server's responsiveness.
2440//
25- // resp - The HTTP response received from the server.
26- // responseTime - The time duration between sending the request and receiving the response.
41+ // The function logs the decision process at each step, providing traceability and insight into the scaling mechanism.
42+ // The method should be called after each significant interaction with the external system (e.g., an HTTP request) to
43+ // ensure concurrency levels are adapted to current conditions.
2744//
28- // It logs the specific reason for scaling decisions, helping in traceability and fine-tuning system performance.
45+ // Returns: None. The function directly calls the ScaleUp or ScaleDown methods as needed.
46+ //
47+ // Note: This function does not return any value; it performs actions based on internal assessments and logs outcomes.
2948func (ch * ConcurrencyHandler ) EvaluateAndAdjustConcurrency (resp * http.Response , responseTime time.Duration ) {
30- // Call monitoring functions
3149 rateLimitFeedback := ch .MonitorRateLimitHeaders (resp )
3250 responseCodeFeedback := ch .MonitorServerResponseCodes (resp )
3351 responseTimeFeedback := ch .MonitorResponseTimeVariability (responseTime )
3452
35- // Log the feedback from each monitoring function for debugging
36- ch .logger .Debug ("Concurrency Adjustment Feedback" ,
37- zap .Int ("RateLimitFeedback" , rateLimitFeedback ),
38- zap .Int ("ResponseCodeFeedback" , responseCodeFeedback ),
39- zap .Int ("ResponseTimeFeedback" , responseTimeFeedback ))
40-
41- // Determine overall action based on feedback
42- suggestions := []int {rateLimitFeedback , responseCodeFeedback , responseTimeFeedback }
43- scaleDownCount := 0
44- scaleUpCount := 0
45-
46- for _ , suggestion := range suggestions {
47- switch suggestion {
48- case - 1 :
49- scaleDownCount ++
50- case 1 :
51- scaleUpCount ++
52- }
53+ // Use weighted scores for each metric.
54+ weightedRateLimitScore := float64 (rateLimitFeedback ) * metricWeights ["RateLimit" ]
55+ weightedResponseCodeScore := float64 (responseCodeFeedback ) * metricWeights ["ServerError" ]
56+ weightedResponseTimeScore := float64 (responseTimeFeedback ) * metricWeights ["ResponseTime" ]
57+
58+ // Calculate the cumulative score.
59+ cumulativeScore := weightedRateLimitScore + weightedResponseCodeScore + weightedResponseTimeScore
60+
61+ // Log the feedback from each monitoring function for debugging.
62+ ch .logger .Debug ("Evaluate and Adjust Concurrency" ,
63+ zap .String ("event" , "EvaluateConcurrency" ),
64+ zap .Float64 ("weightedRateLimitScore" , weightedRateLimitScore ),
65+ zap .Float64 ("weightedResponseCodeScore" , weightedResponseCodeScore ),
66+ zap .Float64 ("weightedResponseTimeScore" , weightedResponseTimeScore ),
67+ zap .Float64 ("cumulativeScore" , cumulativeScore ),
68+ zap .Int ("rateLimitFeedback" , rateLimitFeedback ),
69+ zap .Int ("responseCodeFeedback" , responseCodeFeedback ),
70+ zap .Int ("responseTimeFeedback" , responseTimeFeedback ),
71+ zap .Duration ("responseTime" , responseTime ),
72+ )
73+
74+ // Check critical thresholds
75+ if rateLimitFeedback <= RateLimitCriticalThreshold || weightedResponseCodeScore >= ErrorResponseThreshold {
76+ ch .logger .Warn ("Scaling down due to critical threshold breach" ,
77+ zap .String ("event" , "CriticalThresholdBreach" ),
78+ zap .Int ("rateLimitFeedback" , rateLimitFeedback ),
79+ zap .Float64 ("errorResponseRate" , weightedResponseCodeScore ),
80+ )
81+ ch .ScaleDown ()
82+ return
5383 }
5484
55- // Log the counts for scale down and up suggestions
56- ch .logger .Info ("Scaling Decision Counts" ,
57- zap .Int ("ScaleDownCount" , scaleDownCount ),
58- zap .Int ("ScaleUpCount" , scaleUpCount ))
59-
60- // Decide on scaling action
61- if scaleDownCount > scaleUpCount {
62- ch .logger .Info ("Scaling down the concurrency" , zap .String ("Reason" , "More signals suggested to decrease concurrency" ))
85+ // Evaluate cumulative impact and make a scaling decision.
86+ if cumulativeScore < 0 {
87+ utilizedBefore := len (ch .sem ) // Tokens in use before scaling down.
6388 ch .ScaleDown ()
64- } else if scaleUpCount > scaleDownCount {
65- ch .logger .Info ("Scaling up the concurrency" , zap .String ("Reason" , "More signals suggested to increase concurrency" ))
89+ utilizedAfter := len (ch .sem ) // Tokens in use after scaling down.
90+ ch .logger .Info ("Concurrency scaling decision: scale down." ,
91+ zap .Float64 ("cumulativeScore" , cumulativeScore ),
92+ zap .Int ("utilizedTokensBefore" , utilizedBefore ),
93+ zap .Int ("utilizedTokensAfter" , utilizedAfter ),
94+ zap .Int ("availableTokensBefore" , cap (ch .sem )- utilizedBefore ),
95+ zap .Int ("availableTokensAfter" , cap (ch .sem )- utilizedAfter ),
96+ zap .String ("reason" , "Cumulative impact of metrics suggested an overload." ),
97+ )
98+ } else if cumulativeScore > 0 {
99+ utilizedBefore := len (ch .sem ) // Tokens in use before scaling up.
66100 ch .ScaleUp ()
101+ utilizedAfter := len (ch .sem ) // Tokens in use after scaling up.
102+ ch .logger .Info ("Concurrency scaling decision: scale up." ,
103+ zap .Float64 ("cumulativeScore" , cumulativeScore ),
104+ zap .Int ("utilizedTokensBefore" , utilizedBefore ),
105+ zap .Int ("utilizedTokensAfter" , utilizedAfter ),
106+ zap .Int ("availableTokensBefore" , cap (ch .sem )- utilizedBefore ),
107+ zap .Int ("availableTokensAfter" , cap (ch .sem )- utilizedAfter ),
108+ zap .String ("reason" , "Metrics indicate available resources to handle more load." ),
109+ )
67110 } else {
68- ch .logger .Info ("No change in concurrency" , zap .String ("Reason" , "Equal signals for both scaling up and down" ))
111+ ch .logger .Info ("Concurrency scaling decision: no change." ,
112+ zap .Float64 ("cumulativeScore" , cumulativeScore ),
113+ zap .Int ("currentUtilizedTokens" , len (ch .sem )),
114+ zap .Int ("currentAvailableTokens" , cap (ch .sem )- len (ch .sem )),
115+ zap .String ("reason" , "Metrics are stable, maintaining current concurrency level." ),
116+ )
69117 }
70118}
71119
0 commit comments