1- import base64
2- import logging
3- import os
4- import sys
51from kubernetes import client , config
6- from kubernetes .client import V1Job , V1ObjectMeta , V1JobSpec , V1PodTemplateSpec , V1PodSpec , V1Container , V1VolumeMount , V1Volume , V1ConfigMapVolumeSource , V1EmptyDirVolumeSource , V1EnvVar , V1SecurityContext , V1SeccompProfile , V1Capabilities
7- from kubernetes .client .rest import ApiException
8- import time
2+ import kubernetes .client
93import subprocess
104
5+ import sys
6+ import os
7+
8+ from time import sleep
9+
10+ import ray
11+
12+ from torchx .specs .api import AppState , is_terminal
13+
14+ from codeflare_sdk .cluster .cluster import Cluster , ClusterConfiguration
15+ from codeflare_sdk .job .jobs import DDPJobDefinition
16+
1117import pytest
1218
13- from support import random_choice , read_file
19+ from support import random_choice
20+
21+ # Creates a Ray cluster, and trains the MNIST dataset using the CodeFlare SDK.
22+ # Asserts creation of AppWrapper, RayCluster, and successful completion of the training job.
23+ # Covers successfull installation of CodeFlare-SDK
1424
1525class TestMNISTRayClusterSDK :
1626 def setup_method (self ):
@@ -19,104 +29,138 @@ def setup_method(self):
1929
2030 # Initialize Kubernetes client
2131 self .api_instance = client .CoreV1Api ()
22- self .batch_api = client .BatchV1Api ()
23- self .cmap = client .V1ConfigMap ()
32+ self .custom_api = kubernetes .client .CustomObjectsApi (self .api_instance .api_client )
2433
2534 def teardown_method (self ):
2635 if hasattr (self , 'namespace' ):
2736 self .api_instance .delete_namespace (self .namespace )
2837 if hasattr (self , 'configmap' ):
2938 self .api_instance .delete_namespaced_config_map (self .configmap .metadata .name , self .namespace )
3039
31-
3240 def test_mnist_ray_cluster_sdk (self ):
33- namespace = self .create_test_namespace ()
34-
35- file_paths = [
36- "./tests/e2e/mnist_raycluster_sdk_test.py" ,
37- "./tests/e2e/requirements.txt" ,
38- "./tests/e2e/mnist.py" ,
39- "./tests/e2e/install-codeflare-sdk.sh"
40- ]
41- self .create_config_map (namespace , file_paths )
42-
41+ self .create_test_namespace ()
4342 self .run_mnist_raycluster_sdk ()
4443
45-
4644 def create_test_namespace (self ):
4745 self .namespace = f"test-ns-{ random_choice ()} "
4846 namespace_body = client .V1Namespace (metadata = client .V1ObjectMeta (name = self .namespace ))
4947 self .api_instance .create_namespace (namespace_body )
5048 return self .namespace
5149
52- def create_config_map (self , namespace , file_paths ):
53- data = {os .path .basename (path ): read_file (path ) for path in file_paths }
54- binary_data = {key : base64 .b64encode (value ).decode ('utf-8' ) for key , value in data .items ()}
55- config_map = client .V1ConfigMap (
56- api_version = "v1" ,
57- kind = "ConfigMap" ,
58- metadata = client .V1ObjectMeta (
59- generate_name = "config-" ,
60- namespace = namespace ,
61- ),
62- binary_data = binary_data ,
63- immutable = True ,
50+ def run_mnist_raycluster_sdk (self ):
51+ ray_image = "quay.io/project-codeflare/ray:latest-py39-cu118"
52+ host = os .getenv ("CLUSTER_HOSTNAME" )
53+
54+ ingress_options = {}
55+ if host is not None :
56+ ingress_options = {
57+ "ingresses" : [
58+ {
59+ "ingressName" : "ray-dashboard" ,
60+ "port" : 8265 ,
61+ "pathType" : "Prefix" ,
62+ "path" : "/" ,
63+ "host" : host ,
64+ "annotations" : {
65+ "nginx.ingress.kubernetes.io/proxy-body-size" : "10M" ,
66+ }
67+ },
68+ ]
69+ }
70+
71+ cluster = Cluster (
72+ ClusterConfiguration (
73+ name = "mnist" ,
74+ namespace = self .namespace ,
75+ num_workers = 1 ,
76+ head_cpus = "500m" ,
77+ head_memory = 2 ,
78+ min_cpus = "500m" ,
79+ max_cpus = 1 ,
80+ min_memory = 1 ,
81+ max_memory = 2 ,
82+ num_gpus = 0 ,
83+ instascale = False ,
84+ image = ray_image ,
85+ ingress_options = ingress_options ,
86+ )
6487 )
65- # config_map = client.V1ConfigMap(data=data)
66- self .api_instance .create_namespaced_config_map (namespace = namespace , body = config_map )
6788
68- def run_mnist_raycluster_sdk (self ):
69- script_path = './tests/e2e/mnist_raycluster_sdk.py'
70- result = subprocess .run (['python' , script_path , self .namespace ])
71- output = result .stdout
72- errors = result .stderr
73- if result .returncode != 0 :
74- raise subprocess .CalledProcessError (result .returncode , 'python' , output = output , stderr = errors )
75- return output
76-
77- # # Specifically used on KinD clusters
78- # def configure_pods(self):
79- # hostname = os.getenv('CLUSTER_HOSTNAME')
80- # node = self.get_first_node()
81- # node_ip = self.get_node_internal_ip(node)
82- # host_alias = client.V1HostAlias(ip=node_ip, hostnames=[hostname])
83-
84- # pods = self.find_mnist_head_pod(self.namespace)
85- # for pod in pods:
86- # container = pod.spec.containers[0]
87- # if not pod.spec.host_aliases:
88- # pod.spec.host_aliases = []
89- # pod.spec.host_aliases.append(host_alias)
90- # if not container.env:
91- # container.env = []
92- # container.env.append(hostname)
93-
94-
95-
96- # def get_node_internal_ip(node):
97- # for address in node.status.addresses:
98- # if address.type == "InternalIP":
99- # ip = address.address
100- # return ip
101-
102- # def get_first_node(self):
103- # try:
104- # # List all nodes in the cluster
105- # nodes = self.api_instance.list_node()
106- # except ApiException as e:
107- # pytest.fail(f"Exception when calling CoreV1Api->list_node: {e}")
108- # return nodes.items[0]
109-
110- # def find_mnist_head_pod(self, namespace):
111- # try:
112- # # List all pods in the specified namespace
113- # pods = self.v1.list_namespaced_pod(namespace)
114- # except ApiException as e:
115- # print(f"Exception when calling CoreV1Api->list_namespaced_pod: {e}")
116- # return None
117-
118- # for pod in pods.items:
119- # if pod.metadata.name.startswith("mnist-head"):
120- # return pod
121- # print("No 'mnist-head' pod found in the namespace")
122- # return None
89+
90+ cluster .up ()
91+ self .assert_appwrapper_exists ()
92+
93+ cluster .status ()
94+
95+ cluster .wait_ready ()
96+ self .assert_raycluster_exists ()
97+
98+ cluster .status ()
99+
100+ cluster .details ()
101+
102+ jobdef = DDPJobDefinition (
103+ name = "mnist" ,
104+ script = "./tests/e2e/mnist.py" ,
105+ scheduler_args = {"requirements" : "./tests/e2e/mnist_pip_requirements.txt" },
106+ )
107+ job = jobdef .submit (cluster )
108+
109+ done = False
110+ time = 0
111+ timeout = 900
112+ while not done :
113+ status = job .status ()
114+ if is_terminal (status .state ):
115+ break
116+ if not done :
117+ print (status )
118+ if timeout and time >= timeout :
119+ raise TimeoutError (f"job has timed out after waiting { timeout } s" )
120+ sleep (5 )
121+ time += 5
122+
123+ print (job .status ())
124+ self .assert_job_completion (status )
125+
126+ print (job .logs ())
127+
128+ cluster .down ()
129+
130+
131+ # if not status.state == AppState.SUCCEEDED:
132+
133+ # script_path = './tests/e2e/mnist_raycluster_sdk.py'
134+ # result = subprocess.run(['python', script_path, self.namespace])
135+ # output = result.stdout
136+ # errors = result.stderr
137+ # if result.returncode != 0:
138+ # raise subprocess.CalledProcessError(result.returncode, 'python', output=output, stderr=errors)
139+ # return output
140+
141+
142+ def assert_appwrapper_exists (self ):
143+ try :
144+ self .custom_api .get_namespaced_custom_object ("workload.codeflare.dev" , "v1beta1" , self .namespace , "appwrappers" , "mnist" )
145+ print (f"AppWrapper 'mnist' has been created in the namespace: '{ self .namespace } '" )
146+ assert True
147+ except Exception as e :
148+ print (f"AppWrapper 'mnist' has not been created. Error: { e } " )
149+ assert False
150+
151+ def assert_raycluster_exists (self ):
152+ try :
153+ self .custom_api .get_namespaced_custom_object ("ray.io" , "v1" , self .namespace , "rayclusters" , "mnist" )
154+ print (f"RayCluster 'mnist' created successfully in the namespace: '{ self .namespace } '" )
155+ assert True
156+ except Exception as e :
157+ print (f"RayCluster 'mnist' has not been created. Error: { e } " )
158+ assert False
159+
160+ def assert_job_completion (self , status ):
161+ if status .state == AppState .SUCCEEDED :
162+ print (f"Job has completed: '{ status .state } '" )
163+ assert True
164+ else :
165+ print (f"Job has completed: '{ status .state } '" )
166+ assert False
0 commit comments