From ec79d1e45f608a95662c0d2b132fe29ef20b9ff7 Mon Sep 17 00:00:00 2001 From: Pat O'Connor Date: Tue, 19 Aug 2025 16:53:31 +0100 Subject: [PATCH 1/4] feat(RHOAIENG-29391): Store entrypoint scripts in configMaps Signed-off-by: Pat O'Connor --- src/codeflare_sdk/ray/rayjobs/config.py | 87 ++- src/codeflare_sdk/ray/rayjobs/rayjob.py | 278 +++++++ src/codeflare_sdk/ray/rayjobs/test_config.py | 39 + src/codeflare_sdk/ray/rayjobs/test_rayjob.py | 725 +++++++++++++++++++ 4 files changed, 1128 insertions(+), 1 deletion(-) diff --git a/src/codeflare_sdk/ray/rayjobs/config.py b/src/codeflare_sdk/ray/rayjobs/config.py index 08c352ee..dcc4d2bf 100644 --- a/src/codeflare_sdk/ray/rayjobs/config.py +++ b/src/codeflare_sdk/ray/rayjobs/config.py @@ -20,7 +20,7 @@ import pathlib from dataclasses import dataclass, field, fields -from typing import Dict, List, Optional, Union, get_args, get_origin, Any +from typing import Dict, List, Optional, Union, get_args, get_origin, Any, Tuple from kubernetes.client import ( V1ConfigMapVolumeSource, V1KeyToPath, @@ -515,3 +515,88 @@ def _build_gcs_ft_options(self) -> Dict[str, Any]: } return gcs_ft_options + + def add_script_volumes( + self, configmap_name: str, mount_path: str = "/home/ray/scripts" + ): + """ + Add script volume and mount references to cluster configuration. + + Args: + configmap_name: Name of the ConfigMap containing scripts + mount_path: Where to mount scripts in containers (default: /home/ray/scripts) + """ + # Check if script volume already exists + volume_name = "ray-job-scripts" + existing_volume = next( + (v for v in self.volumes if getattr(v, "name", None) == volume_name), None + ) + if existing_volume: + logger.debug(f"Script volume '{volume_name}' already exists, skipping...") + return + + # Check if script mount already exists + existing_mount = next( + (m for m in self.volume_mounts if getattr(m, "name", None) == volume_name), + None, + ) + if existing_mount: + logger.debug( + f"Script volume mount '{volume_name}' already exists, skipping..." + ) + return + + # Add script volume to cluster configuration + script_volume = V1Volume( + name=volume_name, config_map=V1ConfigMapVolumeSource(name=configmap_name) + ) + self.volumes.append(script_volume) + + # Add script volume mount to cluster configuration + script_mount = V1VolumeMount(name=volume_name, mount_path=mount_path) + self.volume_mounts.append(script_mount) + + logger.info( + f"Added script volume '{configmap_name}' to cluster config: mount_path={mount_path}" + ) + + def build_script_configmap_spec( + self, job_name: str, namespace: str, scripts: Dict[str, str] + ) -> Dict[str, Any]: + """ + Build ConfigMap specification for scripts + + Args: + job_name: Name of the RayJob (used for ConfigMap naming) + namespace: Kubernetes namespace + scripts: Dictionary of script_name -> script_content + + Returns: + Dict: ConfigMap specification ready for Kubernetes API + """ + configmap_name = f"{job_name}-scripts" + return { + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": {"name": configmap_name, "namespace": namespace}, + "data": scripts, + } + + def build_script_volume_specs( + self, configmap_name: str, mount_path: str = "/home/ray/scripts" + ) -> Tuple[Dict[str, Any], Dict[str, Any]]: + """ + Build volume and mount specifications for scripts + + Args: + configmap_name: Name of the ConfigMap containing scripts + mount_path: Where to mount scripts in containers + + Returns: + Tuple of (volume_spec, mount_spec) as dictionaries + """ + volume_spec = {"name": "ray-job-scripts", "configMap": {"name": configmap_name}} + + mount_spec = {"name": "ray-job-scripts", "mountPath": mount_path} + + return volume_spec, mount_spec diff --git a/src/codeflare_sdk/ray/rayjobs/rayjob.py b/src/codeflare_sdk/ray/rayjobs/rayjob.py index 6230a0e1..43562da8 100644 --- a/src/codeflare_sdk/ray/rayjobs/rayjob.py +++ b/src/codeflare_sdk/ray/rayjobs/rayjob.py @@ -18,7 +18,12 @@ import logging import warnings +import os +import re +import ast from typing import Dict, Any, Optional, Tuple +from kubernetes import client +from ...common.kubernetes_cluster.auth import get_api_client from python_client.kuberay_job_api import RayjobApi from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig @@ -153,6 +158,17 @@ def submit(self) -> str: # Validate Ray version compatibility for both cluster_config and runtime_env self._validate_ray_version_compatibility() + # Automatically handle script files for new clusters + if self._cluster_config is not None: + scripts = self._extract_script_files_from_entrypoint() + if scripts: + self._handle_script_volumes_for_new_cluster(scripts) + + # Handle script files for existing clusters + elif self._cluster_name: + scripts = self._extract_script_files_from_entrypoint() + if scripts: + self._handle_script_volumes_for_existing_cluster(scripts) # Build the RayJob custom resource rayjob_cr = self._build_rayjob_cr() @@ -323,3 +339,265 @@ def _map_to_codeflare_status( return status_mapping.get( deployment_status, (CodeflareRayJobStatus.UNKNOWN, False) ) + + def _extract_script_files_from_entrypoint(self) -> Optional[Dict[str, str]]: + """ + Extract local Python script files from entrypoint command, plus their dependencies. + + Returns: + Dict of {script_name: script_content} if local scripts found, None otherwise + """ + if not self.entrypoint: + return None + + scripts = {} + mount_path = "/home/ray/scripts" + processed_files = set() # Avoid infinite loops + + # Look for Python file patterns in entrypoint (e.g., "python script.py", "python /path/to/script.py") + python_file_pattern = r"(?:python\s+)?([./\w/]+\.py)" + matches = re.findall(python_file_pattern, self.entrypoint) + + # Process main scripts from entrypoint files + for script_path in matches: + self._process_script_and_imports( + script_path, scripts, mount_path, processed_files + ) + + # Update entrypoint paths to use mounted locations + for script_path in matches: + if script_path in [os.path.basename(s) for s in processed_files]: + old_path = script_path + new_path = f"{mount_path}/{os.path.basename(script_path)}" + self.entrypoint = self.entrypoint.replace(old_path, new_path) + + return scripts if scripts else None + + def _process_script_and_imports( + self, + script_path: str, + scripts: Dict[str, str], + mount_path: str, + processed_files: set, + ): + """Recursively process a script and its local imports""" + if script_path in processed_files: + return + + # Check if it's a local file (not already a container path) + if script_path.startswith("/home/ray/") or not os.path.isfile(script_path): + return + + processed_files.add(script_path) + + try: + with open(script_path, "r") as f: + script_content = f.read() + + script_name = os.path.basename(script_path) + scripts[script_name] = script_content + + logger.info( + f"Found local script: {script_path} -> will mount at {mount_path}/{script_name}" + ) + + # Parse imports in this script to find dependencies + self._find_local_imports( + script_content, + script_path, + lambda path: self._process_script_and_imports( + path, scripts, mount_path, processed_files + ), + ) + + except (IOError, OSError) as e: + logger.warning(f"Could not read script file {script_path}: {e}") + + def _find_local_imports( + self, script_content: str, script_path: str, process_callback + ): + """ + Find local Python imports in script content and process them. + + Args: + script_content: The content of the Python script + script_path: Path to the current script (for relative imports) + process_callback: Function to call for each found local import + """ + + try: + # Parse the Python AST to find imports + tree = ast.parse(script_content) + script_dir = os.path.dirname(os.path.abspath(script_path)) + + for node in ast.walk(tree): + if isinstance(node, ast.Import): + # Handle: import module_name + for alias in node.names: + potential_file = os.path.join(script_dir, f"{alias.name}.py") + if os.path.isfile(potential_file): + process_callback(potential_file) + + elif isinstance(node, ast.ImportFrom): + # Handle: from module_name import something + if node.module: + potential_file = os.path.join(script_dir, f"{node.module}.py") + if os.path.isfile(potential_file): + process_callback(potential_file) + + except (SyntaxError, ValueError) as e: + logger.debug(f"Could not parse imports from {script_path}: {e}") + + def _handle_script_volumes_for_new_cluster(self, scripts: Dict[str, str]): + """Handle script volumes for new clusters (uses ManagedClusterConfig).""" + # Build ConfigMap spec using config.py + configmap_spec = self._cluster_config.build_script_configmap_spec( + job_name=self.name, namespace=self.namespace, scripts=scripts + ) + + # Create ConfigMap via Kubernetes API + configmap_name = self._create_configmap_from_spec(configmap_spec) + + # Add volumes to cluster config (config.py handles spec building) + self._cluster_config.add_script_volumes( + configmap_name=configmap_name, mount_path="/home/ray/scripts" + ) + + def _handle_script_volumes_for_existing_cluster(self, scripts: Dict[str, str]): + """Handle script volumes for existing clusters (updates RayCluster CR).""" + # Create config builder for utility methods + config_builder = ManagedClusterConfig() + + # Build ConfigMap spec using config.py + configmap_spec = config_builder.build_script_configmap_spec( + job_name=self.name, namespace=self.namespace, scripts=scripts + ) + + # Create ConfigMap via Kubernetes API + configmap_name = self._create_configmap_from_spec(configmap_spec) + + # Update existing RayCluster + self._update_existing_cluster_for_scripts(configmap_name, config_builder) + + def _create_configmap_from_spec(self, configmap_spec: Dict[str, Any]) -> str: + """ + Create ConfigMap from specification via Kubernetes API. + + Args: + configmap_spec: ConfigMap specification dictionary + + Returns: + str: Name of the created ConfigMap + """ + + configmap_name = configmap_spec["metadata"]["name"] + + # Convert dict spec to V1ConfigMap + configmap = client.V1ConfigMap( + metadata=client.V1ObjectMeta(**configmap_spec["metadata"]), + data=configmap_spec["data"], + ) + + # Create ConfigMap via Kubernetes API + k8s_api = client.CoreV1Api(get_api_client()) + try: + k8s_api.create_namespaced_config_map( + namespace=self.namespace, body=configmap + ) + logger.info( + f"Created ConfigMap '{configmap_name}' with {len(configmap_spec['data'])} scripts" + ) + except client.ApiException as e: + if e.status == 409: # Already exists + logger.info(f"ConfigMap '{configmap_name}' already exists, updating...") + k8s_api.replace_namespaced_config_map( + name=configmap_name, namespace=self.namespace, body=configmap + ) + else: + raise RuntimeError( + f"Failed to create ConfigMap '{configmap_name}': {e}" + ) + + return configmap_name + + # Note: This only works once the pods have been restarted as the configmaps won't be picked up until then :/ + def _update_existing_cluster_for_scripts( + self, configmap_name: str, config_builder: ManagedClusterConfig + ): + """ + Update existing RayCluster to add script volumes and mounts. + + Args: + configmap_name: Name of the ConfigMap containing scripts + config_builder: ManagedClusterConfig instance for building specs + """ + + # Get existing RayCluster + api_instance = client.CustomObjectsApi(get_api_client()) + try: + ray_cluster = api_instance.get_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=self.namespace, + plural="rayclusters", + name=self.cluster_name, + ) + except client.ApiException as e: + raise RuntimeError(f"Failed to get RayCluster '{self.cluster_name}': {e}") + + # Build script volume and mount specifications using config.py + script_volume, script_mount = config_builder.build_script_volume_specs( + configmap_name=configmap_name, mount_path="/home/ray/scripts" + ) + + # Helper function to check for duplicate volumes/mounts + def volume_exists(volumes_list, volume_name): + return any(v.get("name") == volume_name for v in volumes_list) + + def mount_exists(mounts_list, mount_name): + return any(m.get("name") == mount_name for m in mounts_list) + + # Add volumes and mounts to head group + head_spec = ray_cluster["spec"]["headGroupSpec"]["template"]["spec"] + if "volumes" not in head_spec: + head_spec["volumes"] = [] + if not volume_exists(head_spec["volumes"], script_volume["name"]): + head_spec["volumes"].append(script_volume) + + head_container = head_spec["containers"][0] # Ray head container + if "volumeMounts" not in head_container: + head_container["volumeMounts"] = [] + if not mount_exists(head_container["volumeMounts"], script_mount["name"]): + head_container["volumeMounts"].append(script_mount) + + # Add volumes and mounts to worker groups + for worker_group in ray_cluster["spec"]["workerGroupSpecs"]: + worker_spec = worker_group["template"]["spec"] + if "volumes" not in worker_spec: + worker_spec["volumes"] = [] + if not volume_exists(worker_spec["volumes"], script_volume["name"]): + worker_spec["volumes"].append(script_volume) + + worker_container = worker_spec["containers"][0] # Ray worker container + if "volumeMounts" not in worker_container: + worker_container["volumeMounts"] = [] + if not mount_exists(worker_container["volumeMounts"], script_mount["name"]): + worker_container["volumeMounts"].append(script_mount) + + # Update the RayCluster + try: + api_instance.patch_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=self.namespace, + plural="rayclusters", + name=self.cluster_name, + body=ray_cluster, + ) + logger.info( + f"Updated RayCluster '{self.cluster_name}' with script volumes from ConfigMap '{configmap_name}'" + ) + except client.ApiException as e: + raise RuntimeError( + f"Failed to update RayCluster '{self.cluster_name}': {e}" + ) diff --git a/src/codeflare_sdk/ray/rayjobs/test_config.py b/src/codeflare_sdk/ray/rayjobs/test_config.py index 80736295..7d7864c5 100644 --- a/src/codeflare_sdk/ray/rayjobs/test_config.py +++ b/src/codeflare_sdk/ray/rayjobs/test_config.py @@ -131,3 +131,42 @@ def test_ray_usage_stats_with_other_user_envs(): # Total count should be correct (3 user envs) assert len(config.envs) == 3 + + +def test_add_script_volumes_existing_volume_early_return(): + """Test add_script_volumes early return when volume already exists.""" + from kubernetes.client import V1Volume, V1ConfigMapVolumeSource + + config = ManagedClusterConfig() + + # Pre-add a volume with same name + existing_volume = V1Volume( + name="ray-job-scripts", + config_map=V1ConfigMapVolumeSource(name="existing-scripts"), + ) + config.volumes.append(existing_volume) + + # Should return early and not add duplicate + config.add_script_volumes(configmap_name="new-scripts") + + # Should still have only one volume, no mount added + assert len(config.volumes) == 1 + assert len(config.volume_mounts) == 0 + + +def test_add_script_volumes_existing_mount_early_return(): + """Test add_script_volumes early return when mount already exists.""" + from kubernetes.client import V1VolumeMount + + config = ManagedClusterConfig() + + # Pre-add a mount with same name + existing_mount = V1VolumeMount(name="ray-job-scripts", mount_path="/existing/path") + config.volume_mounts.append(existing_mount) + + # Should return early and not add duplicate + config.add_script_volumes(configmap_name="new-scripts") + + # Should still have only one mount, no volume added + assert len(config.volumes) == 0 + assert len(config.volume_mounts) == 1 diff --git a/src/codeflare_sdk/ray/rayjobs/test_rayjob.py b/src/codeflare_sdk/ray/rayjobs/test_rayjob.py index 6827ed03..4074f614 100644 --- a/src/codeflare_sdk/ray/rayjobs/test_rayjob.py +++ b/src/codeflare_sdk/ray/rayjobs/test_rayjob.py @@ -13,6 +13,7 @@ # limitations under the License. import pytest +import os from unittest.mock import MagicMock, patch from codeflare_sdk.common.utils.constants import CUDA_RUNTIME_IMAGE, RAY_VERSION @@ -1143,3 +1144,727 @@ class MockClusterConfig: rayjob._cluster_config = MockClusterConfig() rayjob._validate_cluster_config_image() # Should not raise +def test_extract_script_files_from_entrypoint_single_script(mocker, tmp_path): + """Test extracting a single script file from entrypoint.""" + # Mock kubernetes config loading + mocker.patch("kubernetes.config.load_kube_config") + mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + + # Create a test script + test_script = tmp_path / "test_script.py" + test_script.write_text("print('Hello World!')") + + # Change to temp directory for test + original_cwd = os.getcwd() + os.chdir(tmp_path) + + try: + rayjob = RayJob( + job_name="test-job", + cluster_name="existing-cluster", + entrypoint=f"python {test_script.name}", + namespace="test-namespace", + ) + + scripts = rayjob._extract_script_files_from_entrypoint() + + assert scripts is not None + assert test_script.name in scripts + assert scripts[test_script.name] == "print('Hello World!')" + assert f"/home/ray/scripts/{test_script.name}" in rayjob.entrypoint + finally: + os.chdir(original_cwd) + + +def test_extract_script_files_with_dependencies(mocker, tmp_path): + """Test extracting script files with local dependencies.""" + # Mock kubernetes config loading + mocker.patch("kubernetes.config.load_kube_config") + mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + + # Create main script and dependency + main_script = tmp_path / "main.py" + main_script.write_text( + """ +import helper +from utils import calculate + +def main(): + helper.do_something() + result = calculate(42) + print(f"Result: {result}") + +if __name__ == "__main__": + main() +""" + ) + + helper_script = tmp_path / "helper.py" + helper_script.write_text( + """ +def do_something(): + print("Doing something...") +""" + ) + + utils_script = tmp_path / "utils.py" + utils_script.write_text( + """ +def calculate(x): + return x * 2 +""" + ) + + # Change to temp directory for test + original_cwd = os.getcwd() + os.chdir(tmp_path) + + try: + rayjob = RayJob( + job_name="test-job", + cluster_name="existing-cluster", + entrypoint="python main.py", + namespace="test-namespace", + ) + + scripts = rayjob._extract_script_files_from_entrypoint() + + assert scripts is not None + assert len(scripts) == 3 + assert "main.py" in scripts + assert "helper.py" in scripts + assert "utils.py" in scripts + + # Verify content + assert "import helper" in scripts["main.py"] + assert "def do_something" in scripts["helper.py"] + assert "def calculate" in scripts["utils.py"] + + finally: + os.chdir(original_cwd) + + +def test_extract_script_files_no_local_scripts(mocker): + """Test entrypoint with no local script files.""" + # Mock kubernetes config loading + mocker.patch("kubernetes.config.load_kube_config") + mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + + rayjob = RayJob( + job_name="test-job", + cluster_name="existing-cluster", + entrypoint="python -c 'print(\"hello world\")'", + namespace="test-namespace", + ) + + scripts = rayjob._extract_script_files_from_entrypoint() + + assert scripts is None + + +def test_extract_script_files_nonexistent_script(mocker): + """Test entrypoint referencing non-existent script.""" + # Mock kubernetes config loading + mocker.patch("kubernetes.config.load_kube_config") + mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + + rayjob = RayJob( + job_name="test-job", + cluster_name="existing-cluster", + entrypoint="python nonexistent.py", + namespace="test-namespace", + ) + + scripts = rayjob._extract_script_files_from_entrypoint() + + assert scripts is None + + +def test_build_script_configmap_spec(): + """Test building ConfigMap specification for scripts.""" + from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig + + config = ManagedClusterConfig() + scripts = {"main.py": "print('main')", "helper.py": "print('helper')"} + + spec = config.build_script_configmap_spec( + job_name="test-job", namespace="test-namespace", scripts=scripts + ) + + assert spec["apiVersion"] == "v1" + assert spec["kind"] == "ConfigMap" + assert spec["metadata"]["name"] == "test-job-scripts" + assert spec["metadata"]["namespace"] == "test-namespace" + assert spec["data"] == scripts + + +def test_build_script_volume_specs(): + """Test building volume and mount specifications for scripts.""" + from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig + + config = ManagedClusterConfig() + + volume_spec, mount_spec = config.build_script_volume_specs( + configmap_name="test-scripts", mount_path="/custom/path" + ) + + assert volume_spec["name"] == "ray-job-scripts" + assert volume_spec["configMap"]["name"] == "test-scripts" + + assert mount_spec["name"] == "ray-job-scripts" + assert mount_spec["mountPath"] == "/custom/path" + + +def test_add_script_volumes(): + """Test adding script volumes to cluster configuration.""" + from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig + + config = ManagedClusterConfig() + + # Initially no volumes + assert len(config.volumes) == 0 + assert len(config.volume_mounts) == 0 + + config.add_script_volumes(configmap_name="test-scripts") + + # Should have added one volume and one mount + assert len(config.volumes) == 1 + assert len(config.volume_mounts) == 1 + + volume = config.volumes[0] + mount = config.volume_mounts[0] + + assert volume.name == "ray-job-scripts" + assert volume.config_map.name == "test-scripts" + + assert mount.name == "ray-job-scripts" + assert mount.mount_path == "/home/ray/scripts" + + +def test_add_script_volumes_duplicate_prevention(): + """Test that adding script volumes twice doesn't create duplicates.""" + from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig + + config = ManagedClusterConfig() + + # Add volumes twice + config.add_script_volumes(configmap_name="test-scripts") + config.add_script_volumes(configmap_name="test-scripts") + + # Should still have only one of each + assert len(config.volumes) == 1 + assert len(config.volume_mounts) == 1 + + +def test_create_configmap_from_spec(mocker): + """Test creating ConfigMap via Kubernetes API.""" + # Mock kubernetes config loading + mocker.patch("kubernetes.config.load_kube_config") + mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + + # Mock Kubernetes API + mock_k8s_api = mocker.patch("kubernetes.client.CoreV1Api") + mock_api_instance = MagicMock() + mock_k8s_api.return_value = mock_api_instance + + # Mock get_api_client + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.get_api_client") + + rayjob = RayJob( + job_name="test-job", + cluster_name="existing-cluster", + entrypoint="python test.py", + namespace="test-namespace", + ) + + configmap_spec = { + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": {"name": "test-scripts", "namespace": "test-namespace"}, + "data": {"test.py": "print('test')"}, + } + + result = rayjob._create_configmap_from_spec(configmap_spec) + + assert result == "test-scripts" + mock_api_instance.create_namespaced_config_map.assert_called_once() + + +def test_create_configmap_already_exists(mocker): + """Test creating ConfigMap when it already exists (409 conflict).""" + # Mock kubernetes config loading + mocker.patch("kubernetes.config.load_kube_config") + mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + + # Mock Kubernetes API + mock_k8s_api = mocker.patch("kubernetes.client.CoreV1Api") + mock_api_instance = MagicMock() + mock_k8s_api.return_value = mock_api_instance + + # Mock get_api_client + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.get_api_client") + + # Mock API exception for conflict + from kubernetes.client import ApiException + + mock_api_instance.create_namespaced_config_map.side_effect = ApiException( + status=409 + ) + + rayjob = RayJob( + job_name="test-job", + cluster_name="existing-cluster", + entrypoint="python test.py", + namespace="test-namespace", + ) + + configmap_spec = { + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": {"name": "test-scripts", "namespace": "test-namespace"}, + "data": {"test.py": "print('test')"}, + } + + result = rayjob._create_configmap_from_spec(configmap_spec) + + assert result == "test-scripts" + mock_api_instance.create_namespaced_config_map.assert_called_once() + mock_api_instance.replace_namespaced_config_map.assert_called_once() + + +def test_handle_script_volumes_for_new_cluster(mocker, tmp_path): + """Test handling script volumes for new cluster creation.""" + # Mock kubernetes config loading + mocker.patch("kubernetes.config.load_kube_config") + mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + + # Mock ConfigMap creation + mock_create = mocker.patch.object(RayJob, "_create_configmap_from_spec") + mock_create.return_value = "test-job-scripts" + + # Create test script + test_script = tmp_path / "test.py" + test_script.write_text("print('test')") + + from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig + + cluster_config = ManagedClusterConfig() + + original_cwd = os.getcwd() + os.chdir(tmp_path) + + try: + rayjob = RayJob( + job_name="test-job", + cluster_config=cluster_config, + entrypoint="python test.py", + namespace="test-namespace", + ) + + scripts = {"test.py": "print('test')"} + rayjob._handle_script_volumes_for_new_cluster(scripts) + + # Verify ConfigMap creation was called + mock_create.assert_called_once() + + # Verify volumes were added to cluster config + assert len(cluster_config.volumes) == 1 + assert len(cluster_config.volume_mounts) == 1 + + finally: + os.chdir(original_cwd) + + +def test_ast_parsing_import_detection(mocker, tmp_path): + """Test AST parsing correctly detects import statements.""" + # Mock kubernetes config loading + mocker.patch("kubernetes.config.load_kube_config") + mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + + # Create scripts with different import patterns + main_script = tmp_path / "main.py" + main_script.write_text( + """# Different import patterns +import helper +from utils import func1, func2 +from local_module import MyClass +import os # Standard library - should be ignored +import non_existent # Non-local - should be ignored +""" + ) + + helper_script = tmp_path / "helper.py" + helper_script.write_text("def helper_func(): pass") + + utils_script = tmp_path / "utils.py" + utils_script.write_text( + """def func1(): pass +def func2(): pass +""" + ) + + local_module_script = tmp_path / "local_module.py" + local_module_script.write_text("class MyClass: pass") + + original_cwd = os.getcwd() + os.chdir(tmp_path) + + try: + rayjob = RayJob( + job_name="test-job", + cluster_name="existing-cluster", + entrypoint="python main.py", + namespace="test-namespace", + ) + + scripts = rayjob._extract_script_files_from_entrypoint() + + # Should find all local dependencies + assert scripts is not None + assert len(scripts) == 4 # main + 3 dependencies + assert "main.py" in scripts + assert "helper.py" in scripts + assert "utils.py" in scripts + assert "local_module.py" in scripts + + finally: + os.chdir(original_cwd) + + +def test_rayjob_submit_with_scripts_new_cluster(mocker, tmp_path): + """Test RayJob submission with script detection for new cluster.""" + # Mock kubernetes config loading + mocker.patch("kubernetes.config.load_kube_config") + + # Mock the RayjobApi + mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mock_api_instance = MagicMock() + mock_api_class.return_value = mock_api_instance + mock_api_instance.submit_job.return_value = True + + # Mock ConfigMap creation + mock_k8s_api = mocker.patch("kubernetes.client.CoreV1Api") + mock_k8s_instance = MagicMock() + mock_k8s_api.return_value = mock_k8s_instance + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.get_api_client") + + # Create test script + test_script = tmp_path / "test.py" + test_script.write_text("print('Hello from script!')") + + from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig + + cluster_config = ManagedClusterConfig() + + original_cwd = os.getcwd() + os.chdir(tmp_path) + + try: + rayjob = RayJob( + job_name="test-job", + cluster_config=cluster_config, + entrypoint="python test.py", + namespace="test-namespace", + ) + + # Submit should detect scripts and handle them + result = rayjob.submit() + + assert result == "test-job" + + # Verify ConfigMap was created + mock_k8s_instance.create_namespaced_config_map.assert_called_once() + + # Verify volumes were added + assert len(cluster_config.volumes) == 1 + assert len(cluster_config.volume_mounts) == 1 + + # Verify entrypoint was updated + assert "/home/ray/scripts/test.py" in rayjob.entrypoint + + finally: + os.chdir(original_cwd) + + +def test_process_script_and_imports_io_error(mocker, tmp_path): + """Test _process_script_and_imports handles IO errors gracefully.""" + mocker.patch("kubernetes.config.load_kube_config") + mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + + rayjob = RayJob( + job_name="test-job", + cluster_name="existing-cluster", + entrypoint="python test.py", + namespace="test-namespace", + ) + + scripts = {} + processed_files = set() + + # Mock os.path.isfile to return True but open() to raise IOError + mocker.patch("os.path.isfile", return_value=True) + mocker.patch("builtins.open", side_effect=IOError("Permission denied")) + + # Should handle the error gracefully and not crash + rayjob._process_script_and_imports( + "test.py", scripts, "/home/ray/scripts", processed_files + ) + + # Should add to processed_files but not to scripts (due to error) + assert "test.py" in processed_files + assert len(scripts) == 0 + + +def test_process_script_and_imports_container_path_skip(mocker): + """Test that scripts already in container paths are skipped.""" + mocker.patch("kubernetes.config.load_kube_config") + mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + + rayjob = RayJob( + job_name="test-job", + cluster_name="existing-cluster", + entrypoint="python test.py", + namespace="test-namespace", + ) + + scripts = {} + processed_files = set() + + # Test script path already in container + rayjob._process_script_and_imports( + "/home/ray/scripts/test.py", scripts, "/home/ray/scripts", processed_files + ) + + # Should skip processing + assert len(scripts) == 0 + assert len(processed_files) == 0 + + +def test_process_script_and_imports_already_processed(mocker, tmp_path): + """Test that already processed scripts are skipped (infinite loop prevention).""" + mocker.patch("kubernetes.config.load_kube_config") + mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + + rayjob = RayJob( + job_name="test-job", + cluster_name="existing-cluster", + entrypoint="python test.py", + namespace="test-namespace", + ) + + scripts = {} + processed_files = {"test.py"} # Already processed + + # Should return early without processing + rayjob._process_script_and_imports( + "test.py", scripts, "/home/ray/scripts", processed_files + ) + + # Should remain unchanged + assert len(scripts) == 0 + assert processed_files == {"test.py"} + + +def test_find_local_imports_syntax_error(mocker): + """Test _find_local_imports handles syntax errors gracefully.""" + mocker.patch("kubernetes.config.load_kube_config") + mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + + rayjob = RayJob( + job_name="test-job", + cluster_name="existing-cluster", + entrypoint="python test.py", + namespace="test-namespace", + ) + + # Invalid Python syntax + invalid_script_content = "import helper\ndef invalid_syntax(" + + mock_callback = mocker.Mock() + + # Should handle syntax error gracefully + rayjob._find_local_imports(invalid_script_content, "test.py", mock_callback) + + # Callback should not be called due to syntax error + mock_callback.assert_not_called() + + +def test_create_configmap_api_error_non_409(mocker): + """Test _create_configmap_from_spec handles non-409 API errors.""" + mocker.patch("kubernetes.config.load_kube_config") + mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + + # Mock Kubernetes API with 500 error + mock_k8s_api = mocker.patch("kubernetes.client.CoreV1Api") + mock_api_instance = mocker.Mock() + mock_k8s_api.return_value = mock_api_instance + + from kubernetes.client import ApiException + + mock_api_instance.create_namespaced_config_map.side_effect = ApiException( + status=500 + ) + + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.get_api_client") + + rayjob = RayJob( + job_name="test-job", + cluster_name="existing-cluster", + entrypoint="python test.py", + namespace="test-namespace", + ) + + configmap_spec = { + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": {"name": "test-scripts", "namespace": "test-namespace"}, + "data": {"test.py": "print('test')"}, + } + + # Should raise RuntimeError for non-409 API errors + with pytest.raises(RuntimeError, match="Failed to create ConfigMap"): + rayjob._create_configmap_from_spec(configmap_spec) + + +def test_update_existing_cluster_get_cluster_error(mocker): + """Test _update_existing_cluster_for_scripts handles get cluster errors.""" + mocker.patch("kubernetes.config.load_kube_config") + mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + + # Mock CustomObjectsApi with error + mock_custom_api = mocker.patch("kubernetes.client.CustomObjectsApi") + mock_api_instance = mocker.Mock() + mock_custom_api.return_value = mock_api_instance + + from kubernetes.client import ApiException + + mock_api_instance.get_namespaced_custom_object.side_effect = ApiException( + status=404 + ) + + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.get_api_client") + + from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig + + config_builder = ManagedClusterConfig() + + rayjob = RayJob( + job_name="test-job", + cluster_name="existing-cluster", + entrypoint="python test.py", + namespace="test-namespace", + ) + + # Should raise RuntimeError when getting cluster fails + with pytest.raises(RuntimeError, match="Failed to get RayCluster"): + rayjob._update_existing_cluster_for_scripts("test-scripts", config_builder) + + +def test_update_existing_cluster_patch_error(mocker): + """Test _update_existing_cluster_for_scripts handles patch errors.""" + mocker.patch("kubernetes.config.load_kube_config") + mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + + # Mock CustomObjectsApi + mock_custom_api = mocker.patch("kubernetes.client.CustomObjectsApi") + mock_api_instance = mocker.Mock() + mock_custom_api.return_value = mock_api_instance + + # Mock successful get but failed patch + mock_api_instance.get_namespaced_custom_object.return_value = { + "spec": { + "headGroupSpec": { + "template": { + "spec": {"volumes": [], "containers": [{"volumeMounts": []}]} + } + }, + "workerGroupSpecs": [ + { + "template": { + "spec": {"volumes": [], "containers": [{"volumeMounts": []}]} + } + } + ], + } + } + + from kubernetes.client import ApiException + + mock_api_instance.patch_namespaced_custom_object.side_effect = ApiException( + status=500 + ) + + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.get_api_client") + + from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig + + config_builder = ManagedClusterConfig() + + rayjob = RayJob( + job_name="test-job", + cluster_name="existing-cluster", + entrypoint="python test.py", + namespace="test-namespace", + ) + + # Should raise RuntimeError when patching fails + with pytest.raises(RuntimeError, match="Failed to update RayCluster"): + rayjob._update_existing_cluster_for_scripts("test-scripts", config_builder) + + +def test_extract_script_files_empty_entrypoint(mocker): + """Test script extraction with empty entrypoint.""" + mocker.patch("kubernetes.config.load_kube_config") + mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + + rayjob = RayJob( + job_name="test-job", + cluster_name="existing-cluster", + entrypoint="", # Empty entrypoint + namespace="test-namespace", + ) + + scripts = rayjob._extract_script_files_from_entrypoint() + + assert scripts is None + + +def test_add_script_volumes_existing_volume_skip(): + """Test add_script_volumes skips when volume already exists (missing coverage).""" + from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig + from kubernetes.client import V1Volume, V1ConfigMapVolumeSource + + config = ManagedClusterConfig() + + # Pre-add a volume with same name + existing_volume = V1Volume( + name="ray-job-scripts", + config_map=V1ConfigMapVolumeSource(name="existing-scripts"), + ) + config.volumes.append(existing_volume) + + # Should skip adding duplicate volume + config.add_script_volumes(configmap_name="new-scripts") + + # Should still have only one volume + assert len(config.volumes) == 1 + assert len(config.volume_mounts) == 0 # Mount not added due to volume skip + + +def test_add_script_volumes_existing_mount_skip(): + """Test add_script_volumes skips when mount already exists (missing coverage).""" + from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig + from kubernetes.client import V1VolumeMount + + config = ManagedClusterConfig() + + # Pre-add a mount with same name + existing_mount = V1VolumeMount(name="ray-job-scripts", mount_path="/existing/path") + config.volume_mounts.append(existing_mount) + + # Should skip adding duplicate mount + config.add_script_volumes(configmap_name="new-scripts") + + # Should still have only one mount and no volume added + assert len(config.volumes) == 0 # Volume not added due to mount skip + assert len(config.volume_mounts) == 1 From f0ea509760b441ee318f9d46622681208acb72d7 Mon Sep 17 00:00:00 2001 From: Pat O'Connor Date: Wed, 27 Aug 2025 12:02:48 +0100 Subject: [PATCH 2/4] Changes as per review Signed-off-by: Pat O'Connor --- src/codeflare_sdk/ray/rayjobs/config.py | 7 ++++ src/codeflare_sdk/ray/rayjobs/rayjob.py | 29 +++++++++------- src/codeflare_sdk/ray/rayjobs/test_rayjob.py | 36 +++++++++++--------- 3 files changed, 42 insertions(+), 30 deletions(-) diff --git a/src/codeflare_sdk/ray/rayjobs/config.py b/src/codeflare_sdk/ray/rayjobs/config.py index dcc4d2bf..2bf6746c 100644 --- a/src/codeflare_sdk/ray/rayjobs/config.py +++ b/src/codeflare_sdk/ray/rayjobs/config.py @@ -559,6 +559,13 @@ def add_script_volumes( logger.info( f"Added script volume '{configmap_name}' to cluster config: mount_path={mount_path}" ) + + def validate_configmap_size(self, scripts: Dict[str, str]) -> None: + total_size = sum(len(content.encode("utf-8")) for content in scripts.values()) + if total_size > 1024 * 1024: # 1MB + raise ValueError( + f"ConfigMap size exceeds 1MB limit. Total size: {total_size} bytes" + ) def build_script_configmap_spec( self, job_name: str, namespace: str, scripts: Dict[str, str] diff --git a/src/codeflare_sdk/ray/rayjobs/rayjob.py b/src/codeflare_sdk/ray/rayjobs/rayjob.py index 43562da8..79afc0f7 100644 --- a/src/codeflare_sdk/ray/rayjobs/rayjob.py +++ b/src/codeflare_sdk/ray/rayjobs/rayjob.py @@ -25,7 +25,7 @@ from kubernetes import client from ...common.kubernetes_cluster.auth import get_api_client from python_client.kuberay_job_api import RayjobApi - +from python_client.kuberay_cluster_api import RayClusterApi from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig from ...common.utils import get_current_namespace @@ -41,6 +41,8 @@ logger = logging.getLogger(__name__) +mount_path = "/home/ray/scripts" + class RayJob: """ @@ -148,6 +150,7 @@ def __init__( logger.info(f"Using existing cluster: {self.cluster_name}") self._api = RayjobApi() + self._cluster_api = RayClusterApi() logger.info(f"Initialized RayJob: {self.name} in namespace: {self.namespace}") @@ -351,7 +354,7 @@ def _extract_script_files_from_entrypoint(self) -> Optional[Dict[str, str]]: return None scripts = {} - mount_path = "/home/ray/scripts" + # mount_path = "/home/ray/scripts" processed_files = set() # Avoid infinite loops # Look for Python file patterns in entrypoint (e.g., "python script.py", "python /path/to/script.py") @@ -450,6 +453,9 @@ def _find_local_imports( def _handle_script_volumes_for_new_cluster(self, scripts: Dict[str, str]): """Handle script volumes for new clusters (uses ManagedClusterConfig).""" + # Validate ConfigMap size before creation + self._cluster_config.validate_configmap_size(scripts) + # Build ConfigMap spec using config.py configmap_spec = self._cluster_config.build_script_configmap_spec( job_name=self.name, namespace=self.namespace, scripts=scripts @@ -467,6 +473,9 @@ def _handle_script_volumes_for_existing_cluster(self, scripts: Dict[str, str]): """Handle script volumes for existing clusters (updates RayCluster CR).""" # Create config builder for utility methods config_builder = ManagedClusterConfig() + + # Validate ConfigMap size before creation + config_builder.validate_configmap_size(scripts) # Build ConfigMap spec using config.py configmap_spec = config_builder.build_script_configmap_spec( @@ -535,12 +544,9 @@ def _update_existing_cluster_for_scripts( # Get existing RayCluster api_instance = client.CustomObjectsApi(get_api_client()) try: - ray_cluster = api_instance.get_namespaced_custom_object( - group="ray.io", - version="v1", - namespace=self.namespace, - plural="rayclusters", + ray_cluster = self._cluster_api.get_ray_cluster( name=self.cluster_name, + k8s_namespace=self.namespace, ) except client.ApiException as e: raise RuntimeError(f"Failed to get RayCluster '{self.cluster_name}': {e}") @@ -586,13 +592,10 @@ def mount_exists(mounts_list, mount_name): # Update the RayCluster try: - api_instance.patch_namespaced_custom_object( - group="ray.io", - version="v1", - namespace=self.namespace, - plural="rayclusters", + self._cluster_api.patch_ray_cluster( name=self.cluster_name, - body=ray_cluster, + ray_cluster=ray_cluster, + k8s_namespace=self.namespace, ) logger.info( f"Updated RayCluster '{self.cluster_name}' with script volumes from ConfigMap '{configmap_name}'" diff --git a/src/codeflare_sdk/ray/rayjobs/test_rayjob.py b/src/codeflare_sdk/ray/rayjobs/test_rayjob.py index 4074f614..fe85f22f 100644 --- a/src/codeflare_sdk/ray/rayjobs/test_rayjob.py +++ b/src/codeflare_sdk/ray/rayjobs/test_rayjob.py @@ -32,6 +32,9 @@ def test_rayjob_submit_success(mocker): mock_api_instance = MagicMock() mock_api_class.return_value = mock_api_instance + # Mock the RayClusterApi class + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") + # Configure the mock to return success when submit is called mock_api_instance.submit.return_value = {"metadata": {"name": "test-rayjob"}} @@ -76,6 +79,9 @@ def test_rayjob_submit_failure(mocker): mock_api_instance = MagicMock() mock_api_class.return_value = mock_api_instance + # Mock the RayClusterApi class + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") + # Configure the mock to return failure (False/None) when submit_job is called mock_api_instance.submit_job.return_value = None @@ -1729,21 +1735,19 @@ def test_create_configmap_api_error_non_409(mocker): def test_update_existing_cluster_get_cluster_error(mocker): """Test _update_existing_cluster_for_scripts handles get cluster errors.""" mocker.patch("kubernetes.config.load_kube_config") - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mock_rayjob_api = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - # Mock CustomObjectsApi with error - mock_custom_api = mocker.patch("kubernetes.client.CustomObjectsApi") - mock_api_instance = mocker.Mock() - mock_custom_api.return_value = mock_api_instance + # Mock RayClusterApi with error + mock_cluster_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") + mock_cluster_api_instance = mocker.Mock() + mock_cluster_api_class.return_value = mock_cluster_api_instance from kubernetes.client import ApiException - mock_api_instance.get_namespaced_custom_object.side_effect = ApiException( + mock_cluster_api_instance.get_ray_cluster.side_effect = ApiException( status=404 ) - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.get_api_client") - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig config_builder = ManagedClusterConfig() @@ -1763,15 +1767,15 @@ def test_update_existing_cluster_get_cluster_error(mocker): def test_update_existing_cluster_patch_error(mocker): """Test _update_existing_cluster_for_scripts handles patch errors.""" mocker.patch("kubernetes.config.load_kube_config") - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mock_rayjob_api = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - # Mock CustomObjectsApi - mock_custom_api = mocker.patch("kubernetes.client.CustomObjectsApi") - mock_api_instance = mocker.Mock() - mock_custom_api.return_value = mock_api_instance + # Mock RayClusterApi + mock_cluster_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") + mock_cluster_api_instance = mocker.Mock() + mock_cluster_api_class.return_value = mock_cluster_api_instance # Mock successful get but failed patch - mock_api_instance.get_namespaced_custom_object.return_value = { + mock_cluster_api_instance.get_ray_cluster.return_value = { "spec": { "headGroupSpec": { "template": { @@ -1790,12 +1794,10 @@ def test_update_existing_cluster_patch_error(mocker): from kubernetes.client import ApiException - mock_api_instance.patch_namespaced_custom_object.side_effect = ApiException( + mock_cluster_api_instance.patch_ray_cluster.side_effect = ApiException( status=500 ) - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.get_api_client") - from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig config_builder = ManagedClusterConfig() From 20a61bd079edcc69e6e4930fa67dfd8ec06ebc96 Mon Sep 17 00:00:00 2001 From: Pat O'Connor Date: Wed, 27 Aug 2025 12:51:04 +0100 Subject: [PATCH 3/4] Changes as per review again because I'm dumb Signed-off-by: Pat O'Connor --- src/codeflare_sdk/ray/rayjobs/config.py | 2 +- src/codeflare_sdk/ray/rayjobs/rayjob.py | 6 +++--- src/codeflare_sdk/ray/rayjobs/test_rayjob.py | 18 ++++++++++-------- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/src/codeflare_sdk/ray/rayjobs/config.py b/src/codeflare_sdk/ray/rayjobs/config.py index 2bf6746c..849d7997 100644 --- a/src/codeflare_sdk/ray/rayjobs/config.py +++ b/src/codeflare_sdk/ray/rayjobs/config.py @@ -559,7 +559,7 @@ def add_script_volumes( logger.info( f"Added script volume '{configmap_name}' to cluster config: mount_path={mount_path}" ) - + def validate_configmap_size(self, scripts: Dict[str, str]) -> None: total_size = sum(len(content.encode("utf-8")) for content in scripts.values()) if total_size > 1024 * 1024: # 1MB diff --git a/src/codeflare_sdk/ray/rayjobs/rayjob.py b/src/codeflare_sdk/ray/rayjobs/rayjob.py index 79afc0f7..acf93fdc 100644 --- a/src/codeflare_sdk/ray/rayjobs/rayjob.py +++ b/src/codeflare_sdk/ray/rayjobs/rayjob.py @@ -455,7 +455,7 @@ def _handle_script_volumes_for_new_cluster(self, scripts: Dict[str, str]): """Handle script volumes for new clusters (uses ManagedClusterConfig).""" # Validate ConfigMap size before creation self._cluster_config.validate_configmap_size(scripts) - + # Build ConfigMap spec using config.py configmap_spec = self._cluster_config.build_script_configmap_spec( job_name=self.name, namespace=self.namespace, scripts=scripts @@ -473,7 +473,7 @@ def _handle_script_volumes_for_existing_cluster(self, scripts: Dict[str, str]): """Handle script volumes for existing clusters (updates RayCluster CR).""" # Create config builder for utility methods config_builder = ManagedClusterConfig() - + # Validate ConfigMap size before creation config_builder.validate_configmap_size(scripts) @@ -594,7 +594,7 @@ def mount_exists(mounts_list, mount_name): try: self._cluster_api.patch_ray_cluster( name=self.cluster_name, - ray_cluster=ray_cluster, + ray_patch=ray_cluster, k8s_namespace=self.namespace, ) logger.info( diff --git a/src/codeflare_sdk/ray/rayjobs/test_rayjob.py b/src/codeflare_sdk/ray/rayjobs/test_rayjob.py index fe85f22f..0afd62d5 100644 --- a/src/codeflare_sdk/ray/rayjobs/test_rayjob.py +++ b/src/codeflare_sdk/ray/rayjobs/test_rayjob.py @@ -1150,6 +1150,8 @@ class MockClusterConfig: rayjob._cluster_config = MockClusterConfig() rayjob._validate_cluster_config_image() # Should not raise + + def test_extract_script_files_from_entrypoint_single_script(mocker, tmp_path): """Test extracting a single script file from entrypoint.""" # Mock kubernetes config loading @@ -1738,15 +1740,15 @@ def test_update_existing_cluster_get_cluster_error(mocker): mock_rayjob_api = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") # Mock RayClusterApi with error - mock_cluster_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") + mock_cluster_api_class = mocker.patch( + "codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi" + ) mock_cluster_api_instance = mocker.Mock() mock_cluster_api_class.return_value = mock_cluster_api_instance from kubernetes.client import ApiException - mock_cluster_api_instance.get_ray_cluster.side_effect = ApiException( - status=404 - ) + mock_cluster_api_instance.get_ray_cluster.side_effect = ApiException(status=404) from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig @@ -1770,7 +1772,9 @@ def test_update_existing_cluster_patch_error(mocker): mock_rayjob_api = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") # Mock RayClusterApi - mock_cluster_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") + mock_cluster_api_class = mocker.patch( + "codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi" + ) mock_cluster_api_instance = mocker.Mock() mock_cluster_api_class.return_value = mock_cluster_api_instance @@ -1794,9 +1798,7 @@ def test_update_existing_cluster_patch_error(mocker): from kubernetes.client import ApiException - mock_cluster_api_instance.patch_ray_cluster.side_effect = ApiException( - status=500 - ) + mock_cluster_api_instance.patch_ray_cluster.side_effect = ApiException(status=500) from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig From 7a002f27722aeddd433c79d13e71da813655b417 Mon Sep 17 00:00:00 2001 From: Pat O'Connor Date: Wed, 27 Aug 2025 13:27:17 +0100 Subject: [PATCH 4/4] added kubeconfig loads to test Signed-off-by: Pat O'Connor --- src/codeflare_sdk/ray/rayjobs/test_rayjob.py | 31 ++++++++++++++++++++ src/codeflare_sdk/ray/rayjobs/test_status.py | 14 +++++++++ 2 files changed, 45 insertions(+) diff --git a/src/codeflare_sdk/ray/rayjobs/test_rayjob.py b/src/codeflare_sdk/ray/rayjobs/test_rayjob.py index 0afd62d5..ff7a2639 100644 --- a/src/codeflare_sdk/ray/rayjobs/test_rayjob.py +++ b/src/codeflare_sdk/ray/rayjobs/test_rayjob.py @@ -446,11 +446,13 @@ def test_submit_with_auto_cluster(mocker): def test_namespace_auto_detection_success(mocker): """Test successful namespace auto-detection.""" + mocker.patch("kubernetes.config.load_kube_config") mocker.patch( "codeflare_sdk.ray.rayjobs.rayjob.get_current_namespace", return_value="detected-ns", ) mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") rayjob = RayJob( job_name="test-job", entrypoint="python script.py", cluster_name="test-cluster" @@ -461,10 +463,12 @@ def test_namespace_auto_detection_success(mocker): def test_namespace_auto_detection_fallback(mocker): """Test that namespace auto-detection failure raises an error.""" + mocker.patch("kubernetes.config.load_kube_config") mocker.patch( "codeflare_sdk.ray.rayjobs.rayjob.get_current_namespace", return_value=None ) mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") with pytest.raises(ValueError, match="Could not auto-detect Kubernetes namespace"): RayJob( @@ -476,11 +480,13 @@ def test_namespace_auto_detection_fallback(mocker): def test_namespace_explicit_override(mocker): """Test that explicit namespace overrides auto-detection.""" + mocker.patch("kubernetes.config.load_kube_config") mocker.patch( "codeflare_sdk.ray.rayjobs.rayjob.get_current_namespace", return_value="detected-ns", ) mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") rayjob = RayJob( job_name="test-job", @@ -494,7 +500,9 @@ def test_namespace_explicit_override(mocker): def test_shutdown_behavior_with_cluster_config(mocker): """Test that shutdown_after_job_finishes is True when cluster_config is provided.""" + mocker.patch("kubernetes.config.load_kube_config") mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig @@ -512,7 +520,9 @@ def test_shutdown_behavior_with_cluster_config(mocker): def test_shutdown_behavior_with_existing_cluster(mocker): """Test that shutdown_after_job_finishes is False when using existing cluster.""" + mocker.patch("kubernetes.config.load_kube_config") mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") rayjob = RayJob( job_name="test-job", @@ -526,7 +536,9 @@ def test_shutdown_behavior_with_existing_cluster(mocker): def test_rayjob_with_rayjob_cluster_config(mocker): """Test RayJob with the new ManagedClusterConfig.""" + mocker.patch("kubernetes.config.load_kube_config") mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig @@ -549,7 +561,9 @@ def test_rayjob_with_rayjob_cluster_config(mocker): def test_rayjob_cluster_config_validation(mocker): """Test validation of ManagedClusterConfig parameters.""" + mocker.patch("kubernetes.config.load_kube_config") mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig @@ -621,7 +635,9 @@ def test_build_ray_cluster_spec_integration(mocker): def test_rayjob_with_runtime_env(mocker): """Test RayJob with runtime environment configuration.""" + mocker.patch("kubernetes.config.load_kube_config") mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") runtime_env = {"pip": ["numpy", "pandas"]} @@ -642,7 +658,9 @@ def test_rayjob_with_runtime_env(mocker): def test_rayjob_with_active_deadline_and_ttl(mocker): """Test RayJob with both active deadline and TTL settings.""" + mocker.patch("kubernetes.config.load_kube_config") mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") rayjob = RayJob( job_name="test-job", @@ -664,7 +682,9 @@ def test_rayjob_with_active_deadline_and_ttl(mocker): def test_rayjob_cluster_name_generation_with_config(mocker): """Test cluster name generation when using cluster_config.""" + mocker.patch("kubernetes.config.load_kube_config") mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig @@ -684,7 +704,9 @@ def test_rayjob_cluster_name_generation_with_config(mocker): def test_rayjob_namespace_propagation_to_cluster_config(mocker): """Test that job namespace is propagated to cluster config when None.""" + mocker.patch("kubernetes.config.load_kube_config") mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") from codeflare_sdk.ray.rayjobs.rayjob import get_current_namespace @@ -719,7 +741,9 @@ def test_rayjob_error_handling_invalid_cluster_config(mocker): def test_rayjob_constructor_parameter_validation(mocker): """Test constructor parameter validation.""" + mocker.patch("kubernetes.config.load_kube_config") mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") # Test with valid parameters rayjob = RayJob( @@ -941,7 +965,9 @@ def test_build_ray_cluster_spec_with_image_pull_secrets(mocker): def test_rayjob_user_override_shutdown_behavior(mocker): """Test that user can override the auto-detected shutdown behavior.""" + mocker.patch("kubernetes.config.load_kube_config") mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") # Test 1: User overrides shutdown to True even when using existing cluster rayjob_existing_override = RayJob( @@ -1009,6 +1035,7 @@ def test_submit_with_cluster_config_compatible_image_passes(self, mocker): """Test that submission passes with compatible cluster_config image.""" mocker.patch("kubernetes.config.load_kube_config") mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") mock_api_instance = MagicMock() mock_api_class.return_value = mock_api_instance mock_api_instance.submit_job.return_value = True @@ -1030,6 +1057,7 @@ def test_submit_with_cluster_config_incompatible_image_fails(self, mocker): """Test that submission fails with incompatible cluster_config image.""" mocker.patch("kubernetes.config.load_kube_config") mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") mock_api_instance = MagicMock() mock_api_class.return_value = mock_api_instance @@ -1052,6 +1080,7 @@ def test_validate_ray_version_compatibility_method(self, mocker): """Test the _validate_ray_version_compatibility method directly.""" mocker.patch("kubernetes.config.load_kube_config") mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") mock_api_instance = MagicMock() mock_api_class.return_value = mock_api_instance @@ -1087,6 +1116,7 @@ def test_validate_cluster_config_image_method(self, mocker): """Test the _validate_cluster_config_image method directly.""" mocker.patch("kubernetes.config.load_kube_config") mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") mock_api_instance = MagicMock() mock_api_class.return_value = mock_api_instance @@ -1122,6 +1152,7 @@ def test_validate_cluster_config_image_edge_cases(self, mocker): """Test edge cases in _validate_cluster_config_image method.""" mocker.patch("kubernetes.config.load_kube_config") mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") mock_api_instance = MagicMock() mock_api_class.return_value = mock_api_instance diff --git a/src/codeflare_sdk/ray/rayjobs/test_status.py b/src/codeflare_sdk/ray/rayjobs/test_status.py index 6d2ce946..f3ed7ef8 100644 --- a/src/codeflare_sdk/ray/rayjobs/test_status.py +++ b/src/codeflare_sdk/ray/rayjobs/test_status.py @@ -24,8 +24,11 @@ def test_rayjob_status(mocker): """ Test the RayJob status method with different deployment statuses. """ + # Mock kubernetes config loading + mocker.patch("kubernetes.config.load_kube_config") # Mock the RayjobApi to avoid actual Kubernetes calls mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") mock_api_instance = mock_api_class.return_value # Create a RayJob instance @@ -101,7 +104,9 @@ def test_rayjob_status_unknown_deployment_status(mocker): """ Test handling of unknown deployment status from the API. """ + mocker.patch("kubernetes.config.load_kube_config") mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") mock_api_instance = mock_api_class.return_value rayjob = RayJob( @@ -129,7 +134,9 @@ def test_rayjob_status_missing_fields(mocker): """ Test handling of API response with missing fields. """ + mocker.patch("kubernetes.config.load_kube_config") mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") mock_api_instance = mock_api_class.return_value rayjob = RayJob( @@ -154,8 +161,11 @@ def test_map_to_codeflare_status(mocker): """ Test the _map_to_codeflare_status helper method directly. """ + # Mock kubernetes config loading + mocker.patch("kubernetes.config.load_kube_config") # Mock the RayjobApi constructor to avoid authentication issues mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") rayjob = RayJob( job_name="test-job", @@ -217,8 +227,10 @@ def test_rayjob_status_print_no_job_found(mocker): """ Test that pretty_print.print_no_job_found is called when no job is found and print_to_console=True. """ + mocker.patch("kubernetes.config.load_kube_config") # Mock the RayjobApi and pretty_print mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") mock_api_instance = mock_api_class.return_value mock_print_no_job_found = mocker.patch( "codeflare_sdk.ray.rayjobs.pretty_print.print_no_job_found" @@ -248,8 +260,10 @@ def test_rayjob_status_print_job_found(mocker): """ Test that pretty_print.print_job_status is called when job is found and print_to_console=True. """ + mocker.patch("kubernetes.config.load_kube_config") # Mock the RayjobApi and pretty_print mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") mock_api_instance = mock_api_class.return_value mock_print_job_status = mocker.patch( "codeflare_sdk.ray.rayjobs.pretty_print.print_job_status"