@@ -51,18 +51,12 @@ type ClientSet struct {
5151 // Address is the proxy server address. Assuming HA proxy server
5252 address string
5353
54- // leaseCounter counts number of proxy server leases
55- leaseCounter ServerCounter
54+ // serverCounter counts number of proxy server leases
55+ serverCounter ServerCounter
5656
5757 // lastReceivedServerCount is the last serverCount value received when connecting to a proxy server
5858 lastReceivedServerCount int
5959
60- // lastServerCount is the most-recently observed serverCount value from either lease system or proxy server,
61- // former takes priority unless it is an HA server.
62- // Initialized when the ClientSet creates the first client.
63- // When syncForever is set, it will be the most recently seen.
64- lastServerCount int
65-
6660 // syncInterval is the interval at which the agent periodically checks
6761 // that it has connections to all instances of the proxy server.
6862 syncInterval time.Duration
@@ -108,6 +102,11 @@ func (cs *ClientSet) ClientsCount() int {
108102 return len (cs .clients )
109103}
110104
105+ // SetServerCounter sets the strategy for determining the server count.
106+ func (cs * ClientSet ) SetServerCounter (counter ServerCounter ) {
107+ cs .serverCounter = counter
108+ }
109+
111110func (cs * ClientSet ) HealthyClientsCount () int {
112111 cs .mu .Lock ()
113112 defer cs .mu .Unlock ()
@@ -175,7 +174,6 @@ type ClientSetConfig struct {
175174 WarnOnChannelLimit bool
176175 SyncForever bool
177176 XfrChannelSize int
178- ServerLeaseCounter ServerCounter
179177 ServerCountSource string
180178}
181179
@@ -195,7 +193,6 @@ func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *C
195193 drainCh : drainCh ,
196194 xfrChannelSize : cc .XfrChannelSize ,
197195 stopCh : stopCh ,
198- leaseCounter : cc .ServerLeaseCounter ,
199196 serverCountSource : cc .ServerCountSource ,
200197 }
201198}
@@ -214,30 +211,40 @@ func (cs *ClientSet) resetBackoff() *wait.Backoff {
214211 }
215212}
216213
217- // sync makes sure that #clients >= #proxy servers
214+ // determineServerCount determines the number of proxy servers by delegating to its configured counter strategy.
215+ func (cs * ClientSet ) determineServerCount () int {
216+ serverCount := cs .serverCounter .Count ()
217+ metrics .Metrics .SetServerCount (serverCount )
218+ return serverCount
219+ }
220+
221+ // sync manages the backoff and the connection attempts to the proxy server.
222+ // sync runs until stopCh is closed
218223func (cs * ClientSet ) sync () {
219224 defer cs .shutdown ()
220225 backoff := cs .resetBackoff ()
221226 var duration time.Duration
222227 for {
223- if serverCount , err := cs .connectOnce (); err != nil {
228+ if err := cs .connectOnce (); err != nil {
224229 if dse , ok := err .(* DuplicateServerError ); ok {
225- clientsCount := cs .ClientsCount ()
226- klog .V (4 ).InfoS ("duplicate server" , "serverID" , dse .ServerID , "serverCount" , serverCount , "clientsCount" , clientsCount )
227- if serverCount != 0 && clientsCount >= serverCount {
228- duration = backoff .Step ()
229- } else {
230- backoff = cs .resetBackoff ()
231- duration = wait .Jitter (backoff .Duration , backoff .Jitter )
232- }
230+ klog .V (4 ).InfoS ("duplicate server connection attempt" , "serverID" , dse .ServerID )
231+ // We connected to a server we already have a connection to.
232+ // This is expected in syncForever mode. We just wait for the
233+ // next sync period to try again. No need for backoff.
234+ backoff = cs .resetBackoff ()
235+ duration = wait .Jitter (backoff .Duration , backoff .Jitter )
233236 } else {
237+ // A 'real' error, so we backoff.
234238 klog .ErrorS (err , "cannot connect once" )
235239 duration = backoff .Step ()
236240 }
237241 } else {
242+ // A successful connection was made, or no new connection was needed.
243+ // Reset the backoff and wait for the next sync period.
238244 backoff = cs .resetBackoff ()
239245 duration = wait .Jitter (backoff .Duration , backoff .Jitter )
240246 }
247+
241248 time .Sleep (duration )
242249 select {
243250 case <- cs .stopCh :
@@ -247,76 +254,37 @@ func (cs *ClientSet) sync() {
247254 }
248255}
249256
250- func (cs * ClientSet ) ServerCount () int {
251-
252- var serverCount int
253- var countSourceLabel string
257+ func (cs * ClientSet ) connectOnce () error {
258+ serverCount := cs .determineServerCount ()
254259
255- switch cs .serverCountSource {
256- case "" , "default" :
257- if cs .leaseCounter != nil {
258- serverCount = cs .leaseCounter .Count ()
259- countSourceLabel = fromLeases
260- } else {
261- serverCount = cs .lastReceivedServerCount
262- countSourceLabel = fromResponses
263- }
264- case "max" :
265- countFromLeases := 0
266- if cs .leaseCounter != nil {
267- countFromLeases = cs .leaseCounter .Count ()
268- }
269- countFromResponses := cs .lastReceivedServerCount
270-
271- serverCount = countFromLeases
272- countSourceLabel = fromLeases
273- if countFromResponses > serverCount {
274- serverCount = countFromResponses
275- countSourceLabel = fromResponses
276- }
277- if serverCount == 0 {
278- serverCount = 1
279- countSourceLabel = fromFallback
280- }
281-
282- }
283-
284- if serverCount != cs .lastServerCount {
285- klog .Warningf ("change detected in proxy server count (was: %d, now: %d, source: %q)" , cs .lastServerCount , serverCount , countSourceLabel )
286- cs .lastServerCount = serverCount
260+ // If not in syncForever mode, we only connect if we have fewer connections than the server count.
261+ if ! cs .syncForever && cs .ClientsCount () >= serverCount && serverCount > 0 {
262+ return nil // Nothing to do.
287263 }
288264
289- metrics .Metrics .SetServerCount (serverCount )
290- return serverCount
291- }
292-
293- func (cs * ClientSet ) connectOnce () (int , error ) {
294- serverCount := cs .ServerCount ()
295-
296- if ! cs .syncForever && serverCount != 0 && cs .ClientsCount () >= serverCount {
297- return serverCount , nil
298- }
265+ // In syncForever mode, we always try to connect, to discover new servers.
299266 c , receivedServerCount , err := cs .newAgentClient ()
300267 if err != nil {
301- return serverCount , err
268+ return err
302269 }
270+
303271 if err := cs .AddClient (c .serverID , c ); err != nil {
304272 c .Close ()
305- return serverCount , err
273+ return err // likely *DuplicateServerError
306274 }
307- // By moving the update to here, we only accept the server count from a server
308- // that we have successfully added to our active client set, implicitly ignoring
309- // stale data from duplicate connection attempts.
275+ // SUCCESS: We connected to a new, unique server.
276+ // Only now do we update our view of the server count.
310277 cs .lastReceivedServerCount = receivedServerCount
311- klog .V (2 ).InfoS ("sync added client connecting to proxy server" , "serverID" , c .serverID )
278+ klog .V (2 ).InfoS ("successfully connected to new proxy server" , "serverID" , c .serverID , "newServerCount" , receivedServerCount )
312279
313280 labels := runpprof .Labels (
314281 "agentIdentifiers" , cs .agentIdentifiers ,
315282 "serverAddress" , cs .address ,
316283 "serverID" , c .serverID ,
317284 )
318285 go runpprof .Do (context .Background (), labels , func (context.Context ) { c .Serve () })
319- return serverCount , nil
286+
287+ return nil
320288}
321289
322290func (cs * ClientSet ) Serve () {
0 commit comments