Skip to content

Commit 3e0e7bd

Browse files
committed
retores use of oneshot in server launch
1 parent 76ef3e2 commit 3e0e7bd

File tree

2 files changed

+81
-34
lines changed

2 files changed

+81
-34
lines changed

crates/agent/src/agent/mcp/mod.rs

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -182,14 +182,22 @@ impl McpManagerHandle {
182182
&mut self,
183183
name: String,
184184
config: McpServerConfig,
185-
) -> Result<McpManagerResponse, McpManagerError> {
186-
self.request_tx
185+
) -> Result<oneshot::Receiver<LaunchServerResult>, McpManagerError> {
186+
match self
187+
.request_tx
187188
.send_recv(McpManagerRequest::LaunchServer {
188189
server_name: name,
189190
config,
190191
})
191192
.await
192-
.unwrap_or(Err(McpManagerError::Channel))
193+
.unwrap_or(Err(McpManagerError::Channel))?
194+
{
195+
McpManagerResponse::LaunchServer(rx) => Ok(rx),
196+
other => Err(McpManagerError::Custom(format!(
197+
"received unexpected response: {:?}",
198+
other
199+
))),
200+
}
193201
}
194202

195203
pub async fn get_tool_specs(&self, server_name: String) -> Result<Vec<ToolSpec>, McpManagerError> {
@@ -257,7 +265,7 @@ pub struct McpManager {
257265

258266
cred_path: PathBuf,
259267

260-
initializing_servers: HashMap<String, McpServerActorHandle>,
268+
initializing_servers: HashMap<String, (McpServerActorHandle, oneshot::Sender<LaunchServerResult>)>,
261269
servers: HashMap<String, McpServerActorHandle>,
262270
event_buf: Vec<McpServerActorEvent>,
263271
}
@@ -332,8 +340,10 @@ impl McpManager {
332340
}
333341
let event_tx = self.server_event_tx.clone();
334342
let handle = McpServerActor::spawn(name.clone(), config, self.cred_path.clone(), event_tx);
335-
self.initializing_servers.insert(name, handle);
336-
Ok(McpManagerResponse::LaunchServer)
343+
let (tx, rx) = oneshot::channel();
344+
345+
self.initializing_servers.insert(name, (handle, tx));
346+
Ok(McpManagerResponse::LaunchServer(rx))
337347
},
338348
McpManagerRequest::GetToolSpecs { server_name } => match self.servers.get(&server_name) {
339349
Some(handle) => Ok(McpManagerResponse::ToolSpecs(handle.get_tool_specs().await?)),
@@ -365,17 +375,25 @@ impl McpManager {
365375
list_tools_duration: _,
366376
list_prompts_duration: _,
367377
} => {
368-
let Some(handle) = self.initializing_servers.remove(server_name) else {
378+
let Some((handle, result_tx)) = self.initializing_servers.remove(server_name) else {
369379
warn!(?server_name, ?evt, "event was not from an initializing MCP server");
370380
return;
371381
};
372382

383+
if let Err(e) = result_tx.send(Ok(())) {
384+
error!(?server_name, ?e, "failed to send server initialized message");
385+
}
386+
373387
if self.servers.insert(server_name.clone(), handle).is_some() {
374388
warn!(?server_name, "duplicated server. old server dropped");
375389
}
376390
},
377-
McpServerActorEvent::InitializeError { server_name, error: _ } => {
378-
self.initializing_servers.remove(server_name);
391+
McpServerActorEvent::InitializeError { server_name, error } => {
392+
if let Some((_, result_tx)) = self.initializing_servers.remove(server_name) {
393+
if let Err(e) = result_tx.send(Err(McpManagerError::Custom(error.clone()))) {
394+
error!(?server_name, ?e, "failed to send server initialized message");
395+
}
396+
}
379397
},
380398
McpServerActorEvent::OauthRequest { server_name, oauth_url } => {
381399
info!(?server_name, ?oauth_url, "received oauth request");
@@ -418,14 +436,16 @@ pub enum McpManagerRequest {
418436

419437
#[derive(Debug)]
420438
pub enum McpManagerResponse {
421-
LaunchServer,
439+
LaunchServer(oneshot::Receiver<LaunchServerResult>),
422440
ToolSpecs(Vec<ToolSpec>),
423441
Prompts(Vec<Prompt>),
424442
ExecuteTool(oneshot::Receiver<ExecuteToolResult>),
425443
}
426444

427445
pub type ExecuteToolResult = Result<CallToolResult, McpServerActorError>;
428446

447+
type LaunchServerResult = Result<(), McpManagerError>;
448+
429449
#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
430450
pub enum McpManagerError {
431451
#[error("Server with the name {} is not initialized", .name)]

crates/agent/src/agent/mod.rs

Lines changed: 51 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ use agent_loop::{
5757
};
5858
use chrono::Utc;
5959
use consts::MAX_RESOURCE_FILE_LENGTH;
60+
use futures::stream::FuturesUnordered;
6061
use mcp::actor::McpServerActorEvent;
6162
use permissions::evaluate_tool_permission;
6263
use protocol::{
@@ -94,9 +95,11 @@ use task_executor::{
9495
};
9596
use tokio::sync::{
9697
broadcast,
98+
mpsc,
9799
oneshot,
98100
};
99101
use tokio::time::Instant;
102+
use tokio_stream::StreamExt as _;
100103
use tokio_util::sync::CancellationToken;
101104
use tool_utils::{
102105
SanitizedToolSpecs,
@@ -344,54 +347,78 @@ impl Agent {
344347
warn!(?self.cached_mcp_configs.overridden_configs, "ignoring overridden configs");
345348
}
346349

347-
let mut total_servers_to_be_loaded = 0_usize;
350+
let mut results = FuturesUnordered::new();
348351

349352
for config in self
350353
.cached_mcp_configs
351354
.configs
352355
.iter()
353356
.filter(|config| config.is_enabled())
354357
{
355-
if let Err(e) = self
358+
let Ok(rx) = self
356359
.mcp_manager_handle
357360
.launch_server(config.server_name.clone(), config.config.clone())
358361
.await
359-
{
360-
warn!(?config.server_name, ?e, "failed to launch MCP config, skipping");
361-
} else {
362-
total_servers_to_be_loaded += 1;
363-
}
362+
else {
363+
warn!(?config.server_name, "failed to launch MCP config, skipping");
364+
continue;
365+
};
366+
let name = config.server_name.clone();
367+
results.push(async move { (name, rx.await) });
364368
}
365369

370+
// Continually loop through the receivers until all have completed.
371+
let mut launched_servers = Vec::new();
372+
let (success_tx, mut success_rx) = mpsc::channel(8);
373+
let mut failed_servers = Vec::new();
374+
let (failed_tx, mut failed_rx) = mpsc::channel(8);
375+
let init_results_handle = tokio::spawn(async move {
376+
while let Some((name, res)) = results.next().await {
377+
debug!(?name, ?res, "received result from LaunchServer request");
378+
let Ok(res) = res else {
379+
warn!(?name, "channel unexpectedly dropped during MCP initialization");
380+
let _ = failed_tx.send(name).await;
381+
continue;
382+
};
383+
match res {
384+
Ok(_) => {
385+
let _ = success_tx.send(name).await;
386+
},
387+
Err(err) => {
388+
error!(?name, ?err, "failed to launch MCP server");
389+
let _ = failed_tx.send(name).await;
390+
},
391+
}
392+
}
393+
});
394+
366395
let timeout_at = Instant::now() + self.settings.mcp_init_timeout;
367396
loop {
368397
tokio::select! {
369-
evt = self.mcp_manager_handle.recv() => {
370-
let evt = match evt {
371-
Ok(evt) => evt,
372-
Err(e) => {
373-
error!(?e, "mcp manager handle channel closed");
374-
break;
375-
}
398+
name = success_rx.recv() => {
399+
let Some(name) = name else {
400+
// If None is returned in either success/failed receivers, then the
401+
// senders have dropped, meaning initialization has completed.
402+
break;
376403
};
377-
378-
if matches!(evt, McpServerActorEvent::Initialized{ .. } | McpServerActorEvent::InitializeError { .. }) {
379-
total_servers_to_be_loaded = total_servers_to_be_loaded.saturating_sub(1);
380-
}
381-
self.handle_mcp_server_actor_events(evt).await;
382-
383-
if total_servers_to_be_loaded == 0 {
384-
info!("all mcp servers loaded before timeout");
404+
debug!(?name, "MCP server successfully initialized");
405+
launched_servers.push(name);
406+
},
407+
name = failed_rx.recv() => {
408+
let Some(name) = name else {
385409
break;
386-
}
410+
};
411+
warn!(?name, "MCP server failed initialization");
412+
failed_servers.push(name);
387413
},
388-
389414
_ = tokio::time::sleep_until(timeout_at) => {
390415
warn!("timed out before all MCP servers could be initialized");
391416
break;
392417
},
393418
}
394419
}
420+
info!(?launched_servers, ?failed_servers, "MCP server initialization finished");
421+
init_results_handle.abort();
395422
}
396423

397424
// Next, run agent spawn hooks.

0 commit comments

Comments
 (0)