Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 73 additions & 20 deletions lib/runtime/src/discovery/kube.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,20 @@ use async_trait::async_trait;
use kube::Client as KubeClient;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::sync::{OnceCell, RwLock};

/// Kubernetes-based discovery client
#[derive(Clone)]
pub struct KubeDiscoveryClient {
instance_id: u64,
metadata: Arc<RwLock<DiscoveryMetadata>>,
kube_client: KubeClient,
pod_info: PodInfo,
cancel_token: CancellationToken,
daemon_state: Arc<OnceCell<Arc<DaemonHandles>>>,
}

#[derive(Debug)]
struct DaemonHandles {
metadata_watch: tokio::sync::watch::Receiver<Arc<MetadataSnapshot>>,
}

Expand Down Expand Up @@ -53,28 +60,36 @@ impl KubeDiscoveryClient {
.await
.map_err(|e| anyhow::anyhow!("Failed to create Kubernetes client: {}", e))?;

// Create watch channel with initial empty snapshot
let (watch_tx, watch_rx) = tokio::sync::watch::channel(Arc::new(MetadataSnapshot::empty()));

// Create and spawn daemon
let daemon = DiscoveryDaemon::new(kube_client, pod_info, cancel_token)?;

tokio::spawn(async move {
if let Err(e) = daemon.run(watch_tx).await {
tracing::error!("Discovery daemon failed: {}", e);
}
});

tracing::info!("Discovery daemon started");

Ok(Self {
instance_id,
metadata,
metadata_watch: watch_rx,
kube_client,
pod_info,
cancel_token,
daemon_state: Arc::new(OnceCell::new()),
})
}
}

impl Clone for KubeDiscoveryClient {
fn clone(&self) -> Self {
Self {
instance_id: self.instance_id,
metadata: self.metadata.clone(),
kube_client: self.kube_client.clone(),
pod_info: self.pod_info.clone(),
cancel_token: self.cancel_token.clone(),
daemon_state: self.daemon_state.clone(),
}
}
}

impl DaemonHandles {
fn receiver(&self) -> tokio::sync::watch::Receiver<Arc<MetadataSnapshot>> {
self.metadata_watch.clone()
}
}

#[async_trait]
impl Discovery for KubeDiscoveryClient {
fn instance_id(&self) -> u64 {
Expand Down Expand Up @@ -144,8 +159,11 @@ impl Discovery for KubeDiscoveryClient {
async fn list(&self, query: DiscoveryQuery) -> Result<Vec<DiscoveryInstance>> {
tracing::debug!("KubeDiscoveryClient::list called with query={:?}", query);

// Ensure the daemon is running before accessing the snapshot
let metadata_watch = self.metadata_watch().await?;

// Get current snapshot (may be empty if daemon hasn't fetched yet)
let snapshot = self.metadata_watch.borrow().clone();
let snapshot = metadata_watch.borrow().clone();

tracing::debug!(
"List using snapshot seq={} with {} instances",
Expand Down Expand Up @@ -177,8 +195,8 @@ impl Discovery for KubeDiscoveryClient {
query
);

// Clone the watch receiver
let mut watch_rx = self.metadata_watch.clone();
// Clone the watch receiver (starts daemon lazily on demand)
let mut watch_rx = self.metadata_watch().await?;

// Create output stream
let (event_tx, event_rx) = mpsc::unbounded_channel();
Expand Down Expand Up @@ -311,3 +329,38 @@ impl Discovery for KubeDiscoveryClient {
Ok(Box::pin(stream))
}
}

impl KubeDiscoveryClient {
async fn metadata_watch(&self) -> Result<tokio::sync::watch::Receiver<Arc<MetadataSnapshot>>> {
let instance_id = self.instance_id;
let kube_client = self.kube_client.clone();
let pod_info = self.pod_info.clone();
let cancel_token = self.cancel_token.clone();

let handles = self
.daemon_state
.get_or_try_init(|| async move {
let (watch_tx, watch_rx) =
tokio::sync::watch::channel(Arc::new(MetadataSnapshot::empty()));
let daemon = DiscoveryDaemon::new(kube_client, pod_info, cancel_token)?;

tokio::spawn(async move {
if let Err(e) = daemon.run(watch_tx).await {
tracing::error!("Discovery daemon failed: {}", e);
}
});

tracing::info!(
"Discovery daemon started lazily for instance_id={:x}",
instance_id
);

Ok::<Arc<DaemonHandles>, anyhow::Error>(Arc::new(DaemonHandles {
metadata_watch: watch_rx,
}))
})
.await?;

Ok(handles.receiver())
}
}
Loading