Commit d954ab6
authored
RHOAIENG-18848: chore(tests/containers): initial kubernetes/openshift deployment support (#892)
* RHOAIENG-18848: chore(tests/containers): add openshift-python-wrapper dependency
* RHOAIENG-18848: chore(tests/containers): initial test that runs container on openshift/kubernetes
* install local-path provisioner on kubernetes in github actions
* more careful printing of pod status in case `containerStatuses == None`
* sort out how we want to work with privileged/unprivileged client
* only run the new test if we have kubernetes around
* add pod waiting and port forwarding utils
diff --git c/.github/workflows/build-notebooks-TEMPLATE.yaml i/.github/workflows/build-notebooks-TEMPLATE.yaml
index 8a98aa2..13507b7 100644
--- c/.github/workflows/build-notebooks-TEMPLATE.yaml
+++ i/.github/workflows/build-notebooks-TEMPLATE.yaml
@@ -290,10 +290,10 @@ jobs:
- name: Install deps
run: poetry install --sync
- - name: Run container tests (in PyTest)
+ - name: Run Testcontainers container tests (in PyTest)
run: |
set -Eeuxo pipefail
- poetry run pytest --capture=fd tests/containers --image="${{ steps.calculated_vars.outputs.OUTPUT_IMAGE }}"
+ poetry run pytest --capture=fd tests/containers -m 'not openshift' --image="${{ steps.calculated_vars.outputs.OUTPUT_IMAGE }}"
env:
DOCKER_HOST: "unix:///var/run/podman/podman.sock"
TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE: "/var/run/podman/podman.sock"
@@ -439,6 +439,16 @@ jobs:
kubectl wait deployments --all --all-namespaces --for=condition=Available --timeout=100s
kubectl wait pods --all --all-namespaces --for=condition=Ready --timeout=100s
+ - name: "Install local-path provisioner"
+ if: ${{ steps.have-tests.outputs.tests == 'true' }}
+ run: |
+ set -Eeuxo pipefail
+ kubectl apply -f https://raw.githubusercontent.com/rancher/local-path-provisioner/v0.0.31/deploy/local-path-storage.yaml
+ kubectl wait deployments --all --namespace=local-path-storage --for=condition=Available --timeout=100s
+ # https://kubernetes.io/docs/tasks/administer-cluster/change-default-storage-class/
+ kubectl get storageclass
+ kubectl patch storageclass local-path -p '{"metadata": {"annotations":{"storageclass.kubernetes.io/is-default-class":"true"}}}'
+
- name: "Run image tests"
if: ${{ steps.have-tests.outputs.tests == 'true' }}
run: python3 ci/cached-builds/make_test.py --target ${{ inputs.target }}
@@ -449,6 +459,18 @@ jobs:
# endregion
+ - name: Run OpenShift container tests (in PyTest)
+ if: ${{ steps.have-tests.outputs.tests == 'true' }}
+ run: |
+ set -Eeuxo pipefail
+ poetry run pytest --capture=fd tests/containers -m 'openshift' --image="${{ steps.calculated_vars.outputs.OUTPUT_IMAGE }}"
+ env:
+ # TODO(jdanek): this Testcontainers stuff should not be necessary but currently it has to be there
+ DOCKER_HOST: "unix:///var/run/podman/podman.sock"
+ TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE: "/var/run/podman/podman.sock"
+ # pulling the Ryuk container from docker.io introduces CI flakiness
+ TESTCONTAINERS_RYUK_DISABLED: "true"
+
# region Trivy vulnerability scan
- name: Run Trivy vulnerability scanner
diff --git c/README.md i/README.md
index 961e590..22703ac 100644
--- c/README.md
+++ i/README.md
@@ -105,7 +105,7 @@ sudo dnf install podman
systemctl --user start podman.service
systemctl --user status podman.service
systemctl --user status podman.socket
-DOCKER_HOST=unix:///run/user/$UID/podman/podman.sock poetry run pytest tests/containers --image quay.io/opendatahub/workbench-images@sha256:e98d19df346e7abb1fa3053f6d41f0d1fa9bab39e49b4cb90b510ca33452c2e4
+DOCKER_HOST=unix:///run/user/$UID/podman/podman.sock poetry run pytest tests/containers -m 'not openshift' --image quay.io/opendatahub/workbench-images@sha256:e98d19df346e7abb1fa3053f6d41f0d1fa9bab39e49b4cb90b510ca33452c2e4
# Mac OS
brew install podman
@@ -113,7 +113,7 @@ podman machine init
podman machine set --rootful
sudo podman-mac-helper install
podman machine start
-poetry run pytest tests/containers --image quay.io/opendatahub/workbench-images@sha256:e98d19df346e7abb1fa3053f6d41f0d1fa9bab39e49b4cb90b510ca33452c2e4
+poetry run pytest tests/containers -m 'not openshift' --image quay.io/opendatahub/workbench-images@sha256:e98d19df346e7abb1fa3053f6d41f0d1fa9bab39e49b4cb90b510ca33452c2e4
```
When using lima on macOS, it might be useful to give yourself access to rootful podman socket
diff --git c/pyproject.toml i/pyproject.toml
index 9271a4c..6440b12 100644
--- c/pyproject.toml
+++ i/pyproject.toml
@@ -8,6 +8,7 @@ package-mode = false
[tool.poetry.dependencies]
python = "~3.12"
+requests = "^2.32.3"
[tool.poetry.group.dev.dependencies]
diff --git c/pytest.ini i/pytest.ini
index 2b320d7..aff2508 100644
--- c/pytest.ini
+++ i/pytest.ini
@@ -15,3 +15,5 @@ log_cli_level = INFO
log_file = logs/pytest-logs.txt
log_file_level = DEBUG
+
+markers = openshift
diff --git c/tests/containers/base_image_test.py i/tests/containers/base_image_test.py
index 03f3d9a..b7e0049 100644
--- c/tests/containers/base_image_test.py
+++ i/tests/containers/base_image_test.py
@@ -11,12 +11,13 @@ import tempfile
import textwrap
from typing import TYPE_CHECKING, Any, Callable
-import pytest
import testcontainers.core.container
import testcontainers.core.waiting_utils
from tests.containers import docker_utils
+import pytest
+
logging.basicConfig(level=logging.DEBUG)
LOGGER = logging.getLogger(__name__)
@@ -72,7 +73,8 @@ class TestBaseImage:
if "not found" in line:
unsatisfied_deps.append((dlib, line.strip()))
assert output
- print("OUTPUT>", json.dumps({"dir": path, "count_scanned": count_scanned, "unsatisfied": unsatisfied_deps}))
+ print("OUTPUT>",
+ json.dumps({"dir": path, "count_scanned": count_scanned, "unsatisfied": unsatisfied_deps}))
try:
container.start()
@@ -105,18 +107,7 @@ class TestBaseImage:
with subtests.test(f"{dlib=}"):
pytest.fail(f"{dlib=} has unsatisfied dependencies {deps=}")
- def test_oc_command_runs(self, image: str):
- container = testcontainers.core.container.DockerContainer(image=image, user=23456, group_add=[0])
- container.with_command("/bin/sh -c 'sleep infinity'")
- try:
- container.start()
- ecode, output = container.exec(["/bin/sh", "-c", "oc version"])
- finally:
- docker_utils.NotebookContainer(container).stop(timeout=0)
-
- logging.debug(output.decode())
- assert ecode == 0
-
+ # @pytest.mark.environmentss("docker")
def test_oc_command_runs_fake_fips(self, image: str, subtests: pytest_subtests.SubTests):
"""Establishes a best-effort fake FIPS environment and attempts to execute `oc` binary in it.
@@ -190,7 +181,8 @@ class TestBaseImage:
docker_utils.NotebookContainer(container).stop(timeout=0)
-def encode_python_function_execution_command_interpreter(python: str, function: Callable[..., Any], *args: list[Any]) -> list[str]:
+def encode_python_function_execution_command_interpreter(python: str, function: Callable[..., Any], *args: list[Any]) -> \
+ list[str]:
"""Returns a cli command that will run the given Python function encoded inline.
All dependencies (imports, ...) must be part of function body."""
code = textwrap.dedent(inspect.getsource(function))
diff --git c/tests/containers/cancellation_token.py i/tests/containers/cancellation_token.py
new file mode 100644
index 00000000..d7d6260
--- /dev/null
+++ i/tests/containers/cancellation_token.py
@@ -0,0 +1,37 @@
+import os
+import threading
+
+
+class CancellationToken:
+ """Flag to signal a thread it should cancel itself.
+ This cooperative cancellation pattern is commonly used in c# and go
+ See https://learn.microsoft.com/en-us/dotnet/api/system.threading.cancellationtoken?view=net-9.0
+ """
+
+ def __init__(self):
+ # consider using the wrapt.synchronized decorator
+ # https://github.com/GrahamDumpleton/wrapt/blob/develop/blog/07-the-missing-synchronized-decorator.md
+ self._lock = threading.Lock()
+ self._canceled = False
+ # something selectable avoids having to use short timeout in select
+ self._read_fd, self._write_fd = os.pipe()
+
+ def fileno(self):
+ """This lets us use the token in select() calls"""
+ return self._read_fd
+
+ @Property
+ def cancelled(self):
+ with self._lock:
+ return self._canceled
+
+ def cancel(self):
+ with self._lock:
+ os.write(self._write_fd, b'x')
+ self._canceled = True
+
+ def __del__(self):
+ # consider https://docs.python.org/3/library/weakref.html#weakref.finalize
+ with self._lock:
+ os.close(self._read_fd)
+ os.close(self._write_fd)
diff --git c/tests/containers/kubernetes_utils.py i/tests/containers/kubernetes_utils.py
new file mode 100644
index 00000000..6682982
--- /dev/null
+++ i/tests/containers/kubernetes_utils.py
@@ -0,0 +1,473 @@
+from __future__ import annotations
+
+import contextlib
+import functools
+import logging
+import threading
+import time
+import traceback
+import typing
+import socket
+from socket import socket
+from typing import Any, Callable, Generator
+
+import requests
+
+import kubernetes
+import kubernetes.dynamic.exceptions
+import kubernetes.stream.ws_client
+import kubernetes.dynamic.exceptions
+import kubernetes.stream.ws_client
+import kubernetes.client.api.core_v1_api
+from kubernetes.dynamic import DynamicClient, ResourceField
+
+import ocp_resources.pod
+import ocp_resources.deployment
+import ocp_resources.service
+import ocp_resources.persistent_volume_claim
+import ocp_resources.project_request
+import ocp_resources.namespace
+import ocp_resources.project_project_openshift_io
+import ocp_resources.deployment
+import ocp_resources.resource
+import ocp_resources.pod
+import ocp_resources.namespace
+import ocp_resources.project_project_openshift_io
+import ocp_resources.project_request
+
+from tests.containers import socket_proxy
+
+
+class TestFrameConstants:
+ GLOBAL_POLL_INTERVAL_MEDIUM = 10
+ TIMEOUT_2MIN = 2 * 60
+
+
+logging.basicConfig(level=logging.DEBUG)
+LOGGER = logging.getLogger(__name__)
+
+
+# https://github.com/RedHatQE/openshift-python-wrapper/tree/main/examples
+
+def get_client() -> kubernetes.dynamic.DynamicClient:
+ try:
+ # client = kubernetes.dynamic.DynamicClient(client=kubernetes.config.new_client_from_config())
+ # probably same as above
+ client = ocp_resources.resource.get_client()
+ return client
+ except kubernetes.config.ConfigException as e:
+ # probably bad config
+ logging.error(e)
+ except kubernetes.dynamic.exceptions.UnauthorizedError as e:
+ # wrong or expired credentials
+ logging.error(e)
+ except kubernetes.client.ApiException as e:
+ # unexpected, we catch unauthorized above
+ logging.error(e)
+ except Exception as e:
+ # unexpected error, assert here
+ logging.error(e)
+
+ raise RuntimeError("Failed to instantiate client")
+
+
+def get_username(client: kubernetes.dynamic.DynamicClient) -> str:
+ # can't just access
+ # > client.configuration.username
+ # because we normally auth using tokens, not username and password
+
+ # this is what kubectl does (see kubectl -v8 auth whoami)
+ self_subject_review_resource: kubernetes.dynamic.Resource = client.resources.get(
+ api_version="authentication.k8s.io/v1", kind="SelfSubjectReview"
+ )
+ self_subject_review: kubernetes.dynamic.ResourceInstance = client.create(self_subject_review_resource)
+ username: str = self_subject_review.status.userInfo.username
+ return username
+
+
+class TestKubernetesUtils:
+ def test_get_username(self):
+ client = get_client()
+ username = get_username(client)
+ assert username is not None and len(username) > 0
+
+
+class TestFrame:
+ def __init__[T](self):
+ self.stack: list[tuple[T, Callable[[T], None] | None]] = []
+
+ def defer_resource[T: ocp_resources.resource.Resource](self, resource: T, wait=False,
+ destructor: Callable[[T], None] | None = None) -> T:
+ result = resource.deploy(wait=wait)
+ self.defer(resource, destructor)
+ return result
+
+ def add[T](self, resource: T, destructor: Callable[[T], None] = None) -> T:
+ self.defer(resource, destructor)
+ return resource
+
+ def defer[T](self, resource: T, destructor: Callable[[T], None] = None) -> T:
+ self.stack.append((resource, destructor))
+
+ def destroy(self, wait=False):
+ while self.stack:
+ resource, destructor = self.stack.pop()
+ if destructor is not None:
+ destructor(resource)
+ else:
+ resource.clean_up(wait=wait)
+
+ def __enter__(self) -> TestFrame:
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.destroy(wait=True)
+
+
+class ImageDeployment:
+ def __init__(self, client: kubernetes.dynamic.DynamicClient, image: str):
+ self.client = client
+ self.image = image
+ self.tf = TestFrame()
+
+ def __enter__(self) -> ImageDeployment:
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.tf.destroy()
+
+ def deploy(self, container_name: str) -> None:
+ LOGGER.debug(f"Deploying {self.image}")
+ # custom namespace is necessary, because we cannot assign a SCC to pods created in one of the default namespaces:
+ # default, kube-system, kube-public, openshift-node, openshift-infra, openshift.
+ # https://docs.openshift.com/container-platform/4.17/authentication/managing-security-context-constraints.html#role-based-access-to-ssc_configuring-internal-oauth
+
+ # TODO(jdanek): sort out how we want to work with privileged/unprivileged client
+ # take inspiration from odh-tests
+ ns = create_namespace(privileged_client=True, name=f"test-ns-{container_name}")
+ self.tf.defer_resource(ns)
+
+ pvc = ocp_resources.persistent_volume_claim.PersistentVolumeClaim(
+ name=container_name,
+ namespace=ns.name,
+ accessmodes=ocp_resources.persistent_volume_claim.PersistentVolumeClaim.AccessMode.RWO,
+ volume_mode=ocp_resources.persistent_volume_claim.PersistentVolumeClaim.VolumeMode.FILE,
+ size="1Gi",
+ )
+ self.tf.defer_resource(pvc, wait=True)
+ deployment = ocp_resources.deployment.Deployment(
+ client=self.client,
+ name=container_name,
+ namespace=ns.name,
+ selector={"matchLabels": {"app": container_name}},
+ replicas=1,
+ template={
+ "metadata": {
+ "annotations": {
+ # This will result in the container spec having something like below,
+ # regardless of what kind of namespace this is being run in.
+ # Keep in mind that `default` is a privileged namespace and this annotation has no effect there.
+ # ```
+ # spec:
+ # securityContext:
+ # seLinuxOptions:
+ # level: 's0:c34,c4'
+ # fsGroup: 1001130000
+ # seccompProfile:
+ # type: RuntimeDefault
+ # ```
+ "openshift.io/scc": "restricted-v2"
+ },
+ "labels": {
+ "app": container_name,
+ }
+ },
+ "spec": {
+ "containers": [
+ {
+ "name": container_name,
+ "image": self.image,
+ # "command": ["/bin/sh", "-c", "while true ; do date; sleep 5; done;"],
+ "ports": [
+ {
+ "containerPort": 8888,
+ "name": "notebook-port",
+ "protocol": "TCP",
+ }
+ ],
+ # rstudio will not start without its volume mount and it does not log the error for it
+ # See the testcontainers implementation of this (the tty=True part)
+ "volumeMounts": [
+ {
+ "mountPath": "/opt/app-root/src",
+ "name": "my-workbench"
+ }
+ ],
+ },
+ ],
+ "volumes": [
+ {
+ "name": "my-workbench",
+ "persistentVolumeClaim": {
+ "claimName": container_name,
+ }
+ }
+ ]
+ }
+ }
+ )
+ self.tf.defer_resource(deployment)
+ LOGGER.debug(f"Waiting for pods to become ready...")
+ PodUtils.wait_for_pods_ready(self.client, namespace_name=ns.name, label_selector=f"app={container_name}",
+ expect_pods_count=1)
+
+ core_v1_api = kubernetes.client.api.core_v1_api.CoreV1Api(api_client=self.client.client)
+ pod_name: kubernetes.client.models.v1_pod_list.V1PodList = core_v1_api.list_namespaced_pod(
+ namespace=ns.name,
+ label_selector=f"app={container_name}"
+ )
+ assert len(pod_name.items) == 1
+ pod: kubernetes.client.models.v1_pod.V1Pod = pod_name.items[0]
+
+ p = socket_proxy.SocketProxy(exposing_contextmanager(core_v1_api, pod), "localhost", 0)
+ t = threading.Thread(target=p.listen_and_serve_until_canceled)
+ t.start()
+ self.tf.defer(t, lambda thread: thread.join())
+ self.tf.defer(p.cancellation_token, lambda token: token.cancel())
+
+ self.port = p.get_actual_port()
+ LOGGER.debug(f"Listening on port {self.port}")
+ resp = requests.get(f"http://localhost:{self.port}")
+ assert resp.status_code == 200
+ LOGGER.debug(f"Done with portforward")
+
+
+class PodUtils:
+ READINESS_TIMEOUT = TestFrameConstants.TIMEOUT_2MIN
+
+ # consider using timeout_sampler
+ @staticmethod
+ def wait_for_pods_ready(
+ client: DynamicClient, namespace_name: str, label_selector: str, expect_pods_count: int
+ ) -> None:
+ """Wait for all pods in namespace to be ready
+ :param client:
+ :param namespace_name: name of the namespace
+ :param label_selector:
+ :param expect_pods_count:
+ """
+
+ # it's a dynamic client with the `resource` parameter already filled in
+ class ResourceType(kubernetes.dynamic.Resource, kubernetes.dynamic.DynamicClient):
+ pass
+
+ resource: ResourceType = client.resources.get(
+ kind=ocp_resources.pod.Pod.kind,
+ api_version=ocp_resources.pod.Pod.api_version,
+ )
+
+ def ready() -> bool:
+ pods = resource.get(namespace=namespace_name, label_selector=label_selector).items
+ if not pods and expect_pods_count == 0:
+ logging.debug("All expected Pods %s in Namespace %s are ready", label_selector, namespace_name)
+ return True
+ if not pods:
+ logging.debug("Pods matching %s/%s are not ready", namespace_name, label_selector)
+ return False
+ if len(pods) != expect_pods_count:
+ logging.debug("Expected Pods %s/%s are not ready", namespace_name, label_selector)
+ return False
+ pod: ResourceField
+ for pod in pods:
+ if not Readiness.is_pod_ready(pod) and not Readiness.is_pod_succeeded(pod):
+ if not pod.status.containerStatuses:
+ pod_status = pod.status
+ else:
+ pod_status = {cs.name: cs.state for cs in pod.status.containerStatuses}
+
+ logging.debug("Pod is not ready: %s/%s (%s)",
+ namespace_name, pod.metadata.name, pod_status)
+ return False
+ else:
+ # check all containers in pods are ready
+ for cs in pod.status.containerStatuses:
+ if not (cs.ready or cs.state.get("terminated", {}).get("reason", "") == "Completed"):
+ logging.debug(
+ f"Container {cs.getName()} of Pod {namespace_name}/{pod.metadata.name} not ready ({cs.state=})"
+ )
+ return False
+ logging.info("Pods matching %s/%s are ready", namespace_name, label_selector)
+ return True
+
+ Wait.until(
+ description=f"readiness of all Pods matching {label_selector} in Namespace {namespace_name}",
+ poll_interval=TestFrameConstants.GLOBAL_POLL_INTERVAL_MEDIUM,
+ timeout=PodUtils.READINESS_TIMEOUT,
+ ready=ready,
+ )
+
+
+class Wait:
+ @staticmethod
+ def until(
+ description: str,
+ poll_interval: float,
+ timeout: float,
+ ready: Callable[[], bool],
+ on_timeout: Callable[[], None] | None = None,
+ ) -> None:
+ """For every poll (happening once each {@code pollIntervalMs}) checks if supplier {@code ready} is true.
+
+ If yes, the wait is closed. Otherwise, waits another {@code pollIntervalMs} and tries again.
+ Once the wait timeout (specified by {@code timeoutMs} is reached and supplier wasn't true until that time,
+ runs the {@code onTimeout} (f.e. print of logs, showing the actual value that was checked inside {@code ready}),
+ and finally throws {@link WaitException}.
+ @param description information about on what we are waiting
+ @param pollIntervalMs poll interval in milliseconds
+ @param timeoutMs timeout specified in milliseconds
+ @param ready {@link BooleanSupplier} containing code, which should be executed each poll,
+ verifying readiness of the particular thing
+ @param onTimeout {@link Runnable} executed once timeout is reached and
+ before the {@link WaitException} is thrown."""
+ logging.info("Waiting for: %s", description)
+ deadline = time.monotonic() + timeout
+
+ exception_message: str | None = None
+ previous_exception_message: str | None = None
+
+ # in case we are polling every 1s, we want to print exception after x tries, not on the first try
+ # for minutes poll interval will 2 be enough
+ exception_appearance_count: int = 2 if (poll_interval // 60) > 0 else max(int(timeout // poll_interval // 4), 2)
+ exception_count: int = 0
+ new_exception_appearance: int = 0
+
+ stack_trace_error: str | None = None
+
+ while True:
+ try:
+ result: bool = ready()
+ except KeyboardInterrupt:
+ raise # quick exit if the user gets tired of waiting
+ except Exception as e:
+ exception_message = str(e)
+
+ exception_count += 1
+ new_exception_appearance += 1
+ if (
+ exception_count == exception_appearance_count
+ and exception_message is not None
+ and exception_message == previous_exception_message
+ ):
+ logging.info(f"While waiting for: {description} exception occurred: {exception_message}")
+ # log the stacktrace
+ stack_trace_error = traceback.format_exc()
+ elif (
+ exception_message is not None
+ and exception_message != previous_exception_message
+ and new_exception_appearance == 2
+ ):
+ previous_exception_message = exception_message
+
+ result = False
+
+ time_left: float = deadline - time.monotonic()
+ if result:
+ return
+ if time_left <= 0:
+ if exception_count > 1:
+ logging.error("Exception waiting for: %s, %s", description, exception_message)
+
+ if stack_trace_error is not None:
+ # printing handled stacktrace
+ logging.error(stack_trace_error)
+ if on_timeout is not None:
+ on_timeout()
+ wait_exception: WaitException = WaitException(f"Timeout after {timeout} s waiting for {description}")
+ logging.error(wait_exception)
+ raise wait_exception
+
+ sleep_time: float = min(poll_interval, time_left)
+ time.sleep(sleep_time) # noqa: FCN001
+
+
+class WaitException(Exception):
+ pass
+
+
+class Readiness:
+ @staticmethod
+ def is_pod_ready(pod: ResourceField) -> bool:
+ Utils.check_not_none(value=pod, message="Pod can't be null.")
+
+ condition = ocp_resources.pod.Pod.Condition.READY
+ status = ocp_resources.pod.Pod.Condition.Status.TRUE
+ for cond in pod.get("status", {}).get("conditions", []):
+ if cond["type"] == condition and cond["status"].casefold() == status.casefold():
+ return True
+ return False
+
+ @staticmethod
+ def is_pod_succeeded(pod: ResourceField) -> bool:
+ Utils.check_not_none(value=pod, message="Pod can't be null.")
+ return pod.status is not None and "Succeeded" == pod.status.phase
+
+
+class Utils:
+ @staticmethod
+ def check_not_none(value: Any, message: str) -> None:
+ if value is None:
+ raise ValueError(message)
+
+
+@contextlib.contextmanager
+def exposing_contextmanager(
+ core_v1_api: kubernetes.client.CoreV1Api,
+ pod: kubernetes.client.models.V1Pod
+) -> Generator[socket, None, None]:
+ # If we e.g., specify the wrong port, the pf = portforward() call succeeds,
+ # but pf.connected will later flip to False
+ # we need to check that _everything_ works before moving on
+ pf = None
+ s = None
+ while not pf or not pf.connected or not s:
+ pf: kubernetes.stream.ws_client.PortForward = kubernetes.stream.portforward(
+ api_method=core_v1_api.connect_get_namespaced_pod_portforward,
+ name=pod.metadata.name,
+ namespace=pod.metadata.namespace,
+ ports=",".join(str(p) for p in [8888]),
+ )
+ s: typing.Union[kubernetes.stream.ws_client.PortForward._Port._Socket, socket.socket] | None = pf.socket(8888)
+ assert s, "Failed to establish connection"
+
+ try:
+ yield s
+ finally:
+ s.close()
+ pf.close()
+
+
+@functools.wraps(ocp_resources.namespace.Namespace.__init__)
+def create_namespace(privileged_client: bool = False, *args,
+ **kwargs) -> ocp_resources.project_project_openshift_io.Project:
+ if not privileged_client:
+ with ocp_resources.project_request.ProjectRequest(*args, **kwargs):
+ project = ocp_resources.project_project_openshift_io.Project(*args, **kwargs)
+ project.wait_for_status(status=project.Status.ACTIVE, timeout=TestFrameConstants.TIMEOUT_2MIN)
+ return project
+ else:
+ with ocp_resources.namespace.Namespace(*args, **kwargs) as ns:
+ ns.wait_for_status(status=ocp_resources.namespace.Namespace.Status.ACTIVE,
+ timeout=TestFrameConstants.TIMEOUT_2MIN)
+ return ns
+
+
+__all__ = [
+ get_client,
+ get_username,
+ exposing_contextmanager,
+ create_namespace,
+ PodUtils,
+ TestFrame,
+ TestFrameConstants,
+ ImageDeployment,
+]
diff --git c/tests/containers/socket_proxy.py i/tests/containers/socket_proxy.py
new file mode 100644
index 00000000..77fc348
--- /dev/null
+++ i/tests/containers/socket_proxy.py
@@ -0,0 +1,173 @@
+from __future__ import annotations
+
+import contextlib
+import logging
+import socket
+import select
+import threading
+import subprocess
+import typing
+
+from tests.containers.cancellation_token import CancellationToken
+
+"""Proxies kubernetes portforwards to a local port.
+
+This is implemented as a thread running select() loop and managing the sockets.
+
+There are alternative implementations for this.
+
+1) Run oc port-forward in a subprocess
+* There isn't a nice way where kubectl would report in machine-readable way the
+ port number, kubernetes/kubectl#1190 (comment)
+2) Use the socket as is, mount a custom adaptor to the requests library
+* The code to do this is weird. This is what docker-py does w.r.t. the docker socket.
+ It defines a custom 'http+docker://' protocol, and an adaptor for it, that uses the docker socket.
+3) Implement proxy using asyncio
+* There are advantages to asyncio, but since we don't have Python asyncio anywhere else yet,
+ it is probably best to avoid using asyncio.
+
+Out of these, the oc port-forward subprocess is a decent alternative solution.
+"""
+
+class SubprocessProxy:
+ #
+ def __init__(self, namespace: str, name: str, port: int):
+ self.namespace = namespace
+ self.name = name
+ self.port = port
+
+ def start(self):
+ self.forwarder = subprocess.Popen(
+ ["kubectl", "port-forward", self.namespace, self.name],
+ text=True,
+ )
+ self.forwarder.communicate()
+
+ def stop(self):
+ self.forwarder.terminate()
+
+
+class SocketProxy:
+ def __init__(
+ self,
+ remote_socket_factory: typing.ContextManager[socket.socket],
+ local_host: str = "localhost",
+ local_port: int = 0,
+ buffer_size: int = 4096
+ ) -> None:
+ """
+
+ :param local_host: probably "localhost" would make most sense here
+ :param local_port: usually leave as to 0, which will make the OS choose a free port
+ :param remote_socket_factory: this is a context manager for kubernetes port forwarding
+ :param buffer_size: do not poke it, leave this at the default value
+ """
+ self.local_host = local_host
+ self.local_port = local_port
+ self.buffer_size = buffer_size
+ self.remote_socket_factory = remote_socket_factory
+
+ self.cancellation_token = CancellationToken()
+
+ self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.server_socket.bind((self.local_host, self.local_port))
+ self.server_socket.listen(1)
+ logging.info(f"Proxy listening on {self.local_host}:{self.local_port}")
+
+ def listen_and_serve_until_canceled(self):
+ """Accepts the client, creates a new socket to the remote, and proxies the data.
+
+ Handles at most one client at a time. """
+ try:
+ while not self.cancellation_token.cancelled:
+ client_socket, addr = self.server_socket.accept()
+ logging.info(f"Accepted connection from {addr[0]}:{addr[1]}")
+ self._handle_client(client_socket)
+ except Exception as e:
+ logging.exception(f"Proxying failed to listen", exc_info=e)
+ raise
+ finally:
+ self.server_socket.close()
+
+ def get_actual_port(self) -> int:
+ """Returns the port that the proxy is listening on.
+ When port number 0 was passed in, this will return the actual randomly assigned port."""
+ return self.server_socket.getsockname()[1]
+
+ def _handle_client(self, client_socket):
+ with client_socket as _, self.remote_socket_factory as remote_socket:
+ while True:
+ readable, _, _ = select.select([client_socket, remote_socket, self.cancellation_token], [], [])
+
+ if self.cancellation_token.cancelled:
+ break
+
+ if client_socket in readable:
+ data = client_socket.recv(self.buffer_size)
+ if not data:
+ break
+ remote_socket.send(data)
+
+ if remote_socket in readable:
+ data = remote_socket.recv(self.buffer_size)
+ if not data:
+ break
+ client_socket.send(data)
+
+
+if __name__ == "__main__":
+ """Sample application to show how this can work."""
+
+
+ @contextlib.contextmanager
+ def remote_socket_factory():
+ class MockServer(threading.Thread):
+ def __init__(self, local_host: str = "localhost", local_port: int = 0):
+ self.local_host = local_host
+ self.local_port = local_port
+
+ self.is_socket_bound = threading.Event()
+
+ super().__init__()
+
+ def run(self):
+ self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.server_socket.bind((self.local_host, self.local_port))
+ self.server_socket.listen(1)
+ print(f"MockServer listening on {self.local_host}:{self.local_port}")
+ self.is_socket_bound.set()
+
+ client_socket, addr = self.server_socket.accept()
+ logging.info(f"MockServer accepted connection from {addr[0]}:{addr[1]}")
+
+ client_socket.send(b"Hello World\n")
+ client_socket.close()
+
+ def get_actual_port(self):
+ self.is_socket_bound.wait()
+ return self.server_socket.getsockname()[1]
+
+ server = MockServer()
+ server.start()
+
+ client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ client_socket.connect(("localhost", server.get_actual_port()))
+
+ yield client_socket
+
+ client_socket.close()
+ server.join()
+
+
+ proxy = SocketProxy(remote_socket_factory(), "localhost", 0)
+ thread = threading.Thread(target=proxy.listen_and_serve_until_canceled)
+ thread.start()
+
+ client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ client_socket.connect(("localhost", proxy.get_actual_port()))
+
+ print(client_socket.recv(1024)) # prints Hello World
+
+ thread.join()
diff --git c/tests/containers/workbenches/workbench_image_test.py i/tests/containers/workbenches/workbench_image_test.py
index cbfb7da..892f775 100644
--- c/tests/containers/workbenches/workbench_image_test.py
+++ i/tests/containers/workbenches/workbench_image_test.py
@@ -21,7 +21,7 @@ import testcontainers.core.waiting_utils
import pytest
import pytest_subtests
-from tests.containers import docker_utils, podman_machine_utils
+from tests.containers import docker_utils, podman_machine_utils, kubernetes_utils
class TestWorkbenchImage:
@@ -108,6 +108,18 @@ class TestWorkbenchImage:
finally:
docker_utils.NotebookContainer(container).stop(timeout=0)
+ @pytest.mark.openshift
+ def test_image_run_on_openshift(self, image: str):
+ skip_if_not_workbench_image(image)
+
+ client = kubernetes_utils.get_client()
+ print(client)
+
+ username = kubernetes_utils.get_username(client)
+ print(username)
+
+ with kubernetes_utils.ImageDeployment(client, image) as image:
+ image.deploy(container_name="notebook-tests-pod")
class WorkbenchContainer(testcontainers.core.container.DockerContainer):
@functools.wraps(testcontainers.core.container.DockerContainer.__init__)
* fixup, remove the add function in TestFrame
* fixup, adjust to changes after rebase
* fixup, restore test_oc_command_runs
* fixup, undo intellij reformat changes1 parent ff24dc0 commit d954ab6
File tree
10 files changed
+2119
-60
lines changed- .github/workflows
- tests/containers
- workbenches
10 files changed
+2119
-60
lines changed| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
296 | 296 | | |
297 | 297 | | |
298 | 298 | | |
299 | | - | |
| 299 | + | |
300 | 300 | | |
301 | 301 | | |
302 | | - | |
| 302 | + | |
303 | 303 | | |
304 | 304 | | |
305 | 305 | | |
| |||
445 | 445 | | |
446 | 446 | | |
447 | 447 | | |
| 448 | + | |
| 449 | + | |
| 450 | + | |
| 451 | + | |
| 452 | + | |
| 453 | + | |
| 454 | + | |
| 455 | + | |
| 456 | + | |
| 457 | + | |
448 | 458 | | |
449 | 459 | | |
450 | 460 | | |
| |||
455 | 465 | | |
456 | 466 | | |
457 | 467 | | |
| 468 | + | |
| 469 | + | |
| 470 | + | |
| 471 | + | |
| 472 | + | |
| 473 | + | |
| 474 | + | |
| 475 | + | |
| 476 | + | |
| 477 | + | |
| 478 | + | |
| 479 | + | |
458 | 480 | | |
459 | 481 | | |
460 | 482 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
105 | 105 | | |
106 | 106 | | |
107 | 107 | | |
108 | | - | |
| 108 | + | |
109 | 109 | | |
110 | 110 | | |
111 | 111 | | |
112 | 112 | | |
113 | 113 | | |
114 | 114 | | |
115 | 115 | | |
116 | | - | |
| 116 | + | |
117 | 117 | | |
118 | 118 | | |
119 | 119 | | |
| |||
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
8 | 8 | | |
9 | 9 | | |
10 | 10 | | |
| 11 | + | |
11 | 12 | | |
12 | 13 | | |
13 | 14 | | |
| |||
18 | 19 | | |
19 | 20 | | |
20 | 21 | | |
| 22 | + | |
| 23 | + | |
21 | 24 | | |
22 | 25 | | |
23 | 26 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
15 | 15 | | |
16 | 16 | | |
17 | 17 | | |
| 18 | + | |
| 19 | + | |
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
11 | 11 | | |
12 | 12 | | |
13 | 13 | | |
14 | | - | |
15 | 14 | | |
16 | 15 | | |
17 | 16 | | |
18 | 17 | | |
19 | 18 | | |
| 19 | + | |
| 20 | + | |
20 | 21 | | |
21 | 22 | | |
22 | 23 | | |
| |||
72 | 73 | | |
73 | 74 | | |
74 | 75 | | |
75 | | - | |
| 76 | + | |
| 77 | + | |
76 | 78 | | |
77 | 79 | | |
78 | 80 | | |
| |||
117 | 119 | | |
118 | 120 | | |
119 | 121 | | |
| 122 | + | |
120 | 123 | | |
121 | 124 | | |
122 | 125 | | |
| |||
140 | 143 | | |
141 | 144 | | |
142 | 145 | | |
143 | | - | |
| 146 | + | |
| 147 | + | |
144 | 148 | | |
145 | 149 | | |
146 | 150 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
| 1 | + | |
| 2 | + | |
| 3 | + | |
| 4 | + | |
| 5 | + | |
| 6 | + | |
| 7 | + | |
| 8 | + | |
| 9 | + | |
| 10 | + | |
| 11 | + | |
| 12 | + | |
| 13 | + | |
| 14 | + | |
| 15 | + | |
| 16 | + | |
| 17 | + | |
| 18 | + | |
| 19 | + | |
| 20 | + | |
| 21 | + | |
| 22 | + | |
| 23 | + | |
| 24 | + | |
| 25 | + | |
| 26 | + | |
| 27 | + | |
| 28 | + | |
| 29 | + | |
| 30 | + | |
| 31 | + | |
| 32 | + | |
| 33 | + | |
| 34 | + | |
| 35 | + | |
| 36 | + | |
| 37 | + | |
0 commit comments