Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/core/containers/runtime/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
"""Container runtime providers."""

from .providers import ContainerProvider, KubernetesProvider, LocalDockerProvider
from .uv_provider import UVProvider

__all__ = [
"ContainerProvider",
"LocalDockerProvider",
"KubernetesProvider",
"UVProvider",
]
79 changes: 75 additions & 4 deletions src/core/containers/runtime/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,11 @@ def __init__(self):
capture_output=True,
timeout=5,
)
except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired):
except (
subprocess.CalledProcessError,
FileNotFoundError,
subprocess.TimeoutExpired,
):
raise RuntimeError(
"Docker is not available. Please install Docker Desktop or Docker Engine."
)
Expand Down Expand Up @@ -154,10 +158,13 @@ def start_container(

# Build docker run command
cmd = [
"docker", "run",
"docker",
"run",
"-d", # Detached
"--name", self._container_name,
"-p", f"{port}:8000", # Map port
"--name",
self._container_name,
"-p",
f"{port}:8000", # Map port
]

# Add environment variables
Expand Down Expand Up @@ -290,4 +297,68 @@ class KubernetesProvider(ContainerProvider):
>>> # Pod running in k8s, accessible via service or port-forward
>>> provider.stop_container()
"""

pass


class RuntimeProvider(ABC):
"""
Abstract base class for runtime providers that are not container providers.
Providers implement this interface to support different runtime platforms:
- UVProvider: Runs environments via `uv run`

The provider manages a single runtime lifecycle and provides the base URL
for connecting to it.

Example:
>>> provider = UVProvider()
>>> base_url = provider.start_container("echo-env:latest")
>>> print(base_url) # http://localhost:8000
>>> # Use the environment via base_url
>>> provider.stop_container()
"""

@abstractmethod
def start(
self,
port: Optional[int] = None,
env_vars: Optional[Dict[str, str]] = None,
**kwargs: Any,
) -> str:
"""
Start a runtime from the specified image.

Args:
image: Runtime image name
port: Port to expose (if None, provider chooses)
env_vars: Environment variables for the runtime
**kwargs: Additional runtime options
"""

@abstractmethod
def stop(self) -> None:
"""
Stop the runtime.
"""
pass

@abstractmethod
def wait_for_ready(self, timeout_s: float = 30.0) -> None:
"""
Wait for the runtime to be ready to accept requests.
"""
pass

def __enter__(self) -> "RuntimeProvider":
"""
Enter the runtime provider.
"""
self.start()
return self

def __exit__(self, exc_type, exc, tb) -> None:
"""
Exit the runtime provider.
"""
self.stop()
return False
203 changes: 203 additions & 0 deletions src/core/containers/runtime/uv_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
"""Providers for launching Hugging Face Spaces via ``uv run``."""

from __future__ import annotations

import os
import socket
import subprocess
import time
from typing import Dict, Optional

import requests

from .providers import RuntimeProvider


def _check_uv_installed() -> None:
try:
subprocess.check_output(["uv", "--version"])
except FileNotFoundError as exc:
raise RuntimeError(
"`uv` executable not found. Install uv from https://docs.astral.sh and ensure it is on PATH."
) from exc


def _find_free_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind(("", 0))
sock.listen(1)
return sock.getsockname()[1]


def _create_uv_command(
repo_id: str,
port: int,
reload: bool,
) -> list[str]:
command = [
"uv",
"run",
"--isolated",
"--with",
f"git+https://huggingface.co/spaces/{repo_id}",
"--",
"server",
"--host",
"0.0.0.0",
"--port",
str(port),
]
if reload:
command.append("--reload")
return command


def _poll_health(health_url: str, timeout_s: float) -> None:
"""Poll a health endpoint until it returns HTTP 200 or times out."""

deadline = time.time() + timeout_s
while time.time() < deadline:
try:
timeout = max(0.0001, min(deadline - time.time(), 2.0))
response = requests.get(health_url, timeout=timeout)
if response.status_code == 200:
return
except requests.RequestException:
continue

time.sleep(0.5)

raise TimeoutError(f"Server did not become ready within {timeout_s:.1f} seconds")


class UVProvider(RuntimeProvider):
"""
RuntimeProvider implementation backed by ``uv run``.

Args:
repo_id: The repository ID of the environment to run
reload: Whether to reload the environment on code changes
env_vars: Environment variables to pass to the environment
context_timeout_s: The timeout to wait for the environment to become ready

Example:
>>> provider = UVProvider(repo_id="burtenshaw/echo-cli")
>>> base_url = provider.start()
>>> print(base_url) # http://localhost:8000
>>> # Use the environment via base_url
>>> provider.stop()
"""

def __init__(
self,
repo_id: str,
reload: bool = False,
env_vars: Optional[Dict[str, str]] = None,
context_timeout_s: float = 60.0,
):
"""Initialize the UVProvider."""
self.repo_id = repo_id
self.reload = reload
self.env_vars = env_vars
self.context_timeout_s = context_timeout_s
_check_uv_installed()
self._process = None
self._base_url = None

def start(
self,
port: Optional[int] = None,
env_vars: Optional[Dict[str, str]] = None,
**_: Dict[str, str],
) -> str:
"""
Start the environment via `uv run`.

Args:
port: The port to bind the environment to
env_vars: Environment variables to pass to the environment

Returns:
The base URL of the environment

Raises:
RuntimeError: If the environment is already running
"""
if self._process is not None and self._process.poll() is None:
raise RuntimeError("UVProvider is already running")

bind_port = port or _find_free_port()

command = _create_uv_command(
repo_id=self.repo_id,
port=bind_port,
reload=self.reload,
)

env = os.environ.copy()

if self.env_vars:
env.update(self.env_vars)
if env_vars:
env.update(env_vars)

try:
self._process = subprocess.Popen(command, env=env)
except OSError as exc:
raise RuntimeError(f"Failed to launch `uv run`: {exc}") from exc

self._base_url = f"http://localhost:{bind_port}"
return self._base_url

def wait_for_ready(self, timeout_s: float = 60.0) -> None:
"""
Wait for the environment to become ready.

Args:
timeout_s: The timeout to wait for the environment to become ready

Raises:
RuntimeError: If the environment is not running
TimeoutError: If the environment does not become ready within the timeout
"""
if self._process and self._process.poll() is not None:
code = self._process.returncode
raise RuntimeError(f"uv process exited prematurely with code {code}")

_poll_health(f"{self._base_url}/health", timeout_s=timeout_s)

def stop(self) -> None:
"""
Stop the environment.

Raises:
RuntimeError: If the environment is not running
"""
if self._process is None:
return

if self._process.poll() is None:
self._process.terminate()
try:
self._process.wait(timeout=10.0)
except subprocess.TimeoutExpired:
self._process.kill()
self._process.wait(timeout=5.0)

self._process = None
self._base_url = None

@property
def base_url(self) -> str:
"""
The base URL of the environment.

Returns:
The base URL of the environment

Raises:
RuntimeError: If the environment is not running
"""
if self._base_url is None:
raise RuntimeError("UVProvider has not been started")
return self._base_url
45 changes: 29 additions & 16 deletions src/core/http_env_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
import requests

from .client_types import StepResult
from .containers.runtime import LocalDockerProvider
from .containers.runtime import LocalDockerProvider, UVProvider

if TYPE_CHECKING:
from .containers.runtime import ContainerProvider
from .containers.runtime import ContainerProvider, RuntimeProvider

ActT = TypeVar("ActT")
ObsT = TypeVar("ObsT")
Expand Down Expand Up @@ -106,22 +106,35 @@ def from_docker_image(
return cls(base_url=base_url, provider=provider)

@classmethod
def from_hub(cls: Type[EnvClientT], repo_id: str, provider: Optional["ContainerProvider"] = None, **kwargs: Any) -> EnvClientT:
"""
Create an environment client by pulling from a Hugging Face model hub.
def from_hub(
cls: Type[EnvClientT],
repo_id: str,
*,
use_docker: bool = True,
provider: Optional["ContainerProvider" | "RuntimeProvider"] = None,
**provider_kwargs: Any,
) -> EnvClientT:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this signature very hard to understand without context since it's mixing kwargs for docker and for uv. Also I don't think one needs use_docker, provider, and runner.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My suggestion would be to remove provider and runner. Also remove project_url and connect_host. Have a single env (instead of env for docker and extra_env for uv). And use some typed dict + typing.overload so that IDEs can have correct autocompletion. Here is a simplified example:

from typing import Any, Dict, NotRequired, TypedDict, Unpack, overload


class DockerKwargs(TypedDict, total=False):
    tag: NotRequired[str]
    env: NotRequired[Dict[str, str]]


class UVProvider(TypedDict):
    host: NotRequired[str]
    port: NotRequired[int]
    reload: NotRequired[bool]
    timeout_s: NotRequired[float]
    env: NotRequired[Dict[str, str]]


@overload
def from_hub(repo_id: str, *, use_docker: bool = True, **kwargs: Unpack[DockerKwargs]) -> str: ...


@overload
def from_hub(repo_id: str, *, use_docker: bool = False, **kwargs: Unpack[UVProvider]) -> str: ...


def from_hub(repo_id: str, *, use_docker: bool = False, **kwargs: Any) -> str:
    raise NotImplementedError()
Image Image

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that I'm not a fan of overloads and typed dict but it's the only solution I see to correctly document the signature while keeping a single method.

Another solution is to have from_hub_docker and from_hub_uv (more explicit but less elegant)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this. I've simplified the signatures right down, but I haven't added type overloading in this PR.

"""Create a client from a Hugging Face Space.

Set ``use_docker=True`` to launch the registry image with a container
provider. The default ``use_docker=False`` runs the Space locally using
``uv run`` through :class:`UVProvider`.
"""

if provider is None:
provider = LocalDockerProvider()

if "tag" in kwargs:
tag = kwargs["tag"]

if use_docker:
tag = provider_kwargs.pop("tag", "latest")
image = f"registry.hf.space/{repo_id.replace('/', '-')}:{tag}"
return cls.from_docker_image(image, provider=provider, **provider_kwargs)
else:
tag = "latest"

base_url = f"registry.hf.space/{repo_id.replace('/', '-')}:{tag}"

return cls.from_docker_image(image=base_url, provider=provider)
provider: RuntimeProvider = UVProvider(
repo_id=repo_id,
**provider_kwargs,
)
base_url = provider.start()
timeout_s = provider_kwargs.pop("timeout_s", 60.0)
provider.wait_for_ready(base_url=provider.base_url, timeout_s=timeout_s)

return cls(base_url=base_url, provider=provider)

@abstractmethod
def _step_payload(self, action: ActT) -> dict:
Expand Down