diff --git a/lib/runtime/src/discovery/kube.rs b/lib/runtime/src/discovery/kube.rs index 5fd65c9f30..c39439212b 100644 --- a/lib/runtime/src/discovery/kube.rs +++ b/lib/runtime/src/discovery/kube.rs @@ -19,14 +19,30 @@ use async_trait::async_trait; use kube::Client as KubeClient; use std::collections::HashSet; use std::sync::Arc; -use tokio::sync::RwLock; +use tokio::sync::{OnceCell, RwLock}; +use tokio::task::JoinHandle; /// Kubernetes-based discovery client #[derive(Clone)] pub struct KubeDiscoveryClient { instance_id: u64, metadata: Arc>, + kube_client: KubeClient, + pod_info: PodInfo, + cancel_token: CancellationToken, + daemon_state: Arc>>, +} + +struct DaemonHandles { metadata_watch: tokio::sync::watch::Receiver>, + #[allow(dead_code)] + daemon_handle: JoinHandle<()>, +} + +impl DaemonHandles { + fn receiver(&self) -> tokio::sync::watch::Receiver> { + self.metadata_watch.clone() + } } impl KubeDiscoveryClient { @@ -53,26 +69,51 @@ impl KubeDiscoveryClient { .await .map_err(|e| anyhow::anyhow!("Failed to create Kubernetes client: {}", e))?; - // Create watch channel with initial empty snapshot - let (watch_tx, watch_rx) = tokio::sync::watch::channel(Arc::new(MetadataSnapshot::empty())); - - // Create and spawn daemon - let daemon = DiscoveryDaemon::new(kube_client, pod_info, cancel_token)?; - - tokio::spawn(async move { - if let Err(e) = daemon.run(watch_tx).await { - tracing::error!("Discovery daemon failed: {}", e); - } - }); - - tracing::info!("Discovery daemon started"); - Ok(Self { instance_id, metadata, - metadata_watch: watch_rx, + kube_client, + pod_info, + cancel_token, + daemon_state: Arc::new(OnceCell::new()), }) } + + async fn metadata_watch(&self) -> Result>> { + let handles = self + .daemon_state + .get_or_try_init(|| { + let instance_id = self.instance_id; + let kube_client = self.kube_client.clone(); + let pod_info = self.pod_info.clone(); + let cancel_token = self.cancel_token.clone(); + + async move { + let (watch_tx, watch_rx) = + tokio::sync::watch::channel(Arc::new(MetadataSnapshot::empty())); + let daemon = DiscoveryDaemon::new(kube_client, pod_info, cancel_token)?; + + let daemon_handle = tokio::spawn(async move { + if let Err(e) = daemon.run(watch_tx).await { + tracing::error!("Discovery daemon failed: {}", e); + } + }); + + tracing::info!( + "Discovery daemon started lazily for instance_id={:x}", + instance_id + ); + + Ok::, anyhow::Error>(Arc::new(DaemonHandles { + metadata_watch: watch_rx, + daemon_handle, + })) + } + }) + .await?; + + Ok(handles.receiver()) + } } #[async_trait] @@ -144,8 +185,31 @@ impl Discovery for KubeDiscoveryClient { async fn list(&self, query: DiscoveryQuery) -> Result> { tracing::debug!("KubeDiscoveryClient::list called with query={:?}", query); - // Get current snapshot (may be empty if daemon hasn't fetched yet) - let snapshot = self.metadata_watch.borrow().clone(); + // Ensure the daemon is running before accessing the snapshot + let mut metadata_watch = self.metadata_watch().await?; + + // Check if we need to wait for initial snapshot. + // With lazy daemon start, the first call to list() triggers the daemon which needs time + // to poll other pods and aggregate their metadata before we have meaningful results. + let needs_wait = { + let snapshot = metadata_watch.borrow(); + snapshot.sequence == 0 && snapshot.instances.is_empty() + }; + + // Wait for daemon to fetch at least one snapshot if this is the initial empty state + if needs_wait { + tracing::debug!("Waiting for initial discovery snapshot..."); + // Wait for first update with a timeout + tokio::time::timeout(std::time::Duration::from_secs(10), metadata_watch.changed()) + .await + .map_err(|_| anyhow::anyhow!("Timeout waiting for initial discovery snapshot"))? + .map_err(|_| { + anyhow::anyhow!("Discovery daemon stopped before providing snapshot") + })?; + } + + // Get current snapshot + let snapshot = metadata_watch.borrow().clone(); tracing::debug!( "List using snapshot seq={} with {} instances", @@ -177,8 +241,8 @@ impl Discovery for KubeDiscoveryClient { query ); - // Clone the watch receiver - let mut watch_rx = self.metadata_watch.clone(); + // Clone the watch receiver (starts daemon lazily on demand) + let mut watch_rx = self.metadata_watch().await?; // Create output stream let (event_tx, event_rx) = mpsc::unbounded_channel();