Skip to content

Commit a2fafee

Browse files
committed
RHOAIENG-27792: Add stop and resubmit functions to RayJob
1 parent cb5589c commit a2fafee

File tree

4 files changed

+146
-12
lines changed

4 files changed

+146
-12
lines changed

codecov.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,3 @@ coverage:
1515
default:
1616
target: 85%
1717
threshold: 2.5%
18-

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ cryptography = "43.0.3"
3333
executing = "1.2.0"
3434
pydantic = "< 2"
3535
ipywidgets = "8.1.2"
36-
python-client = { git = "https://github.com/ray-project/kuberay.git", subdirectory = "clients/python-client", rev = "d1e750d9beac612ad455b951c1a789f971409ab3" }
36+
python-client = { git = "https://github.com/ray-project/kuberay.git", subdirectory = "clients/python-client", rev = "49419654418865a5838adc7f323f13d82454aa18" }
3737

3838
[[tool.poetry.source]]
3939
name = "pypi"

src/codeflare_sdk/ray/rayjobs/rayjob.py

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -155,29 +155,24 @@ def __init__(
155155
logger.info(f"Initialized RayJob: {self.name} in namespace: {self.namespace}")
156156

157157
def submit(self) -> str:
158-
# Validate required parameters
159158
if not self.entrypoint:
160-
raise ValueError("entrypoint must be provided to submit a RayJob")
159+
raise ValueError("Entrypoint must be provided to submit a RayJob")
161160

162-
# Validate Ray version compatibility for both cluster_config and runtime_env
163161
self._validate_ray_version_compatibility()
162+
164163
# Automatically handle script files for new clusters
165164
if self._cluster_config is not None:
166165
scripts = self._extract_script_files_from_entrypoint()
167166
if scripts:
168167
self._handle_script_volumes_for_new_cluster(scripts)
169-
170-
# Handle script files for existing clusters
171168
elif self._cluster_name:
172169
scripts = self._extract_script_files_from_entrypoint()
173170
if scripts:
174171
self._handle_script_volumes_for_existing_cluster(scripts)
175172

176-
# Build the RayJob custom resource
177173
rayjob_cr = self._build_rayjob_cr()
178174

179-
# Submit the job - KubeRay operator handles everything else
180-
logger.info(f"Submitting RayJob {self.name} to KubeRay operator")
175+
logger.info(f"Submitting RayJob {self.name} to Kuberay operator")
181176
result = self._api.submit_job(k8s_namespace=self.namespace, job=rayjob_cr)
182177

183178
if result:
@@ -190,11 +185,31 @@ def submit(self) -> str:
190185
else:
191186
raise RuntimeError(f"Failed to submit RayJob {self.name}")
192187

188+
def stop(self):
189+
"""
190+
Suspend the Ray job.
191+
"""
192+
stopped = self._api.suspend_job(name=self.name, k8s_namespace=self.namespace)
193+
if stopped:
194+
logger.info(f"Successfully stopped the RayJob {self.name}")
195+
return True
196+
else:
197+
raise RuntimeError(f"Failed to stop the RayJob {self.name}")
198+
199+
def resubmit(self):
200+
"""
201+
Resubmit the Ray job.
202+
"""
203+
if self._api.resubmit_job(name=self.name, k8s_namespace=self.namespace):
204+
logger.info(f"Successfully resubmitted the RayJob {self.name}")
205+
return True
206+
else:
207+
raise RuntimeError(f"Failed to resubmit the RayJob {self.name}")
208+
193209
def _build_rayjob_cr(self) -> Dict[str, Any]:
194210
"""
195211
Build the RayJob custom resource specification using native RayJob capabilities.
196212
"""
197-
# Basic RayJob custom resource structure
198213
rayjob_cr = {
199214
"apiVersion": "ray.io/v1",
200215
"kind": "RayJob",

src/codeflare_sdk/ray/rayjobs/test_rayjob.py

Lines changed: 121 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ def test_submit_validation_no_entrypoint(mocker):
390390
)
391391

392392
with pytest.raises(
393-
ValueError, match="entrypoint must be provided to submit a RayJob"
393+
ValueError, match="Entrypoint must be provided to submit a RayJob"
394394
):
395395
rayjob.submit()
396396

@@ -1903,3 +1903,123 @@ def test_add_script_volumes_existing_mount_skip():
19031903
# Should still have only one mount and no volume added
19041904
assert len(config.volumes) == 0 # Volume not added due to mount skip
19051905
assert len(config.volume_mounts) == 1
1906+
1907+
1908+
def test_rayjob_stop_success(mocker, caplog):
1909+
"""Test successful RayJob stop operation."""
1910+
mocker.patch("kubernetes.config.load_kube_config")
1911+
1912+
mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi")
1913+
mock_api_instance = MagicMock()
1914+
mock_api_class.return_value = mock_api_instance
1915+
1916+
mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi")
1917+
1918+
mock_api_instance.suspend_job.return_value = {
1919+
"metadata": {"name": "test-rayjob"},
1920+
"spec": {"suspend": True},
1921+
}
1922+
1923+
rayjob = RayJob(
1924+
job_name="test-rayjob",
1925+
cluster_name="test-cluster",
1926+
namespace="test-namespace",
1927+
entrypoint="python script.py",
1928+
)
1929+
1930+
with caplog.at_level("INFO"):
1931+
result = rayjob.stop()
1932+
1933+
assert result is True
1934+
1935+
mock_api_instance.suspend_job.assert_called_once_with(
1936+
name="test-rayjob", k8s_namespace="test-namespace"
1937+
)
1938+
1939+
# Verify success message was logged
1940+
assert "Successfully stopped the RayJob test-rayjob" in caplog.text
1941+
1942+
1943+
def test_rayjob_stop_failure(mocker):
1944+
"""Test RayJob stop operation when API call fails."""
1945+
mocker.patch("kubernetes.config.load_kube_config")
1946+
1947+
mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi")
1948+
mock_api_instance = MagicMock()
1949+
mock_api_class.return_value = mock_api_instance
1950+
1951+
mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi")
1952+
1953+
mock_api_instance.suspend_job.return_value = None
1954+
1955+
rayjob = RayJob(
1956+
job_name="test-rayjob",
1957+
cluster_name="test-cluster",
1958+
namespace="test-namespace",
1959+
entrypoint="python script.py",
1960+
)
1961+
1962+
with pytest.raises(RuntimeError, match="Failed to stop the RayJob test-rayjob"):
1963+
rayjob.stop()
1964+
1965+
mock_api_instance.suspend_job.assert_called_once_with(
1966+
name="test-rayjob", k8s_namespace="test-namespace"
1967+
)
1968+
1969+
1970+
def test_rayjob_resubmit_success(mocker):
1971+
"""Test successful RayJob resubmit operation."""
1972+
mocker.patch("kubernetes.config.load_kube_config")
1973+
1974+
mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi")
1975+
mock_api_instance = MagicMock()
1976+
mock_api_class.return_value = mock_api_instance
1977+
1978+
mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi")
1979+
1980+
mock_api_instance.resubmit_job.return_value = {
1981+
"metadata": {"name": "test-rayjob"},
1982+
"spec": {"suspend": False},
1983+
}
1984+
1985+
rayjob = RayJob(
1986+
job_name="test-rayjob",
1987+
cluster_name="test-cluster",
1988+
namespace="test-namespace",
1989+
entrypoint="python script.py",
1990+
)
1991+
1992+
result = rayjob.resubmit()
1993+
1994+
assert result is True
1995+
1996+
mock_api_instance.resubmit_job.assert_called_once_with(
1997+
name="test-rayjob", k8s_namespace="test-namespace"
1998+
)
1999+
2000+
2001+
def test_rayjob_resubmit_failure(mocker):
2002+
"""Test RayJob resubmit operation when API call fails."""
2003+
mocker.patch("kubernetes.config.load_kube_config")
2004+
2005+
mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi")
2006+
mock_api_instance = MagicMock()
2007+
mock_api_class.return_value = mock_api_instance
2008+
2009+
mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi")
2010+
2011+
mock_api_instance.resubmit_job.return_value = None
2012+
2013+
rayjob = RayJob(
2014+
job_name="test-rayjob",
2015+
cluster_name="test-cluster",
2016+
namespace="test-namespace",
2017+
entrypoint="python script.py",
2018+
)
2019+
2020+
with pytest.raises(RuntimeError, match="Failed to resubmit the RayJob test-rayjob"):
2021+
rayjob.resubmit()
2022+
2023+
mock_api_instance.resubmit_job.assert_called_once_with(
2024+
name="test-rayjob", k8s_namespace="test-namespace"
2025+
)

0 commit comments

Comments
 (0)