Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/core/containers/runtime/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
"ContainerProvider",
"LocalDockerProvider",
"KubernetesProvider",
]
]
111 changes: 101 additions & 10 deletions src/core/containers/runtime/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,51 +135,75 @@ def start_container(

Args:
image: Docker image name
port: Port to expose (if None, finds available port)
port: Port to expose (if None, uses 8000)
env_vars: Environment variables for the container
**kwargs: Additional Docker run options
- command_override: List of command args to override container CMD
- memory_gb: Memory limit in GB (default: 4GB)

Returns:
Base URL to connect to the container
"""
import subprocess
import time
import logging

# Find available port if not specified
logger = logging.getLogger(__name__)

# Use default port if not specified
if port is None:
port = self._find_available_port()
port = 8000

# Use default memory limit if not specified
memory_gb = kwargs.get("memory_gb", 4)

# Generate container name
self._container_name = self._generate_container_name(image)

# Build docker run command
# Use host networking for better performance and consistency with podman
# NOTE: Do NOT use --rm initially - if container fails to start, we need logs
cmd = [
"docker", "run",
"-d", # Detached
"--name", self._container_name,
"-p", f"{port}:8000", # Map port
"--network", "host", # Use host network
"--memory", f"{memory_gb}g", # Limit container memory
"--memory-swap", f"{memory_gb}g", # Prevent swap usage (set equal to --memory)
"--oom-kill-disable=false", # Allow OOM killer (exit gracefully)
]

# Add environment variables
if env_vars:
for key, value in env_vars.items():
cmd.extend(["-e", f"{key}={value}"])

# Pass custom port via environment variable instead of overriding command
# This allows the container to use its proper entrypoint/CMD
if port != 8000:
cmd.extend(["-e", f"PORT={port}"])

# Add image
cmd.append(image)

# Add command override if provided (explicit override by user)
if "command_override" in kwargs:
cmd.extend(kwargs["command_override"])

# Run container
try:
logger.debug(f"Starting container with command: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
self._container_id = result.stdout.strip()
logger.debug(f"Container started with ID: {self._container_id}")
except subprocess.CalledProcessError as e:
error_msg = f"Failed to start Docker container.\nCommand: {' '.join(cmd)}\nExit code: {e.returncode}\nStderr: {e.stderr}\nStdout: {e.stdout}"
raise RuntimeError(error_msg) from e

# Wait a moment for container to start
time.sleep(1)

base_url = f"http://localhost:{port}"
base_url = f"http://127.0.0.1:{port}"
return base_url

def stop_container(self) -> None:
Expand Down Expand Up @@ -227,23 +251,65 @@ def wait_for_ready(self, base_url: str, timeout_s: float = 30.0) -> None:
"""
import time
import requests
import subprocess
import logging

start_time = time.time()
health_url = f"{base_url}/health"
last_error = None

while time.time() - start_time < timeout_s:
try:
response = requests.get(health_url, timeout=2.0)
if response.status_code == 200:
return
except requests.RequestException:
pass
except requests.RequestException as e:
last_error = str(e)

time.sleep(0.5)

raise TimeoutError(
f"Container at {base_url} did not become ready within {timeout_s}s"
)
# If we timeout, provide diagnostic information
error_msg = f"Container at {base_url} did not become ready within {timeout_s}s"

if self._container_id:
try:
# First check if container exists
inspect_result = subprocess.run(
["docker", "inspect", self._container_id],
capture_output=True,
text=True,
timeout=5,
)

if inspect_result.returncode != 0:
# Container doesn't exist - likely exited and auto-removed due to --rm flag
error_msg += f"\n\nContainer was auto-removed (likely exited immediately)."
error_msg += f"\nThis typically means:"
error_msg += f"\n 1. The container image has an error in its startup script"
error_msg += f"\n 2. Required dependencies are missing in the container"
error_msg += f"\n 3. Port {base_url.split(':')[-1]} might be in use by another process"
error_msg += f"\n 4. Container command/entrypoint is misconfigured"
error_msg += f"\nTry running the container manually to debug:"
error_msg += f"\n docker run -it --rm <IMAGE_NAME>"
else:
# Container exists, try to get logs
result = subprocess.run(
["docker", "logs", "--tail", "50", self._container_id],
capture_output=True,
text=True,
timeout=5,
)
if result.stdout or result.stderr:
error_msg += f"\n\nContainer logs (last 50 lines):\n{result.stdout}\n{result.stderr}"
except subprocess.TimeoutExpired:
error_msg += f"\n\nTimeout while trying to inspect container"
except Exception as e:
error_msg += f"\n\nFailed to get container diagnostics: {e}"

if last_error:
error_msg += f"\n\nLast connection error: {last_error}"

raise TimeoutError(error_msg)

def _find_available_port(self) -> int:
"""
Expand Down Expand Up @@ -276,6 +342,31 @@ def _generate_container_name(self, image: str) -> str:
timestamp = int(time.time() * 1000)
return f"{clean_image}-{timestamp}"

def _infer_app_module(self, image: str) -> Optional[str]:
"""
Infer the uvicorn app module path from the image name.

Args:
image: Container image name

Returns:
App module path like "envs.coding_env.server.app:app" or None
"""
clean_image = image.split("/")[-1].split(":")[0]

# Map common environment names to their app modules
env_module_map = {
"coding-env": "envs.coding_env.server.app:app",
"echo-env": "envs.echo_env.server.app:app",
"git-env": "envs.git_env.server.app:app",
"openspiel-env": "envs.openspiel_env.server.app:app",
"sumo-rl-env": "envs.sumo_rl_env.server.app:app",
"finrl-env": "envs.finrl_env.server.app:app",
}

return env_module_map.get(clean_image)



class KubernetesProvider(ContainerProvider):
"""
Expand Down
17 changes: 13 additions & 4 deletions src/core/env_server/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import os
from dataclasses import asdict
from typing import Any, Dict, Type
from typing import Any, Dict, Optional, Type

from .interfaces import Environment
from .types import Action, Observation
Expand Down Expand Up @@ -85,13 +85,22 @@ async def reset(request: Dict[str, Any] = Body(default={})) -> Dict[str, Any]:
async def step(request: Dict[str, Any]) -> Dict[str, Any]:
"""Step endpoint - executes action and returns observation."""
action_data = request.get("action", {})
# TODO: Handle timeout_s, request_id, episode_id from request if provided

# Extract timeout_s from request (sent by HTTPEnvClient)
timeout_s = request.get("timeout_s", None)

# TODO: Handle request_id, episode_id from request if provided

# Deserialize action
action = self._deserialize_action(action_data)

# Execute step
observation = self.env.step(action)
# Execute step with timeout if environment supports it
try:
# Try to pass timeout_s to step() method
observation = self.env.step(action, timeout_s=timeout_s)
except TypeError:
# Environment doesn't support timeout parameter, call without it
observation = self.env.step(action)

# Return serialized observation
return self._serialize_observation(observation)
Expand Down
16 changes: 11 additions & 5 deletions src/core/http_env_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ def from_docker_image(
cls: Type[EnvClientT],
image: str,
provider: Optional["ContainerProvider"] = None,
timeout_s: float = 120.0,
request_timeout_s: float = 15.0,
**kwargs: Any,
) -> EnvClientT:
"""
Expand All @@ -62,6 +64,8 @@ def from_docker_image(
Args:
image: Docker image name to run (e.g., "echo-env:latest")
provider: Container provider to use (defaults to LocalDockerProvider)
timeout_s: Maximum time to wait for container to become ready (default: 120 seconds)
request_timeout_s: Timeout for HTTP requests to the environment (default: 15 seconds)
**kwargs: Additional arguments to pass to provider.start_container()
(e.g., env_vars, port)

Expand All @@ -75,9 +79,11 @@ def from_docker_image(
>>> # Create environment from image
>>> env = CodingEnv.from_docker_image("coding-env:latest")
>>>
>>> # Create environment with custom env vars
>>> # Create environment with custom env vars and timeouts
>>> env = CodingEnv.from_docker_image(
... "coding-env:latest",
... timeout_s=180.0,
... request_timeout_s=120.0,
... env_vars={"MY_VAR": "value"}
... )
>>>
Expand All @@ -99,11 +105,11 @@ def from_docker_image(
# 1. Start container with optional kwargs (e.g., env_vars, port)
base_url = provider.start_container(image, **kwargs)

# 2. Wait for server to be ready
provider.wait_for_ready(base_url)
# 2. Wait for server to be ready with configured timeout
provider.wait_for_ready(base_url, timeout_s=timeout_s)

# 3. Create and return client instance with provider reference
return cls(base_url=base_url, provider=provider)
# 3. Create and return client instance with provider reference and request timeout
return cls(base_url=base_url, request_timeout_s=request_timeout_s, provider=provider)

@abstractmethod
def _step_payload(self, action: ActT) -> dict:
Expand Down
Loading