Skip to content

Commit 75faa9e

Browse files
authored
fix acc for AWs that never completes but genericItem completes (#460)
* fix acc for AWs that never completes but genericItem completes * fix test * changes to dispatch cycle and accouting * increase poll time * revert dispatch cycle changes * fix test * remove update etcd call * fix tests and acc for burst of short jobs
1 parent ef10f5a commit 75faa9e

File tree

4 files changed

+44
-21
lines changed

4 files changed

+44
-21
lines changed

pkg/controller/queuejob/queuejob_controller_ex.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -904,9 +904,11 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust
904904
klog.V(11).Infof("[getAggAvaiResPri] %s: Skipping adjustments for %s since it is the job being processed.", time.Now().String(), value.Name)
905905
continue
906906
} else if !value.Status.CanRun {
907-
totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory)
908-
klog.V(6).Infof("[getAggAvaiResPri] %s: AW %s cannot run, adding any dangling pod resources %v while it being preempted.", time.Now().String(), value.Name, totalResource)
909-
preemptable = preemptable.Add(totalResource)
907+
// canRun is false when AW completes or it is preempted
908+
// when preempted AW is cleanedup and resources will be released by preempt thread
909+
// when AW is completed cluster state will reflect available resources
910+
// in both cases we do not account for resources.
911+
klog.V(6).Infof("[getAggAvaiResPri] %s: AW %s cannot run, so not accounting resoources", time.Now().String(), value.Name)
910912
continue
911913
} else if value.Status.SystemPriority < targetpr {
912914
// Dispatcher Mode: Ensure this job is part of the target cluster
@@ -929,7 +931,13 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust
929931
klog.V(10).Infof("[getAggAvaiResPri] %s: Added %s to candidate preemptable job with priority %f.", time.Now().String(), value.Name, value.Status.SystemPriority)
930932
}
931933

934+
err := qjm.qjobResControls[arbv1.ResourceTypePod].UpdateQueueJobStatus(value)
935+
if err != nil {
936+
klog.Warningf("[getAggAvaiResPri] Error updating pod status counts for AppWrapper job: %s, err=%+v", value.Name, err)
937+
}
938+
932939
totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory)
940+
klog.V(10).Infof("[getAggAvaiResPri] total resources consumed by Appwrapper %v when lower priority compared to target are %v", value.Name, totalResource)
933941
preemptable = preemptable.Add(totalResource)
934942
klog.V(6).Infof("[getAggAvaiResPri] %s proirity %v is lower target priority %v reclaiming total preemptable resources %v", value.Name, value.Status.SystemPriority, targetpr, totalResource)
935943
continue
@@ -953,13 +961,19 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust
953961
klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in genericItem=%T for job %s which can-run is set to: %v but state is still pending.", qjv, genericItem, value.Name, value.Status.CanRun)
954962
}
955963

964+
err := qjm.qjobResControls[arbv1.ResourceTypePod].UpdateQueueJobStatus(value)
965+
if err != nil {
966+
klog.Warningf("[getAggAvaiResPri] Error updating pod status counts for AppWrapper job: %s, err=%+v", value.Name, err)
967+
}
968+
956969
totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory)
957-
delta, err := qjv.NonNegSub(totalResource)
970+
klog.V(6).Infof("[getAggAvaiResPri] total resources consumed by Appwrapper %v when CanRun are %v", value.Name, totalResource)
971+
pending, err = qjv.NonNegSub(totalResource)
958972
if err != nil {
959973
klog.Warningf("[getAggAvaiResPri] Subtraction of resources failed, adding entire appwrapper resoources %v, %v", qjv, err)
960974
pending = qjv
961975
}
962-
pending = pending.Add(delta)
976+
klog.V(6).Infof("[getAggAvaiResPri] The value of pending is %v", pending)
963977
continue
964978
} else {
965979
//Do nothing
@@ -1568,7 +1582,7 @@ func (cc *XController) Run(stopCh chan struct{}) {
15681582
cc.cache.Run(stopCh)
15691583

15701584
// go wait.Until(cc.ScheduleNext, 2*time.Second, stopCh)
1571-
go wait.Until(cc.ScheduleNext, 0, stopCh)
1585+
go wait.Until(cc.ScheduleNext, 2*time.Second, stopCh)
15721586
// start preempt thread based on preemption of pods
15731587

15741588
// TODO - scheduleNext...Job....

pkg/controller/queuejobresources/pod/pod.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -243,22 +243,26 @@ func (qjrPod *QueueJobResPod) UpdateQueueJobStatus(queuejob *arbv1.AppWrapper) e
243243
}
244244

245245
running := int32(queuejobresources.FilterPods(pods, v1.PodRunning))
246-
totalResourcesConsumed := queuejobresources.GetPodResourcesByPhase(v1.PodRunning, pods)
246+
podPhases := []v1.PodPhase{v1.PodRunning, v1.PodSucceeded}
247+
totalResourcesConsumedForPodPhases := clusterstateapi.EmptyResource()
248+
for _, phase := range podPhases {
249+
totalResourcesConsumedForPodPhases.Add(queuejobresources.GetPodResourcesByPhase(phase, pods))
250+
}
247251
pending := int32(queuejobresources.FilterPods(pods, v1.PodPending))
248252
succeeded := int32(queuejobresources.FilterPods(pods, v1.PodSucceeded))
249253
failed := int32(queuejobresources.FilterPods(pods, v1.PodFailed))
250254
podsConditionMap := queuejobresources.PendingPodsFailedSchd(pods)
251-
klog.V(10).Infof("[UpdateQueueJobStatus] There are %d pods of AppWrapper %s: pending %d, running %d, succeeded %d, failed %d, pendingpodsfailedschd %d",
252-
len(pods), queuejob.Name, pending, running, succeeded, failed, len(podsConditionMap))
255+
klog.V(10).Infof("[UpdateQueueJobStatus] There are %d pods of AppWrapper %s: pending %d, running %d, succeeded %d, failed %d, pendingpodsfailedschd %d, total resource consumed %v",
256+
len(pods), queuejob.Name, pending, running, succeeded, failed, len(podsConditionMap), totalResourcesConsumedForPodPhases)
253257

254258
queuejob.Status.Pending = pending
255259
queuejob.Status.Running = running
256260
queuejob.Status.Succeeded = succeeded
257261
queuejob.Status.Failed = failed
258262
//Total resources by all running pods
259-
queuejob.Status.TotalGPU = totalResourcesConsumed.GPU
260-
queuejob.Status.TotalCPU = totalResourcesConsumed.MilliCPU
261-
queuejob.Status.TotalMemory = totalResourcesConsumed.Memory
263+
queuejob.Status.TotalGPU = totalResourcesConsumedForPodPhases.GPU
264+
queuejob.Status.TotalCPU = totalResourcesConsumedForPodPhases.MilliCPU
265+
queuejob.Status.TotalMemory = totalResourcesConsumedForPodPhases.Memory
262266

263267
queuejob.Status.PendingPodConditions = nil
264268
for podName, cond := range podsConditionMap {
@@ -623,7 +627,7 @@ func (qjrPod *QueueJobResPod) createQueueJobPod(qj *arbv1.AppWrapper, ix int32,
623627
if tmpl == nil {
624628
tmpl = make(map[string]string)
625629
}
626-
630+
627631
tmpl[queueJobName] = qj.Name
628632

629633
// Include pre-defined metadata info, e.g. annotations
@@ -634,12 +638,12 @@ func (qjrPod *QueueJobResPod) createQueueJobPod(qj *arbv1.AppWrapper, ix int32,
634638
templateObjMetadata.SetNamespace(qj.Namespace)
635639
templateObjMetadata.SetOwnerReferences([]metav1.OwnerReference{
636640
*metav1.NewControllerRef(qj, queueJobKind),
637-
},)
641+
})
638642
templateObjMetadata.SetLabels(tmpl)
639643

640644
return &v1.Pod{
641645
ObjectMeta: templateObjMetadata,
642-
Spec: templateCopy.Spec,
646+
Spec: templateCopy.Spec,
643647
}
644648
}
645649

test/e2e/queue.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ var _ = Describe("AppWrapper E2E Test", func() {
104104

105105
// Using quite mode due to creating of pods in earlier step.
106106
err = waitAWReadyQuiet(context, aw2)
107+
fmt.Fprintf(os.Stdout, "The error is %v", err)
107108
Expect(err).NotTo(HaveOccurred())
108109
})
109110

@@ -118,7 +119,7 @@ var _ = Describe("AppWrapper E2E Test", func() {
118119
// This should fill up the worker node and most of the master node
119120
aw := createDeploymentAWwith550CPU(context, appendRandomString("aw-deployment-2-550cpu"))
120121
appwrappers = append(appwrappers, aw)
121-
122+
time.Sleep(2 * time.Minute)
122123
err := waitAWPodsReady(context, aw)
123124
Expect(err).NotTo(HaveOccurred())
124125

@@ -136,6 +137,7 @@ var _ = Describe("AppWrapper E2E Test", func() {
136137

137138
// Since preemption takes some time, increasing timeout wait time to 2 minutes
138139
err = waitAWPodsExists(context, aw3, 120000*time.Millisecond)
140+
fmt.Fprintf(os.Stdout, "[e2e] The error is %v", err)
139141
Expect(err).NotTo(HaveOccurred())
140142
})
141143

@@ -425,7 +427,7 @@ var _ = Describe("AppWrapper E2E Test", func() {
425427
// This should fill up the worker node and most of the master node
426428
aw := createDeploymentAWwith550CPU(context, appendRandomString("aw-deployment-2-550cpu"))
427429
appwrappers = append(appwrappers, aw)
428-
430+
time.Sleep(1 * time.Minute)
429431
err := waitAWPodsReady(context, aw)
430432
Expect(err).NotTo(HaveOccurred())
431433

@@ -436,6 +438,7 @@ var _ = Describe("AppWrapper E2E Test", func() {
436438
appwrappers = append(appwrappers, aw2)
437439

438440
err = waitAWAnyPodsExists(context, aw2)
441+
fmt.Fprintf(os.Stdout, "The error is %v", err)
439442
Expect(err).NotTo(HaveOccurred())
440443

441444
err = waitAWPodsPending(context, aw2)
@@ -455,7 +458,7 @@ var _ = Describe("AppWrapper E2E Test", func() {
455458
// Make sure they are running
456459
err = waitAWPodsReady(context, aw3)
457460
Expect(err).NotTo(HaveOccurred())
458-
461+
time.Sleep(2 * time.Minute)
459462
// Make sure pods from AW aw-deployment-1-700-cpu above do not exist proving preemption
460463
err = waitAWAnyPodsExists(context, aw2)
461464
Expect(err).To(HaveOccurred())
@@ -482,6 +485,7 @@ var _ = Describe("AppWrapper E2E Test", func() {
482485
appwrappers = append(appwrappers, aw2)
483486

484487
err = waitAWAnyPodsExists(context, aw2)
488+
fmt.Fprintf(os.Stdout, "the error is %v", err)
485489
Expect(err).NotTo(HaveOccurred())
486490

487491
err = waitAWPodsReady(context, aw2)
@@ -498,6 +502,7 @@ var _ = Describe("AppWrapper E2E Test", func() {
498502
// This should fill up the worker node and most of the master node
499503
aw := createDeploymentAWwith550CPU(context, appendRandomString("aw-deployment-2-550cpu"))
500504
appwrappers = append(appwrappers, aw)
505+
time.Sleep(1 * time.Minute)
501506

502507
err := waitAWPodsReady(context, aw)
503508
Expect(err).NotTo(HaveOccurred())
@@ -507,7 +512,7 @@ var _ = Describe("AppWrapper E2E Test", func() {
507512
context, appendRandomString("aw-deployment-2-426-vs-425-cpu"), "426m", "425m", 2, 60)
508513

509514
appwrappers = append(appwrappers, aw2)
510-
515+
time.Sleep(1 * time.Minute)
511516
err = waitAWAnyPodsExists(context, aw2)
512517
Expect(err).To(HaveOccurred())
513518

@@ -664,7 +669,7 @@ var _ = Describe("AppWrapper E2E Test", func() {
664669
// This should fill up the worker node and most of the master node
665670
aw := createDeploymentAWwith550CPU(context, appendRandomString("aw-deployment-2-550cpu"))
666671
appwrappers = append(appwrappers, aw)
667-
672+
time.Sleep(1 * time.Minute)
668673
err := waitAWPodsReady(context, aw)
669674
Expect(err).NotTo(HaveOccurred())
670675

test/e2e/util.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -737,7 +737,7 @@ func waitAWPending(ctx *context, aw *arbv1.AppWrapper) error {
737737
}
738738

739739
func waitAWPodsReadyEx(ctx *context, aw *arbv1.AppWrapper, taskNum int, quite bool) error {
740-
return wait.Poll(100*time.Millisecond, ninetySeconds, awPodPhase(ctx, aw,
740+
return wait.Poll(100*time.Millisecond, threeHundredSeconds, awPodPhase(ctx, aw,
741741
[]v1.PodPhase{v1.PodRunning, v1.PodSucceeded}, taskNum, quite))
742742
}
743743

0 commit comments

Comments
 (0)