Skip to content

Commit cf047f0

Browse files
committed
improved configuration for workflows calling for GPUs
1 parent 0124123 commit cf047f0

File tree

4 files changed

+39
-11
lines changed

4 files changed

+39
-11
lines changed

ManagementService/python/deployWorkflow.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,9 @@ def compile_resource_info_map(resource_names, uploaded_resources, email, sapi, d
132132
resource_metadata = json.loads(resource_metadata)
133133
if "runtime" in resource_metadata:
134134
resource_info["runtime"] = resource_metadata["runtime"]
135-
#if "on_gpu" in resource_metadata:
136-
# resource_info["on_gpu"] = True
135+
print("RESOURCE_INFO_ALL: " +str(resource_info))
136+
#if "num_gpu" in resource_metadata:
137+
# print("RESOURCE_INFO: " + str(resource_info["num_gpu"]))
137138

138139
num_chunks_str = dlc.get("grain_source_zip_num_chunks_" + resource_id)
139140
try:
@@ -296,12 +297,23 @@ def create_k8s_deployment(email, workflow_info, runtime, management=False):
296297
env.append({'name': 'WORKFLOWID', 'value': workflow_info["workflowId"]})
297298
env.append({'name': 'WORKFLOWNAME', 'value': workflow_info["workflowName"]})
298299

300+
"""
301+
if "num_gpu" in workflow_info.keys():
302+
print("INSIDE K8S Deploy, num_gpu: " + str(workflow_info['num_gpu']))
303+
num_gpu = int(workflow_info['num_gpu'])
304+
# overwrite values from values.yaml for new workflows
305+
kservice['spec']['template']['spec']['containers'][0]['resources']['limits']['nvidia.com/gpu'] = str(num_gpu)
306+
kservice['spec']['template']['spec']['containers'][0]['resources']['requests']['nvidia.com/gpu'] = str(num_gpu)
307+
kservice['spec']['template']['spec']['containers'][0]['image'] = "localhost:5000/microfn/sandbox"
308+
if num_gpu > 0:
309+
kservice['spec']['template']['spec']['containers'][0]['image'] = "localhost:5000/microfn/sandbox"
310+
"""
299311
# Special handling for the management container
300312
if management:
301313
kservice['spec']['template']['spec']['volumes'] = [{ 'name': 'new-workflow-conf', 'configMap': {'name': new_workflow_conf['configmap']}}]
302314
kservice['spec']['template']['spec']['containers'][0]['volumeMounts'] = [{'name': 'new-workflow-conf', 'mountPath': '/opt/mfn/SandboxAgent/conf'}]
303315
kservice['spec']['template']['spec']['serviceAccountName'] = new_workflow_conf['mgmtserviceaccount']
304-
316+
305317
# management container should not consume a CPU and use standard sandbox image
306318
if (labels['workflowid'] == "Management"):
307319
kservice['spec']['template']['spec']['containers'][0]['resources']['limits']['nvidia.com/gpu'] = "0"
@@ -464,6 +476,13 @@ def handle(value, sapi):
464476
runtime = "Java"
465477
else:
466478
runtime = "Python"
479+
480+
if "num_gpu" in resource_info_map.keys():
481+
print ("RESOURCE_INFO_MAP: " + str(resource_info_map))
482+
workflow_info['num_gpu'] = resource_info_map['num_gpu']
483+
else:
484+
workflow_info['num_gpu'] = 0
485+
467486
url, endpoint_key = create_k8s_deployment(email, workflow_info, runtime)
468487
if url is not None and len(url) > 0:
469488
status = "deploying"

mfn_sdk/mfn_sdk/workflow.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ def json(self,json):
127127
self.client.action('uploadWorkflowJSON',{'workflow':{'id':self.id,'json':base64.b64encode(self._json.encode()).decode()}})
128128

129129

130-
def deploy(self, timeout=None):
130+
def deploy(self, timeout=None): #, num_gpu=None):
131131
""" deploy a workflow and optionally wait in linearly increasing multiples of 1000ms
132132
:timeout: By default returns after calling deploy on the workflow without waiting for it to be actually deployed.
133133
If timeout is set to a numeric <= 0, it waits indefinitely in intervals of 1000ms, 2000ms, 3000ms, ...
@@ -145,6 +145,10 @@ def deploy(self, timeout=None):
145145
else:
146146
self.client.action('deployWorkflow',{'workflow':{'id':self.id}})
147147

148+
#if num_gpu is not None:
149+
# print("NUM_GPU:" + str(num_gpu))
150+
151+
148152
# if timeout is None, do not wait but return immediately even if it's not yet deployed
149153
if timeout is None:
150154
return

tests/asl_DLIB/test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,6 @@ def test_dlib(self):
3434

3535
testtuplelist =[(inp1, res1)]
3636

37-
test = MFNTest(test_name = "Dlib__Test")
37+
test = MFNTest(test_name = "Dlib_Test", num_gpu = 1)
3838
test.exec_tests(testtuplelist)
3939

tests/mfn_test_utils.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class MfnAppTextFormat():
4343
mfntestfailed = MfnAppTextFormat.STYLE_BOLD + MfnAppTextFormat.COLOR_RED + 'FAILED' + MfnAppTextFormat.END + MfnAppTextFormat.END
4444

4545
class MFNTest():
46-
def __init__(self, test_name=None, timeout=None, workflow_filename=None, new_user=False, delete_user=False):
46+
def __init__(self, test_name=None, timeout=None, workflow_filename=None, new_user=False, delete_user=False, num_gpu=None):
4747

4848
self._settings = self._get_settings()
4949

@@ -84,6 +84,9 @@ def __init__(self, test_name=None, timeout=None, workflow_filename=None, new_use
8484
if timeout is not None:
8585
self._settings["timeout"] = timeout
8686

87+
if num_gpu is not None:
88+
self._settings["num_gpu"] = num_gpu
89+
8790
self._log_clear_timestamp = int(time.time() * 1000.0 * 1000.0)
8891

8992
# will be the deployed workflow object in self._client
@@ -190,6 +193,9 @@ def _get_resource_info_map(self, workflow_description=None, resource_info_map=No
190193
resource_info["resource_req_filename"] = "requirements/" + resource_ref + "_requirements.txt"
191194
resource_info["resource_env_filename"] = "environment_variables/" + resource_ref + "_environment_variables.txt"
192195
resource_info_map[resource_ref] = resource_info
196+
resource_info_map[resource_ref]['num_gpu'] = self._settings['num_gpu']
197+
print("resource_info_map: " + json.dumps(resource_info_map))
198+
193199

194200
elif "States" in workflow_description:
195201
states = workflow_description["States"]
@@ -203,6 +209,9 @@ def _get_resource_info_map(self, workflow_description=None, resource_info_map=No
203209
resource_info["resource_req_filename"] = "requirements/" + resource_name + "_requirements.txt"
204210
resource_info["resource_env_filename"] = "environment_variables/" + resource_name + "_environment_variables.txt"
205211
resource_info_map[resource_name] = resource_info
212+
resource_info_map[resource_name]['num_gpu'] = self._settings['num_gpu']
213+
print("resource_info_map: " + json.dumps(resource_info_map))
214+
206215

207216
if "Type" in state and state["Type"] == "Parallel":
208217
branches = state['Branches']
@@ -219,10 +228,6 @@ def _get_resource_info_map(self, workflow_description=None, resource_info_map=No
219228
print("ERROR: invalid workflow description.")
220229
assert False
221230

222-
#resource_info_map[resource_name]['on_gpu'] = True
223-
224-
#print("resource_info_map: " + str(resource_info_map))
225-
226231
return resource_info_map
227232

228233
def _delete_resource_if_existing(self, existing_resources, resource_name):
@@ -299,7 +304,7 @@ def deploy_workflow(self):
299304
try:
300305
wf = self._client.add_workflow(self._workflow_name)
301306
wf.json = json.dumps(self._workflow_description)
302-
wf.deploy(self._settings["timeout"])
307+
wf.deploy(self._settings["timeout"]) #, num_gpu=self._settings['num_gpu'])
303308
self._workflow = wf
304309
if self._workflow.status != "failed":
305310
print("MFN workflow " + self._workflow_name + " deployed.")

0 commit comments

Comments
 (0)