Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 84 additions & 20 deletions lib/runtime/src/discovery/kube.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,30 @@ use async_trait::async_trait;
use kube::Client as KubeClient;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::sync::{OnceCell, RwLock};
use tokio::task::JoinHandle;

/// Kubernetes-based discovery client
#[derive(Clone)]
pub struct KubeDiscoveryClient {
instance_id: u64,
metadata: Arc<RwLock<DiscoveryMetadata>>,
kube_client: KubeClient,
pod_info: PodInfo,
cancel_token: CancellationToken,
daemon_state: Arc<OnceCell<Arc<DaemonHandles>>>,
}

struct DaemonHandles {
metadata_watch: tokio::sync::watch::Receiver<Arc<MetadataSnapshot>>,
#[allow(dead_code)]
daemon_handle: JoinHandle<()>,
}

impl DaemonHandles {
fn receiver(&self) -> tokio::sync::watch::Receiver<Arc<MetadataSnapshot>> {
self.metadata_watch.clone()
}
}

impl KubeDiscoveryClient {
Expand All @@ -53,26 +69,51 @@ impl KubeDiscoveryClient {
.await
.map_err(|e| anyhow::anyhow!("Failed to create Kubernetes client: {}", e))?;

// Create watch channel with initial empty snapshot
Copy link
Contributor

@mohammedabdulwahhab mohammedabdulwahhab Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add an env var that spawns the metadata daemon anyway in the init. The operator could inject an env var on the frontend (where we know we need clients).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we update the dgd and the docs here with this env var: https://github.com/ai-dynamo/dynamo/blob/main/deploy/discovery/dgd.yaml

let (watch_tx, watch_rx) = tokio::sync::watch::channel(Arc::new(MetadataSnapshot::empty()));

// Create and spawn daemon
let daemon = DiscoveryDaemon::new(kube_client, pod_info, cancel_token)?;

tokio::spawn(async move {
if let Err(e) = daemon.run(watch_tx).await {
tracing::error!("Discovery daemon failed: {}", e);
}
});

tracing::info!("Discovery daemon started");

Ok(Self {
instance_id,
metadata,
metadata_watch: watch_rx,
kube_client,
pod_info,
cancel_token,
daemon_state: Arc::new(OnceCell::new()),
})
}

async fn metadata_watch(&self) -> Result<tokio::sync::watch::Receiver<Arc<MetadataSnapshot>>> {
let handles = self
.daemon_state
.get_or_try_init(|| {
let instance_id = self.instance_id;
let kube_client = self.kube_client.clone();
let pod_info = self.pod_info.clone();
let cancel_token = self.cancel_token.clone();

async move {
let (watch_tx, watch_rx) =
tokio::sync::watch::channel(Arc::new(MetadataSnapshot::empty()));
let daemon = DiscoveryDaemon::new(kube_client, pod_info, cancel_token)?;

let daemon_handle = tokio::spawn(async move {
if let Err(e) = daemon.run(watch_tx).await {
tracing::error!("Discovery daemon failed: {}", e);
}
});

tracing::info!(
"Discovery daemon started lazily for instance_id={:x}",
instance_id
);

Ok::<Arc<DaemonHandles>, anyhow::Error>(Arc::new(DaemonHandles {
metadata_watch: watch_rx,
daemon_handle,
}))
}
})
.await?;

Ok(handles.receiver())
}
}

#[async_trait]
Expand Down Expand Up @@ -144,8 +185,31 @@ impl Discovery for KubeDiscoveryClient {
async fn list(&self, query: DiscoveryQuery) -> Result<Vec<DiscoveryInstance>> {
tracing::debug!("KubeDiscoveryClient::list called with query={:?}", query);

// Get current snapshot (may be empty if daemon hasn't fetched yet)
let snapshot = self.metadata_watch.borrow().clone();
// Ensure the daemon is running before accessing the snapshot
let mut metadata_watch = self.metadata_watch().await?;

// Check if we need to wait for initial snapshot.
// With lazy daemon start, the first call to list() triggers the daemon which needs time
// to poll other pods and aggregate their metadata before we have meaningful results.
let needs_wait = {
let snapshot = metadata_watch.borrow();
snapshot.sequence == 0 && snapshot.instances.is_empty()
};

// Wait for daemon to fetch at least one snapshot if this is the initial empty state
if needs_wait {
tracing::debug!("Waiting for initial discovery snapshot...");
// Wait for first update with a timeout
tokio::time::timeout(std::time::Duration::from_secs(10), metadata_watch.changed())
.await
.map_err(|_| anyhow::anyhow!("Timeout waiting for initial discovery snapshot"))?
.map_err(|_| {
anyhow::anyhow!("Discovery daemon stopped before providing snapshot")
})?;
}

// Get current snapshot
let snapshot = metadata_watch.borrow().clone();

tracing::debug!(
"List using snapshot seq={} with {} instances",
Expand Down Expand Up @@ -177,8 +241,8 @@ impl Discovery for KubeDiscoveryClient {
query
);

// Clone the watch receiver
let mut watch_rx = self.metadata_watch.clone();
// Clone the watch receiver (starts daemon lazily on demand)
let mut watch_rx = self.metadata_watch().await?;

// Create output stream
let (event_tx, event_rx) = mpsc::unbounded_channel();
Expand Down
Loading