@@ -348,21 +348,22 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) *
348348
349349// TODO: We can use informer to filter AWs that do not meet the minScheduling spec.
350350// we still need a thread for dispatch duration but minScheduling spec can definetly be moved to an informer
351- func (qjm * XController ) PreemptQueueJobs () {
351+ func (qjm * XController ) PreemptQueueJobs (inspectAw * arbv1. AppWrapper ) {
352352 ctx := context .Background ()
353+ aw := qjm .GetQueueJobEligibleForPreemption (inspectAw )
354+ if aw != nil {
353355
354- qjobs := qjm .GetQueueJobsEligibleForPreemption ()
355- for _ , aw := range qjobs {
356+ //for _, aw := range qjobs {
356357 if aw .Status .State == arbv1 .AppWrapperStateCompleted || aw .Status .State == arbv1 .AppWrapperStateDeleted || aw .Status .State == arbv1 .AppWrapperStateFailed {
357- continue
358+ return
358359 }
359360
360361 var updateNewJob * arbv1.AppWrapper
361362 var message string
362363 newjob , err := qjm .getAppWrapper (aw .Namespace , aw .Name , "[PreemptQueueJobs] get fresh app wrapper" )
363364 if err != nil {
364365 klog .Warningf ("[PreemptQueueJobs] failed in retrieving a fresh copy of the app wrapper '%s/%s', err=%v. Will try to preempt on the next run." , aw .Namespace , aw .Name , err )
365- continue
366+ return
366367 }
367368 //we need to update AW before analyzing it as a candidate for preemption
368369 updateErr := qjm .UpdateQueueJobStatus (newjob )
@@ -394,13 +395,11 @@ func (qjm *XController) PreemptQueueJobs() {
394395 err := qjm .updateStatusInEtcdWithRetry (ctx , updateNewJob , "PreemptQueueJobs - CanRun: false -- DispatchDeadlineExceeded" )
395396 if err != nil {
396397 klog .Warningf ("[PreemptQueueJobs] status update CanRun: false -- DispatchDeadlineExceeded for '%s/%s' failed" , newjob .Namespace , newjob .Name )
397- continue
398+ return
398399 }
399400 // cannot use cleanup AW, since it puts AW back in running state
400401 qjm .qjqueue .AddUnschedulableIfNotPresent (updateNewJob )
401402
402- // Move to next AW
403- continue
404403 }
405404 }
406405
@@ -462,7 +461,7 @@ func (qjm *XController) PreemptQueueJobs() {
462461 err = qjm .updateStatusInEtcdWithRetry (ctx , updateNewJob , "PreemptQueueJobs - CanRun: false -- MinPodsNotRunning" )
463462 if err != nil {
464463 klog .Warningf ("[PreemptQueueJobs] status update for '%s/%s' failed, skipping app wrapper err =%v" , newjob .Namespace , newjob .Name , err )
465- continue
464+ return
466465 }
467466
468467 if cleanAppWrapper {
@@ -506,98 +505,83 @@ func (qjm *XController) preemptAWJobs(ctx context.Context, preemptAWs []*arbv1.A
506505 }
507506}
508507
509- func (qjm * XController ) GetQueueJobsEligibleForPreemption () []* arbv1.AppWrapper {
510- qjobs := make ([]* arbv1.AppWrapper , 0 )
511-
512- queueJobs , err := qjm .appWrapperLister .AppWrappers ("" ).List (labels .Everything ())
513- if err != nil {
514- klog .Errorf ("List of queueJobs %+v" , qjobs )
515- return qjobs
516- }
508+ func (qjm * XController ) GetQueueJobEligibleForPreemption (value * arbv1.AppWrapper ) * arbv1.AppWrapper {
517509
518510 if ! qjm .isDispatcher { // Agent Mode
519- for _ , value := range queueJobs {
520-
521- // Skip if AW Pending or just entering the system and does not have a state yet.
522- if (value .Status .State == arbv1 .AppWrapperStateEnqueued ) || (value .Status .State == "" ) {
523- continue
524- }
525511
526- if value .Status .State == arbv1 .AppWrapperStateActive && value .Spec .SchedSpec .DispatchDuration .Limit > 0 {
527- awDispatchDurationLimit := value .Spec .SchedSpec .DispatchDuration .Limit
528- dispatchDuration := value .Status .ControllerFirstDispatchTimestamp .Add (time .Duration (awDispatchDurationLimit ) * time .Second )
529- currentTime := time .Now ()
530- dispatchTimeExceeded := ! currentTime .Before (dispatchDuration )
512+ if value .Status .State == arbv1 .AppWrapperStateActive && value .Spec .SchedSpec .DispatchDuration .Limit > 0 {
513+ awDispatchDurationLimit := value .Spec .SchedSpec .DispatchDuration .Limit
514+ dispatchDuration := value .Status .ControllerFirstDispatchTimestamp .Add (time .Duration (awDispatchDurationLimit ) * time .Second )
515+ currentTime := time .Now ()
516+ dispatchTimeExceeded := ! currentTime .Before (dispatchDuration )
531517
532- if dispatchTimeExceeded {
533- klog .V (8 ).Infof ("Appwrapper Dispatch limit exceeded, currentTime %v, dispatchTimeInSeconds %v" , currentTime , dispatchDuration )
534- value .Spec .SchedSpec .DispatchDuration .Overrun = true
535- qjobs = append (qjobs , value )
536- // Got AW which exceeded dispatch runtime limit, move to next AW
537- continue
538- }
518+ if dispatchTimeExceeded {
519+ klog .V (8 ).Infof ("Appwrapper Dispatch limit exceeded, currentTime %v, dispatchTimeInSeconds %v" , currentTime , dispatchDuration )
520+ value .Spec .SchedSpec .DispatchDuration .Overrun = true
521+ // Got AW which exceeded dispatch runtime limit, move to next AW
522+ return value
539523 }
540- replicas := value .Spec .SchedSpec .MinAvailable
524+ }
525+ replicas := value .Spec .SchedSpec .MinAvailable
541526
542- if (int (value .Status .Running ) + int (value .Status .Succeeded )) < replicas {
527+ if (int (value .Status .Running ) + int (value .Status .Succeeded )) < replicas {
543528
544- // Find the dispatched condition if there is any
545- numConditions := len (value .Status .Conditions )
546- var dispatchedCondition arbv1.AppWrapperCondition
547- dispatchedConditionExists := false
529+ // Find the dispatched condition if there is any
530+ numConditions := len (value .Status .Conditions )
531+ var dispatchedCondition arbv1.AppWrapperCondition
532+ dispatchedConditionExists := false
548533
549- for i := numConditions - 1 ; i > 0 ; i -- {
550- dispatchedCondition = value .Status .Conditions [i ]
551- if dispatchedCondition .Type != arbv1 .AppWrapperCondDispatched {
552- continue
553- }
554- dispatchedConditionExists = true
555- break
534+ for i := numConditions - 1 ; i > 0 ; i -- {
535+ dispatchedCondition = value .Status .Conditions [i ]
536+ if dispatchedCondition .Type != arbv1 .AppWrapperCondDispatched {
537+ continue
556538 }
539+ dispatchedConditionExists = true
540+ break
541+ }
557542
558- // Check for the minimum age and then skip preempt if current time is not beyond minimum age
559- // The minimum age is controlled by the requeuing.TimeInSeconds stanza
560- // For preemption, the time is compared to the last condition or the dispatched condition in the AppWrapper, whichever happened later
561- lastCondition := value .Status .Conditions [numConditions - 1 ]
562- var condition arbv1.AppWrapperCondition
543+ // Check for the minimum age and then skip preempt if current time is not beyond minimum age
544+ // The minimum age is controlled by the requeuing.TimeInSeconds stanza
545+ // For preemption, the time is compared to the last condition or the dispatched condition in the AppWrapper, whichever happened later
546+ lastCondition := value .Status .Conditions [numConditions - 1 ]
547+ var condition arbv1.AppWrapperCondition
563548
564- if dispatchedConditionExists && dispatchedCondition .LastTransitionMicroTime .After (lastCondition .LastTransitionMicroTime .Time ) {
565- condition = dispatchedCondition
566- } else {
567- condition = lastCondition
568- }
569- var requeuingTimeInSeconds int
570- if value .Status .RequeueingTimeInSeconds > 0 {
571- requeuingTimeInSeconds = value .Status .RequeueingTimeInSeconds
572- } else if value .Spec .SchedSpec .Requeuing .InitialTimeInSeconds == 0 {
573- requeuingTimeInSeconds = value .Spec .SchedSpec .Requeuing .TimeInSeconds
574- } else {
575- requeuingTimeInSeconds = value .Spec .SchedSpec .Requeuing .InitialTimeInSeconds
576- }
549+ if dispatchedConditionExists && dispatchedCondition .LastTransitionMicroTime .After (lastCondition .LastTransitionMicroTime .Time ) {
550+ condition = dispatchedCondition
551+ } else {
552+ condition = lastCondition
553+ }
554+ var requeuingTimeInSeconds int
555+ if value .Status .RequeueingTimeInSeconds > 0 {
556+ requeuingTimeInSeconds = value .Status .RequeueingTimeInSeconds
557+ } else if value .Spec .SchedSpec .Requeuing .InitialTimeInSeconds == 0 {
558+ requeuingTimeInSeconds = value .Spec .SchedSpec .Requeuing .TimeInSeconds
559+ } else {
560+ requeuingTimeInSeconds = value .Spec .SchedSpec .Requeuing .InitialTimeInSeconds
561+ }
577562
578- minAge := condition .LastTransitionMicroTime .Add (time .Duration (requeuingTimeInSeconds ) * time .Second )
579- currentTime := time .Now ()
563+ minAge := condition .LastTransitionMicroTime .Add (time .Duration (requeuingTimeInSeconds ) * time .Second )
564+ currentTime := time .Now ()
580565
581- if currentTime .Before (minAge ) {
582- continue
583- }
566+ if currentTime .Before (minAge ) {
567+ return nil
568+ }
584569
585- if replicas > 0 {
586- klog .V (3 ).Infof ("AppWrapper '%s/%s' is eligible for preemption Running: %d - minAvailable: %d , Succeeded: %d !!!" , value .Namespace , value .Name , value .Status .Running , replicas , value .Status .Succeeded )
587- qjobs = append (qjobs , value )
588- }
589- } else {
590- // Preempt when schedulingSpec stanza is not set but pods fails scheduling.
591- // ignore co-scheduler pods
592- if len (value .Status .PendingPodConditions ) > 0 {
593- klog .V (3 ).Infof ("AppWrapper '%s/%s' is eligible for preemption Running: %d , Succeeded: %d due to failed scheduling !!!" , value .Namespace , value .Status .Running , value .Status .Succeeded )
594- qjobs = append (qjobs , value )
595- }
570+ if replicas > 0 {
571+ klog .V (3 ).Infof ("AppWrapper '%s/%s' is eligible for preemption Running: %d - minAvailable: %d , Succeeded: %d !!!" , value .Namespace , value .Name , value .Status .Running , replicas , value .Status .Succeeded )
572+ return value
573+ }
574+ } else {
575+ // Preempt when schedulingSpec stanza is not set but pods fails scheduling.
576+ // ignore co-scheduler pods
577+ if len (value .Status .PendingPodConditions ) > 0 {
578+ klog .V (3 ).Infof ("AppWrapper '%s/%s' is eligible for preemption Running: %d , Succeeded: %d due to failed scheduling !!!" , value .Namespace , value .Status .Running , value .Status .Succeeded )
579+ return value
596580 }
597581 }
598582 }
599583
600- return qjobs
584+ return nil
601585}
602586
603587func (qjm * XController ) GetAggregatedResourcesPerGenericItem (cqj * arbv1.AppWrapper ) []* clusterstateapi.Resource {
@@ -1500,20 +1484,8 @@ func (qjm *XController) backoff(ctx context.Context, q *arbv1.AppWrapper, reason
15001484func (cc * XController ) Run (stopCh <- chan struct {}) {
15011485 go cc .appwrapperInformer .Informer ().Run (stopCh )
15021486
1503- // go cc.qjobResControls[arbv1.ResourceTypePod].Run(stopCh)
1504-
15051487 cache .WaitForCacheSync (stopCh , cc .appWrapperSynced )
15061488
1507- // cache is turned off, issue: https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/588
1508- // update snapshot of ClientStateCache every second
1509- // cc.cache.Run(stopCh)
1510-
1511- // start preempt thread is used to preempt AWs that have partial pods or have reached dispatch duration
1512- go wait .Until (cc .PreemptQueueJobs , 60 * time .Second , stopCh )
1513-
1514- // This thread is used to update AW that has completionstatus set to Complete or RunningHoldCompletion
1515- //go wait.Until(cc.UpdateQueueJobs, 5*time.Second, stopCh)
1516-
15171489 if cc .isDispatcher {
15181490 go wait .Until (cc .UpdateAgent , 2 * time .Second , stopCh ) // In the Agent?
15191491 for _ , jobClusterAgent := range cc .agentMap {
@@ -1653,29 +1625,63 @@ func (cc *XController) addQueueJob(obj interface{}) {
16531625 //When an AW entrs a system with completionstatus keep checking the AW until completed
16541626 //updatequeuejobs now runs as a part of informer machinery. optimization here is to not use etcd to pullout submitted AWs and operate
16551627 //on stale AWs. This has potential to improve performance at scale.
1656- //if qj.Status.State != arbv1.AppWrapperStateCompleted && qj.Status.State != arbv1.AppWrapperStateFailed && qj.Status.State != "" {
1657- requeueInterval := 30 * time .Second
1658- key , err := cache .MetaNamespaceKeyFunc (qj )
1659- if err == nil {
1628+ if hasCompletionStatus {
1629+ requeueInterval := 5 * time .Second
1630+ key , err := cache .MetaNamespaceKeyFunc (qj )
1631+ if err != nil {
1632+ klog .Warningf ("[Informer-addQJ] Error getting AW %s from cache cannot determine completion status" , qj .Name )
1633+ //TODO: should we return from this loop?
1634+ }
16601635 go func () {
16611636 for {
16621637 time .Sleep (requeueInterval )
16631638 latestAw , exists , err := cc .appwrapperInformer .Informer ().GetStore ().GetByKey (key )
1664- if latestAw .(* arbv1.AppWrapper ).Status .State != arbv1 .AppWrapperStateActive && latestAw .(* arbv1.AppWrapper ).Status .State != arbv1 .AppWrapperStateEnqueued && latestAw .(* arbv1.AppWrapper ).Status .State != arbv1 .AppWrapperStateRunningHoldCompletion {
1665- klog .V (2 ).Infof ("[Informer-addQJ] Stopping requeue for AW %s with status %s" , latestAw .(* arbv1.AppWrapper ).Name , latestAw .(* arbv1.AppWrapper ).Status .State )
1666- break //Exit the loop
1667- }
1668- if err == nil && exists {
1639+ if err != nil && ! exists {
1640+ klog .Warningf ("[Informer-addQJ] Recent copy of AW %s not found in cache" , qj .Name )
1641+ } else {
1642+ if latestAw .(* arbv1.AppWrapper ).Status .State != arbv1 .AppWrapperStateActive && latestAw .(* arbv1.AppWrapper ).Status .State != arbv1 .AppWrapperStateEnqueued && latestAw .(* arbv1.AppWrapper ).Status .State != arbv1 .AppWrapperStateRunningHoldCompletion {
1643+ klog .V (2 ).Infof ("[Informer-addQJ] Stopping requeue for AW %s with status %s" , latestAw .(* arbv1.AppWrapper ).Name , latestAw .(* arbv1.AppWrapper ).Status .State )
1644+ break //Exit the loop
1645+ }
16691646 // Enqueue the latest copy of the AW.
16701647 if (qj .Status .State != arbv1 .AppWrapperStateCompleted && qj .Status .State != arbv1 .AppWrapperStateFailed ) && hasCompletionStatus {
16711648 cc .UpdateQueueJobs (latestAw .(* arbv1.AppWrapper ))
1672- klog .V (2 ).Infof ("[Informer-addQJ] Finished requeing AW to determine completion status" )
1649+ klog .V (2 ).Infof ("[Informer-addQJ] requeing AW to determine completion status for AW" , qj .Name )
1650+ }
1651+
1652+ }
1653+
1654+ }
1655+ }()
1656+ }
1657+
1658+ if qj .Spec .SchedSpec .MinAvailable > 0 {
1659+ requeueInterval := 60 * time .Second
1660+ key , err := cache .MetaNamespaceKeyFunc (qj )
1661+ if err != nil {
1662+ klog .Errorf ("[Informer-addQJ] Error getting AW %s from cache cannot preempt AW" , qj .Name )
1663+ //TODO: should we return from this loop?
1664+ }
1665+ go func () {
1666+ for {
1667+ time .Sleep (requeueInterval )
1668+ latestAw , exists , err := cc .appwrapperInformer .Informer ().GetStore ().GetByKey (key )
1669+ if err != nil && ! exists {
1670+ klog .Warningf ("[Informer-addQJ] Recent copy of AW %s not found in cache" , qj .Name )
1671+ } else {
1672+ if latestAw .(* arbv1.AppWrapper ).Status .State != arbv1 .AppWrapperStateActive && latestAw .(* arbv1.AppWrapper ).Status .State != arbv1 .AppWrapperStateEnqueued && latestAw .(* arbv1.AppWrapper ).Status .State != arbv1 .AppWrapperStateRunningHoldCompletion {
1673+ klog .V (2 ).Infof ("[Informer-addQJ] Stopping requeue for AW %s with status %s" , latestAw .(* arbv1.AppWrapper ).Name , latestAw .(* arbv1.AppWrapper ).Status .State )
1674+ break //Exit the loop
1675+ }
1676+ // Enqueue the latest copy of the AW.
1677+ if (qj .Status .State != arbv1 .AppWrapperStateCompleted && qj .Status .State != arbv1 .AppWrapperStateFailed ) && (qj .Spec .SchedSpec .MinAvailable > 0 ) {
1678+ cc .PreemptQueueJobs (latestAw .(* arbv1.AppWrapper ))
1679+ klog .V (2 ).Infof ("[Informer-addQJ] requeing AW to check minScheduling spec for AW" , qj .Name )
16731680 }
16741681 }
16751682 }
16761683 }()
16771684 }
1678- //}
16791685}
16801686
16811687func (cc * XController ) updateQueueJob (oldObj , newObj interface {}) {
0 commit comments