From f19c7bdb3a0ce200b73f2e9cf9879a22a5982ee8 Mon Sep 17 00:00:00 2001 From: kryanbeane Date: Mon, 1 Sep 2025 12:31:48 +0100 Subject: [PATCH 1/3] RHOAIENG-27792: Add stop and resubmit functions to RayJob --- pyproject.toml | 2 +- src/codeflare_sdk/ray/rayjobs/rayjob.py | 33 +++-- src/codeflare_sdk/ray/rayjobs/test_rayjob.py | 122 ++++++++++++++++++- 3 files changed, 146 insertions(+), 11 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index e75783bb..55d83030 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,7 +33,7 @@ cryptography = "43.0.3" executing = "1.2.0" pydantic = "< 2" ipywidgets = "8.1.2" -python-client = { git = "https://github.com/ray-project/kuberay.git", subdirectory = "clients/python-client", rev = "d1e750d9beac612ad455b951c1a789f971409ab3" } +python-client = { git = "https://github.com/ray-project/kuberay.git", subdirectory = "clients/python-client", rev = "49419654418865a5838adc7f323f13d82454aa18" } [[tool.poetry.source]] name = "pypi" diff --git a/src/codeflare_sdk/ray/rayjobs/rayjob.py b/src/codeflare_sdk/ray/rayjobs/rayjob.py index 072f5153..972aa139 100644 --- a/src/codeflare_sdk/ray/rayjobs/rayjob.py +++ b/src/codeflare_sdk/ray/rayjobs/rayjob.py @@ -154,29 +154,24 @@ def __init__( logger.info(f"Initialized RayJob: {self.name} in namespace: {self.namespace}") def submit(self) -> str: - # Validate required parameters if not self.entrypoint: - raise ValueError("entrypoint must be provided to submit a RayJob") + raise ValueError("Entrypoint must be provided to submit a RayJob") - # Validate Ray version compatibility for both cluster_config and runtime_env self._validate_ray_version_compatibility() + # Automatically handle script files for new clusters if self._cluster_config is not None: scripts = self._extract_script_files_from_entrypoint() if scripts: self._handle_script_volumes_for_new_cluster(scripts) - - # Handle script files for existing clusters elif self._cluster_name: scripts = self._extract_script_files_from_entrypoint() if scripts: self._handle_script_volumes_for_existing_cluster(scripts) - # Build the RayJob custom resource rayjob_cr = self._build_rayjob_cr() - # Submit the job - KubeRay operator handles everything else - logger.info(f"Submitting RayJob {self.name} to KubeRay operator") + logger.info(f"Submitting RayJob {self.name} to Kuberay operator") result = self._api.submit_job(k8s_namespace=self.namespace, job=rayjob_cr) if result: @@ -189,11 +184,31 @@ def submit(self) -> str: else: raise RuntimeError(f"Failed to submit RayJob {self.name}") + def stop(self): + """ + Suspend the Ray job. + """ + stopped = self._api.suspend_job(name=self.name, k8s_namespace=self.namespace) + if stopped: + logger.info(f"Successfully stopped the RayJob {self.name}") + return True + else: + raise RuntimeError(f"Failed to stop the RayJob {self.name}") + + def resubmit(self): + """ + Resubmit the Ray job. + """ + if self._api.resubmit_job(name=self.name, k8s_namespace=self.namespace): + logger.info(f"Successfully resubmitted the RayJob {self.name}") + return True + else: + raise RuntimeError(f"Failed to resubmit the RayJob {self.name}") + def _build_rayjob_cr(self) -> Dict[str, Any]: """ Build the RayJob custom resource specification using native RayJob capabilities. """ - # Basic RayJob custom resource structure rayjob_cr = { "apiVersion": "ray.io/v1", "kind": "RayJob", diff --git a/src/codeflare_sdk/ray/rayjobs/test_rayjob.py b/src/codeflare_sdk/ray/rayjobs/test_rayjob.py index 9b87cec5..18973bfe 100644 --- a/src/codeflare_sdk/ray/rayjobs/test_rayjob.py +++ b/src/codeflare_sdk/ray/rayjobs/test_rayjob.py @@ -390,7 +390,7 @@ def test_submit_validation_no_entrypoint(mocker): ) with pytest.raises( - ValueError, match="entrypoint must be provided to submit a RayJob" + ValueError, match="Entrypoint must be provided to submit a RayJob" ): rayjob.submit() @@ -1878,3 +1878,123 @@ def test_add_script_volumes_existing_mount_skip(): # Should still have only one mount and no volume added assert len(config.volumes) == 0 # Volume not added due to mount skip assert len(config.volume_mounts) == 1 + + +def test_rayjob_stop_success(mocker, caplog): + """Test successful RayJob stop operation.""" + mocker.patch("kubernetes.config.load_kube_config") + + mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mock_api_instance = MagicMock() + mock_api_class.return_value = mock_api_instance + + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") + + mock_api_instance.suspend_job.return_value = { + "metadata": {"name": "test-rayjob"}, + "spec": {"suspend": True}, + } + + rayjob = RayJob( + job_name="test-rayjob", + cluster_name="test-cluster", + namespace="test-namespace", + entrypoint="python script.py", + ) + + with caplog.at_level("INFO"): + result = rayjob.stop() + + assert result is True + + mock_api_instance.suspend_job.assert_called_once_with( + name="test-rayjob", k8s_namespace="test-namespace" + ) + + # Verify success message was logged + assert "Successfully stopped the RayJob test-rayjob" in caplog.text + + +def test_rayjob_stop_failure(mocker): + """Test RayJob stop operation when API call fails.""" + mocker.patch("kubernetes.config.load_kube_config") + + mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mock_api_instance = MagicMock() + mock_api_class.return_value = mock_api_instance + + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") + + mock_api_instance.suspend_job.return_value = None + + rayjob = RayJob( + job_name="test-rayjob", + cluster_name="test-cluster", + namespace="test-namespace", + entrypoint="python script.py", + ) + + with pytest.raises(RuntimeError, match="Failed to stop the RayJob test-rayjob"): + rayjob.stop() + + mock_api_instance.suspend_job.assert_called_once_with( + name="test-rayjob", k8s_namespace="test-namespace" + ) + + +def test_rayjob_resubmit_success(mocker): + """Test successful RayJob resubmit operation.""" + mocker.patch("kubernetes.config.load_kube_config") + + mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mock_api_instance = MagicMock() + mock_api_class.return_value = mock_api_instance + + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") + + mock_api_instance.resubmit_job.return_value = { + "metadata": {"name": "test-rayjob"}, + "spec": {"suspend": False}, + } + + rayjob = RayJob( + job_name="test-rayjob", + cluster_name="test-cluster", + namespace="test-namespace", + entrypoint="python script.py", + ) + + result = rayjob.resubmit() + + assert result is True + + mock_api_instance.resubmit_job.assert_called_once_with( + name="test-rayjob", k8s_namespace="test-namespace" + ) + + +def test_rayjob_resubmit_failure(mocker): + """Test RayJob resubmit operation when API call fails.""" + mocker.patch("kubernetes.config.load_kube_config") + + mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mock_api_instance = MagicMock() + mock_api_class.return_value = mock_api_instance + + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") + + mock_api_instance.resubmit_job.return_value = None + + rayjob = RayJob( + job_name="test-rayjob", + cluster_name="test-cluster", + namespace="test-namespace", + entrypoint="python script.py", + ) + + with pytest.raises(RuntimeError, match="Failed to resubmit the RayJob test-rayjob"): + rayjob.resubmit() + + mock_api_instance.resubmit_job.assert_called_once_with( + name="test-rayjob", k8s_namespace="test-namespace" + ) From dc4921400bac9a15ff175b43545c8f187aafc8be Mon Sep 17 00:00:00 2001 From: kryanbeane Date: Mon, 1 Sep 2025 16:53:22 +0100 Subject: [PATCH 2/3] RHOAIENG-27792: Auto tear down training config map when job is deleted --- poetry.lock | 8 +- pyproject.toml | 2 +- src/codeflare_sdk/ray/rayjobs/config.py | 10 +- src/codeflare_sdk/ray/rayjobs/rayjob.py | 81 +++- src/codeflare_sdk/ray/rayjobs/test_config.py | 23 ++ src/codeflare_sdk/ray/rayjobs/test_rayjob.py | 356 +++++++++++++++++- .../rayjob/lifecycled_cluster_oauth_test.py | 187 ++++----- 7 files changed, 521 insertions(+), 146 deletions(-) diff --git a/poetry.lock b/poetry.lock index 381383d4..22efaf62 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.4 and should not be changed by hand. [[package]] name = "aiohappyeyeballs" @@ -3320,8 +3320,8 @@ kubernetes = ">=25.0.0" [package.source] type = "git" url = "https://github.com/ray-project/kuberay.git" -reference = "d1e750d9beac612ad455b951c1a789f971409ab3" -resolved_reference = "d1e750d9beac612ad455b951c1a789f971409ab3" +reference = "a16c0365e3b19a202d835097e1139eca9406b383" +resolved_reference = "a16c0365e3b19a202d835097e1139eca9406b383" subdirectory = "clients/python-client" [[package]] @@ -4696,4 +4696,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = "^3.11" -content-hash = "30c47f95bf1bf33682dd0bc107eef88f4e9226ca7ad5b33e929bfd3ab7030e95" +content-hash = "edf5742e4c1edc0261f6a58c3e80f8b535030e581b185a6e6ebedfee60d9155d" diff --git a/pyproject.toml b/pyproject.toml index 55d83030..d1532e07 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,7 +33,7 @@ cryptography = "43.0.3" executing = "1.2.0" pydantic = "< 2" ipywidgets = "8.1.2" -python-client = { git = "https://github.com/ray-project/kuberay.git", subdirectory = "clients/python-client", rev = "49419654418865a5838adc7f323f13d82454aa18" } +python-client = { git = "https://github.com/ray-project/kuberay.git", subdirectory = "clients/python-client", rev = "a16c0365e3b19a202d835097e1139eca9406b383" } [[tool.poetry.source]] name = "pypi" diff --git a/src/codeflare_sdk/ray/rayjobs/config.py b/src/codeflare_sdk/ray/rayjobs/config.py index 35842536..a46e9a62 100644 --- a/src/codeflare_sdk/ray/rayjobs/config.py +++ b/src/codeflare_sdk/ray/rayjobs/config.py @@ -523,7 +523,15 @@ def build_script_configmap_spec( return { "apiVersion": "v1", "kind": "ConfigMap", - "metadata": {"name": configmap_name, "namespace": namespace}, + "metadata": { + "name": configmap_name, + "namespace": namespace, + "labels": { + "ray.io/job-name": job_name, + "app.kubernetes.io/managed-by": "codeflare-sdk", + "app.kubernetes.io/component": "rayjob-scripts", + }, + }, "data": scripts, } diff --git a/src/codeflare_sdk/ray/rayjobs/rayjob.py b/src/codeflare_sdk/ray/rayjobs/rayjob.py index 972aa139..49ccafcb 100644 --- a/src/codeflare_sdk/ray/rayjobs/rayjob.py +++ b/src/codeflare_sdk/ray/rayjobs/rayjob.py @@ -159,16 +159,6 @@ def submit(self) -> str: self._validate_ray_version_compatibility() - # Automatically handle script files for new clusters - if self._cluster_config is not None: - scripts = self._extract_script_files_from_entrypoint() - if scripts: - self._handle_script_volumes_for_new_cluster(scripts) - elif self._cluster_name: - scripts = self._extract_script_files_from_entrypoint() - if scripts: - self._handle_script_volumes_for_existing_cluster(scripts) - rayjob_cr = self._build_rayjob_cr() logger.info(f"Submitting RayJob {self.name} to Kuberay operator") @@ -176,6 +166,17 @@ def submit(self) -> str: if result: logger.info(f"Successfully submitted RayJob {self.name}") + + # Handle script files after RayJob creation so we can set owner reference + if self._cluster_config is not None: + scripts = self._extract_script_files_from_entrypoint() + if scripts: + self._handle_script_volumes_for_new_cluster(scripts, result) + elif self._cluster_name: + scripts = self._extract_script_files_from_entrypoint() + if scripts: + self._handle_script_volumes_for_existing_cluster(scripts, result) + if self.shutdown_after_job_finishes: logger.info( f"Cluster will be automatically cleaned up {self.ttl_seconds_after_finished}s after job completion" @@ -205,6 +206,17 @@ def resubmit(self): else: raise RuntimeError(f"Failed to resubmit the RayJob {self.name}") + def delete(self): + """ + Delete the Ray job. + """ + deleted = self._api.delete_job(name=self.name, k8s_namespace=self.namespace) + if deleted: + logger.info(f"Successfully deleted the RayJob {self.name}") + return True + else: + raise RuntimeError(f"Failed to delete the RayJob {self.name}") + def _build_rayjob_cr(self) -> Dict[str, Any]: """ Build the RayJob custom resource specification using native RayJob capabilities. @@ -464,7 +476,9 @@ def _find_local_imports( except (SyntaxError, ValueError) as e: logger.debug(f"Could not parse imports from {script_path}: {e}") - def _handle_script_volumes_for_new_cluster(self, scripts: Dict[str, str]): + def _handle_script_volumes_for_new_cluster( + self, scripts: Dict[str, str], rayjob_result: Dict[str, Any] = None + ): """Handle script volumes for new clusters (uses ManagedClusterConfig).""" # Validate ConfigMap size before creation self._cluster_config.validate_configmap_size(scripts) @@ -474,15 +488,17 @@ def _handle_script_volumes_for_new_cluster(self, scripts: Dict[str, str]): job_name=self.name, namespace=self.namespace, scripts=scripts ) - # Create ConfigMap via Kubernetes API - configmap_name = self._create_configmap_from_spec(configmap_spec) + # Create ConfigMap via Kubernetes API with owner reference + configmap_name = self._create_configmap_from_spec(configmap_spec, rayjob_result) # Add volumes to cluster config (config.py handles spec building) self._cluster_config.add_script_volumes( configmap_name=configmap_name, mount_path=MOUNT_PATH ) - def _handle_script_volumes_for_existing_cluster(self, scripts: Dict[str, str]): + def _handle_script_volumes_for_existing_cluster( + self, scripts: Dict[str, str], rayjob_result: Dict[str, Any] = None + ): """Handle script volumes for existing clusters (updates RayCluster CR).""" # Create config builder for utility methods config_builder = ManagedClusterConfig() @@ -495,18 +511,21 @@ def _handle_script_volumes_for_existing_cluster(self, scripts: Dict[str, str]): job_name=self.name, namespace=self.namespace, scripts=scripts ) - # Create ConfigMap via Kubernetes API - configmap_name = self._create_configmap_from_spec(configmap_spec) + # Create ConfigMap via Kubernetes API with owner reference + configmap_name = self._create_configmap_from_spec(configmap_spec, rayjob_result) # Update existing RayCluster self._update_existing_cluster_for_scripts(configmap_name, config_builder) - def _create_configmap_from_spec(self, configmap_spec: Dict[str, Any]) -> str: + def _create_configmap_from_spec( + self, configmap_spec: Dict[str, Any], rayjob_result: Dict[str, Any] = None + ) -> str: """ Create ConfigMap from specification via Kubernetes API. Args: configmap_spec: ConfigMap specification dictionary + rayjob_result: The result from RayJob creation containing UID Returns: str: Name of the created ConfigMap @@ -514,9 +533,35 @@ def _create_configmap_from_spec(self, configmap_spec: Dict[str, Any]) -> str: configmap_name = configmap_spec["metadata"]["name"] + metadata = client.V1ObjectMeta(**configmap_spec["metadata"]) + + # Add owner reference if we have the RayJob result + if ( + rayjob_result + and isinstance(rayjob_result, dict) + and rayjob_result.get("metadata", {}).get("uid") + ): + logger.info( + f"Adding owner reference to ConfigMap '{configmap_name}' with RayJob UID: {rayjob_result['metadata']['uid']}" + ) + metadata.owner_references = [ + client.V1OwnerReference( + api_version="ray.io/v1", + kind="RayJob", + name=self.name, + uid=rayjob_result["metadata"]["uid"], + controller=True, + block_owner_deletion=True, + ) + ] + else: + logger.warning( + f"No valid RayJob result with UID found, ConfigMap '{configmap_name}' will not have owner reference. Result: {rayjob_result}" + ) + # Convert dict spec to V1ConfigMap configmap = client.V1ConfigMap( - metadata=client.V1ObjectMeta(**configmap_spec["metadata"]), + metadata=metadata, data=configmap_spec["data"], ) diff --git a/src/codeflare_sdk/ray/rayjobs/test_config.py b/src/codeflare_sdk/ray/rayjobs/test_config.py index 7d7864c5..82e9464f 100644 --- a/src/codeflare_sdk/ray/rayjobs/test_config.py +++ b/src/codeflare_sdk/ray/rayjobs/test_config.py @@ -170,3 +170,26 @@ def test_add_script_volumes_existing_mount_early_return(): # Should still have only one mount, no volume added assert len(config.volumes) == 0 assert len(config.volume_mounts) == 1 + + +def test_build_script_configmap_spec_labels(): + """Test that build_script_configmap_spec creates ConfigMap with correct labels.""" + config = ManagedClusterConfig() + + job_name = "test-job" + namespace = "test-namespace" + scripts = {"script.py": "print('hello')", "helper.py": "# helper code"} + + configmap_spec = config.build_script_configmap_spec(job_name, namespace, scripts) + + assert configmap_spec["apiVersion"] == "v1" + assert configmap_spec["kind"] == "ConfigMap" + assert configmap_spec["metadata"]["name"] == f"{job_name}-scripts" + assert configmap_spec["metadata"]["namespace"] == namespace + + labels = configmap_spec["metadata"]["labels"] + assert labels["ray.io/job-name"] == job_name + assert labels["app.kubernetes.io/managed-by"] == "codeflare-sdk" + assert labels["app.kubernetes.io/component"] == "rayjob-scripts" + + assert configmap_spec["data"] == scripts diff --git a/src/codeflare_sdk/ray/rayjobs/test_rayjob.py b/src/codeflare_sdk/ray/rayjobs/test_rayjob.py index 18973bfe..7c4823f8 100644 --- a/src/codeflare_sdk/ray/rayjobs/test_rayjob.py +++ b/src/codeflare_sdk/ray/rayjobs/test_rayjob.py @@ -1376,16 +1376,13 @@ def test_add_script_volumes_duplicate_prevention(): def test_create_configmap_from_spec(mocker): """Test creating ConfigMap via Kubernetes API.""" - # Mock kubernetes config loading mocker.patch("kubernetes.config.load_kube_config") mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - # Mock Kubernetes API mock_k8s_api = mocker.patch("kubernetes.client.CoreV1Api") mock_api_instance = MagicMock() mock_k8s_api.return_value = mock_api_instance - # Mock get_api_client mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.get_api_client") rayjob = RayJob( @@ -1410,19 +1407,15 @@ def test_create_configmap_from_spec(mocker): def test_create_configmap_already_exists(mocker): """Test creating ConfigMap when it already exists (409 conflict).""" - # Mock kubernetes config loading mocker.patch("kubernetes.config.load_kube_config") mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - # Mock Kubernetes API mock_k8s_api = mocker.patch("kubernetes.client.CoreV1Api") mock_api_instance = MagicMock() mock_k8s_api.return_value = mock_api_instance - # Mock get_api_client mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.get_api_client") - # Mock API exception for conflict from kubernetes.client import ApiException mock_api_instance.create_namespaced_config_map.side_effect = ApiException( @@ -1450,17 +1443,178 @@ def test_create_configmap_already_exists(mocker): mock_api_instance.replace_namespaced_config_map.assert_called_once() +def test_create_configmap_with_owner_reference_basic(mocker, caplog): + """Test creating ConfigMap with owner reference from valid RayJob result.""" + # Mock kubernetes config loading + mocker.patch("kubernetes.config.load_kube_config") + mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + + # Mock Kubernetes API + mock_k8s_api = mocker.patch("kubernetes.client.CoreV1Api") + mock_api_instance = MagicMock() + mock_k8s_api.return_value = mock_api_instance + + # Mock get_api_client + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.get_api_client") + + # Mock client.V1ObjectMeta and V1ConfigMap + mock_v1_metadata = mocker.patch("kubernetes.client.V1ObjectMeta") + mock_metadata_instance = MagicMock() + mock_v1_metadata.return_value = mock_metadata_instance + + rayjob = RayJob( + job_name="test-job", + cluster_name="existing-cluster", + entrypoint="python test.py", + namespace="test-namespace", + ) + + configmap_spec = { + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": { + "name": "test-scripts", + "namespace": "test-namespace", + "labels": { + "ray.io/job-name": "test-job", + "app.kubernetes.io/managed-by": "codeflare-sdk", + "app.kubernetes.io/component": "rayjob-scripts", + }, + }, + "data": {"test.py": "print('test')"}, + } + + # Valid RayJob result with UID + rayjob_result = { + "metadata": { + "name": "test-job", + "namespace": "test-namespace", + "uid": "a4dd4c5a-ab61-411d-b4d1-4abb5177422a", + } + } + + with caplog.at_level("INFO"): + result = rayjob._create_configmap_from_spec(configmap_spec, rayjob_result) + + assert result == "test-scripts" + + # Verify owner reference was set + expected_owner_ref = mocker.ANY # We'll check via the logs + assert ( + "Adding owner reference to ConfigMap 'test-scripts' with RayJob UID: a4dd4c5a-ab61-411d-b4d1-4abb5177422a" + in caplog.text + ) + + # Verify owner_references was set on metadata + assert mock_metadata_instance.owner_references is not None + mock_api_instance.create_namespaced_config_map.assert_called_once() + + +def test_create_configmap_without_owner_reference_no_uid(mocker, caplog): + """Test creating ConfigMap without owner reference when RayJob has no UID.""" + mocker.patch("kubernetes.config.load_kube_config") + mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + + mock_k8s_api = mocker.patch("kubernetes.client.CoreV1Api") + mock_api_instance = MagicMock() + mock_k8s_api.return_value = mock_api_instance + + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.get_api_client") + + mock_v1_metadata = mocker.patch("kubernetes.client.V1ObjectMeta") + mock_metadata_instance = MagicMock() + mock_v1_metadata.return_value = mock_metadata_instance + + rayjob = RayJob( + job_name="test-job", + cluster_name="existing-cluster", + entrypoint="python test.py", + namespace="test-namespace", + ) + + configmap_spec = { + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": {"name": "test-scripts", "namespace": "test-namespace"}, + "data": {"test.py": "print('test')"}, + } + + # RayJob result without UID + rayjob_result = { + "metadata": { + "name": "test-job", + "namespace": "test-namespace", + # No UID field + } + } + + with caplog.at_level("WARNING"): + result = rayjob._create_configmap_from_spec(configmap_spec, rayjob_result) + + assert result == "test-scripts" + + # Verify warning was logged and no owner reference was set + assert ( + "No valid RayJob result with UID found, ConfigMap 'test-scripts' will not have owner reference" + in caplog.text + ) + + # The important part is that the warning was logged, indicating no owner reference was set + mock_api_instance.create_namespaced_config_map.assert_called_once() + + +def test_create_configmap_with_invalid_rayjob_result(mocker, caplog): + """Test creating ConfigMap with None or invalid rayjob_result.""" + # Mock kubernetes config loading + mocker.patch("kubernetes.config.load_kube_config") + mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + + # Mock Kubernetes API + mock_k8s_api = mocker.patch("kubernetes.client.CoreV1Api") + mock_api_instance = MagicMock() + mock_k8s_api.return_value = mock_api_instance + + # Mock get_api_client + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.get_api_client") + + rayjob = RayJob( + job_name="test-job", + cluster_name="existing-cluster", + entrypoint="python test.py", + namespace="test-namespace", + ) + + configmap_spec = { + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": {"name": "test-scripts", "namespace": "test-namespace"}, + "data": {"test.py": "print('test')"}, + } + + # Test with None + with caplog.at_level("WARNING"): + result = rayjob._create_configmap_from_spec(configmap_spec, None) + + assert result == "test-scripts" + assert "No valid RayJob result with UID found" in caplog.text + + # Test with string instead of dict + caplog.clear() + with caplog.at_level("WARNING"): + result = rayjob._create_configmap_from_spec(configmap_spec, "not-a-dict") + + assert result == "test-scripts" + assert "No valid RayJob result with UID found" in caplog.text + + def test_handle_script_volumes_for_new_cluster(mocker, tmp_path): """Test handling script volumes for new cluster creation.""" - # Mock kubernetes config loading mocker.patch("kubernetes.config.load_kube_config") mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - # Mock ConfigMap creation mock_create = mocker.patch.object(RayJob, "_create_configmap_from_spec") mock_create.return_value = "test-job-scripts" - # Create test script test_script = tmp_path / "test.py" test_script.write_text("print('test')") @@ -1482,10 +1636,8 @@ def test_handle_script_volumes_for_new_cluster(mocker, tmp_path): scripts = {"test.py": "print('test')"} rayjob._handle_script_volumes_for_new_cluster(scripts) - # Verify ConfigMap creation was called mock_create.assert_called_once() - # Verify volumes were added to cluster config assert len(cluster_config.volumes) == 1 assert len(cluster_config.volume_mounts) == 1 @@ -1495,11 +1647,9 @@ def test_handle_script_volumes_for_new_cluster(mocker, tmp_path): def test_ast_parsing_import_detection(mocker, tmp_path): """Test AST parsing correctly detects import statements.""" - # Mock kubernetes config loading mocker.patch("kubernetes.config.load_kube_config") mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - # Create scripts with different import patterns main_script = tmp_path / "main.py" main_script.write_text( """# Different import patterns @@ -1537,7 +1687,6 @@ def func2(): pass scripts = rayjob._extract_script_files_from_entrypoint() - # Should find all local dependencies assert scripts is not None assert len(scripts) == 4 # main + 3 dependencies assert "main.py" in scripts @@ -1549,6 +1698,73 @@ def func2(): pass os.chdir(original_cwd) +def test_script_handling_timing_after_rayjob_submission(mocker, tmp_path): + """Test that script handling happens after RayJob is submitted (not before).""" + mocker.patch("kubernetes.config.load_kube_config") + + mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mock_api_instance = MagicMock() + mock_api_class.return_value = mock_api_instance + + submit_result = { + "metadata": { + "name": "test-job", + "namespace": "test-namespace", + "uid": "test-uid-12345", + } + } + mock_api_instance.submit_job.return_value = submit_result + + mock_handle_new = mocker.patch.object( + RayJob, "_handle_script_volumes_for_new_cluster" + ) + + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") + + test_script = tmp_path / "test.py" + test_script.write_text("print('test')") + + call_order = [] + + def track_submit(*args, **kwargs): + call_order.append("submit_job") + return submit_result + + def track_handle_scripts(*args, **kwargs): + call_order.append("handle_scripts") + assert len(args) >= 2 + assert args[1] == submit_result # rayjob_result should be second arg + + mock_api_instance.submit_job.side_effect = track_submit + mock_handle_new.side_effect = track_handle_scripts + + original_cwd = os.getcwd() + try: + os.chdir(tmp_path) + + from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig + + cluster_config = ManagedClusterConfig() + + rayjob = RayJob( + job_name="test-job", + cluster_config=cluster_config, + entrypoint="python test.py", + namespace="test-namespace", + ) + + rayjob.submit() + finally: + os.chdir(original_cwd) + + assert call_order == ["submit_job", "handle_scripts"] + + mock_api_instance.submit_job.assert_called_once() + mock_handle_new.assert_called_once() + + mock_handle_new.assert_called_with({"test.py": "print('test')"}, submit_result) + + def test_rayjob_submit_with_scripts_new_cluster(mocker, tmp_path): """Test RayJob submission with script detection for new cluster.""" # Mock kubernetes config loading @@ -1674,11 +1890,119 @@ def test_process_script_and_imports_already_processed(mocker, tmp_path): # Should return early without processing rayjob._process_script_and_imports("test.py", scripts, MOUNT_PATH, processed_files) - # Should remain unchanged assert len(scripts) == 0 assert processed_files == {"test.py"} +def test_submit_with_scripts_owner_reference_integration(mocker, tmp_path, caplog): + """Integration test for submit() with local scripts to verify end-to-end owner reference flow.""" + # Mock kubernetes config loading + mocker.patch("kubernetes.config.load_kube_config") + + # Mock the RayjobApi + mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mock_api_instance = MagicMock() + mock_api_class.return_value = mock_api_instance + + # RayJob submission returns result with UID + submit_result = { + "metadata": { + "name": "test-job", + "namespace": "test-namespace", + "uid": "unique-rayjob-uid-12345", + } + } + mock_api_instance.submit_job.return_value = submit_result + + # Mock Kubernetes ConfigMap API + mock_k8s_api = mocker.patch("kubernetes.client.CoreV1Api") + mock_k8s_instance = MagicMock() + mock_k8s_api.return_value = mock_k8s_instance + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.get_api_client") + + # Capture the ConfigMap that gets created + created_configmap = None + + def capture_configmap(namespace, body): + nonlocal created_configmap + created_configmap = body + return body + + mock_k8s_instance.create_namespaced_config_map.side_effect = capture_configmap + + # Create test scripts + test_script = tmp_path / "main.py" + test_script.write_text("import helper\nprint('main')") + + helper_script = tmp_path / "helper.py" + helper_script.write_text("def help(): print('helper')") + + # Change to temp directory for script detection + original_cwd = os.getcwd() + try: + os.chdir(tmp_path) + + from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig + + cluster_config = ManagedClusterConfig() + + rayjob = RayJob( + job_name="test-job", + cluster_config=cluster_config, + entrypoint="python main.py", + namespace="test-namespace", + ) + + with caplog.at_level("INFO"): + result = rayjob.submit() + + assert result == "test-job" + + # Verify RayJob was submitted first + mock_api_instance.submit_job.assert_called_once() + + # Verify ConfigMap was created with owner reference + mock_k8s_instance.create_namespaced_config_map.assert_called_once() + assert created_configmap is not None + + # Verify owner reference was set correctly + assert hasattr(created_configmap.metadata, "owner_references") + assert created_configmap.metadata.owner_references is not None + assert len(created_configmap.metadata.owner_references) == 1 + + owner_ref = created_configmap.metadata.owner_references[0] + assert owner_ref.api_version == "ray.io/v1" + assert owner_ref.kind == "RayJob" + assert owner_ref.name == "test-job" + assert owner_ref.uid == "unique-rayjob-uid-12345" + assert owner_ref.controller is True + assert owner_ref.block_owner_deletion is True + + # Verify labels were set + assert created_configmap.metadata.labels["ray.io/job-name"] == "test-job" + assert ( + created_configmap.metadata.labels["app.kubernetes.io/managed-by"] + == "codeflare-sdk" + ) + assert ( + created_configmap.metadata.labels["app.kubernetes.io/component"] + == "rayjob-scripts" + ) + + # Verify scripts were included + assert "main.py" in created_configmap.data + assert "helper.py" in created_configmap.data + + # Verify log message + assert ( + "Adding owner reference to ConfigMap 'test-job-scripts' with RayJob UID: unique-rayjob-uid-12345" + in caplog.text + ) + + finally: + os.chdir(original_cwd) + + def test_find_local_imports_syntax_error(mocker): """Test _find_local_imports handles syntax errors gracefully.""" mocker.patch("kubernetes.config.load_kube_config") diff --git a/tests/e2e/rayjob/lifecycled_cluster_oauth_test.py b/tests/e2e/rayjob/lifecycled_cluster_oauth_test.py index 54186de3..41dd5280 100644 --- a/tests/e2e/rayjob/lifecycled_cluster_oauth_test.py +++ b/tests/e2e/rayjob/lifecycled_cluster_oauth_test.py @@ -3,48 +3,41 @@ import os from time import sleep -# Add the parent directory to the path to import support sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) from support import * -from codeflare_sdk import ( - TokenAuthentication, - RayJob, - ManagedClusterConfig, -) +from codeflare_sdk import RayJob, ManagedClusterConfig from codeflare_sdk.ray.rayjobs.status import CodeflareRayJobStatus - -# This test creates a RayJob that will create and lifecycle its own cluster on OpenShift +import kubernetes.client.rest +from python_client.kuberay_job_api import RayjobApi +from python_client.kuberay_cluster_api import RayClusterApi @pytest.mark.openshift class TestRayJobLifecycledClusterOauth: + """Test RayJob with auto-created cluster lifecycle management on OpenShift.""" + def setup_method(self): initialize_kubernetes_client(self) def teardown_method(self): delete_namespace(self) - delete_kueue_resources(self) def test_rayjob_with_lifecycled_cluster_oauth(self): + """ + Test RayJob submission with embedded cluster configuration, including: + 1. Job submission with auto-cluster creation + 2. Job suspension (stop) and verification + 3. Job resumption (resubmit) and verification + 4. Job completion monitoring + 5. Automatic cluster cleanup after job deletion + """ self.setup_method() create_namespace(self) - create_kueue_resources(self) - self.run_rayjob_with_lifecycled_cluster_oauth() - - def run_rayjob_with_lifecycled_cluster_oauth(self): ray_image = get_ray_image() + self.job_api = RayjobApi() + job_name = "lifecycled-job" - auth = TokenAuthentication( - token=run_oc_command(["whoami", "--show-token=true"]), - server=run_oc_command(["whoami", "--show-server=true"]), - skip_tls=True, - ) - auth.login() - - job_name = "lifecycled-cluster-rayjob" - - # Create cluster configuration for auto-creation cluster_config = ManagedClusterConfig( head_cpu_requests="500m", head_cpu_limits="500m", @@ -58,113 +51,95 @@ def run_rayjob_with_lifecycled_cluster_oauth(self): image=ray_image, ) - # Create RayJob with embedded cluster - will auto-create and manage cluster lifecycle rayjob = RayJob( job_name=job_name, - cluster_config=cluster_config, # This triggers auto-cluster creation namespace=self.namespace, - entrypoint="python -c \"import ray; ray.init(); print('Hello from auto-created cluster!'); print(f'Ray version: {ray.__version__}'); import time; time.sleep(30); print('RayJob completed successfully!')\"", - runtime_env={ - "pip": ["torch", "pytorch-lightning", "torchmetrics", "torchvision"], - "env_vars": get_setup_env_variables(ACCELERATOR="cpu"), - }, - shutdown_after_job_finishes=True, # Auto-cleanup cluster after job finishes - ttl_seconds_after_finished=30, # Wait 30s after job completion before cleanup - ) - - # Submit the job - print( - f"Submitting RayJob '{job_name}' with auto-cluster creation and lifecycle management" + cluster_config=cluster_config, + entrypoint="python -c \"import ray; ray.init(); print('RayJob completed successfully')\"", + runtime_env={"env_vars": get_setup_env_variables(ACCELERATOR="cpu")}, + shutdown_after_job_finishes=True, ) - submission_result = rayjob.submit() - assert ( - submission_result == job_name - ), f"Job submission failed, expected {job_name}, got {submission_result}" - print( - f"Successfully submitted RayJob '{job_name}' with cluster '{rayjob.cluster_name}'!" - ) - - # Monitor the job status until completion - self.monitor_rayjob_completion(rayjob) - - # Verify cluster auto-cleanup - print("🔍 Verifying cluster auto-cleanup after job completion...") - self.verify_cluster_cleanup(rayjob.cluster_name, timeout=60) - - def monitor_rayjob_completion(self, rayjob: RayJob, timeout: int = 900): - """ - Monitor a RayJob until it completes or fails. - Args: - rayjob: The RayJob instance to monitor - timeout: Maximum time to wait in seconds (default: 15 minutes) - """ - print(f"Monitoring RayJob '{rayjob.name}' status...") + try: + # 1. Submit and wait for job to reach running state + assert rayjob.submit() == job_name + assert self.job_api.wait_until_job_running( + name=rayjob.name, k8s_namespace=rayjob.namespace, timeout=60 + ), "Job did not reach running state" + + # 2. Stop (suspend) the job and + assert rayjob.stop(), "Job stop failed" + job_cr = self.job_api.get_job( + name=rayjob.name, k8s_namespace=rayjob.namespace + ) + assert job_cr["spec"]["suspend"] is True, "Job suspend not set to true" + + assert self._wait_for_job_status( + rayjob, "Suspended", timeout=30 + ), "Job did not reach Suspended state" + + # 3. Test Job Resubmission + assert rayjob.resubmit(), "Job resubmit failed" + job_cr = self.job_api.get_job( + name=rayjob.name, k8s_namespace=rayjob.namespace + ) + assert job_cr["spec"]["suspend"] is False, "Job suspend not set to false" + + assert self.job_api.wait_until_job_finished( + name=rayjob.name, k8s_namespace=rayjob.namespace, timeout=300 + ), "Job did not complete" + + finally: + # 4. Delete the job and cleanup + assert rayjob.delete() + self.verify_cluster_cleanup(rayjob) + + def _wait_for_job_status( + self, + rayjob: RayJob, + expected_status: str, + timeout: int = 30, + ) -> bool: + """Wait for a job to reach a specific deployment status.""" elapsed_time = 0 - check_interval = 10 # Check every 10 seconds + check_interval = 2 while elapsed_time < timeout: - status, ready = rayjob.status(print_to_console=True) - - # Check if job has completed (either successfully or failed) - if status == CodeflareRayJobStatus.COMPLETE: - print(f"RayJob '{rayjob.name}' completed successfully!") - return - elif status == CodeflareRayJobStatus.FAILED: - raise AssertionError(f"RayJob '{rayjob.name}' failed!") - elif status == CodeflareRayJobStatus.RUNNING: - print(f"RayJob '{rayjob.name}' is still running...") - elif status == CodeflareRayJobStatus.UNKNOWN: - print(f"RayJob '{rayjob.name}' status is unknown") - - # Wait before next check + status = self.job_api.get_job_status( + name=rayjob.name, k8s_namespace=rayjob.namespace + ) + if status and status.get("jobDeploymentStatus") == expected_status: + return True + sleep(check_interval) elapsed_time += check_interval - # If we reach here, the job has timed out - final_status, _ = rayjob.status(print_to_console=True) - raise TimeoutError( - f"RayJob '{rayjob.name}' did not complete within {timeout} seconds. " - f"Final status: {final_status}" - ) - - def verify_cluster_cleanup(self, cluster_name: str, timeout: int = 60): - """ - Verify that the cluster created by the RayJob has been cleaned up. - Args: - cluster_name: The name of the cluster to check for cleanup - timeout: Maximum time to wait for cleanup in seconds (default: 1 minute) - """ - from kubernetes import client - import kubernetes.client.rest + return False + def verify_cluster_cleanup(self, rayjob: RayJob, timeout: int = 60): + """Verify that the cluster created by the RayJob has been cleaned up.""" elapsed_time = 0 - check_interval = 5 # Check every 5 seconds + check_interval = 5 + cluster_api = RayClusterApi() while elapsed_time < timeout: try: - # Try to get the RayCluster resource - custom_api = client.CustomObjectsApi() - custom_api.get_namespaced_custom_object( - group="ray.io", - version="v1", - namespace=self.namespace, - plural="rayclusters", - name=cluster_name, + cluster_info = cluster_api.get_ray_cluster( + name=rayjob.cluster_name, k8s_namespace=rayjob.namespace ) - print(f"Cluster '{cluster_name}' still exists, waiting for cleanup...") + # Cluster doesn't exist + if cluster_info is None: + return + sleep(check_interval) elapsed_time += check_interval + except kubernetes.client.rest.ApiException as e: if e.status == 404: - print( - f"✅ Cluster '{cluster_name}' has been successfully cleaned up!" - ) return else: raise e - # If we reach here, the cluster was not cleaned up in time raise TimeoutError( - f"Cluster '{cluster_name}' was not cleaned up within {timeout} seconds" + f"Cluster '{rayjob.cluster_name}' was not cleaned up within {timeout} seconds" ) From 1637cfdd8b481554dcfeaac90ed485f3091d9664 Mon Sep 17 00:00:00 2001 From: kryanbeane Date: Fri, 5 Sep 2025 09:00:25 +0100 Subject: [PATCH 3/3] RHOAIENG-27792: rayjob test improvements --- poetry.lock | 47 ++- pyproject.toml | 6 +- src/codeflare_sdk/ray/rayjobs/test_config.py | 34 ++ src/codeflare_sdk/ray/rayjobs/test_rayjob.py | 349 ++++++++++++++---- .../rayjob/lifecycled_cluster_oauth_test.py | 7 +- 5 files changed, 362 insertions(+), 81 deletions(-) diff --git a/poetry.lock b/poetry.lock index 22efaf62..5ba16429 100644 --- a/poetry.lock +++ b/poetry.lock @@ -545,6 +545,18 @@ files = [ [package.dependencies] pycparser = "*" +[[package]] +name = "chardet" +version = "5.2.0" +description = "Universal encoding detector for Python 3" +optional = false +python-versions = ">=3.7" +groups = ["dev"] +files = [ + {file = "chardet-5.2.0-py3-none-any.whl", hash = "sha256:e1cf59446890a00105fe7b7912492ea04b6e6f06d4b742b2c788469e34c82970"}, + {file = "chardet-5.2.0.tar.gz", hash = "sha256:1b3b6ff479a8c414bc3fa2c0852995695c4a026dcd6d0633b2dd092ca39c1cf7"}, +] + [[package]] name = "charset-normalizer" version = "3.4.2" @@ -893,6 +905,27 @@ files = [ {file = "defusedxml-0.7.1.tar.gz", hash = "sha256:1bb3032db185915b62d7c6209c5a8792be6a32ab2fedacc84e01b52c51aa3e69"}, ] +[[package]] +name = "diff-cover" +version = "9.6.0" +description = "Run coverage and linting reports on diffs" +optional = false +python-versions = ">=3.9" +groups = ["dev"] +files = [ + {file = "diff_cover-9.6.0-py3-none-any.whl", hash = "sha256:29fbeb52d77a0b8c811e5580d5dbf41801a838da2ed54319a599da8f7233c547"}, + {file = "diff_cover-9.6.0.tar.gz", hash = "sha256:75e5bc056dcaa68c6c87c9fb4e07c9e60daef15b6e8d034d56d2da9e2c84a872"}, +] + +[package.dependencies] +chardet = ">=3.0.0" +Jinja2 = ">=2.7.1" +pluggy = ">=0.13.1,<2" +Pygments = ">=2.19.1,<3.0.0" + +[package.extras] +toml = ["tomli (>=1.2.1)"] + [[package]] name = "distlib" version = "0.3.9" @@ -1546,7 +1579,7 @@ version = "3.1.6" description = "A very fast and expressive template engine." optional = false python-versions = ">=3.7" -groups = ["docs", "test"] +groups = ["dev", "docs", "test"] files = [ {file = "jinja2-3.1.6-py3-none-any.whl", hash = "sha256:85ece4451f492d0c13c5dd7c13a64681a86afae63a5f347908daf103ce6d2f67"}, {file = "jinja2-3.1.6.tar.gz", hash = "sha256:0137fb05990d35f1275a587e9aee6d56da821fc83491a0fb838183be43f66d6d"}, @@ -1916,7 +1949,7 @@ version = "3.0.2" description = "Safely add untrusted strings to HTML/XML markup." optional = false python-versions = ">=3.9" -groups = ["docs", "test"] +groups = ["dev", "docs", "test"] files = [ {file = "MarkupSafe-3.0.2-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:7e94c425039cde14257288fd61dcfb01963e658efbc0ff54f5306b06054700f8"}, {file = "MarkupSafe-3.0.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:9e2d922824181480953426608b81967de705c3cef4d1af983af849d7bd619158"}, @@ -2725,7 +2758,7 @@ version = "1.6.0" description = "plugin and hook calling mechanisms for python" optional = false python-versions = ">=3.9" -groups = ["test"] +groups = ["dev", "test"] files = [ {file = "pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746"}, {file = "pluggy-1.6.0.tar.gz", hash = "sha256:7dcc130b76258d33b90f61b658791dede3486c3e6bfb003ee5c9bfb396dd22f3"}, @@ -3214,7 +3247,7 @@ version = "2.19.2" description = "Pygments is a syntax highlighting package written in Python." optional = false python-versions = ">=3.8" -groups = ["main", "docs", "test"] +groups = ["main", "dev", "docs", "test"] files = [ {file = "pygments-2.19.2-py3-none-any.whl", hash = "sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b"}, {file = "pygments-2.19.2.tar.gz", hash = "sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887"}, @@ -3320,8 +3353,8 @@ kubernetes = ">=25.0.0" [package.source] type = "git" url = "https://github.com/ray-project/kuberay.git" -reference = "a16c0365e3b19a202d835097e1139eca9406b383" -resolved_reference = "a16c0365e3b19a202d835097e1139eca9406b383" +reference = "b2fd91b58c2bbe22f9b4f730c5a8f3180c05e570" +resolved_reference = "b2fd91b58c2bbe22f9b4f730c5a8f3180c05e570" subdirectory = "clients/python-client" [[package]] @@ -4696,4 +4729,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = "^3.11" -content-hash = "edf5742e4c1edc0261f6a58c3e80f8b535030e581b185a6e6ebedfee60d9155d" +content-hash = "88e1c126c20b29f7220d3d4347260b916ce024230acd742cc068af2701876fdd" diff --git a/pyproject.toml b/pyproject.toml index d1532e07..c5170a5d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,7 +33,7 @@ cryptography = "43.0.3" executing = "1.2.0" pydantic = "< 2" ipywidgets = "8.1.2" -python-client = { git = "https://github.com/ray-project/kuberay.git", subdirectory = "clients/python-client", rev = "a16c0365e3b19a202d835097e1139eca9406b383" } +python-client = { git = "https://github.com/ray-project/kuberay.git", subdirectory = "clients/python-client", rev = "b2fd91b58c2bbe22f9b4f730c5a8f3180c05e570" } [[tool.poetry.source]] name = "pypi" @@ -59,6 +59,10 @@ pytest-mock = "3.11.1" pytest-timeout = "2.3.1" jupyterlab = "4.3.1" + +[tool.poetry.group.dev.dependencies] +diff-cover = "^9.6.0" + [tool.pytest.ini_options] filterwarnings = [ "ignore::DeprecationWarning:pkg_resources", diff --git a/src/codeflare_sdk/ray/rayjobs/test_config.py b/src/codeflare_sdk/ray/rayjobs/test_config.py index 82e9464f..d19864ba 100644 --- a/src/codeflare_sdk/ray/rayjobs/test_config.py +++ b/src/codeflare_sdk/ray/rayjobs/test_config.py @@ -82,6 +82,40 @@ def test_gpu_validation_fails_with_unsupported_accelerator(): ManagedClusterConfig(head_accelerators={"unsupported.com/accelerator": 1}) +def test_config_type_validation_errors(mocker): + """Test that type validation properly raises errors with incorrect types.""" + # Mock the _is_type method to return False for type checking + mocker.patch.object( + ManagedClusterConfig, + "_is_type", + side_effect=lambda value, expected_type: False, # Always fail type check + ) + + # This should raise TypeError during initialization + with pytest.raises(TypeError, match="Type validation failed"): + ManagedClusterConfig() + + +def test_config_is_type_method(): + """Test the _is_type static method for type checking.""" + # Test basic types + assert ManagedClusterConfig._is_type("test", str) is True + assert ManagedClusterConfig._is_type(123, int) is True + assert ManagedClusterConfig._is_type(123, str) is False + + # Test optional types (Union with None) + from typing import Optional + + assert ManagedClusterConfig._is_type(None, Optional[str]) is True + assert ManagedClusterConfig._is_type("test", Optional[str]) is True + assert ManagedClusterConfig._is_type(123, Optional[str]) is False + + # Test dict types + assert ManagedClusterConfig._is_type({}, dict) is True + assert ManagedClusterConfig._is_type({"key": "value"}, dict) is True + assert ManagedClusterConfig._is_type([], dict) is False + + def test_ray_usage_stats_always_disabled_by_default(): """Test that RAY_USAGE_STATS_ENABLED is always set to '0' by default""" config = ManagedClusterConfig() diff --git a/src/codeflare_sdk/ray/rayjobs/test_rayjob.py b/src/codeflare_sdk/ray/rayjobs/test_rayjob.py index 7c4823f8..54ad61dd 100644 --- a/src/codeflare_sdk/ray/rayjobs/test_rayjob.py +++ b/src/codeflare_sdk/ray/rayjobs/test_rayjob.py @@ -20,6 +20,13 @@ from codeflare_sdk.ray.rayjobs.rayjob import RayJob from codeflare_sdk.ray.cluster.config import ClusterConfiguration from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig +from kubernetes.client import ( + V1Volume, + V1VolumeMount, + V1Toleration, + V1ConfigMapVolumeSource, + ApiException, +) def test_rayjob_submit_success(mocker): @@ -274,8 +281,6 @@ def test_build_ray_cluster_spec(mocker): }, } # Use ManagedClusterConfig which has the build_ray_cluster_spec method - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig - cluster_config = ManagedClusterConfig(num_workers=2) # Mock the method that will be called @@ -353,9 +358,6 @@ def test_build_rayjob_cr_with_auto_cluster(mocker): "workerGroupSpecs": [{"replicas": 2}], }, } - # Use ManagedClusterConfig and mock its build_ray_cluster_spec method - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig - cluster_config = ManagedClusterConfig(num_workers=2) # Mock the method that will be called @@ -415,8 +417,6 @@ def test_submit_with_auto_cluster(mocker): mock_api_instance.submit_job.return_value = True # Use ManagedClusterConfig and mock its build_ray_cluster_spec method - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig - cluster_config = ManagedClusterConfig(num_workers=1) # Mock the method that will be called @@ -504,8 +504,6 @@ def test_shutdown_behavior_with_cluster_config(mocker): mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig - cluster_config = ManagedClusterConfig() rayjob = RayJob( @@ -540,8 +538,6 @@ def test_rayjob_with_rayjob_cluster_config(mocker): mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig - cluster_config = ManagedClusterConfig( num_workers=2, head_cpu_requests="500m", @@ -565,8 +561,6 @@ def test_rayjob_cluster_config_validation(mocker): mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig - # Test with minimal valid config cluster_config = ManagedClusterConfig() @@ -603,8 +597,6 @@ def test_build_ray_cluster_spec_integration(mocker): # Mock the RayjobApi class entirely mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig - cluster_config = ManagedClusterConfig() # Mock the build_ray_cluster_spec method on the cluster config @@ -686,8 +678,6 @@ def test_rayjob_cluster_name_generation_with_config(mocker): mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig - cluster_config = ManagedClusterConfig() rayjob = RayJob( @@ -708,15 +698,11 @@ def test_rayjob_namespace_propagation_to_cluster_config(mocker): mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") - from codeflare_sdk.ray.rayjobs.rayjob import get_current_namespace - mocker.patch( "codeflare_sdk.ray.rayjobs.rayjob.get_current_namespace", return_value="detected-ns", ) - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig - cluster_config = ManagedClusterConfig() rayjob = RayJob( @@ -767,8 +753,6 @@ def test_rayjob_constructor_parameter_validation(mocker): def test_build_ray_cluster_spec_function(mocker): """Test the build_ray_cluster_spec method directly.""" - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig - # Create a test cluster config cluster_config = ManagedClusterConfig( num_workers=2, @@ -806,8 +790,6 @@ def test_build_ray_cluster_spec_function(mocker): def test_build_ray_cluster_spec_with_accelerators(mocker): """Test build_ray_cluster_spec with GPU accelerators.""" - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig - # Create a test cluster config with GPU accelerators cluster_config = ManagedClusterConfig( head_accelerators={"nvidia.com/gpu": 1}, @@ -833,9 +815,6 @@ def test_build_ray_cluster_spec_with_accelerators(mocker): def test_build_ray_cluster_spec_with_custom_volumes(mocker): """Test build_ray_cluster_spec with custom volumes and volume mounts.""" - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig - from kubernetes.client import V1Volume, V1VolumeMount - # Create custom volumes and volume mounts custom_volume = V1Volume(name="custom-data", empty_dir={}) custom_volume_mount = V1VolumeMount(name="custom-data", mount_path="/data") @@ -863,8 +842,6 @@ def test_build_ray_cluster_spec_with_custom_volumes(mocker): def test_build_ray_cluster_spec_with_environment_variables(mocker): """Test build_ray_cluster_spec with environment variables.""" - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig - # Create a test cluster config with environment variables cluster_config = ManagedClusterConfig( envs={"CUDA_VISIBLE_DEVICES": "0", "RAY_DISABLE_IMPORT_WARNING": "1"}, @@ -895,9 +872,6 @@ def test_build_ray_cluster_spec_with_environment_variables(mocker): def test_build_ray_cluster_spec_with_tolerations(mocker): """Test build_ray_cluster_spec with tolerations.""" - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig - from kubernetes.client import V1Toleration - # Create test tolerations head_toleration = V1Toleration( key="node-role.kubernetes.io/master", operator="Exists", effect="NoSchedule" @@ -932,8 +906,6 @@ def test_build_ray_cluster_spec_with_tolerations(mocker): def test_build_ray_cluster_spec_with_image_pull_secrets(mocker): """Test build_ray_cluster_spec with image pull secrets.""" - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig - # Create a test cluster config with image pull secrets cluster_config = ManagedClusterConfig( image_pull_secrets=["my-registry-secret", "another-secret"] @@ -981,8 +953,6 @@ def test_rayjob_user_override_shutdown_behavior(mocker): assert rayjob_existing_override.shutdown_after_job_finishes is True # Test 2: User overrides shutdown to False even when creating new cluster - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig - cluster_config = ManagedClusterConfig() rayjob_new_override = RayJob( @@ -1300,8 +1270,6 @@ def test_extract_script_files_nonexistent_script(mocker): def test_build_script_configmap_spec(): """Test building ConfigMap specification for scripts.""" - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig - config = ManagedClusterConfig() scripts = {"main.py": "print('main')", "helper.py": "print('helper')"} @@ -1318,8 +1286,6 @@ def test_build_script_configmap_spec(): def test_build_script_volume_specs(): """Test building volume and mount specifications for scripts.""" - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig - config = ManagedClusterConfig() volume_spec, mount_spec = config.build_script_volume_specs( @@ -1335,8 +1301,6 @@ def test_build_script_volume_specs(): def test_add_script_volumes(): """Test adding script volumes to cluster configuration.""" - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig - config = ManagedClusterConfig() # Initially no volumes @@ -1361,8 +1325,6 @@ def test_add_script_volumes(): def test_add_script_volumes_duplicate_prevention(): """Test that adding script volumes twice doesn't create duplicates.""" - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig - config = ManagedClusterConfig() # Add volumes twice @@ -1416,8 +1378,6 @@ def test_create_configmap_already_exists(mocker): mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.get_api_client") - from kubernetes.client import ApiException - mock_api_instance.create_namespaced_config_map.side_effect = ApiException( status=409 ) @@ -1618,8 +1578,6 @@ def test_handle_script_volumes_for_new_cluster(mocker, tmp_path): test_script = tmp_path / "test.py" test_script.write_text("print('test')") - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig - cluster_config = ManagedClusterConfig() original_cwd = os.getcwd() @@ -1742,8 +1700,6 @@ def track_handle_scripts(*args, **kwargs): try: os.chdir(tmp_path) - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig - cluster_config = ManagedClusterConfig() rayjob = RayJob( @@ -1786,8 +1742,6 @@ def test_rayjob_submit_with_scripts_new_cluster(mocker, tmp_path): test_script = tmp_path / "test.py" test_script.write_text("print('Hello from script!')") - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig - cluster_config = ManagedClusterConfig() original_cwd = os.getcwd() @@ -1942,8 +1896,6 @@ def capture_configmap(namespace, body): try: os.chdir(tmp_path) - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig - cluster_config = ManagedClusterConfig() rayjob = RayJob( @@ -2037,8 +1989,6 @@ def test_create_configmap_api_error_non_409(mocker): mock_api_instance = mocker.Mock() mock_k8s_api.return_value = mock_api_instance - from kubernetes.client import ApiException - mock_api_instance.create_namespaced_config_map.side_effect = ApiException( status=500 ) @@ -2076,12 +2026,8 @@ def test_update_existing_cluster_get_cluster_error(mocker): mock_cluster_api_instance = mocker.Mock() mock_cluster_api_class.return_value = mock_cluster_api_instance - from kubernetes.client import ApiException - mock_cluster_api_instance.get_ray_cluster.side_effect = ApiException(status=404) - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig - config_builder = ManagedClusterConfig() rayjob = RayJob( @@ -2126,12 +2072,8 @@ def test_update_existing_cluster_patch_error(mocker): } } - from kubernetes.client import ApiException - mock_cluster_api_instance.patch_ray_cluster.side_effect = ApiException(status=500) - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig - config_builder = ManagedClusterConfig() rayjob = RayJob( @@ -2165,9 +2107,6 @@ def test_extract_script_files_empty_entrypoint(mocker): def test_add_script_volumes_existing_volume_skip(): """Test add_script_volumes skips when volume already exists (missing coverage).""" - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig - from kubernetes.client import V1Volume, V1ConfigMapVolumeSource - config = ManagedClusterConfig() # Pre-add a volume with same name @@ -2187,9 +2126,6 @@ def test_add_script_volumes_existing_volume_skip(): def test_add_script_volumes_existing_mount_skip(): """Test add_script_volumes skips when mount already exists (missing coverage).""" - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig - from kubernetes.client import V1VolumeMount - config = ManagedClusterConfig() # Pre-add a mount with same name @@ -2322,3 +2258,274 @@ def test_rayjob_resubmit_failure(mocker): mock_api_instance.resubmit_job.assert_called_once_with( name="test-rayjob", k8s_namespace="test-namespace" ) + + +def test_rayjob_delete_success(mocker): + """Test successful RayJob deletion.""" + # Mock the API + mocker.patch("kubernetes.config.load_kube_config") + mock_api_instance = mocker.MagicMock() + mocker.patch( + "codeflare_sdk.ray.rayjobs.rayjob.RayjobApi", return_value=mock_api_instance + ) + mocker.patch( + "codeflare_sdk.ray.rayjobs.rayjob.get_current_namespace", + return_value="test-namespace", + ) + + rayjob = RayJob( + job_name="test-rayjob", + entrypoint="python script.py", + cluster_name="test-cluster", + ) + + mock_api_instance.delete_job.return_value = True + + result = rayjob.delete() + + assert result is True + mock_api_instance.delete_job.assert_called_once_with( + name="test-rayjob", k8s_namespace="test-namespace" + ) + + +def test_rayjob_delete_failure(mocker): + """Test failed RayJob deletion.""" + mock_api_instance = mocker.MagicMock() + mocker.patch("kubernetes.config.load_kube_config") + mocker.patch( + "codeflare_sdk.ray.rayjobs.rayjob.RayjobApi", return_value=mock_api_instance + ) + mocker.patch( + "codeflare_sdk.ray.rayjobs.rayjob.get_current_namespace", + return_value="test-namespace", + ) + + rayjob = RayJob( + job_name="test-rayjob", + entrypoint="python script.py", + cluster_name="test-cluster", + ) + + mock_api_instance.delete_job.return_value = False + + with pytest.raises(RuntimeError, match="Failed to delete the RayJob test-rayjob"): + rayjob.delete() + + mock_api_instance.delete_job.assert_called_once_with( + name="test-rayjob", k8s_namespace="test-namespace" + ) + + +def test_rayjob_init_both_none_error(mocker): + """Test RayJob initialization error when both cluster_name and cluster_config are None.""" + mocker.patch( + "codeflare_sdk.ray.rayjobs.rayjob.get_current_namespace", + return_value="test-namespace", + ) + + with pytest.raises( + ValueError, + match="Configuration Error: You must provide either 'cluster_name' .* or 'cluster_config'", + ): + RayJob( + job_name="test-job", + entrypoint="python script.py", + cluster_name=None, + cluster_config=None, + ) + + +def test_rayjob_init_missing_cluster_name_with_no_config(mocker): + """Test RayJob initialization error when cluster_name is None without cluster_config.""" + mocker.patch( + "codeflare_sdk.ray.rayjobs.rayjob.get_current_namespace", + return_value="test-namespace", + ) + + with pytest.raises( + ValueError, + match="Configuration Error: a 'cluster_name' is required when not providing 'cluster_config'", + ): + rayjob = RayJob.__new__(RayJob) + rayjob.name = "test-job" + rayjob.entrypoint = "python script.py" + rayjob.runtime_env = None + rayjob.ttl_seconds_after_finished = 0 + rayjob.active_deadline_seconds = None + rayjob.shutdown_after_job_finishes = False + rayjob.namespace = "test-namespace" + rayjob._cluster_name = None + rayjob._cluster_config = None + if rayjob._cluster_config is None and rayjob._cluster_name is None: + raise ValueError( + "❌ Configuration Error: a 'cluster_name' is required when not providing 'cluster_config'" + ) + + +def test_handle_script_volumes_for_existing_cluster_direct_call(mocker): + """Test _handle_script_volumes_for_existing_cluster method directly.""" + # Mock APIs + mock_api_instance = mocker.MagicMock() + mock_cluster_api = mocker.MagicMock() + mocker.patch( + "codeflare_sdk.ray.rayjobs.rayjob.RayjobApi", return_value=mock_api_instance + ) + mocker.patch( + "codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi", return_value=mock_cluster_api + ) + mocker.patch( + "codeflare_sdk.ray.rayjobs.rayjob.get_current_namespace", + return_value="test-namespace", + ) + + # Mock the Kubernetes API for ConfigMap creation + mock_k8s_api = mocker.MagicMock() + mocker.patch("kubernetes.client.CoreV1Api", return_value=mock_k8s_api) + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.get_api_client", return_value=None) + + # Mock existing cluster + mock_cluster = { + "spec": { + "headGroupSpec": { + "template": { + "spec": {"containers": [{"volumeMounts": []}], "volumes": []} + } + }, + "workerGroupSpecs": [ + { + "template": { + "spec": {"containers": [{"volumeMounts": []}], "volumes": []} + } + } + ], + } + } + mock_cluster_api.get_ray_cluster.return_value = mock_cluster + + rayjob = RayJob( + job_name="test-job", + entrypoint="python script.py", + cluster_name="existing-cluster", + ) + + scripts = {"test_script.py": "print('Hello World')"} + rayjob._handle_script_volumes_for_existing_cluster( + scripts, {"metadata": {"uid": "test-uid"}} + ) + + mock_k8s_api.create_namespaced_config_map.assert_called_once() + created_configmap = mock_k8s_api.create_namespaced_config_map.call_args[1]["body"] + assert "test_script.py" in created_configmap.data + + mock_cluster_api.patch_ray_cluster.assert_called_once_with( + name="existing-cluster", ray_patch=mock_cluster, k8s_namespace="test-namespace" + ) + + +def test_handle_script_volumes_for_existing_cluster_no_volumes_init(mocker): + """Test _handle_script_volumes_for_existing_cluster when volumes/mounts don't exist initially.""" + mock_api_instance = mocker.MagicMock() + mock_cluster_api = mocker.MagicMock() + mocker.patch( + "codeflare_sdk.ray.rayjobs.rayjob.RayjobApi", return_value=mock_api_instance + ) + mocker.patch( + "codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi", return_value=mock_cluster_api + ) + mocker.patch( + "codeflare_sdk.ray.rayjobs.rayjob.get_current_namespace", + return_value="test-namespace", + ) + + mock_k8s_api = mocker.MagicMock() + mocker.patch("kubernetes.client.CoreV1Api", return_value=mock_k8s_api) + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.get_api_client", return_value=None) + + # Mock existing cluster WITHOUT volumes/volumeMounts (to test initialization) + mock_cluster = { + "spec": { + "headGroupSpec": {"template": {"spec": {"containers": [{}]}}}, + "workerGroupSpecs": [{"template": {"spec": {"containers": [{}]}}}], + } + } + mock_cluster_api.get_ray_cluster.return_value = mock_cluster + + # Create RayJob with existing cluster + rayjob = RayJob( + job_name="test-job", + entrypoint="python script.py", + cluster_name="existing-cluster", + ) + + # Call the method directly with test scripts + scripts = {"test_script.py": "print('Hello World')"} + rayjob._handle_script_volumes_for_existing_cluster( + scripts, {"metadata": {"uid": "test-uid"}} + ) + + # Verify volumes and volumeMounts were initialized + patched_cluster = mock_cluster_api.patch_ray_cluster.call_args[1]["ray_patch"] + + # Check head group + head_spec = patched_cluster["spec"]["headGroupSpec"]["template"]["spec"] + assert "volumes" in head_spec + assert len(head_spec["volumes"]) == 1 + assert "volumeMounts" in head_spec["containers"][0] + assert len(head_spec["containers"][0]["volumeMounts"]) == 1 + + # Check worker group + worker_spec = patched_cluster["spec"]["workerGroupSpecs"][0]["template"]["spec"] + assert "volumes" in worker_spec + assert len(worker_spec["volumes"]) == 1 + assert "volumeMounts" in worker_spec["containers"][0] + assert len(worker_spec["containers"][0]["volumeMounts"]) == 1 + + +def test_update_existing_cluster_for_scripts_api_errors(mocker): + """Test _update_existing_cluster_for_scripts error handling.""" + mock_api_instance = mocker.MagicMock() + mock_cluster_api = mocker.MagicMock() + mocker.patch( + "codeflare_sdk.ray.rayjobs.rayjob.RayjobApi", return_value=mock_api_instance + ) + mocker.patch( + "codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi", return_value=mock_cluster_api + ) + mocker.patch( + "codeflare_sdk.ray.rayjobs.rayjob.get_current_namespace", + return_value="test-namespace", + ) + + # Mock config builder + mock_config_builder = mocker.MagicMock() + mocker.patch( + "codeflare_sdk.ray.rayjobs.rayjob.ManagedClusterConfig", + return_value=mock_config_builder, + ) + + # Set up config builder to return valid specs + mock_config_builder.build_script_volume_specs.return_value = ( + {"name": "script-volume", "configMap": {"name": "test-configmap"}}, + {"name": "script-volume", "mountPath": "/home/ray/scripts"}, + ) + + # Mock cluster API to raise error + mock_cluster_api.get_ray_cluster.side_effect = ApiException( + status=404, reason="Not Found" + ) + + # Create RayJob + rayjob = RayJob( + job_name="test-job", + entrypoint="python script.py", + cluster_name="existing-cluster", + ) + + # Call the method directly + with pytest.raises( + RuntimeError, match="Failed to get RayCluster 'existing-cluster'" + ): + rayjob._update_existing_cluster_for_scripts( + "test-configmap", mock_config_builder + ) diff --git a/tests/e2e/rayjob/lifecycled_cluster_oauth_test.py b/tests/e2e/rayjob/lifecycled_cluster_oauth_test.py index 41dd5280..7db71441 100644 --- a/tests/e2e/rayjob/lifecycled_cluster_oauth_test.py +++ b/tests/e2e/rayjob/lifecycled_cluster_oauth_test.py @@ -7,7 +7,10 @@ from support import * from codeflare_sdk import RayJob, ManagedClusterConfig -from codeflare_sdk.ray.rayjobs.status import CodeflareRayJobStatus +from codeflare_sdk.ray.rayjobs.status import ( + CodeflareRayJobStatus, + RayJobDeploymentStatus, +) import kubernetes.client.rest from python_client.kuberay_job_api import RayjobApi from python_client.kuberay_cluster_api import RayClusterApi @@ -64,7 +67,7 @@ def test_rayjob_with_lifecycled_cluster_oauth(self): # 1. Submit and wait for job to reach running state assert rayjob.submit() == job_name assert self.job_api.wait_until_job_running( - name=rayjob.name, k8s_namespace=rayjob.namespace, timeout=60 + name=rayjob.name, k8s_namespace=rayjob.namespace, timeout=300 ), "Job did not reach running state" # 2. Stop (suspend) the job and