@@ -254,7 +254,7 @@ func NewJobController(restConfig *rest.Config, mcadConfig *config.MCADConfigurat
254254 FilterFunc : func (obj interface {}) bool {
255255 switch t := obj .(type ) {
256256 case * arbv1.AppWrapper :
257- klog .V (10 ).Infof ("[Informer] Filter Name=%s Namespace=%s Version=%s Local=%t FilterIgnore=%t Sender=%s &qj=%p qj=%+v " , t .Name , t .Namespace , t .ResourceVersion , t .Status .Local , t .Status .FilterIgnore , t .Status .Sender , t , t )
257+ klog .V (10 ).Infof ("[Informer] Filter Name=%s Namespace=%s Version=%s Local=%t FilterIgnore=%t Sender=%s " , t .Name , t .Namespace , t .ResourceVersion , t .Status .Local , t .Status .FilterIgnore , t .Status .Sender )
258258 // todo: This is a current workaround for duplicate message bug.
259259 // if t.Status.Local == true { // ignore duplicate message from cache
260260 // return false
@@ -440,14 +440,15 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) {
440440 return
441441 }
442442
443- if cleanAppWrapper {
444- klog .V (4 ).Infof ("[PreemptQueueJobs] Deleting AppWrapper %s/%s due to maximum number of re-queueing(s) exceeded." , newjob .Namespace , newjob .Name )
445- go qjm .Cleanup (ctx , updateNewJob )
446- } else {
447- // Only back-off AWs that are in state running and not in state Failed
448- if updateNewJob .Status .State != arbv1 .AppWrapperStateFailed {
449- klog .Infof ("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue." , newjob .Namespace , newjob .Name )
450- qjm .backoff (ctx , updateNewJob , "PreemptionTriggered" , string (message ))
443+ if cleanAppWrapper {
444+ klog .V (4 ).Infof ("[PreemptQueueJobs] Deleting AppWrapper %s/%s due to maximum number of re-queueing(s) exceeded." , newjob .Namespace , newjob .Name )
445+ go qjm .Cleanup (ctx , updateNewJob )
446+ } else {
447+ // Only back-off AWs that are in state running and not in state Failed
448+ if updateNewJob .Status .State != arbv1 .AppWrapperStateFailed {
449+ klog .Infof ("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue." , newjob .Namespace , newjob .Name )
450+ qjm .backoff (ctx , updateNewJob , "PreemptionTriggered" , string (message ))
451+ }
451452 }
452453 }
453454 }
@@ -1367,7 +1368,7 @@ func (cc *XController) updateStatusInEtcd(ctx context.Context, currentAppwrapper
13671368func (cc * XController ) updateStatusInEtcdWithRetry (ctx context.Context , source * arbv1.AppWrapper , caller string ) error {
13681369 klog .V (4 ).Infof ("[updateStatusInEtcdWithMergeFunction] trying to update '%s/%s' version '%s' called by '%s'" , source .Namespace , source .Name , source .ResourceVersion , caller )
13691370 source .Status .Sender = "before " + caller // set Sender string to indicate code location
1370- updateStatusRetrierRetrier := retrier .New (retrier .ExponentialBackoff (10 , 100 * time .Millisecond ), & EtcdErrorClassifier {})
1371+ updateStatusRetrierRetrier := retrier .New (retrier .ExponentialBackoff (1 , 100 * time .Millisecond ), & EtcdErrorClassifier {})
13711372 updateStatusRetrierRetrier .SetJitter (0.05 )
13721373 updatedAW := source .DeepCopy ()
13731374 err := updateStatusRetrierRetrier .RunCtx (ctx , func (localContext context.Context ) error {
@@ -1564,10 +1565,10 @@ func (cc *XController) addQueueJob(obj interface{}) {
15641565 firstTime := metav1 .NowMicro ()
15651566 qj , ok := obj .(* arbv1.AppWrapper )
15661567 if ! ok {
1567- klog .Errorf ("[Informer-addQJ] object is not AppWrapper. object=%+v" , obj )
1568+ klog .Errorf ("[Informer-addQJ] object is not AppWrapper." )
15681569 return
15691570 }
1570- klog .V (6 ).Infof ("[Informer-addQJ] %s/%s &qj=%p qj=%+v " , qj .Namespace , qj .Name , qj , qj )
1571+ klog .V (6 ).Infof ("[Informer-addQJ] %s/%s" , qj .Namespace , qj .Name )
15711572 if qj .Status .QueueJobState == "" {
15721573 qj .Status .ControllerFirstTimestamp = firstTime
15731574 qj .Status .SystemPriority = float64 (qj .Spec .Priority )
@@ -1602,18 +1603,19 @@ func (cc *XController) addQueueJob(obj interface{}) {
16021603 // updatequeuejobs now runs as a part of informer machinery. optimization here is to not use etcd to pullout submitted AWs and operate
16031604 // on stale AWs. This has potential to improve performance at scale.
16041605 if hasCompletionStatus {
1605- requeueInterval := 5 * time .Second
1606+ requeueIntervalForCompletionStatus := 5 * time .Second
16061607 key , err := cache .MetaNamespaceKeyFunc (qj )
16071608 if err != nil {
16081609 klog .Warningf ("[Informer-addQJ] Error getting AW %s/%s from cache cannot determine completion status" , qj .Namespace , qj .Name )
16091610 // TODO: should we return from this loop?
16101611 }
16111612 go func () {
16121613 for {
1613- time .Sleep (requeueInterval )
1614+ time .Sleep (requeueIntervalForCompletionStatus )
16141615 latestObj , exists , err := cc .appwrapperInformer .Informer ().GetStore ().GetByKey (key )
1615- if err != nil && ! exists {
1616- klog .Warningf ("[Informer-addQJ] Recent copy of AW %s/%s not found in cache" , qj .Namespace , qj .Name )
1616+ if err != nil || ! exists {
1617+ klog .Warningf ("[Informer-addQJ] Recent copy of AW %s/%s not found in cache,stopping check for completion status" , qj .Namespace , qj .Name )
1618+ break
16171619 } else {
16181620 var latestAw * arbv1.AppWrapper
16191621 if latestObj != nil {
@@ -1648,8 +1650,9 @@ func (cc *XController) addQueueJob(obj interface{}) {
16481650 for {
16491651 time .Sleep (requeueInterval )
16501652 latestObj , exists , err := cc .appwrapperInformer .Informer ().GetStore ().GetByKey (key )
1651- if err != nil && ! exists {
1652- klog .Warningf ("[Informer-addQJ] Recent copy of AW %s/%s not found in cache" , qj .Namespace , qj .Name )
1653+ if err != nil || ! exists {
1654+ klog .Warningf ("[Informer-addQJ] Recent copy of AW %s/%s not found in cache, stopping check for minScheduling" , qj .Namespace , qj .Name )
1655+ break
16531656 } else {
16541657 var latestAw * arbv1.AppWrapper
16551658 if latestObj != nil {
@@ -1757,9 +1760,9 @@ func (cc *XController) enqueue(obj interface{}) error {
17571760
17581761 err := cc .eventQueue .Add (qj ) // add to FIFO queue if not in, update object & keep position if already in FIFO queue
17591762 if err != nil {
1760- klog .Errorf ("[enqueue] Fail to enqueue %s/%s to eventQueue, ignore. *Delay=%.6f seconds &qj=%p Version=%s Status=%+v err=%#v" , qj .Namespace , qj .Name , time .Now ().Sub (qj .Status .ControllerFirstTimestamp .Time ).Seconds (), qj , qj .ResourceVersion , qj .Status , err )
1763+ klog .Errorf ("[enqueue] Fail to enqueue %s/%s to eventQueue, ignore. *Delay=%.6f seconds Version=%s Status=%+v err=%#v" , qj .Namespace , qj .Name , time .Now ().Sub (qj .Status .ControllerFirstTimestamp .Time ).Seconds (), qj .ResourceVersion , qj .Status , err )
17611764 } else {
1762- klog .V (10 ).Infof ("[enqueue] %s/%s *Delay=%.6f seconds eventQueue.Add_byEnqueue &qj=%p Version=%s Status=%+v" , qj .Namespace , qj .Name , time .Now ().Sub (qj .Status .ControllerFirstTimestamp .Time ).Seconds (), qj , qj .ResourceVersion , qj .Status )
1765+ klog .V (10 ).Infof ("[enqueue] %s/%s *Delay=%.6f seconds eventQueue.Add_byEnqueue Version=%s Status=%+v" , qj .Namespace , qj .Name , time .Now ().Sub (qj .Status .ControllerFirstTimestamp .Time ).Seconds (), qj .ResourceVersion , qj .Status )
17631766 }
17641767 return err
17651768}
0 commit comments