1- from .config import ClusterConfiguration
2- from .model import RayCluster , AppWrapper
1+ from os import stat
2+ from typing import List , Optional , Tuple
3+
4+ import openshift as oc
5+
36from ..utils import pretty_print
47from ..utils .generate_yaml import generate_appwrapper
5- import openshift as oc
6- from typing import List , Optional
8+ from .config import ClusterConfiguration
9+ from .model import (AppWrapper , AppWrapperStatus , CodeFlareClusterStatus ,
10+ RayCluster , RayClusterStatus )
711
812
913class Cluster :
1014 def __init__ (self , config : ClusterConfiguration ):
1115 self .config = config
12- self .app_wrapper_yaml = self .create_app_wrapper ()
16+ self .app_wrapper_yaml = self .create_app_wrapper ()
1317
1418 def create_app_wrapper (self ):
1519 min_cpu = self .config .min_cpus
1620 max_cpu = self .config .max_cpus
1721 min_memory = self .config .min_memory
18- max_memory = self .config , max_memory
22+ max_memory = self .config . max_memory
1923 gpu = self .config .gpu
2024 workers = self .config .max_worker
2125 template = self .config .template
@@ -30,12 +34,12 @@ def create_app_wrapper(self):
3034 # creates a new cluster with the provided or default spec
3135 def up (self , namespace = 'default' ):
3236 with oc .project (namespace ):
33- oc .invoke ("apply" , ["-f" , self .app_wrapper_yaml ])
37+ oc .invoke ("apply" , ["-f" , self .app_wrapper_yaml ])
3438
3539 def down (self , namespace = 'default' ):
3640 with oc .project (namespace ):
37- oc .invoke ("delete" ,["AppWrapper" , self .app_wrapper_yaml ])
38-
41+ oc .invoke ("delete" , ["AppWrapper" , self .app_wrapper_yaml ])
42+
3943 def status (self , print_to_console = True ):
4044 cluster = _ray_cluster_status (self .config .name )
4145 if cluster :
@@ -45,6 +49,37 @@ def status(self, print_to_console=True):
4549 else :
4650 return None
4751
52+ # checks whether the ray cluster is ready
53+ def is_ready (self ):
54+ ready = False
55+ status = CodeFlareClusterStatus .UNKNOWN
56+ # check the app wrapper status
57+ appwrapper = _app_wrapper_status (self .config .name )
58+ if appwrapper :
59+ if appwrapper .status in [AppWrapperStatus .RUNNING , AppWrapperStatus .COMPLETED , AppWrapperStatus .RUNNING_HOLD_COMPLETION ]:
60+ ready = False
61+ status = CodeFlareClusterStatus .QUEUED
62+ elif appwrapper .status in [AppWrapperStatus .FAILED , AppWrapperStatus .DELETED ]:
63+ ready = False
64+ status = CodeFlareClusterStatus .FAILED #should deleted be separate
65+ return ready , status #exit early, no need to check ray status
66+ elif appwrapper .status in [AppWrapperStatus .PENDING ]:
67+ ready = False
68+ status = CodeFlareClusterStatus .QUEUED
69+ return ready , status # no need to check the ray status since still in queue
70+
71+ # check the ray cluster status
72+ cluster = _ray_cluster_status (self .config .name )
73+ if cluster :
74+ if cluster .status == RayClusterStatus .READY :
75+ ready = True
76+ status = CodeFlareClusterStatus .READY
77+ elif cluster .status in [RayClusterStatus .UNHEALTHY , RayClusterStatus .FAILED ]:
78+ ready = False
79+ status = CodeFlareClusterStatus .FAILED
80+
81+ return status , ready
82+
4883
4984def list_all_clusters (print_to_console = True ):
5085 clusters = _get_ray_clusters ()
@@ -60,13 +95,14 @@ def _get_appwrappers(namespace='default'):
6095 app_wrappers = oc .selector ('appwrappers' ).qnames ()
6196 return app_wrappers
6297
63-
98+
6499def _app_wrapper_status (name , namespace = 'default' ) -> Optional [AppWrapper ]:
65100 with oc .project (namespace ), oc .timeout (10 * 60 ):
66101 cluster = oc .selector (f'appwrapper/{ name } ' ).object ()
67102 if cluster :
68103 return _map_to_app_wrapper (cluster )
69-
104+
105+
70106def _ray_cluster_status (name , namespace = 'default' ) -> Optional [RayCluster ]:
71107 # FIXME should we check the appwrapper first
72108 with oc .project (namespace ), oc .timeout (10 * 60 ):
@@ -87,10 +123,10 @@ def _get_ray_clusters(namespace='default') -> List[RayCluster]:
87123 return list_of_clusters
88124
89125
90- def _map_to_ray_cluster (cluster )-> RayCluster :
126+ def _map_to_ray_cluster (cluster ) -> RayCluster :
91127 cluster_model = cluster .model
92128 return RayCluster (
93- name = cluster .name (), status = cluster_model .status .state ,
129+ name = cluster .name (), status = RayClusterStatus ( cluster_model .status .state . lower ()) ,
94130 min_workers = cluster_model .spec .workerGroupSpecs [0 ].replicas ,
95131 max_workers = cluster_model .spec .workerGroupSpecs [0 ].replicas ,
96132 worker_mem_max = cluster_model .spec .workerGroupSpecs [
@@ -101,9 +137,9 @@ def _map_to_ray_cluster(cluster)->RayCluster:
101137 worker_gpu = 0 )
102138
103139
104- def _map_to_app_wrapper (cluster )-> AppWrapper :
140+ def _map_to_app_wrapper (cluster ) -> AppWrapper :
105141 cluster_model = cluster .model
106142 return AppWrapper (
107- name = cluster .name (), status = cluster_model .status .state ,
143+ name = cluster .name (), status = AppWrapperStatus ( cluster_model .status .state . lower ()) ,
108144 can_run = cluster_model .status .canrun ,
109145 job_state = cluster_model .status .queuejobstate )
0 commit comments