Skip to content

Commit f1ff935

Browse files
committed
changes mpsc to broadcast for mcp_main_loop_to_handle_server_event_rx
1 parent f8afffe commit f1ff935

File tree

2 files changed

+38
-23
lines changed

2 files changed

+38
-23
lines changed

crates/agent/src/agent/mcp/mod.rs

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@ pub mod types;
110110

111111
use std::collections::HashMap;
112112
use std::path::PathBuf;
113-
use std::sync::Arc;
114113

115114
use actor::{
116115
McpServerActor,
@@ -124,8 +123,9 @@ use serde::{
124123
Serialize,
125124
};
126125
use serde_json::Value;
126+
use tokio::sync::broadcast::error::RecvError;
127127
use tokio::sync::{
128-
Mutex,
128+
broadcast,
129129
mpsc,
130130
oneshot,
131131
};
@@ -138,6 +138,8 @@ use types::Prompt;
138138

139139
use super::agent_loop::types::ToolSpec;
140140
use super::consts::DEFAULT_MCP_CREDENTIAL_PATH;
141+
use super::util::path::expand_path;
142+
use super::util::providers::RealProvider;
141143
use super::util::request_channel::{
142144
RequestReceiver,
143145
new_request_channel,
@@ -148,22 +150,30 @@ use crate::agent::util::request_channel::{
148150
respond,
149151
};
150152

151-
#[derive(Debug, Clone)]
153+
#[derive(Debug)]
152154
pub struct McpManagerHandle {
153155
/// Sender for sending requests to the tool manager task
154156
request_tx: RequestSender<McpManagerRequest, McpManagerResponse, McpManagerError>,
155157
server_to_handle_server_event_tx: mpsc::Sender<McpServerActorEvent>,
156-
mcp_main_loop_to_handle_server_event_rx: Arc<Mutex<mpsc::Receiver<McpServerActorEvent>>>,
158+
mcp_main_loop_to_handle_server_event_rx: broadcast::Receiver<McpServerActorEvent>,
159+
}
160+
161+
impl Clone for McpManagerHandle {
162+
fn clone(&self) -> Self {
163+
Self {
164+
request_tx: self.request_tx.clone(),
165+
server_to_handle_server_event_tx: self.server_to_handle_server_event_tx.clone(),
166+
mcp_main_loop_to_handle_server_event_rx: self.mcp_main_loop_to_handle_server_event_rx.resubscribe(),
167+
}
168+
}
157169
}
158170

159171
impl McpManagerHandle {
160172
fn new(
161173
request_tx: RequestSender<McpManagerRequest, McpManagerResponse, McpManagerError>,
162174
server_to_handle_server_event_tx: mpsc::Sender<McpServerActorEvent>,
163-
mcp_main_loop_to_handle_server_event_rx: mpsc::Receiver<McpServerActorEvent>,
175+
mcp_main_loop_to_handle_server_event_rx: broadcast::Receiver<McpServerActorEvent>,
164176
) -> Self {
165-
let mcp_main_loop_to_handle_server_event_rx = Arc::new(Mutex::new(mcp_main_loop_to_handle_server_event_rx));
166-
167177
Self {
168178
request_tx,
169179
server_to_handle_server_event_tx,
@@ -239,9 +249,8 @@ impl McpManagerHandle {
239249
}
240250
}
241251

242-
pub async fn recv(&self) -> Option<McpServerActorEvent> {
243-
let mut rx = self.mcp_main_loop_to_handle_server_event_rx.lock().await;
244-
rx.recv().await
252+
pub async fn recv(&mut self) -> Result<McpServerActorEvent, RecvError> {
253+
self.mcp_main_loop_to_handle_server_event_rx.recv().await
245254
}
246255
}
247256

@@ -278,7 +287,7 @@ impl McpManager {
278287
let request_tx = self.request_tx.clone();
279288
let server_to_handle_server_event_tx = self.server_event_tx.clone();
280289
let (mcp_main_loop_to_handle_server_event_tx, mcp_main_loop_to_handle_server_event_rx) =
281-
mpsc::channel::<McpServerActorEvent>(100);
290+
broadcast::channel::<McpServerActorEvent>(100);
282291

283292
tokio::spawn(async move {
284293
self.main_loop(mcp_main_loop_to_handle_server_event_tx).await;
@@ -291,7 +300,7 @@ impl McpManager {
291300
)
292301
}
293302

294-
async fn main_loop(mut self, mcp_main_loop_to_handle_server_event_tx: mpsc::Sender<McpServerActorEvent>) {
303+
async fn main_loop(mut self, mcp_main_loop_to_handle_server_event_tx: broadcast::Sender<McpServerActorEvent>) {
295304
loop {
296305
tokio::select! {
297306
req = self.request_rx.recv() => {
@@ -304,7 +313,7 @@ impl McpManager {
304313
},
305314
res = self.server_event_rx.recv() => {
306315
if let Some(evt) = res {
307-
self.handle_mcp_actor_event(evt, &mcp_main_loop_to_handle_server_event_tx).await;
316+
self.handle_mcp_actor_event(evt, &mcp_main_loop_to_handle_server_event_tx);
308317
}
309318
}
310319
}
@@ -352,10 +361,10 @@ impl McpManager {
352361
}
353362
}
354363

355-
async fn handle_mcp_actor_event(
364+
fn handle_mcp_actor_event(
356365
&mut self,
357366
evt: McpServerActorEvent,
358-
mcp_main_loop_to_handle_server_event_tx: &mpsc::Sender<McpServerActorEvent>,
367+
mcp_main_loop_to_handle_server_event_tx: &broadcast::Sender<McpServerActorEvent>,
359368
) {
360369
// TODO: keep a record of all the different server events received in this layer?
361370
match &evt {
@@ -381,7 +390,7 @@ impl McpManager {
381390
tracing::info!(?server_name, ?oauth_url, "received oauth request");
382391
},
383392
}
384-
let _ = mcp_main_loop_to_handle_server_event_tx.send(evt).await;
393+
let _ = mcp_main_loop_to_handle_server_event_tx.send(evt);
385394
}
386395
}
387396

crates/agent/src/agent/mod.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -367,9 +367,12 @@ impl Agent {
367367
loop {
368368
tokio::select! {
369369
evt = self.mcp_manager_handle.recv() => {
370-
let Some(evt) = evt else {
371-
error!("mcp manager handle channel closed");
372-
break;
370+
let evt = match evt {
371+
Ok(evt) => evt,
372+
Err(e) => {
373+
error!(?e, "mcp manager handle channel closed");
374+
break;
375+
}
373376
};
374377

375378
if matches!(evt, McpServerActorEvent::Initialized{ .. } | McpServerActorEvent::InitializeError { .. }) {
@@ -462,10 +465,13 @@ impl Agent {
462465
},
463466

464467
evt = self.mcp_manager_handle.recv() => {
465-
if let Some(evt) = evt {
466-
self.handle_mcp_server_actor_events(evt).await;
467-
} else {
468-
error!("mcp manager handle closed");
468+
match evt {
469+
Ok(evt) => {
470+
self.handle_mcp_server_actor_events(evt).await;
471+
},
472+
Err(e) => {
473+
error!(?e, "mcp manager handle closed");
474+
}
469475
}
470476
},
471477
}

0 commit comments

Comments
 (0)