Skip to content

Commit 89e8d24

Browse files
committed
first cut on extending Workflow class with GPU properties
1 parent cf047f0 commit 89e8d24

File tree

6 files changed

+93
-28
lines changed

6 files changed

+93
-28
lines changed

ManagementService/python/addWorkflow.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ def handle(value, sapi):
2727
success = False
2828

2929
email = data["email"]
30+
3031

3132
if "workflow" in data:
3233
workflow = data["workflow"]
@@ -38,6 +39,9 @@ def handle(value, sapi):
3839
wf["status"] = "undeployed"
3940
wf["modified"] = time.time()
4041
wf["endpoints"] = []
42+
wf["gpu_usage"] = None
43+
if "gpu_usage" in workflow:
44+
wf["gpu_usage"] = workflow["gpu_usage"]
4145

4246
wf["id"] = hashlib.md5(str(uuid.uuid4()).encode()).hexdigest()
4347

ManagementService/python/deployWorkflow.py

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,23 @@
2626
WF_TYPE_SAND = 0
2727
WF_TYPE_ASL = 1
2828

29+
def get_kv_pairs(testdict, keys, dicts=None):
30+
# find and return kv pairs with particular keys in testdict
31+
if not dicts:
32+
dicts = [testdict]
33+
testdict = [testdict]
34+
data = testdict.pop(0)
35+
if isinstance(data, dict):
36+
data = data.values()
37+
for d in data:
38+
if isinstance(d, dict) or isinstance(d, list): # check d for type
39+
testdict.append(d)
40+
if isinstance(d, dict):
41+
dicts.append(d)
42+
if testdict: # no more data to search
43+
return get_kv_pairs(testdict, keys, dicts)
44+
return [(k, v) for d in dicts for k, v in d.items() if k in keys]
45+
2946
def is_asl_workflow(wfobj):
3047
return 'StartAt' in wfobj and 'States' in wfobj and isinstance(wfobj['States'], dict)
3148

@@ -132,7 +149,6 @@ def compile_resource_info_map(resource_names, uploaded_resources, email, sapi, d
132149
resource_metadata = json.loads(resource_metadata)
133150
if "runtime" in resource_metadata:
134151
resource_info["runtime"] = resource_metadata["runtime"]
135-
print("RESOURCE_INFO_ALL: " +str(resource_info))
136152
#if "num_gpu" in resource_metadata:
137153
# print("RESOURCE_INFO: " + str(resource_info["num_gpu"]))
138154

@@ -245,7 +261,7 @@ def get_workflow_host_port(host_to_deploy, sid):
245261

246262
return success, host_port
247263

248-
def create_k8s_deployment(email, workflow_info, runtime, management=False):
264+
def create_k8s_deployment(email, workflow_info, runtime, management=False, use_gpus=0):
249265
# KUBERNETES MODE
250266
new_workflow_conf = {}
251267
conf_file = '/opt/mfn/SandboxAgent/conf/new_workflow.conf'
@@ -297,17 +313,15 @@ def create_k8s_deployment(email, workflow_info, runtime, management=False):
297313
env.append({'name': 'WORKFLOWID', 'value': workflow_info["workflowId"]})
298314
env.append({'name': 'WORKFLOWNAME', 'value': workflow_info["workflowName"]})
299315

300-
"""
301-
if "num_gpu" in workflow_info.keys():
302-
print("INSIDE K8S Deploy, num_gpu: " + str(workflow_info['num_gpu']))
303-
num_gpu = int(workflow_info['num_gpu'])
316+
if use_gpus >= 0:
317+
#print("INSIDE K8S Deploy, num_gpu: " + str(workflow_info['num_gpu']))
318+
#num_gpu = int(workflow_info['num_gpu'])
304319
# overwrite values from values.yaml for new workflows
305-
kservice['spec']['template']['spec']['containers'][0]['resources']['limits']['nvidia.com/gpu'] = str(num_gpu)
306-
kservice['spec']['template']['spec']['containers'][0]['resources']['requests']['nvidia.com/gpu'] = str(num_gpu)
307-
kservice['spec']['template']['spec']['containers'][0]['image'] = "localhost:5000/microfn/sandbox"
308-
if num_gpu > 0:
309-
kservice['spec']['template']['spec']['containers'][0]['image'] = "localhost:5000/microfn/sandbox"
310-
"""
320+
kservice['spec']['template']['spec']['containers'][0]['resources']['limits']['nvidia.com/gpu'] = str(use_gpus)
321+
kservice['spec']['template']['spec']['containers'][0]['resources']['requests']['nvidia.com/gpu'] = str(use_gpus)
322+
#kservice['spec']['template']['spec']['containers'][0]['image'] = "localhost:5000/microfn/sandbox"
323+
kservice['spec']['template']['spec']['containers'][0]['image'] = "localhost:5000/microfn/sandbox_gpu"
324+
311325
# Special handling for the management container
312326
if management:
313327
kservice['spec']['template']['spec']['volumes'] = [{ 'name': 'new-workflow-conf', 'configMap': {'name': new_workflow_conf['configmap']}}]
@@ -407,6 +421,11 @@ def handle(value, sapi):
407421
workflow = data["workflow"]
408422
if "id" not in workflow:
409423
raise Exception("malformed input")
424+
"""
425+
if "gpu_usage" not in workflow:
426+
raise Exception("malformed input: no gpu_usage")
427+
use_gpus = int(data['gpu_usage'])
428+
"""
410429
sapi.log(json.dumps(workflow))
411430
wfmeta = sapi.get(email + "_workflow_" + workflow["id"], True)
412431
if wfmeta is None or wfmeta == "":
@@ -437,6 +456,8 @@ def handle(value, sapi):
437456
if is_asl_workflow(wfobj):
438457
wf_type = WF_TYPE_ASL
439458

459+
#use_gpus = int(wfmeta._gpu_usage)
460+
440461
success, errmsg, resource_names, uploaded_resources = check_workflow_functions(wf_type, wfobj, email, sapi)
441462
if not success:
442463
raise Exception("Couldn't deploy workflow; " + errmsg)
@@ -477,13 +498,16 @@ def handle(value, sapi):
477498
else:
478499
runtime = "Python"
479500

501+
"""
480502
if "num_gpu" in resource_info_map.keys():
481503
print ("RESOURCE_INFO_MAP: " + str(resource_info_map))
482504
workflow_info['num_gpu'] = resource_info_map['num_gpu']
483505
else:
484506
workflow_info['num_gpu'] = 0
507+
"""
508+
use_gpus = 0
485509

486-
url, endpoint_key = create_k8s_deployment(email, workflow_info, runtime)
510+
url, endpoint_key = create_k8s_deployment(email, workflow_info, runtime, use_gpus)
487511
if url is not None and len(url) > 0:
488512
status = "deploying"
489513
sapi.addSetEntry(workflow_info["workflowId"] + "_workflow_endpoints", str(url), is_private=True)

deploy/helm/microfunctions/values.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ imageRepo: "localhost:5000"
2424
manager:
2525
#httpProxy: "http://<host>:<port>"
2626
#httpsProxy: "http://<host>:<port>"
27-
#httpGatewayPort: 80
28-
#httpsGatewayPort: 443
27+
httpGatewayPort: 30336
28+
#httpsGatewayPort: 32533
2929
nameOverride: "microfunctions"
3030
newWorkflow:
3131
hpa:

mfn_sdk/mfn_sdk/mfnclient.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ def action(self,action,data=None):
266266
r.raise_for_status()
267267
log.debug("%s: %s <- %s", self.user, action, r.text[:256]+(r.text[256:] and '...'))
268268
resp = r.json()
269+
print(str(resp))
269270
if resp.get('status','') != 'success':
270271
if resp.get('has_error',False):
271272
raise Exception(f"MicroFunctions Error for action {action}: {resp['error_type']}")
@@ -449,7 +450,7 @@ def _get_state_names_and_resource(self, desired_state_type, wf_dict):
449450
return state_list
450451

451452

452-
def add_workflow(self,name,filename=None):
453+
def add_workflow(self,name,filename=None, gpu_usage=None):
453454
""" add a workflow
454455
455456
returns an existing workflow if the name exists, registers a new workflow name if it doesn't exist
@@ -458,7 +459,7 @@ def add_workflow(self,name,filename=None):
458459
for wf in self._workflows:
459460
if wf._name == name:
460461
return wf
461-
data = self.action('addWorkflow',{'workflow':{'name':name}})
462+
data = self.action('addWorkflow',{'workflow':{'name':name, "gpu_usage":gpu_usage}})
462463
wfd = data['workflow']
463464
wf = Workflow(self,wfd)
464465
self._workflows.append(wf)
@@ -475,6 +476,7 @@ def add_workflow(self,name,filename=None):
475476
# parse the WF json to find required functions
476477
fnames = []
477478
wfjson = json.loads(wfdesc)
479+
#print("wfjson: "+ str(wfjson))
478480
if 'States' in wfjson:
479481
state_list = self._get_state_names_and_resource('Task', wfjson)
480482
for state_info in state_list:
@@ -505,7 +507,6 @@ def add_workflow(self,name,filename=None):
505507
with open(fpyname, 'r') as f:
506508
fcode = f.read()
507509
f.code = fcode
508-
509510
return wf
510511

511512

mfn_sdk/mfn_sdk/workflow.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,13 @@ class Workflow(object):
5252
"""
5353

5454
def __init__(self,client,wf):
55+
print(str(wf))
5556
self.client=client
5657
self.id=wf["id"]
5758
self._name=wf["name"]
59+
self._gpu_usage=None
60+
if "gpu_usage" in wf:
61+
self._gpu_usage=wf["gpu_usage"]
5862
self._modified=wf["modified"]
5963
self._status=wf.get("status",None)
6064
self._endpoints=wf.get("endpoints",None)
@@ -68,6 +72,19 @@ def __str__(self):
6872
else:
6973
return f"{self.id} ({self._name}, status: {self._status})"
7074

75+
@property
76+
def gpu_usage(self):
77+
# TODO: workflow GPU usage could have been updated, decide if we should fetch workflow status
78+
return self._gpu_usage
79+
80+
"""
81+
@gpu_usage.setter
82+
def gpu_usage(self,gpu_usage):
83+
# TODO: workflow GPU could have been updated, decide if we should fetch workflow status
84+
res = self.client.action('modifyWorkflow',{'workflow':{'id':self.id,'name':name,'gpu_usage':self._gpu_usage}})
85+
self.gpu_usage = gpu_usage
86+
"""
87+
7188
@property
7289
def name(self):
7390
# TODO: workflow name could have been updated, decide if we should fetch workflow status
@@ -124,15 +141,17 @@ def json(self):
124141
def json(self,json):
125142
if json != self.json:
126143
self._json = json
144+
print ("uploaded workflow JOSN"+ str( json))
127145
self.client.action('uploadWorkflowJSON',{'workflow':{'id':self.id,'json':base64.b64encode(self._json.encode()).decode()}})
128146

129147

130-
def deploy(self, timeout=None): #, num_gpu=None):
148+
def deploy(self, timeout=None):
131149
""" deploy a workflow and optionally wait in linearly increasing multiples of 1000ms
132150
:timeout: By default returns after calling deploy on the workflow without waiting for it to be actually deployed.
133151
If timeout is set to a numeric <= 0, it waits indefinitely in intervals of 1000ms, 2000ms, 3000ms, ...
134152
If timeout is set to a numeric > 0, it waits for the workflow to be deployed in increasing multiples of 100ms, but no longer than the timeout. When the timeout expires and the workflow is not deployed, the function raises an Exception
135153
"""
154+
136155
s = self.status
137156
if s == 'deployed':
138157
log.debug("deploy: wf %s already deployed",self.name)
@@ -145,9 +164,6 @@ def deploy(self, timeout=None): #, num_gpu=None):
145164
else:
146165
self.client.action('deployWorkflow',{'workflow':{'id':self.id}})
147166

148-
#if num_gpu is not None:
149-
# print("NUM_GPU:" + str(num_gpu))
150-
151167

152168
# if timeout is None, do not wait but return immediately even if it's not yet deployed
153169
if timeout is None:
@@ -283,8 +299,6 @@ def execute(self,data,timeout=60, check_duration=False):
283299

284300
# we are already deployed and have the endpoints stored in self._endpoints
285301
url = random.choice(self._endpoints)
286-
print(url)
287-
url=url+":30336"
288302
try:
289303
#postdata = {}
290304
#postdata["value"] = json.dumps(data)

tests/mfn_test_utils.py

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,17 @@ def __init__(self, test_name=None, timeout=None, workflow_filename=None, new_use
8484
if timeout is not None:
8585
self._settings["timeout"] = timeout
8686

87+
"""
88+
else:
89+
#self._gpu_usage = None
90+
#self._workflow_description['num_gpu'] = self._settings["num_gpu"]
91+
#print("Workflow_description:" + str(self._workflow_description))
92+
93+
self.gpu_usage = 0 # hardcoded for now
8794
if num_gpu is not None:
8895
self._settings["num_gpu"] = num_gpu
96+
self._gpu_usage = self._settings["num_gpu"]
97+
"""
8998

9099
self._log_clear_timestamp = int(time.time() * 1000.0 * 1000.0)
91100

@@ -118,6 +127,7 @@ def _get_settings(self):
118127

119128
# Defaults
120129
settings.setdefault("timeout", 60)
130+
settings.setdefault("num_gpu", 0)
121131

122132
return settings
123133

@@ -174,6 +184,7 @@ def _get_resource_info(self, resource_ref):
174184
return retval
175185

176186
def _get_resource_info_map(self, workflow_description=None, resource_info_map=None):
187+
#print(str("wf description: " + str(workflow_description)))
177188
if workflow_description is None:
178189
workflow_description = self._workflow_description
179190
if resource_info_map is None:
@@ -194,8 +205,8 @@ def _get_resource_info_map(self, workflow_description=None, resource_info_map=No
194205
resource_info["resource_env_filename"] = "environment_variables/" + resource_ref + "_environment_variables.txt"
195206
resource_info_map[resource_ref] = resource_info
196207
resource_info_map[resource_ref]['num_gpu'] = self._settings['num_gpu']
197-
print("resource_info_map: " + json.dumps(resource_info_map))
198-
208+
#resource_info_map['num_gpu'] = self._settings['num_gpu']
209+
#print("resource_info_map: " + json.dumps(resource_info_map))
199210

200211
elif "States" in workflow_description:
201212
states = workflow_description["States"]
@@ -210,8 +221,8 @@ def _get_resource_info_map(self, workflow_description=None, resource_info_map=No
210221
resource_info["resource_env_filename"] = "environment_variables/" + resource_name + "_environment_variables.txt"
211222
resource_info_map[resource_name] = resource_info
212223
resource_info_map[resource_name]['num_gpu'] = self._settings['num_gpu']
213-
print("resource_info_map: " + json.dumps(resource_info_map))
214-
224+
#resource_info_map['num_gpu'] = self._settings['num_gpu']
225+
#print("resource_info_map: " + json.dumps(resource_info_map))
215226

216227
if "Type" in state and state["Type"] == "Parallel":
217228
branches = state['Branches']
@@ -228,6 +239,7 @@ def _get_resource_info_map(self, workflow_description=None, resource_info_map=No
228239
print("ERROR: invalid workflow description.")
229240
assert False
230241

242+
#resource_info_map['num_gpu'] = self._settings['num_gpu']
231243
return resource_info_map
232244

233245
def _delete_resource_if_existing(self, existing_resources, resource_name):
@@ -239,6 +251,7 @@ def _delete_resource_if_existing(self, existing_resources, resource_name):
239251

240252
def _create_and_upload_resource(self, resource_name, resource_info):
241253
print("Deploying resource: " + resource_name)
254+
#print(str (resource_info))
242255

243256
resource_filename = resource_info["resource_filename"]
244257
is_zip = resource_info["is_zip"]
@@ -287,10 +300,13 @@ def upload_workflow(self):
287300
self.undeploy_workflow()
288301

289302
resource_info_map = self._get_resource_info_map()
303+
#resource_info_map['num_gpu'] = 1
304+
#print(str(resource_info_map))
290305

291306
existing_resources = self._client.functions
292307

293308
for resource_name in resource_info_map.keys():
309+
#if not resource_name == 'num_gpu':
294310
self._delete_resource_if_existing(existing_resources, resource_name)
295311

296312
resource_info = resource_info_map[resource_name]
@@ -304,8 +320,14 @@ def deploy_workflow(self):
304320
try:
305321
wf = self._client.add_workflow(self._workflow_name)
306322
wf.json = json.dumps(self._workflow_description)
323+
#print (wf.json)
324+
#wf._use_gpu=self._settings["num_gpu"]
325+
wf._gpu_usage = "teststringgpu" # _use_gpu=self._settings["num_gpu"]
307326
wf.deploy(self._settings["timeout"]) #, num_gpu=self._settings['num_gpu'])
308327
self._workflow = wf
328+
#print ("WF: " + str(wf._use_gpu))
329+
#print ("WF1: " + str(wf.gpu_usage))
330+
#wf.gpu_usage = "teststring"
309331
if self._workflow.status != "failed":
310332
print("MFN workflow " + self._workflow_name + " deployed.")
311333
else:

0 commit comments

Comments
 (0)