Skip to content

Commit 550cf04

Browse files
committed
Working version final
1 parent 1cfecd1 commit 550cf04

File tree

5 files changed

+71
-53
lines changed

5 files changed

+71
-53
lines changed

examples/v2beta1/charm/generate_jobs.py

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,20 @@
1-
from random import randint, seed
1+
from random import randint, seed, choice, shuffle
22
import sys
33
import os
44
import time
55

66
seed(42)
77

8-
def create_job(job_index, priority, problem_size, min_replicas, max_replicas, timesteps):
8+
size_index = [0, 1, 2]
9+
sizes_per_pe = [1024, 1024, 2048]
10+
min_pes = [1, 4, 4]
11+
timesteps_per_job = [1000, 2000, 500]
12+
job_prefixes = ["small", "medium", "large"]
13+
14+
njobs = 20
15+
16+
17+
def create_job(prefix, job_index, priority, problem_size, min_replicas, max_replicas, timesteps):
918
with open("charm-template.yaml", "r") as file:
1019
template = file.read()
1120

@@ -15,6 +24,7 @@ def create_job(job_index, priority, problem_size, min_replicas, max_replicas, ti
1524
chare_size = problem_size // num_chares
1625

1726
job_yaml = template.format(
27+
prefix=prefix,
1828
job_index=job_index,
1929
priority=priority,
2030
problem_size=problem_size,
@@ -29,17 +39,24 @@ def create_job(job_index, priority, problem_size, min_replicas, max_replicas, ti
2939

3040

3141
def generate_jobs():
32-
for job_index in range(10):
33-
priority = randint(1, 5)
34-
problem_size = 2 ** (randint(9, 12))
35-
min_replicas = randint(1, 3)
36-
max_replicas = randint(min_replicas+1, 8)
37-
timesteps = 100 * randint(20, 30)
38-
create_job(job_index, priority, problem_size, min_replicas, max_replicas, timesteps)
42+
jobs = [0] * int(njobs * 0.25)
43+
jobs += [1] * int(njobs * 0.5)
44+
jobs += [2] * int(njobs * 0.25)
45+
shuffle(jobs)
46+
print(jobs)
47+
for i, job_index in enumerate(jobs):
48+
idx = job_index
49+
priority = 3 - idx + randint(0, 3)
50+
min_replicas = min_pes[idx]
51+
max_replicas = 4 * min_replicas
52+
problem_size = min_replicas * sizes_per_pe[idx]
53+
timesteps = timesteps_per_job[idx] + 100 * randint(0, 10)
54+
prefix = job_prefixes[idx]
55+
create_job(prefix, i, priority, problem_size, min_replicas, max_replicas, timesteps)
3956

4057

4158
def submit_jobs():
42-
for job_index in range(10):
59+
for job_index in range(njobs):
4360
job_file = f"jobs/charm-job-{job_index}.yaml"
4461
print(f"Submitting {job_file}")
4562
# Here you would submit the job using your cluster's job submission command

examples/v2beta1/charm/track_utilization.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import subprocess
22
import time
33

4-
output_file = "/home/aditya/mpi-operator/examples/v2beta1/charm/pod_utilization.log"
4+
output_file = "/home/aditya/mpi-operator/examples/v2beta1/charm/pod_utilization_long2.log"
5+
6+
open(output_file, "w").close() # Clear the file before starting
57

68
while True:
79
result = subprocess.run(["kubectl", "get", "pods"], capture_output=True, text=True)

pkg/apis/kubeflow/v2beta1/default.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func setDefaultsTypeLauncher(spec *ReplicaSpec) {
2929
return
3030
}
3131
if spec.RestartPolicy == "" {
32-
spec.RestartPolicy = DefaultLauncherRestartPolicy
32+
spec.RestartPolicy = DefaultRestartPolicy
3333
}
3434
if spec.MaxReplicas == nil {
3535
spec.MaxReplicas = ptr.To[int32](1)

pkg/controller/mpi_job_controller.go

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -276,16 +276,17 @@ func (pq PriorityQueue) DeepCopy() PriorityQueue {
276276

277277
func (pq PriorityQueue) Len() int { return len(pq) }
278278

279-
func (pq PriorityQueue) Less(i, j int) bool {
280-
// We want Pop to give us the highest, not lowest, priority so we use greater than here.
281-
if pq[i].priority == pq[j].priority {
282-
return pq[j].timestamp.Before(pq[i].timestamp) // earlier timestamp has higher priority
283-
}
284-
return pq[i].priority < pq[j].priority
285-
}
286-
287279
func compare(a *Item, b *Item) int {
288-
return cmp.Compare(a.priority, b.priority)
280+
if a.priority == b.priority {
281+
if a.timestamp.Before(b.timestamp) {
282+
return -1 // earlier timestamp has higher priority
283+
} else if a.timestamp.After(b.timestamp) {
284+
return 1 // earlier timestamp has higher priority
285+
} else {
286+
return 0 // equal timestamps
287+
} // earlier timestamp has higher priority
288+
}
289+
return cmp.Compare(b.priority, a.priority)
289290
}
290291

291292
func (pq *PriorityQueue) Push(x any) {
@@ -780,8 +781,8 @@ func (c *MPIJobController) assignFreeSlots() error {
780781
}
781782

782783
if runPriority < queuePriority {
783-
idxQueued += 1
784784
it = itQueued
785+
c.queuedJobs = append(c.queuedJobs[:idxQueued], c.queuedJobs[idxQueued+1:]...)
785786
launcherCount = 1
786787
//action = create
787788
} else {
@@ -799,10 +800,15 @@ func (c *MPIJobController) assignFreeSlots() error {
799800
jobMinReplicas := *it.mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker].MinReplicas
800801
if int32(len(workerPodList)) < jobMaxReplicas {
801802
newReplicas = int32(math.Min(float64(jobMaxReplicas),
802-
float64(int(c.latestReplicas[getJobKey(&it.mpiJob)])+c.freeSlots)))
803+
float64(int(c.latestReplicas[getJobKey(&it.mpiJob)])+c.freeSlots-int(launcherCount))))
803804
klog.Infof("Expanding %s to %d, freecount = %d", getJobKey(&it.mpiJob),
804805
newReplicas, c.freeSlots)
806+
805807
if newReplicas < jobMinReplicas {
808+
if launcherCount == 1 {
809+
c.queuedJobs.Push(it)
810+
idxQueued += 1
811+
}
806812
continue
807813
}
808814

@@ -811,6 +817,10 @@ func (c *MPIJobController) assignFreeSlots() error {
811817
continue
812818
}
813819

820+
if c.jobStatus[getJobKey(&it.mpiJob)] == expanding {
821+
continue
822+
}
823+
814824
c.latestReplicas[getJobKey(&it.mpiJob)] = newReplicas
815825
c.freeSlots -= (int(newReplicas) - len(workerPodList) + int(launcherCount))
816826

@@ -823,7 +833,7 @@ func (c *MPIJobController) assignFreeSlots() error {
823833
}
824834
}
825835
//c.runningJobs = c.runningJobs[idxRunning:]
826-
c.queuedJobs = c.queuedJobs[idxQueued:]
836+
//c.queuedJobs = c.queuedJobs[idxQueued:]
827837

828838
if minTime < math.MaxInt64 {
829839
c.queue.AddAfter(assignFreeSlotsFlag, minTime)
@@ -980,7 +990,7 @@ func (c *MPIJobController) syncHandler(key string) error {
980990
//action, _, newPods, err = c.getAction(mpiJob)
981991

982992
isExpand := false
983-
if status, ok := c.jobStatus[getJobKey(mpiJob)]; ok && status == running {
993+
if status, ok := c.jobStatus[getJobKey(mpiJob)]; ok && status == expanding {
984994
selector, err := workerSelector(mpiJob.Name)
985995
if err != nil {
986996
return err
@@ -990,7 +1000,7 @@ func (c *MPIJobController) syncHandler(key string) error {
9901000
return err
9911001
}
9921002
if len(podFullList) < int(lastReplicas) {
993-
isExpand = true
1003+
isExpand = true // this will be true only during the first pass through synchandler
9941004
}
9951005
}
9961006

@@ -1463,6 +1473,8 @@ func (c *MPIJobController) checkJobQueue() error {
14631473
index += 1
14641474
continue
14651475
}
1476+
c.freeSlots -= int(c.latestReplicas[getJobKey(&mpiJob)]) // This is for the workers
1477+
c.freeSlots -= 1 // This one is for the launcher
14661478
c.queue.AddRateLimited(getJobKey(&mpiJob))
14671479
if index < len(c.queuedJobs)-1 {
14681480
c.queuedJobs = append(c.queuedJobs[:index], c.queuedJobs[index+1:]...)
@@ -1529,6 +1541,7 @@ func (c *MPIJobController) calculateWorkerReplicas(mpiJob *kubeflow.MPIJob) (int
15291541
}
15301542

15311543
it := c.runningJobs[index]
1544+
index -= 1
15321545

15331546
// if the running job priority is higher than the new job
15341547
// don't shrink it
@@ -1539,7 +1552,7 @@ func (c *MPIJobController) calculateWorkerReplicas(mpiJob *kubeflow.MPIJob) (int
15391552
if c.jobStatus[getJobKey(&it.mpiJob)] != running {
15401553
continue
15411554
}
1542-
index -= 1
1555+
15431556
workerPodList, err := c.getRunningWorkerPods(&it.mpiJob)
15441557
if err != nil {
15451558
return -1, err

pkg/controller/rescale_client.C

Lines changed: 12 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ int main (int argc, char **argv)
2121

2222
// Create a CcsServer and connect to the given hostname and port
2323
CcsServer server;
24-
char host[BUF], *bitmap;
25-
int i, port, cmdLen, mode;
24+
char host[BUF], *msg;
25+
int i, port, cmdLen;
26+
bool isExpand;
2627

2728
sprintf(host, "%s", argv[1]);
2829
sscanf(argv[2], "%d", &port);
@@ -32,11 +33,11 @@ int main (int argc, char **argv)
3233
//printf("Rescaling from %i to %i\n", OLDNPROCS, NEWNPROCS);
3334

3435
if( NEWNPROCS > OLDNPROCS)
35-
mode = EXPAND;
36+
isExpand = true;
3637
else if(OLDNPROCS > NEWNPROCS)
37-
mode = SHRINK;
38+
isExpand = false;
3839
else{
39-
printf("0");
40+
printf("1");
4041
return 0;
4142
}
4243
//printf("Connecting to server %s %d\n", host, port);
@@ -46,33 +47,18 @@ int main (int argc, char **argv)
4647
}
4748
//printf("Connected to server\n");
4849

49-
cmdLen = OLDNPROCS * sizeof(char) + sizeof(int) + sizeof(char);
50-
bitmap = (char *) malloc(cmdLen);
50+
cmdLen = sizeof(int) + sizeof(bool);
51+
msg = (char *) malloc(cmdLen);
52+
memcpy(msg, &isExpand, sizeof(bool));
53+
memcpy(&msg[sizeof(bool)], &NEWNPROCS, sizeof(int));
5154

52-
if (mode == EXPAND) {
53-
//printf("Sending expand command.\n");
54-
for (i = 0; i < OLDNPROCS; i++) {
55-
bitmap[i] = 1;
56-
}
57-
}
58-
else {
59-
//printf("Sending shrink command.\n");
60-
for (i = 0; i < OLDNPROCS; i++) {
61-
if (i < NEWNPROCS)
62-
bitmap[i] = 1;
63-
else
64-
bitmap[i] = 0;
65-
}
66-
}
67-
memcpy(&bitmap[OLDNPROCS], &NEWNPROCS, sizeof(int));
68-
bitmap[OLDNPROCS+sizeof(int)] = '\0';
69-
if (CcsSendRequest(&server, "set_bitmap", 0, cmdLen, bitmap) == -1) {
55+
if (CcsSendRequest(&server, "realloc", 0, cmdLen, msg) == -1) {
7056
printf("0");
7157
return 0;
7258
}
7359

7460
//printf("Waiting for reply...\n" );
75-
if (CcsRecvResponse(&server, cmdLen, bitmap , 30) == -1) {
61+
if (CcsRecvResponse(&server, cmdLen, msg , 15) == -1) {
7662
printf("0");
7763
return 0;
7864
}

0 commit comments

Comments
 (0)