@@ -19,12 +19,12 @@ package autoscaler
1919import (
2020 "fmt"
2121 "math"
22- "sync"
2322 "time"
2423
2524 "github.com/cortexlabs/cortex/pkg/lib/cron"
2625 "github.com/cortexlabs/cortex/pkg/lib/errors"
2726 libmath "github.com/cortexlabs/cortex/pkg/lib/math"
27+ "github.com/cortexlabs/cortex/pkg/lib/pointer"
2828 "github.com/cortexlabs/cortex/pkg/lib/telemetry"
2929 libtime "github.com/cortexlabs/cortex/pkg/lib/time"
3030 "github.com/cortexlabs/cortex/pkg/types/spec"
@@ -44,19 +44,18 @@ type Scaler interface {
4444}
4545
4646type Autoscaler struct {
47- sync.Mutex
48- logger * zap.SugaredLogger
49- crons map [string ]cron.Cron
50- scalers map [userconfig.Kind ]Scaler
51- lastAwakenTimestamp map [string ]time.Time
47+ logger * zap.SugaredLogger
48+ crons map [string ]cron.Cron
49+ scalers map [userconfig.Kind ]Scaler
50+ recs map [string ]* recommendations
5251}
5352
5453func New (logger * zap.SugaredLogger ) * Autoscaler {
5554 return & Autoscaler {
56- logger : logger ,
57- crons : make (map [string ]cron.Cron ),
58- scalers : make (map [userconfig.Kind ]Scaler ),
59- lastAwakenTimestamp : make (map [string ]time. Time ),
55+ logger : logger ,
56+ crons : make (map [string ]cron.Cron ),
57+ scalers : make (map [userconfig.Kind ]Scaler ),
58+ recs : make (map [string ]* recommendations ),
6059 }
6160}
6261
@@ -65,9 +64,6 @@ func (a *Autoscaler) AddScaler(scaler Scaler, kind userconfig.Kind) {
6564}
6665
6766func (a * Autoscaler ) Awaken (api userconfig.Resource ) error {
68- a .Lock ()
69- defer a .Unlock ()
70-
7167 scaler , ok := a .scalers [api .Kind ]
7268 if ! ok {
7369 return errors .ErrorUnexpected (
@@ -94,7 +90,7 @@ func (a *Autoscaler) Awaken(api userconfig.Resource) error {
9490 return errors .Wrap (err , "failed to scale api to one" )
9591 }
9692
97- a .lastAwakenTimestamp [api .Name ] = time . Now ( )
93+ a .recs [api .Name ]. add ( 1 )
9894
9995 return nil
10096}
@@ -104,11 +100,6 @@ func (a *Autoscaler) AddAPI(api userconfig.Resource) error {
104100 return nil
105101 }
106102
107- autoscaleFn , err := a .autoscaleFn (api )
108- if err != nil {
109- return err
110- }
111-
112103 errorHandler := func (err error ) {
113104 log := a .logger .With (
114105 zap .String ("apiName" , api .Name ),
@@ -119,12 +110,12 @@ func (a *Autoscaler) AddAPI(api userconfig.Resource) error {
119110 telemetry .Error (err )
120111 }
121112
122- a .crons [api .Name ] = cron .Run (autoscaleFn , errorHandler , spec .AutoscalingTickInterval )
113+ autoscaleFn , err := a .autoscaleFn (api )
114+ if err != nil {
115+ return err
116+ }
123117
124- // make sure there is no awaken call registered to an older API with the same name
125- a .Lock ()
126- delete (a .lastAwakenTimestamp , api .Name )
127- a .Unlock ()
118+ a .crons [api .Name ] = cron .Run (autoscaleFn , errorHandler , spec .AutoscalingTickInterval )
128119
129120 return nil
130121}
@@ -140,10 +131,7 @@ func (a *Autoscaler) RemoveAPI(api userconfig.Resource) {
140131 delete (a .crons , api .Name )
141132 }
142133
143- a .Lock ()
144- delete (a .lastAwakenTimestamp , api .Name )
145- a .Unlock ()
146-
134+ delete (a .recs , api .Name )
147135 log .Info ("autoscaler stop" )
148136}
149137
@@ -170,7 +158,7 @@ func (a *Autoscaler) autoscaleFn(api userconfig.Resource) (func() error, error)
170158 log .Info ("autoscaler init" )
171159
172160 var startTime time.Time
173- recs := make ( recommendations )
161+ a . recs [ api . Name ] = newRecommendations ( )
174162
175163 return func () error {
176164 autoscalingSpec , err := scaler .GetAutoscalingSpec (api .Name )
@@ -227,6 +215,8 @@ func (a *Autoscaler) autoscaleFn(api userconfig.Resource) (func() error, error)
227215 recommendation = autoscalingSpec .MaxReplicas
228216 }
229217
218+ recs := a .recs [api .Name ]
219+
230220 // Rule of thumb: any modifications that don't consider historical recommendations should be performed before
231221 // recording the recommendation, any modifications that use historical recommendations should be performed after
232222 recs .add (recommendation )
@@ -240,25 +230,20 @@ func (a *Autoscaler) autoscaleFn(api userconfig.Resource) (func() error, error)
240230
241231 if request < currentReplicas {
242232 downscaleStabilizationFloor = recs .maxSince (autoscalingSpec .DownscaleStabilizationPeriod )
233+ if downscaleStabilizationFloor != nil {
234+ downscaleStabilizationFloor = pointer .Int32 (libmath .MinInt32 (* downscaleStabilizationFloor , currentReplicas ))
235+ }
243236 if time .Since (startTime ) < autoscalingSpec .DownscaleStabilizationPeriod {
244237 request = currentReplicas
245238 } else if downscaleStabilizationFloor != nil && request < * downscaleStabilizationFloor {
246239 request = * downscaleStabilizationFloor
247240 }
248-
249- // awaken state: was scaled from zero
250- // This needs to be protected by a Mutex because an Awaken call will also modify it
251- a .Lock ()
252- lastAwakenTimestamp := a .lastAwakenTimestamp [api .Name ]
253-
254- // Make sure we don't scale below zero if API was recently awaken
255- if time .Since (lastAwakenTimestamp ) < autoscalingSpec .DownscaleStabilizationPeriod {
256- request = libmath .MaxInt32 (request , 1 )
257- }
258- a .Unlock ()
259241 }
260242 if request > currentReplicas {
261243 upscaleStabilizationCeil = recs .minSince (autoscalingSpec .UpscaleStabilizationPeriod )
244+ if upscaleStabilizationCeil != nil {
245+ upscaleStabilizationCeil = pointer .Int32 (libmath .MaxInt32 (* upscaleStabilizationCeil , currentReplicas ))
246+ }
262247 if time .Since (startTime ) < autoscalingSpec .UpscaleStabilizationPeriod {
263248 request = currentReplicas
264249 } else if upscaleStabilizationCeil != nil && request > * upscaleStabilizationCeil {
0 commit comments