From a7c77696d5837ed81c1a3a2e86b4ebc81bea8f1c Mon Sep 17 00:00:00 2001 From: Oviya Seeniraj Date: Mon, 1 Dec 2025 16:17:55 -0800 Subject: [PATCH 1/4] use hostPath, file based toggle, and env vars to enable node-specific cuda fault injections w/o requiring restarts or force deletion (besides initial setup) Signed-off-by: Oviya Seeniraj --- .../cuda-fault-injection/README.md | 22 +- .../cuda-fault-injection/cuda_intercept.c | 40 ++- .../cuda-fault-injection/inject_into_pods.py | 86 +++++- .../helpers/cuda_fault_injection.py | 247 +++++++++++++++++- 4 files changed, 375 insertions(+), 20 deletions(-) diff --git a/tests/fault_tolerance/hardware/fault-injection-service/cuda-fault-injection/README.md b/tests/fault_tolerance/hardware/fault-injection-service/cuda-fault-injection/README.md index 59b4216ab8..7fd039509e 100644 --- a/tests/fault_tolerance/hardware/fault-injection-service/cuda-fault-injection/README.md +++ b/tests/fault_tolerance/hardware/fault-injection-service/cuda-fault-injection/README.md @@ -6,13 +6,16 @@ ## What This Does -Makes CUDA calls return error codes to simulate various GPU failures. Uses LD_PRELOAD to intercept CUDA library calls. +Intercepts CUDA calls to simulate GPU failures using LD_PRELOAD. Faults persist across pod restarts via hostPath volumes, enabling realistic hardware failure testing. ``` -Pod calls cudaMalloc() → LD_PRELOAD intercepts → Returns error → Pod crashes +Pod calls cudaMalloc() → LD_PRELOAD intercepts → Checks /host-fault/cuda_fault_enabled → Returns error → Pod crashes ``` -**Result**: Realistic GPU failure testing without hardware damage. +**Key Features**: +- **Persistent faults**: hostPath volume (`/var/lib/cuda-fault-test`) survives pod restarts on same node +- **Runtime toggle**: Enable/disable faults without pod restarts via `/host-fault/cuda_fault_enabled` +- **Node-specific**: Faults only on target node, healthy nodes unaffected ## Scope @@ -35,13 +38,20 @@ This library simulates **software/orchestration-level failures** that occur when | **43** | GPU stopped responding | `CUDA_ERROR_LAUNCH_TIMEOUT` | Hung kernel | | **74** | NVLink error | `CUDA_ERROR_PEER_ACCESS_UNSUPPORTED` | Multi-GPU communication failure | +## How It Works + +1. **Deployment patching**: Adds hostPath volume + init container to compile library +2. **LD_PRELOAD injection**: Environment variable loads library before CUDA +3. **Runtime control**: Toggle file (`/host-fault/cuda_fault_enabled`) controls fault state +4. **Node persistence**: hostPath ensures faults survive pod restarts on same node + ## Files in This Directory | File | Purpose | |------|---------| -| `cuda_intercept.c` | C library source that intercepts CUDA calls | -| `inject_into_pods.py` | Helper functions for patching Kubernetes deployments | -| `Makefile` | Builds the `.so` library locally (optional, for standalone testing) | +| `cuda_intercept.c` | C library that intercepts CUDA calls and checks fault markers | +| `inject_into_pods.py` | Kubernetes deployment patcher (adds hostPath volume + library) | +| `Makefile` | Local build (optional, for testing) | ## Prerequisites diff --git a/tests/fault_tolerance/hardware/fault-injection-service/cuda-fault-injection/cuda_intercept.c b/tests/fault_tolerance/hardware/fault-injection-service/cuda-fault-injection/cuda_intercept.c index 1052eeda05..9cb2c7cc80 100644 --- a/tests/fault_tolerance/hardware/fault-injection-service/cuda-fault-injection/cuda_intercept.c +++ b/tests/fault_tolerance/hardware/fault-injection-service/cuda-fault-injection/cuda_intercept.c @@ -59,19 +59,20 @@ static const xid_mapping_t xid_mappings[] = { }; // Get XID type and corresponding CUDA error +// Supports runtime toggling via /tmp/cuda_fault_enabled file static void get_fault_config(int* inject, int* xid_type, cudaError_t* error_code) { static int initialized = 0; - static int cached_inject = 0; + static int env_inject = 0; // From environment variable static int cached_xid = 79; // Default to XID 79 static cudaError_t cached_error = cudaErrorNoDevice; if (!initialized) { - // Check if injection is enabled + // Check if injection is enabled via environment char* env = getenv("CUDA_FAULT_INJECTION_ENABLED"); if (env) { - cached_inject = (strcmp(env, "1") == 0 || strcmp(env, "true") == 0); + env_inject = (strcmp(env, "1") == 0 || strcmp(env, "true") == 0); } // Get XID type @@ -85,7 +86,7 @@ get_fault_config(int* inject, int* xid_type, cudaError_t* error_code) if (xid_mappings[i].xid == cached_xid) { cached_error = xid_mappings[i].cuda_error; fprintf( - stderr, "[CUDA FAULT INJECTION] ENABLED - Simulating XID %d (%s)\n", cached_xid, + stderr, "[CUDA FAULT INJECTION] Library loaded - XID %d (%s)\n", cached_xid, xid_mappings[i].description); found = 1; break; @@ -97,16 +98,37 @@ get_fault_config(int* inject, int* xid_type, cudaError_t* error_code) cached_xid = 79; cached_error = cudaErrorNoDevice; } - } else { - fprintf( - stderr, "[CUDA FAULT INJECTION] %s (default: XID 79 - GPU fell off bus)\n", - cached_inject ? "ENABLED" : "DISABLED"); } initialized = 1; } - *inject = cached_inject; + // Runtime toggle: Check node-persistent fault marker on EVERY call + // Use hostPath (/host-fault) so fault persists across pod restarts on same node + // Pod reschedules to different node → no file there → automatic recovery! + int runtime_inject = env_inject; // Default to env var + + // Check hostPath first (persistent across restarts on same node) + FILE* toggle_file = fopen("/host-fault/cuda_fault_enabled", "r"); + if (toggle_file) { + char toggle_value[4] = {0}; + if (fgets(toggle_value, sizeof(toggle_value), toggle_file)) { + runtime_inject = (toggle_value[0] == '1'); + } + fclose(toggle_file); + } else { + // Fallback to ephemeral /tmp for backwards compatibility + toggle_file = fopen("/tmp/cuda_fault_enabled", "r"); + if (toggle_file) { + char toggle_value[4] = {0}; + if (fgets(toggle_value, sizeof(toggle_value), toggle_file)) { + runtime_inject = (toggle_value[0] == '1'); + } + fclose(toggle_file); + } + } + + *inject = runtime_inject; *xid_type = cached_xid; *error_code = cached_error; } diff --git a/tests/fault_tolerance/hardware/fault-injection-service/cuda-fault-injection/inject_into_pods.py b/tests/fault_tolerance/hardware/fault-injection-service/cuda-fault-injection/inject_into_pods.py index 552ed46ee4..90d32ad91a 100755 --- a/tests/fault_tolerance/hardware/fault-injection-service/cuda-fault-injection/inject_into_pods.py +++ b/tests/fault_tolerance/hardware/fault-injection-service/cuda-fault-injection/inject_into_pods.py @@ -200,6 +200,18 @@ def _patch_service_for_injection( service["extraPodSpec"]["volumes"].append( {"name": "cuda-fault-lib", "emptyDir": {}} ) + + # Add hostPath volume for persistent fault marker (survives pod restarts on same node) + # This simulates persistent hardware failure! + service["extraPodSpec"]["volumes"].append( + { + "name": "node-fault-marker", + "hostPath": { + "path": "/var/lib/cuda-fault-test", + "type": "DirectoryOrCreate" + } + } + ) # Add init container to decode base64 if "initContainers" not in service["extraPodSpec"]: @@ -247,7 +259,7 @@ def _patch_service_for_injection( if vm.get("name") != "cuda-fault-lib" ] - # Add mount + # Add mount for compiled library service["extraPodSpec"]["mainContainer"]["volumeMounts"].append( { "name": "cuda-fault-lib", @@ -255,9 +267,19 @@ def _patch_service_for_injection( "readOnly": True, } ) + + # Add mount for persistent fault marker (hostPath) + service["extraPodSpec"]["mainContainer"]["volumeMounts"].append( + { + "name": "node-fault-marker", + "mountPath": "/host-fault", + "readOnly": False, # Need write access + } + ) print(" ✓ Added init container to compile library") print(" ✓ Added ConfigMap volume mount") + print(" ✓ Added hostPath volume for persistent fault marker") # Add node affinity to pin pods to target node (simulates real XID 79 behavior) if target_node and enable: @@ -287,14 +309,14 @@ def _patch_service_for_injection( service["extraPodSpec"]["volumes"] = [ v for v in service["extraPodSpec"]["volumes"] - if v.get("name") not in ["cuda-fault-lib", "cuda-fault-lib-source"] + if v.get("name") not in ["cuda-fault-lib", "cuda-fault-lib-source", "node-fault-marker"] ] if "volumeMounts" in service["extraPodSpec"].get("mainContainer", {}): service["extraPodSpec"]["mainContainer"]["volumeMounts"] = [ vm for vm in service["extraPodSpec"]["mainContainer"]["volumeMounts"] - if vm.get("name") != "cuda-fault-lib" + if vm.get("name") not in ["cuda-fault-lib", "node-fault-marker"] ] # Remove init container @@ -323,6 +345,7 @@ def patch_deployment_env( use_configmap=True, target_node=None, xid_type=79, + passthrough_mode=False, ): """Patch deployment to add/remove LD_PRELOAD environment variable. @@ -334,6 +357,8 @@ def patch_deployment_env( target_node: If provided, adds node affinity to pin pods to this node (simulates real XID where pods crash on the faulty node) xid_type: XID error type to simulate (79, 48, 94, 95, 43, 74). Default: 79 + passthrough_mode: If True, set CUDA_FAULT_INJECTION_ENABLED=0 (library loaded but disabled) + Allows baseline testing before enabling faults via toggle """ custom_api = client.CustomObjectsApi() apps_api = client.AppsV1Api() @@ -385,9 +410,11 @@ def patch_deployment_env( # Prepare environment variables new_envs = [] if enable: + # Set CUDA_FAULT_INJECTION_ENABLED based on passthrough_mode + fault_enabled_value = "0" if passthrough_mode else "1" new_envs = [ {"name": "LD_PRELOAD", "value": lib_path}, - {"name": "CUDA_FAULT_INJECTION_ENABLED", "value": "1"}, + {"name": "CUDA_FAULT_INJECTION_ENABLED", "value": fault_enabled_value}, {"name": "CUDA_XID_TYPE", "value": str(xid_type)}, ] @@ -400,6 +427,31 @@ def patch_deployment_env( available_services = list(services.keys()) print(f" → Available services: {available_services}") + # Set aggressive update strategy when enabling (allow all pods to update at once) + # This ensures all pods get CUDA faults, not just the first few + if enable: + if "updateStrategy" not in spec: + spec["updateStrategy"] = {} + if "rollingUpdate" not in spec["updateStrategy"]: + spec["updateStrategy"]["rollingUpdate"] = {} + + # Allow all pods to be unavailable during update + spec["updateStrategy"]["rollingUpdate"]["maxUnavailable"] = "100%" + # Don't create surge pods + spec["updateStrategy"]["rollingUpdate"]["maxSurge"] = 0 + print(" → Set update strategy: maxUnavailable=100%, maxSurge=0") + print(" (All pods will update simultaneously)") + else: + # Restore default update strategy when disabling + if "updateStrategy" in spec: + spec["updateStrategy"] = { + "rollingUpdate": { + "maxUnavailable": "25%", + "maxSurge": "25%" + } + } + print(" → Restored default update strategy (maxUnavailable=25%)") + for service_name in services_to_patch: if service_name in services: print(f" → Patching service: {service_name}") @@ -465,6 +517,32 @@ def patch_deployment_env( print(f" Services patched: {', '.join(patched_services)}") if use_configmap and enable: print(f" Library mounted at: {lib_path}") + + # Force restart all worker pods when enabling to apply changes immediately + if enable: + print(" → Force-deleting all worker pods to apply changes immediately...") + core_api = client.CoreV1Api() + try: + worker_pods = core_api.list_namespaced_pod( + namespace=namespace, + label_selector=f"nvidia.com/dynamo-graph-deployment-name={deployment_name},nvidia.com/dynamo-component-type=worker" + ) + deleted_count = 0 + for pod in worker_pods.items: + try: + core_api.delete_namespaced_pod( + name=pod.metadata.name, + namespace=namespace, + grace_period_seconds=0 + ) + deleted_count += 1 + except Exception as e: + print(f" ⚠ Could not delete pod {pod.metadata.name}: {e}") + print(f" ✓ Deleted {deleted_count} pod(s) - they will restart with CUDA library") + except Exception as e: + print(f" ⚠ Could not list/delete pods: {e}") + print(" Pods will eventually restart, but may take longer") + return True except ApiException as e: diff --git a/tests/fault_tolerance/hardware/fault-injection-service/helpers/cuda_fault_injection.py b/tests/fault_tolerance/hardware/fault-injection-service/helpers/cuda_fault_injection.py index 92e2629aa2..81a0561a34 100644 --- a/tests/fault_tolerance/hardware/fault-injection-service/helpers/cuda_fault_injection.py +++ b/tests/fault_tolerance/hardware/fault-injection-service/helpers/cuda_fault_injection.py @@ -37,7 +37,7 @@ def __init__(self, lib_dir: Optional[Path] = None): lib_dir = Path(__file__).parent.parent / "cuda-fault-injection" self.lib_dir = lib_dir - self.lib_path = lib_dir / "fake_cuda_xid79.so" + self.lib_path = lib_dir / "cuda_intercept.so" self.lib_built = False def build_library(self) -> bool: @@ -101,12 +101,57 @@ def create_configmap_with_library(self, namespace: str) -> bool: traceback.print_exc() return False + def check_if_cuda_library_deployed( + self, deployment_name: str, namespace: str + ) -> bool: + """ + Check if CUDA fault injection is already deployed to the deployment. + + Args: + deployment_name: Name of the deployment + namespace: Kubernetes namespace + + Returns: + True if CUDA fault library is already deployed, False otherwise + """ + try: + k8s_custom = client.CustomObjectsApi() + + # Get the DynamoGraphDeployment + dgd = k8s_custom.get_namespaced_custom_object( + group="nvidia.com", + version="v1alpha1", + namespace=namespace, + plural="dynamographdeployments", + name=deployment_name, + ) + + # Check for LD_PRELOAD in worker container env + spec = dgd.get("spec", {}) + worker_spec = spec.get("workerSpec", {}) + pod_spec = worker_spec.get("podSpec", {}) + containers = pod_spec.get("containers", []) + + for container in containers: + if container.get("name") in ["vllm-worker", "worker"]: + env = container.get("env", []) + for env_var in env: + if env_var.get("name") == "LD_PRELOAD": + return True + + return False + + except Exception as e: + # If we can't read the deployment, assume it's not deployed + return False + def patch_deployment_for_cuda_fault( self, deployment_name: str, namespace: str, target_node: Optional[str] = None, xid_type: int = 79, + passthrough_mode: bool = False, ) -> bool: """ Patch deployment to enable CUDA fault injection. @@ -116,7 +161,12 @@ def patch_deployment_for_cuda_fault( - Init container to compile library - LD_PRELOAD environment variable - CUDA_XID_TYPE environment variable + - CUDA_FAULT_INJECTION_ENABLED (0 in passthrough mode, 1 otherwise) - Node affinity (if target_node specified) + + Args: + passthrough_mode: If True, set CUDA_FAULT_INJECTION_ENABLED=0 + (library loaded but faults disabled for baseline) Args: deployment_name: Name of the deployment @@ -149,6 +199,7 @@ def patch_deployment_for_cuda_fault( use_configmap=True, target_node=target_node, xid_type=xid_type, + passthrough_mode=passthrough_mode, ) except Exception as e: @@ -339,6 +390,200 @@ def cleanup_cuda_fault_injection( traceback.print_exc() return False + def enable_cuda_faults_via_toggle( + self, pods: List, namespace: str, enable: bool = True + ) -> bool: + """ + Enable or disable CUDA faults on running pods via environment variable toggle. + + This modifies the CUDA_FAULT_INJECTION_ENABLED env var in running pods + without restarting them. Requires the CUDA library to already be loaded. + + Args: + pods: List of pods to toggle faults on + namespace: Kubernetes namespace + enable: True to enable faults, False to disable + + Returns: + True if toggle succeeded + """ + if not pods: + return False + + from kubernetes.stream import stream as k8s_stream + + k8s_core = client.CoreV1Api() + toggle_value = "1" if enable else "0" + action = "Enabling" if enable else "Disabling" + + print(f"\n[→] {action} CUDA faults via toggle on {len(pods)} pods...") + + success_count = 0 + failed_pods = [] + + for pod in pods: + pod_name = pod.metadata.name + + try: + # Get the main container name from pod spec + container_name = pod.spec.containers[0].name if pod.spec.containers else None + if not container_name: + failed_pods.append((pod_name, "No container found")) + continue + + # Write toggle file to hostPath (persists across pod restarts on same node) + # This simulates persistent hardware failure! + exec_command = ['sh', '-c', f'mkdir -p /host-fault && echo "{toggle_value}" > /host-fault/cuda_fault_enabled && cat /host-fault/cuda_fault_enabled'] + + result = subprocess.run( + ['kubectl', 'exec', '-n', namespace, pod_name, '-c', container_name, '--'] + exec_command, + capture_output=True, + text=True, + timeout=10 + ) + + if result.returncode == 0: + actual_value = result.stdout.strip() + if actual_value == toggle_value: + print(f" ✓ Toggle={toggle_value} in {pod_name}/{container_name}") + success_count += 1 + else: + failed_pods.append((pod_name, f"Verify failed: expected '{toggle_value}', got '{actual_value}'")) + else: + failed_pods.append((pod_name, f"Exec failed: {result.stderr.strip()}")) + + except Exception as e: + failed_pods.append((pod_name, str(e))) + continue + + if failed_pods: + print(f" ⚠ Failed to toggle {len(failed_pods)} pods:") + for pod_name, error in failed_pods: + print(f" - {pod_name}: {error}") + + print(f" → Result: {success_count}/{len(pods)} pods toggled successfully") + return success_count > 0 + + def disable_cuda_faults_via_toggle( + self, pods: List[client.V1Pod], namespace: str + ) -> bool: + """ + Disable CUDA faults on running pods via toggle. + + Args: + pods: List of pod objects to disable faults on + namespace: Kubernetes namespace + + Returns: + True if disable succeeded + """ + return self.enable_cuda_faults_via_toggle(pods, namespace, enable=False) + + def cleanup_node_fault_markers(self, pods: List[client.V1Pod], namespace: str) -> bool: + """ + Remove persistent fault marker files from node hostPath. + This cleans up /host-fault/cuda_fault_enabled to prevent future tests from failing. + + Args: + pods: List of pods (to access nodes) + namespace: Kubernetes namespace + + Returns: + True if cleanup succeeded + """ + if not pods: + return True + + print(f" [->] Cleaning persistent fault markers from nodes...") + + success_count = 0 + nodes_cleaned = set() + + for pod in pods: + pod_name = pod.metadata.name + node_name = pod.spec.node_name + + # Skip if we already cleaned this node + if node_name in nodes_cleaned: + continue + + try: + container_name = pod.spec.containers[0].name if pod.spec.containers else None + if not container_name: + continue + + # Remove the persistent marker file from hostPath + exec_command = ['sh', '-c', 'rm -f /host-fault/cuda_fault_enabled 2>/dev/null; echo "ok"'] + + result = subprocess.run( + ['kubectl', 'exec', '-n', namespace, pod_name, '-c', container_name, '--'] + exec_command, + capture_output=True, + text=True, + timeout=10 + ) + + if result.returncode == 0: + print(f" ✓ Cleaned fault marker on node {node_name}") + nodes_cleaned.add(node_name) + success_count += 1 + + except Exception as e: + continue + + return success_count > 0 + + def verify_env_var_set( + self, deployment_name: str, namespace: str, expected_value: str, max_wait: int = 30 + ) -> bool: + """ + Verify that CUDA_FAULT_INJECTION_ENABLED env var is set to expected value. + Polls until the value matches or timeout. + + Args: + deployment_name: Name of the DynamoGraphDeployment + namespace: Kubernetes namespace + expected_value: Expected value ("0" or "1") + max_wait: Maximum seconds to wait + + Returns: + True if verified + """ + k8s_custom = client.CustomObjectsApi() + start_time = time.time() + + while time.time() - start_time < max_wait: + try: + dgd = k8s_custom.get_namespaced_custom_object( + group="nvidia.com", + version="v1alpha1", + namespace=namespace, + plural="dynamographdeployments", + name=deployment_name, + ) + + # Check both worker services + for service_name in ["VllmDecodeWorker", "VllmPrefillWorker"]: + if service_name in dgd["spec"]["services"]: + service = dgd["spec"]["services"][service_name] + env_vars = service.get("extraPodSpec", {}).get("mainContainer", {}).get("env", []) + + for env_var in env_vars: + if env_var.get("name") == "CUDA_FAULT_INJECTION_ENABLED": + if env_var.get("value") != expected_value: + time.sleep(1) + break # Try again + else: + continue # This service is good + break # Inner loop broke, try again + else: + # All services verified + return True + + except Exception: + time.sleep(1) + + return False + def trigger_pod_restart(self, pods: List[client.V1Pod], namespace: str): """ Delete pods to trigger restart with new env vars. From dd33678ba46d4036c964f9b7f56452e754cd7078 Mon Sep 17 00:00:00 2001 From: Oviya Seeniraj Date: Mon, 1 Dec 2025 18:34:22 -0800 Subject: [PATCH 2/4] style: apply black, clang-format, and ruff formatting fixes Signed-off-by: Oviya Seeniraj --- .../cuda-fault-injection/cuda_intercept.c | 7 +- .../cuda-fault-injection/inject_into_pods.py | 43 +++--- .../helpers/cuda_fault_injection.py | 146 ++++++++++++------ 3 files changed, 125 insertions(+), 71 deletions(-) diff --git a/tests/fault_tolerance/hardware/fault-injection-service/cuda-fault-injection/cuda_intercept.c b/tests/fault_tolerance/hardware/fault-injection-service/cuda-fault-injection/cuda_intercept.c index 9cb2c7cc80..dfc05a8e79 100644 --- a/tests/fault_tolerance/hardware/fault-injection-service/cuda-fault-injection/cuda_intercept.c +++ b/tests/fault_tolerance/hardware/fault-injection-service/cuda-fault-injection/cuda_intercept.c @@ -64,7 +64,7 @@ static void get_fault_config(int* inject, int* xid_type, cudaError_t* error_code) { static int initialized = 0; - static int env_inject = 0; // From environment variable + static int env_inject = 0; // From environment variable static int cached_xid = 79; // Default to XID 79 static cudaError_t cached_error = cudaErrorNoDevice; @@ -86,8 +86,7 @@ get_fault_config(int* inject, int* xid_type, cudaError_t* error_code) if (xid_mappings[i].xid == cached_xid) { cached_error = xid_mappings[i].cuda_error; fprintf( - stderr, "[CUDA FAULT INJECTION] Library loaded - XID %d (%s)\n", cached_xid, - xid_mappings[i].description); + stderr, "[CUDA FAULT INJECTION] Library loaded - XID %d (%s)\n", cached_xid, xid_mappings[i].description); found = 1; break; } @@ -107,7 +106,7 @@ get_fault_config(int* inject, int* xid_type, cudaError_t* error_code) // Use hostPath (/host-fault) so fault persists across pod restarts on same node // Pod reschedules to different node → no file there → automatic recovery! int runtime_inject = env_inject; // Default to env var - + // Check hostPath first (persistent across restarts on same node) FILE* toggle_file = fopen("/host-fault/cuda_fault_enabled", "r"); if (toggle_file) { diff --git a/tests/fault_tolerance/hardware/fault-injection-service/cuda-fault-injection/inject_into_pods.py b/tests/fault_tolerance/hardware/fault-injection-service/cuda-fault-injection/inject_into_pods.py index 90d32ad91a..279062810c 100755 --- a/tests/fault_tolerance/hardware/fault-injection-service/cuda-fault-injection/inject_into_pods.py +++ b/tests/fault_tolerance/hardware/fault-injection-service/cuda-fault-injection/inject_into_pods.py @@ -200,7 +200,7 @@ def _patch_service_for_injection( service["extraPodSpec"]["volumes"].append( {"name": "cuda-fault-lib", "emptyDir": {}} ) - + # Add hostPath volume for persistent fault marker (survives pod restarts on same node) # This simulates persistent hardware failure! service["extraPodSpec"]["volumes"].append( @@ -208,8 +208,8 @@ def _patch_service_for_injection( "name": "node-fault-marker", "hostPath": { "path": "/var/lib/cuda-fault-test", - "type": "DirectoryOrCreate" - } + "type": "DirectoryOrCreate", + }, } ) @@ -267,7 +267,7 @@ def _patch_service_for_injection( "readOnly": True, } ) - + # Add mount for persistent fault marker (hostPath) service["extraPodSpec"]["mainContainer"]["volumeMounts"].append( { @@ -309,7 +309,8 @@ def _patch_service_for_injection( service["extraPodSpec"]["volumes"] = [ v for v in service["extraPodSpec"]["volumes"] - if v.get("name") not in ["cuda-fault-lib", "cuda-fault-lib-source", "node-fault-marker"] + if v.get("name") + not in ["cuda-fault-lib", "cuda-fault-lib-source", "node-fault-marker"] ] if "volumeMounts" in service["extraPodSpec"].get("mainContainer", {}): @@ -414,7 +415,10 @@ def patch_deployment_env( fault_enabled_value = "0" if passthrough_mode else "1" new_envs = [ {"name": "LD_PRELOAD", "value": lib_path}, - {"name": "CUDA_FAULT_INJECTION_ENABLED", "value": fault_enabled_value}, + { + "name": "CUDA_FAULT_INJECTION_ENABLED", + "value": fault_enabled_value, + }, {"name": "CUDA_XID_TYPE", "value": str(xid_type)}, ] @@ -434,7 +438,7 @@ def patch_deployment_env( spec["updateStrategy"] = {} if "rollingUpdate" not in spec["updateStrategy"]: spec["updateStrategy"]["rollingUpdate"] = {} - + # Allow all pods to be unavailable during update spec["updateStrategy"]["rollingUpdate"]["maxUnavailable"] = "100%" # Don't create surge pods @@ -445,10 +449,7 @@ def patch_deployment_env( # Restore default update strategy when disabling if "updateStrategy" in spec: spec["updateStrategy"] = { - "rollingUpdate": { - "maxUnavailable": "25%", - "maxSurge": "25%" - } + "rollingUpdate": {"maxUnavailable": "25%", "maxSurge": "25%"} } print(" → Restored default update strategy (maxUnavailable=25%)") @@ -517,15 +518,17 @@ def patch_deployment_env( print(f" Services patched: {', '.join(patched_services)}") if use_configmap and enable: print(f" Library mounted at: {lib_path}") - + # Force restart all worker pods when enabling to apply changes immediately if enable: - print(" → Force-deleting all worker pods to apply changes immediately...") + print( + " → Force-deleting all worker pods to apply changes immediately..." + ) core_api = client.CoreV1Api() try: worker_pods = core_api.list_namespaced_pod( namespace=namespace, - label_selector=f"nvidia.com/dynamo-graph-deployment-name={deployment_name},nvidia.com/dynamo-component-type=worker" + label_selector=f"nvidia.com/dynamo-graph-deployment-name={deployment_name},nvidia.com/dynamo-component-type=worker", ) deleted_count = 0 for pod in worker_pods.items: @@ -533,16 +536,20 @@ def patch_deployment_env( core_api.delete_namespaced_pod( name=pod.metadata.name, namespace=namespace, - grace_period_seconds=0 + grace_period_seconds=0, ) deleted_count += 1 except Exception as e: - print(f" ⚠ Could not delete pod {pod.metadata.name}: {e}") - print(f" ✓ Deleted {deleted_count} pod(s) - they will restart with CUDA library") + print( + f" ⚠ Could not delete pod {pod.metadata.name}: {e}" + ) + print( + f" ✓ Deleted {deleted_count} pod(s) - they will restart with CUDA library" + ) except Exception as e: print(f" ⚠ Could not list/delete pods: {e}") print(" Pods will eventually restart, but may take longer") - + return True except ApiException as e: diff --git a/tests/fault_tolerance/hardware/fault-injection-service/helpers/cuda_fault_injection.py b/tests/fault_tolerance/hardware/fault-injection-service/helpers/cuda_fault_injection.py index 81a0561a34..68dabd37af 100644 --- a/tests/fault_tolerance/hardware/fault-injection-service/helpers/cuda_fault_injection.py +++ b/tests/fault_tolerance/hardware/fault-injection-service/helpers/cuda_fault_injection.py @@ -116,7 +116,7 @@ def check_if_cuda_library_deployed( """ try: k8s_custom = client.CustomObjectsApi() - + # Get the DynamoGraphDeployment dgd = k8s_custom.get_namespaced_custom_object( group="nvidia.com", @@ -141,7 +141,7 @@ def check_if_cuda_library_deployed( return False - except Exception as e: + except Exception: # If we can't read the deployment, assume it's not deployed return False @@ -163,7 +163,7 @@ def patch_deployment_for_cuda_fault( - CUDA_XID_TYPE environment variable - CUDA_FAULT_INJECTION_ENABLED (0 in passthrough mode, 1 otherwise) - Node affinity (if target_node specified) - + Args: passthrough_mode: If True, set CUDA_FAULT_INJECTION_ENABLED=0 (library loaded but faults disabled for baseline) @@ -410,9 +410,6 @@ def enable_cuda_faults_via_toggle( if not pods: return False - from kubernetes.stream import stream as k8s_stream - - k8s_core = client.CoreV1Api() toggle_value = "1" if enable else "0" action = "Enabling" if enable else "Disabling" @@ -420,38 +417,63 @@ def enable_cuda_faults_via_toggle( success_count = 0 failed_pods = [] - + for pod in pods: pod_name = pod.metadata.name - + try: # Get the main container name from pod spec - container_name = pod.spec.containers[0].name if pod.spec.containers else None + container_name = ( + pod.spec.containers[0].name if pod.spec.containers else None + ) if not container_name: failed_pods.append((pod_name, "No container found")) continue - + # Write toggle file to hostPath (persists across pod restarts on same node) # This simulates persistent hardware failure! - exec_command = ['sh', '-c', f'mkdir -p /host-fault && echo "{toggle_value}" > /host-fault/cuda_fault_enabled && cat /host-fault/cuda_fault_enabled'] - + exec_command = [ + "sh", + "-c", + f'mkdir -p /host-fault && echo "{toggle_value}" > /host-fault/cuda_fault_enabled && cat /host-fault/cuda_fault_enabled', + ] + result = subprocess.run( - ['kubectl', 'exec', '-n', namespace, pod_name, '-c', container_name, '--'] + exec_command, + [ + "kubectl", + "exec", + "-n", + namespace, + pod_name, + "-c", + container_name, + "--", + ] + + exec_command, capture_output=True, text=True, - timeout=10 + timeout=10, ) - + if result.returncode == 0: actual_value = result.stdout.strip() if actual_value == toggle_value: - print(f" ✓ Toggle={toggle_value} in {pod_name}/{container_name}") + print( + f" ✓ Toggle={toggle_value} in {pod_name}/{container_name}" + ) success_count += 1 else: - failed_pods.append((pod_name, f"Verify failed: expected '{toggle_value}', got '{actual_value}'")) + failed_pods.append( + ( + pod_name, + f"Verify failed: expected '{toggle_value}', got '{actual_value}'", + ) + ) else: - failed_pods.append((pod_name, f"Exec failed: {result.stderr.strip()}")) - + failed_pods.append( + (pod_name, f"Exec failed: {result.stderr.strip()}") + ) + except Exception as e: failed_pods.append((pod_name, str(e))) continue @@ -460,7 +482,7 @@ def enable_cuda_faults_via_toggle( print(f" ⚠ Failed to toggle {len(failed_pods)} pods:") for pod_name, error in failed_pods: print(f" - {pod_name}: {error}") - + print(f" → Result: {success_count}/{len(pods)} pods toggled successfully") return success_count > 0 @@ -478,79 +500,101 @@ def disable_cuda_faults_via_toggle( True if disable succeeded """ return self.enable_cuda_faults_via_toggle(pods, namespace, enable=False) - - def cleanup_node_fault_markers(self, pods: List[client.V1Pod], namespace: str) -> bool: + + def cleanup_node_fault_markers( + self, pods: List[client.V1Pod], namespace: str + ) -> bool: """ Remove persistent fault marker files from node hostPath. This cleans up /host-fault/cuda_fault_enabled to prevent future tests from failing. - + Args: pods: List of pods (to access nodes) namespace: Kubernetes namespace - + Returns: True if cleanup succeeded """ if not pods: return True - - print(f" [->] Cleaning persistent fault markers from nodes...") - + + print(" [->] Cleaning persistent fault markers from nodes...") + success_count = 0 nodes_cleaned = set() - + for pod in pods: pod_name = pod.metadata.name node_name = pod.spec.node_name - + # Skip if we already cleaned this node if node_name in nodes_cleaned: continue - + try: - container_name = pod.spec.containers[0].name if pod.spec.containers else None + container_name = ( + pod.spec.containers[0].name if pod.spec.containers else None + ) if not container_name: continue - + # Remove the persistent marker file from hostPath - exec_command = ['sh', '-c', 'rm -f /host-fault/cuda_fault_enabled 2>/dev/null; echo "ok"'] - + exec_command = [ + "sh", + "-c", + 'rm -f /host-fault/cuda_fault_enabled 2>/dev/null; echo "ok"', + ] + result = subprocess.run( - ['kubectl', 'exec', '-n', namespace, pod_name, '-c', container_name, '--'] + exec_command, + [ + "kubectl", + "exec", + "-n", + namespace, + pod_name, + "-c", + container_name, + "--", + ] + + exec_command, capture_output=True, text=True, - timeout=10 + timeout=10, ) - + if result.returncode == 0: print(f" ✓ Cleaned fault marker on node {node_name}") nodes_cleaned.add(node_name) success_count += 1 - - except Exception as e: + + except Exception: continue return success_count > 0 def verify_env_var_set( - self, deployment_name: str, namespace: str, expected_value: str, max_wait: int = 30 + self, + deployment_name: str, + namespace: str, + expected_value: str, + max_wait: int = 30, ) -> bool: """ Verify that CUDA_FAULT_INJECTION_ENABLED env var is set to expected value. Polls until the value matches or timeout. - + Args: deployment_name: Name of the DynamoGraphDeployment namespace: Kubernetes namespace expected_value: Expected value ("0" or "1") max_wait: Maximum seconds to wait - + Returns: True if verified """ k8s_custom = client.CustomObjectsApi() start_time = time.time() - + while time.time() - start_time < max_wait: try: dgd = k8s_custom.get_namespaced_custom_object( @@ -560,13 +604,17 @@ def verify_env_var_set( plural="dynamographdeployments", name=deployment_name, ) - + # Check both worker services for service_name in ["VllmDecodeWorker", "VllmPrefillWorker"]: if service_name in dgd["spec"]["services"]: service = dgd["spec"]["services"][service_name] - env_vars = service.get("extraPodSpec", {}).get("mainContainer", {}).get("env", []) - + env_vars = ( + service.get("extraPodSpec", {}) + .get("mainContainer", {}) + .get("env", []) + ) + for env_var in env_vars: if env_var.get("name") == "CUDA_FAULT_INJECTION_ENABLED": if env_var.get("value") != expected_value: @@ -578,12 +626,12 @@ def verify_env_var_set( else: # All services verified return True - + except Exception: time.sleep(1) - + return False - + def trigger_pod_restart(self, pods: List[client.V1Pod], namespace: str): """ Delete pods to trigger restart with new env vars. From 885110c039a6989515d18440fdff25f2bec95cef Mon Sep 17 00:00:00 2001 From: Oviya Seeniraj Date: Wed, 3 Dec 2025 11:54:43 -0800 Subject: [PATCH 3/4] ensured passthrough_mode is correcly used + consolidated duplicate args section and fixed inconsistent type hint Signed-off-by: Oviya Seeniraj --- .../cuda_fault_injection/inject_into_pods.py | 4 +++- .../helpers/cuda_fault_injection.py | 8 +++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/fault_tolerance/hardware/fault_injection_service/cuda_fault_injection/inject_into_pods.py b/tests/fault_tolerance/hardware/fault_injection_service/cuda_fault_injection/inject_into_pods.py index 279062810c..5b4285bc96 100755 --- a/tests/fault_tolerance/hardware/fault_injection_service/cuda_fault_injection/inject_into_pods.py +++ b/tests/fault_tolerance/hardware/fault_injection_service/cuda_fault_injection/inject_into_pods.py @@ -590,11 +590,13 @@ def patch_deployment_env( if enable: # Add new env vars + # Set CUDA_FAULT_INJECTION_ENABLED based on passthrough_mode + fault_enabled_value = "0" if passthrough_mode else "1" container.env.append( client.V1EnvVar(name="LD_PRELOAD", value="/tmp/cuda_intercept.so") ) container.env.append( - client.V1EnvVar(name="CUDA_FAULT_INJECTION_ENABLED", value="1") + client.V1EnvVar(name="CUDA_FAULT_INJECTION_ENABLED", value=fault_enabled_value) ) container.env.append( client.V1EnvVar(name="CUDA_XID_TYPE", value=str(xid_type)) diff --git a/tests/fault_tolerance/hardware/fault_injection_service/helpers/cuda_fault_injection.py b/tests/fault_tolerance/hardware/fault_injection_service/helpers/cuda_fault_injection.py index 236bfa26c0..fa3b58b09c 100644 --- a/tests/fault_tolerance/hardware/fault_injection_service/helpers/cuda_fault_injection.py +++ b/tests/fault_tolerance/hardware/fault_injection_service/helpers/cuda_fault_injection.py @@ -164,15 +164,13 @@ def patch_deployment_for_cuda_fault( - CUDA_FAULT_INJECTION_ENABLED (0 in passthrough mode, 1 otherwise) - Node affinity (if target_node specified) - Args: - passthrough_mode: If True, set CUDA_FAULT_INJECTION_ENABLED=0 - (library loaded but faults disabled for baseline) - Args: deployment_name: Name of the deployment namespace: Kubernetes namespace target_node: Node to pin pods to (simulates real XID behavior) xid_type: XID error type to simulate (79, 48, 94, 95, 43, 74). Default: 79 + passthrough_mode: If True, set CUDA_FAULT_INJECTION_ENABLED=0 + (library loaded but faults disabled for baseline) Returns: True if patch succeeded @@ -391,7 +389,7 @@ def cleanup_cuda_fault_injection( return False def enable_cuda_faults_via_toggle( - self, pods: List, namespace: str, enable: bool = True + self, pods: List[client.V1Pod], namespace: str, enable: bool = True ) -> bool: """ Enable or disable CUDA faults on running pods via environment variable toggle. From 2a9d97396f91b7cf007e0a54797e289fe3bf30ee Mon Sep 17 00:00:00 2001 From: Oviya Seeniraj Date: Wed, 3 Dec 2025 12:52:02 -0800 Subject: [PATCH 4/4] CI formatting fix Signed-off-by: Oviya Seeniraj --- .../cuda_fault_injection/inject_into_pods.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/fault_tolerance/hardware/fault_injection_service/cuda_fault_injection/inject_into_pods.py b/tests/fault_tolerance/hardware/fault_injection_service/cuda_fault_injection/inject_into_pods.py index 5b4285bc96..5083d7c2fb 100755 --- a/tests/fault_tolerance/hardware/fault_injection_service/cuda_fault_injection/inject_into_pods.py +++ b/tests/fault_tolerance/hardware/fault_injection_service/cuda_fault_injection/inject_into_pods.py @@ -596,7 +596,9 @@ def patch_deployment_env( client.V1EnvVar(name="LD_PRELOAD", value="/tmp/cuda_intercept.so") ) container.env.append( - client.V1EnvVar(name="CUDA_FAULT_INJECTION_ENABLED", value=fault_enabled_value) + client.V1EnvVar( + name="CUDA_FAULT_INJECTION_ENABLED", value=fault_enabled_value + ) ) container.env.append( client.V1EnvVar(name="CUDA_XID_TYPE", value=str(xid_type))