@@ -49,6 +49,10 @@ type clusterScalingIntegration struct {
4949 arangod.NumberOfServers
5050 mutex sync.Mutex
5151 }
52+ scaleEnabled struct {
53+ mutex sync.Mutex
54+ enabled bool
55+ }
5256}
5357
5458const (
@@ -57,10 +61,12 @@ const (
5761
5862// newClusterScalingIntegration creates a new clusterScalingIntegration.
5963func newClusterScalingIntegration (depl * Deployment ) * clusterScalingIntegration {
60- return & clusterScalingIntegration {
64+ ci := & clusterScalingIntegration {
6165 log : depl .deps .Log ,
6266 depl : depl ,
6367 }
68+ ci .scaleEnabled .enabled = true
69+ return ci
6470}
6571
6672// SendUpdateToCluster records the given spec to be sended to the cluster.
@@ -70,37 +76,61 @@ func (ci *clusterScalingIntegration) SendUpdateToCluster(spec api.DeploymentSpec
7076 ci .pendingUpdate .spec = & spec
7177}
7278
79+ // checkScalingCluster checks if inspection
80+ // returns true if inspection occurred
81+ func (ci * clusterScalingIntegration ) checkScalingCluster (expectSuccess bool ) bool {
82+ ci .scaleEnabled .mutex .Lock ()
83+ defer ci .scaleEnabled .mutex .Unlock ()
84+
85+ if ! ci .scaleEnabled .enabled {
86+ // Check if it is possible to turn on scaling without any issue
87+ status , _ := ci .depl .GetStatus ()
88+ if status .Plan .IsEmpty () && ci .setNumberOfServers () == nil {
89+ // Scaling should be enabled because there is no Plan.
90+ // It can happen when the enabling action fails
91+ ci .scaleEnabled .enabled = true
92+ }
93+ }
94+
95+ if ci .depl .GetPhase () != api .DeploymentPhaseRunning || ! ci .scaleEnabled .enabled {
96+ // Deployment must be in running state and scaling must be enabled
97+ return false
98+ }
99+
100+ // Update cluster with our state
101+ ctx := context .Background ()
102+ //expectSuccess := *goodInspections > 0 || time.Since(start) > maxClusterBootstrapTime
103+ safeToAskCluster , err := ci .updateClusterServerCount (ctx , expectSuccess )
104+ if err != nil {
105+ if expectSuccess {
106+ ci .log .Debug ().Err (err ).Msg ("Cluster update failed" )
107+ }
108+ } else if safeToAskCluster {
109+ // Inspect once
110+ if err := ci .inspectCluster (ctx , expectSuccess ); err != nil {
111+ if expectSuccess {
112+ ci .log .Debug ().Err (err ).Msg ("Cluster inspection failed" )
113+ }
114+ } else {
115+ return true
116+ }
117+ }
118+ return false
119+ }
120+
73121// listenForClusterEvents keep listening for changes entered in the UI of the cluster.
74122func (ci * clusterScalingIntegration ) ListenForClusterEvents (stopCh <- chan struct {}) {
75123 start := time .Now ()
76124 goodInspections := 0
77125 for {
78- delay := time .Second * 2
79-
80- // Is deployment in running state
81- if ci .depl .GetPhase () == api .DeploymentPhaseRunning {
82- // Update cluster with our state
83- ctx := context .Background ()
84- expectSuccess := goodInspections > 0 || time .Since (start ) > maxClusterBootstrapTime
85- safeToAskCluster , err := ci .updateClusterServerCount (ctx , expectSuccess )
86- if err != nil {
87- if expectSuccess {
88- ci .log .Debug ().Err (err ).Msg ("Cluster update failed" )
89- }
90- } else if safeToAskCluster {
91- // Inspect once
92- if err := ci .inspectCluster (ctx , expectSuccess ); err != nil {
93- if expectSuccess {
94- ci .log .Debug ().Err (err ).Msg ("Cluster inspection failed" )
95- }
96- } else {
97- goodInspections ++
98- }
99- }
126+ expectSuccess := goodInspections > 0 || time .Since (start ) > maxClusterBootstrapTime
127+
128+ if ci .checkScalingCluster (expectSuccess ) {
129+ goodInspections ++
100130 }
101131
102132 select {
103- case <- time .After (delay ):
133+ case <- time .After (time . Second * 2 ):
104134 // Continue
105135 case <- stopCh :
106136 // We're done
@@ -200,11 +230,6 @@ func (ci *clusterScalingIntegration) updateClusterServerCount(ctx context.Contex
200230 }
201231
202232 log := ci .log
203- c , err := ci .depl .clientCache .GetDatabase (ctx )
204- if err != nil {
205- return false , maskAny (err )
206- }
207-
208233 var coordinatorCountPtr * int
209234 var dbserverCountPtr * int
210235
@@ -223,13 +248,11 @@ func (ci *clusterScalingIntegration) updateClusterServerCount(ctx context.Contex
223248 dbserverCountPtr = & dbserverCount
224249 }
225250
226- ci .lastNumberOfServers .mutex .Lock ()
227- lastNumberOfServers := ci .lastNumberOfServers .NumberOfServers
228- ci .lastNumberOfServers .mutex .Unlock ()
251+ lastNumberOfServers := ci .GetLastNumberOfServers ()
229252
230253 // This is to prevent unneseccary updates that may override some values written by the WebUI (in the case of a update loop)
231254 if coordinatorCount != lastNumberOfServers .GetCoordinators () || dbserverCount != lastNumberOfServers .GetDBServers () {
232- if err := arangod . SetNumberOfServers (ctx , c . Connection () , coordinatorCountPtr , dbserverCountPtr ); err != nil {
255+ if err := ci . depl . SetNumberOfServers (ctx , coordinatorCountPtr , dbserverCountPtr ); err != nil {
233256 if expectSuccess {
234257 log .Debug ().Err (err ).Msg ("Failed to set number of servers" )
235258 }
@@ -253,3 +276,50 @@ func (ci *clusterScalingIntegration) updateClusterServerCount(ctx context.Contex
253276 ci .lastNumberOfServers .DBServers = & dbserverCount
254277 return safeToAskCluster , nil
255278}
279+
280+ // GetLastNumberOfServers returns the last number of servers
281+ func (ci * clusterScalingIntegration ) GetLastNumberOfServers () arangod.NumberOfServers {
282+ ci .lastNumberOfServers .mutex .Lock ()
283+ defer ci .lastNumberOfServers .mutex .Unlock ()
284+
285+ return ci .lastNumberOfServers .NumberOfServers
286+ }
287+
288+ // DisableScalingCluster disables scaling DBservers and coordinators
289+ func (ci * clusterScalingIntegration ) DisableScalingCluster () error {
290+ ci .scaleEnabled .mutex .Lock ()
291+ defer ci .scaleEnabled .mutex .Unlock ()
292+
293+ // Turn off scaling DBservers and coordinators in arangoDB for the UI
294+ ctx := context .Background ()
295+ if err := ci .depl .SetNumberOfServers (ctx , nil , nil ); err != nil {
296+ return maskAny (err )
297+ }
298+
299+ ci .scaleEnabled .enabled = false
300+ return nil
301+ }
302+
303+ // EnableScalingCluster enables scaling DBservers and coordinators
304+ func (ci * clusterScalingIntegration ) EnableScalingCluster () error {
305+ ci .scaleEnabled .mutex .Lock ()
306+ defer ci .scaleEnabled .mutex .Unlock ()
307+
308+ if ci .scaleEnabled .enabled {
309+ return nil
310+ }
311+
312+ if err := ci .setNumberOfServers (); err != nil {
313+ return maskAny (err )
314+ }
315+ ci .scaleEnabled .enabled = true
316+ return nil
317+ }
318+
319+ func (ci * clusterScalingIntegration ) setNumberOfServers () error {
320+ ctx := context .Background ()
321+ spec := ci .depl .GetSpec ()
322+ numOfCoordinators := spec .Coordinators .GetCount ()
323+ numOfDBServers := spec .DBServers .GetCount ()
324+ return ci .depl .SetNumberOfServers (ctx , & numOfCoordinators , & numOfDBServers )
325+ }
0 commit comments