Skip to content

Commit 42bcf98

Browse files
Port Forward Dashboard in experimental and classic KubeCluster (#489)
* Port Forward Dashboard in experimental and classic KubeCluster * flake8 and black * updated dashboard_link * overriding dashboard_link property to update port * black formatting Co-authored-by: Jacob Tomlinson <jacobtomlinson@users.noreply.github.com>
1 parent bf1b6a4 commit 42bcf98

File tree

3 files changed

+45
-10
lines changed

3 files changed

+45
-10
lines changed

dask_kubernetes/classic/kubecluster.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import dask.distributed
1313
import distributed.security
1414
from distributed.deploy import SpecCluster, ProcessInterface
15-
from distributed.utils import Log, Logs
15+
from distributed.utils import format_dashboard_link, Log, Logs
1616
import kubernetes_asyncio as kubernetes
1717
from kubernetes_asyncio.client.rest import ApiException
1818

@@ -30,7 +30,10 @@
3030
namespace_default,
3131
escape,
3232
)
33-
from ..common.networking import get_external_address_for_scheduler_service
33+
from ..common.networking import (
34+
get_external_address_for_scheduler_service,
35+
port_forward_dashboard,
36+
)
3437

3538
logger = logging.getLogger(__name__)
3639

@@ -495,6 +498,11 @@ def __init__(
495498
self.kwargs = kwargs
496499
super().__init__(**self.kwargs)
497500

501+
@property
502+
def dashboard_link(self):
503+
host = self.scheduler_address.split("://")[1].split("/")[0].split(":")[0]
504+
return format_dashboard_link(host, self.forwarded_dashboard_port)
505+
498506
def _get_pod_template(self, pod_template, pod_type):
499507
if not pod_template and dask.config.get(
500508
"kubernetes.{}-template".format(pod_type), None
@@ -626,6 +634,10 @@ async def _start(self):
626634

627635
await super()._start()
628636

637+
self.forwarded_dashboard_port = await port_forward_dashboard(
638+
self.name, self.namespace
639+
)
640+
629641
@classmethod
630642
def from_dict(cls, pod_spec, **kwargs):
631643
"""Create cluster with worker pod spec defined by Python dictionary

dask_kubernetes/common/networking.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import time
66
from weakref import finalize
77

8-
from dask.distributed import Client
98
import kubernetes_asyncio as kubernetes
109

1110
from .utils import check_dependency
@@ -89,16 +88,22 @@ async def port_forward_service(service_name, namespace, remote_port, local_port=
8988

9089

9190
async def is_comm_open(ip, port, retries=10):
91+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
9292
while retries > 0:
93-
try:
94-
async with Client(f"tcp://{ip}:{port}", asynchronous=True, timeout=2):
95-
return True
96-
except Exception:
97-
time.sleep(0.5)
93+
result = sock.connect_ex((ip, port))
94+
if result == 0:
95+
return True
96+
else:
97+
time.sleep(2)
9898
retries -= 1
9999
return False
100100

101101

102+
async def port_forward_dashboard(service_name, namespace):
103+
port = await port_forward_service(service_name, namespace, 8787)
104+
return port
105+
106+
102107
async def get_scheduler_address(service_name, namespace):
103108
async with kubernetes.client.api_client.ApiClient() as api_client:
104109
api = kubernetes.client.CoreV1Api(api_client)

dask_kubernetes/experimental/kubecluster.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,14 @@
1212

1313
from distributed.core import Status, rpc
1414
from distributed.deploy import Cluster
15-
16-
from distributed.utils import Log, Logs, LoopRunner, TimeoutError
15+
from distributed.utils import (
16+
Log,
17+
Logs,
18+
LoopRunner,
19+
TimeoutError,
20+
LoopRunner,
21+
format_dashboard_link,
22+
)
1723

1824
from dask_kubernetes.common.auth import ClusterAuth
1925
from dask_kubernetes.operator import (
@@ -23,6 +29,7 @@
2329

2430
from dask_kubernetes.common.networking import (
2531
get_scheduler_address,
32+
port_forward_dashboard,
2633
wait_for_scheduler,
2734
)
2835

@@ -154,6 +161,11 @@ def __init__(
154161
def cluster_name(self):
155162
return f"{self.name}-cluster"
156163

164+
@property
165+
def dashboard_link(self):
166+
host = self.scheduler_address.split("://")[1].split("/")[0].split(":")[0]
167+
return format_dashboard_link(host, self.forwarded_dashboard_port)
168+
157169
async def _start(self):
158170
await ClusterAuth.load_first(self.auth)
159171
cluster_exists = (await self._get_cluster()) is not None
@@ -203,6 +215,9 @@ async def _create_cluster(self):
203215
await wait_for_scheduler(cluster_name, self.namespace)
204216
await wait_for_service(core_api, f"{cluster_name}-service", self.namespace)
205217
self.scheduler_comm = rpc(await self._get_scheduler_address())
218+
self.forwarded_dashboard_port = await port_forward_dashboard(
219+
f"{self.name}-cluster-service", self.namespace
220+
)
206221

207222
async def _connect_cluster(self):
208223
if self.shutdown_on_close is None:
@@ -222,6 +237,9 @@ async def _connect_cluster(self):
222237
await wait_for_scheduler(self.cluster_name, self.namespace)
223238
await wait_for_service(core_api, service_name, self.namespace)
224239
self.scheduler_comm = rpc(await self._get_scheduler_address())
240+
self.forwarded_dashboard_port = await port_forward_dashboard(
241+
f"{self.name}-cluster-service", self.namespace
242+
)
225243

226244
async def _get_cluster(self):
227245
async with kubernetes.client.api_client.ApiClient() as api_client:

0 commit comments

Comments
 (0)