diff --git a/src/core/containers/runtime/__init__.py b/src/core/containers/runtime/__init__.py index a72b5301..0be889ba 100644 --- a/src/core/containers/runtime/__init__.py +++ b/src/core/containers/runtime/__init__.py @@ -12,4 +12,4 @@ "ContainerProvider", "LocalDockerProvider", "KubernetesProvider", -] \ No newline at end of file +] diff --git a/src/core/containers/runtime/providers.py b/src/core/containers/runtime/providers.py index a8022ddc..957bb690 100644 --- a/src/core/containers/runtime/providers.py +++ b/src/core/containers/runtime/providers.py @@ -135,29 +135,42 @@ def start_container( Args: image: Docker image name - port: Port to expose (if None, finds available port) + port: Port to expose (if None, uses 8000) env_vars: Environment variables for the container **kwargs: Additional Docker run options + - command_override: List of command args to override container CMD + - memory_gb: Memory limit in GB (default: 4GB) Returns: Base URL to connect to the container """ import subprocess import time + import logging - # Find available port if not specified + logger = logging.getLogger(__name__) + + # Use default port if not specified if port is None: - port = self._find_available_port() + port = 8000 + + # Use default memory limit if not specified + memory_gb = kwargs.get("memory_gb", 4) # Generate container name self._container_name = self._generate_container_name(image) # Build docker run command + # Use host networking for better performance and consistency with podman + # NOTE: Do NOT use --rm initially - if container fails to start, we need logs cmd = [ "docker", "run", "-d", # Detached "--name", self._container_name, - "-p", f"{port}:8000", # Map port + "--network", "host", # Use host network + "--memory", f"{memory_gb}g", # Limit container memory + "--memory-swap", f"{memory_gb}g", # Prevent swap usage (set equal to --memory) + "--oom-kill-disable=false", # Allow OOM killer (exit gracefully) ] # Add environment variables @@ -165,13 +178,24 @@ def start_container( for key, value in env_vars.items(): cmd.extend(["-e", f"{key}={value}"]) + # Pass custom port via environment variable instead of overriding command + # This allows the container to use its proper entrypoint/CMD + if port != 8000: + cmd.extend(["-e", f"PORT={port}"]) + # Add image cmd.append(image) + + # Add command override if provided (explicit override by user) + if "command_override" in kwargs: + cmd.extend(kwargs["command_override"]) # Run container try: + logger.debug(f"Starting container with command: {' '.join(cmd)}") result = subprocess.run(cmd, capture_output=True, text=True, check=True) self._container_id = result.stdout.strip() + logger.debug(f"Container started with ID: {self._container_id}") except subprocess.CalledProcessError as e: error_msg = f"Failed to start Docker container.\nCommand: {' '.join(cmd)}\nExit code: {e.returncode}\nStderr: {e.stderr}\nStdout: {e.stdout}" raise RuntimeError(error_msg) from e @@ -179,7 +203,7 @@ def start_container( # Wait a moment for container to start time.sleep(1) - base_url = f"http://localhost:{port}" + base_url = f"http://127.0.0.1:{port}" return base_url def stop_container(self) -> None: @@ -227,23 +251,65 @@ def wait_for_ready(self, base_url: str, timeout_s: float = 30.0) -> None: """ import time import requests + import subprocess + import logging start_time = time.time() health_url = f"{base_url}/health" + last_error = None while time.time() - start_time < timeout_s: try: response = requests.get(health_url, timeout=2.0) if response.status_code == 200: return - except requests.RequestException: - pass + except requests.RequestException as e: + last_error = str(e) time.sleep(0.5) - raise TimeoutError( - f"Container at {base_url} did not become ready within {timeout_s}s" - ) + # If we timeout, provide diagnostic information + error_msg = f"Container at {base_url} did not become ready within {timeout_s}s" + + if self._container_id: + try: + # First check if container exists + inspect_result = subprocess.run( + ["docker", "inspect", self._container_id], + capture_output=True, + text=True, + timeout=5, + ) + + if inspect_result.returncode != 0: + # Container doesn't exist - likely exited and auto-removed due to --rm flag + error_msg += f"\n\nContainer was auto-removed (likely exited immediately)." + error_msg += f"\nThis typically means:" + error_msg += f"\n 1. The container image has an error in its startup script" + error_msg += f"\n 2. Required dependencies are missing in the container" + error_msg += f"\n 3. Port {base_url.split(':')[-1]} might be in use by another process" + error_msg += f"\n 4. Container command/entrypoint is misconfigured" + error_msg += f"\nTry running the container manually to debug:" + error_msg += f"\n docker run -it --rm " + else: + # Container exists, try to get logs + result = subprocess.run( + ["docker", "logs", "--tail", "50", self._container_id], + capture_output=True, + text=True, + timeout=5, + ) + if result.stdout or result.stderr: + error_msg += f"\n\nContainer logs (last 50 lines):\n{result.stdout}\n{result.stderr}" + except subprocess.TimeoutExpired: + error_msg += f"\n\nTimeout while trying to inspect container" + except Exception as e: + error_msg += f"\n\nFailed to get container diagnostics: {e}" + + if last_error: + error_msg += f"\n\nLast connection error: {last_error}" + + raise TimeoutError(error_msg) def _find_available_port(self) -> int: """ @@ -276,6 +342,31 @@ def _generate_container_name(self, image: str) -> str: timestamp = int(time.time() * 1000) return f"{clean_image}-{timestamp}" + def _infer_app_module(self, image: str) -> Optional[str]: + """ + Infer the uvicorn app module path from the image name. + + Args: + image: Container image name + + Returns: + App module path like "envs.coding_env.server.app:app" or None + """ + clean_image = image.split("/")[-1].split(":")[0] + + # Map common environment names to their app modules + env_module_map = { + "coding-env": "envs.coding_env.server.app:app", + "echo-env": "envs.echo_env.server.app:app", + "git-env": "envs.git_env.server.app:app", + "openspiel-env": "envs.openspiel_env.server.app:app", + "sumo-rl-env": "envs.sumo_rl_env.server.app:app", + "finrl-env": "envs.finrl_env.server.app:app", + } + + return env_module_map.get(clean_image) + + class KubernetesProvider(ContainerProvider): """ diff --git a/src/core/env_server/http_server.py b/src/core/env_server/http_server.py index d18873f0..7cc75253 100644 --- a/src/core/env_server/http_server.py +++ b/src/core/env_server/http_server.py @@ -15,7 +15,7 @@ import os from dataclasses import asdict -from typing import Any, Dict, Type +from typing import Any, Dict, Optional, Type from .interfaces import Environment from .types import Action, Observation @@ -85,13 +85,22 @@ async def reset(request: Dict[str, Any] = Body(default={})) -> Dict[str, Any]: async def step(request: Dict[str, Any]) -> Dict[str, Any]: """Step endpoint - executes action and returns observation.""" action_data = request.get("action", {}) - # TODO: Handle timeout_s, request_id, episode_id from request if provided + + # Extract timeout_s from request (sent by HTTPEnvClient) + timeout_s = request.get("timeout_s", None) + + # TODO: Handle request_id, episode_id from request if provided # Deserialize action action = self._deserialize_action(action_data) - # Execute step - observation = self.env.step(action) + # Execute step with timeout if environment supports it + try: + # Try to pass timeout_s to step() method + observation = self.env.step(action, timeout_s=timeout_s) + except TypeError: + # Environment doesn't support timeout parameter, call without it + observation = self.env.step(action) # Return serialized observation return self._serialize_observation(observation) diff --git a/src/core/http_env_client.py b/src/core/http_env_client.py index b304e088..4807ca96 100644 --- a/src/core/http_env_client.py +++ b/src/core/http_env_client.py @@ -46,6 +46,8 @@ def from_docker_image( cls: Type[EnvClientT], image: str, provider: Optional["ContainerProvider"] = None, + timeout_s: float = 120.0, + request_timeout_s: float = 15.0, **kwargs: Any, ) -> EnvClientT: """ @@ -62,6 +64,8 @@ def from_docker_image( Args: image: Docker image name to run (e.g., "echo-env:latest") provider: Container provider to use (defaults to LocalDockerProvider) + timeout_s: Maximum time to wait for container to become ready (default: 120 seconds) + request_timeout_s: Timeout for HTTP requests to the environment (default: 15 seconds) **kwargs: Additional arguments to pass to provider.start_container() (e.g., env_vars, port) @@ -75,9 +79,11 @@ def from_docker_image( >>> # Create environment from image >>> env = CodingEnv.from_docker_image("coding-env:latest") >>> - >>> # Create environment with custom env vars + >>> # Create environment with custom env vars and timeouts >>> env = CodingEnv.from_docker_image( ... "coding-env:latest", + ... timeout_s=180.0, + ... request_timeout_s=120.0, ... env_vars={"MY_VAR": "value"} ... ) >>> @@ -99,11 +105,11 @@ def from_docker_image( # 1. Start container with optional kwargs (e.g., env_vars, port) base_url = provider.start_container(image, **kwargs) - # 2. Wait for server to be ready - provider.wait_for_ready(base_url) + # 2. Wait for server to be ready with configured timeout + provider.wait_for_ready(base_url, timeout_s=timeout_s) - # 3. Create and return client instance with provider reference - return cls(base_url=base_url, provider=provider) + # 3. Create and return client instance with provider reference and request timeout + return cls(base_url=base_url, request_timeout_s=request_timeout_s, provider=provider) @abstractmethod def _step_payload(self, action: ActT) -> dict: diff --git a/src/core/tools/local_python_executor.py b/src/core/tools/local_python_executor.py index ba4477d5..83759c09 100644 --- a/src/core/tools/local_python_executor.py +++ b/src/core/tools/local_python_executor.py @@ -8,14 +8,50 @@ Local Python Executor. This module provides functionality for executing Python code locally by wrapping -the smolagents LocalPythonExecutor. +the smolagents LocalPythonExecutor with timeout protection. """ +import multiprocessing +import signal +import threading +from typing import Optional + from smolagents import LocalPythonExecutor from core.env_server.types import CodeExecResult +def _timeout_handler(signum, frame): + """Signal handler for timeout.""" + raise TimeoutError("Code execution timed out") + + +def _run_with_timeout(executor, code: str, timeout_s: float, result_container: list): + """Helper function to run code execution in a separate process with timeout. + + Args: + executor: The LocalPythonExecutor instance + code: Code to execute + timeout_s: Timeout in seconds + result_container: List to store the result (mutated in place) + """ + try: + exec_result = executor(code) + result_container.append({ + 'success': True, + 'stdout': exec_result.logs, + 'stderr': '', + 'exit_code': 0 + }) + except Exception as e: + result_container.append({ + 'success': False, + 'stdout': '', + 'stderr': str(e), + 'exit_code': 1 + }) + + class PyExecutor: """ Wrapper around smolagents LocalPythonExecutor for executing Python code. @@ -57,12 +93,14 @@ def __init__(self, additional_imports: list[str] | None = None): # Initialize tools to make BASE_PYTHON_TOOLS available (including print) self._executor.send_tools({}) - def run(self, code: str) -> CodeExecResult: + def run(self, code: str, timeout_s: Optional[float] = None) -> CodeExecResult: """ - Execute Python code and return the result. + Execute Python code and return the result with optional timeout protection. Args: code: Python code string to execute + timeout_s: Maximum execution time in seconds. If None, no timeout is enforced. + If the code exceeds this time, it will be terminated with a timeout error. Returns: CodeExecResult containing stdout, stderr, and exit_code @@ -77,29 +115,121 @@ def run(self, code: str) -> CodeExecResult: >>> result = executor.run("1 / 0") >>> print(result.exit_code) # 1 >>> print(result.stderr) # Contains error message + >>> + >>> # Timeout protection + >>> result = executor.run("while True: pass", timeout_s=5.0) + >>> print(result.exit_code) # 1 + >>> print("timeout" in result.stderr.lower()) # True """ + # Use proper multiprocessing-based timeout for subprocess protection + if timeout_s is not None and timeout_s > 0: + return self._run_with_process_timeout(code, timeout_s) + + # No timeout - run directly try: - # Execute the code using LocalPythonExecutor - # LocalPythonExecutor returns a CodeOutput object with output, logs, is_final_answer exec_result = self._executor(code) - - # Extract the logs (which contain print outputs) as stdout - # The output field contains the return value of the code - stdout = exec_result.logs - stderr = "" - exit_code = 0 # Success - return CodeExecResult( - stdout=stdout, - stderr=stderr, - exit_code=exit_code, + stdout=exec_result.logs, + stderr="", + exit_code=0, ) - except Exception as e: - # LocalPythonExecutor raises InterpreterError for various issues - # (syntax errors, forbidden operations, runtime errors, etc.) return CodeExecResult( stdout="", stderr=str(e), - exit_code=1, # Non-zero indicates error + exit_code=1, + ) + + def _run_with_process_timeout(self, code: str, timeout_s: float) -> CodeExecResult: + """Execute code with proper subprocess timeout protection using multiprocessing. + + This method uses multiprocessing.Process to isolate code execution and + ensures the process is properly terminated if it exceeds the timeout. + + Args: + code: Python code to execute + timeout_s: Timeout in seconds + + Returns: + CodeExecResult with execution results or timeout error + """ + # Use a Manager to share results between processes + manager = multiprocessing.Manager() + result_container = manager.list() + + # Create a process to run the code + process = multiprocessing.Process( + target=_run_with_timeout, + args=(self._executor, code, timeout_s, result_container) + ) + + try: + # Start the process + process.start() + + # Wait for completion with timeout + process.join(timeout=timeout_s) + + # Check if process completed + if process.is_alive(): + # CRITICAL: Process exceeded timeout - KILL IT! + print(f"WARNING: Code execution timed out after {timeout_s}s, terminating process {process.pid}") + process.terminate() # Send SIGTERM + process.join(timeout=2) # Wait up to 2s for graceful shutdown + + if process.is_alive(): + # Still alive - force kill + print(f"WARNING: Process {process.pid} did not terminate, force killing") + process.kill() # Send SIGKILL + process.join(timeout=1) + + return CodeExecResult( + stdout="", + stderr=f"Code execution timed out after {timeout_s} seconds. " + f"Process was terminated. Possible infinite loop or extremely long computation.", + exit_code=1, + ) + + # Process completed - check results + if result_container: + result = result_container[0] + if result['success']: + return CodeExecResult( + stdout=result['stdout'], + stderr=result['stderr'], + exit_code=result['exit_code'], + ) + else: + return CodeExecResult( + stdout=result['stdout'], + stderr=result['stderr'], + exit_code=result['exit_code'], + ) + else: + # Process completed but no result - something went wrong + return CodeExecResult( + stdout="", + stderr="Code execution completed but produced no output", + exit_code=1, + ) + + except Exception as e: + # Clean up process on exception + if process.is_alive(): + process.terminate() + process.join(timeout=1) + if process.is_alive(): + process.kill() + + return CodeExecResult( + stdout="", + stderr=f"Error during code execution: {str(e)}", + exit_code=1, ) + finally: + # Ensure process is cleaned up + if process.is_alive(): + process.terminate() + process.join(timeout=1) + if process.is_alive(): + process.kill() diff --git a/src/envs/coding_env/server/Dockerfile b/src/envs/coding_env/server/Dockerfile index 7cf90d5d..5717e21e 100644 --- a/src/envs/coding_env/server/Dockerfile +++ b/src/envs/coding_env/server/Dockerfile @@ -21,9 +21,12 @@ COPY src/envs/coding_env/ /app/src/envs/coding_env/ # Copy README for web interface documentation COPY src/envs/coding_env/README.md /app/README.md -# Health check +# Set default port (can be overridden via environment variable) +ENV PORT=8000 + +# Health check (uses PORT env variable) HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \ - CMD curl -f http://localhost:8000/health || exit 1 + CMD curl -f http://localhost:${PORT}/health || exit 1 -# Run the FastAPI server -CMD ["uvicorn", "envs.coding_env.server.app:app", "--host", "0.0.0.0", "--port", "8000"] +# Run the FastAPI server (uses PORT env variable) +CMD sh -c "uvicorn envs.coding_env.server.app:app --host 0.0.0.0 --port ${PORT}" diff --git a/src/envs/coding_env/server/app.py b/src/envs/coding_env/server/app.py index 3a895474..a4cfe683 100644 --- a/src/envs/coding_env/server/app.py +++ b/src/envs/coding_env/server/app.py @@ -12,28 +12,60 @@ Usage: # Development (with auto-reload): - uvicorn envs.coding_env.server.app:app --reload --host 0.0.0.0 --port 8000 + uvicorn envs.coding_env.server.app:app --reload --host 0.0.0.0 --port 5432 # Production: - uvicorn envs.coding_env.server.app:app --host 0.0.0.0 --port 8000 --workers 4 + uvicorn envs.coding_env.server.app:app --host 0.0.0.0 --port 5432 --workers 4 # Or run directly: python -m envs.coding_env.server.app + + # With custom imports: + PYTHON_ADDITIONAL_IMPORTS="sys,os,functools,typing" python -m envs.coding_env.server.app """ +import os + from core.env_server import create_app from ..models import CodeAction, CodeObservation from .python_codeact_env import PythonCodeActEnv -# Create the environment instance -env = PythonCodeActEnv() +# Get additional imports from environment variable +# Format: comma-separated list, e.g., "sys,os,functools,typing" +additional_imports_str = os.environ.get("PYTHON_ADDITIONAL_IMPORTS", "") +if additional_imports_str: + additional_imports = [imp.strip() for imp in additional_imports_str.split(",") if imp.strip()] +else: + # Default imports that match the common_imports used in reward evaluation + additional_imports = [ + "sys", + "os", + "functools", + "typing", + ] + +# Create the environment instance with authorized imports +env = PythonCodeActEnv(additional_imports=additional_imports) # Create the app with web interface and README integration app = create_app(env, CodeAction, CodeObservation, env_name="coding_env") if __name__ == "__main__": + import sys import uvicorn - uvicorn.run(app, host="0.0.0.0", port=8000) + # Get port from environment variable or command line argument + # Priority: command line arg > environment variable > default (5432) + port = int(os.environ.get("PORT", 5432)) + + # Override with command line argument if provided + if len(sys.argv) > 1: + try: + port = int(sys.argv[1]) + except ValueError: + print(f"Invalid port argument: {sys.argv[1]}, using port {port}") + + print(f"Starting server on port {port}") + uvicorn.run(app, host="0.0.0.0", port=port) diff --git a/src/envs/coding_env/server/python_codeact_env.py b/src/envs/coding_env/server/python_codeact_env.py index 14daf2c9..ae984aca 100644 --- a/src/envs/coding_env/server/python_codeact_env.py +++ b/src/envs/coding_env/server/python_codeact_env.py @@ -45,10 +45,12 @@ class PythonCodeActEnv(Environment): def __init__( self, + additional_imports: list[str] | None = None, ): self.transform = create_safe_coding_transform() - self._executor = PyExecutor() + self._executor = PyExecutor(additional_imports=additional_imports) self._state = CodeState() + self._additional_imports = additional_imports def reset(self) -> Observation: """ @@ -63,7 +65,7 @@ def reset(self) -> Observation: self._state.last_exit_code = 0 # Reset executor to clear any previously defined variables/functions - self._executor = PyExecutor() + self._executor = PyExecutor(additional_imports=self._additional_imports) # Reset transform to clear any accumulated state self.transform = create_safe_coding_transform() @@ -77,24 +79,37 @@ def reset(self) -> Observation: return self._apply_transform(observation) - def step(self, action: Action) -> Observation: + def step(self, action: Action, timeout_s: float | None = None) -> Observation: """ Execute code action and return observation. Args: action: CodeAction containing the code to execute + timeout_s: Maximum execution time in seconds. If None, uses default timeout (60s). + If code exceeds timeout, execution is terminated with a timeout error. Returns: CodeObservation with execution results (stdout, stderr, exit_code) Raises: ValueError: If action is not a CodeAction instance + + Example: + >>> env = PythonCodeActEnv() + >>> env.reset() + >>> action = CodeAction(code="print('Hello')") + >>> obs = env.step(action, timeout_s=30.0) # 30 second timeout + >>> print(obs.stdout) # "Hello\n" """ if not isinstance(action, CodeAction): raise ValueError(f"Expected CodeAction, got {type(action)}") - # Execute the code using PyExecutor - result = self._executor.run(action.code) + # Use default timeout if none provided (60 seconds is reasonable for most code) + if timeout_s is None: + timeout_s = 60.0 + + # Execute the code using PyExecutor with timeout protection + result = self._executor.run(action.code, timeout_s=timeout_s) # Update state self._state.step_count += 1 diff --git a/src/envs/coding_env/server/requirements.txt b/src/envs/coding_env/server/requirements.txt index cf6769a0..cd90ed4a 100644 --- a/src/envs/coding_env/server/requirements.txt +++ b/src/envs/coding_env/server/requirements.txt @@ -1 +1,2 @@ smolagents +numpy