Skip to content

Commit a5d7a0b

Browse files
committed
changes mpsc to broadcast for mcp_main_loop_to_handle_server_event_rx
1 parent bb3eb4a commit a5d7a0b

File tree

2 files changed

+38
-23
lines changed

2 files changed

+38
-23
lines changed

crates/agent/src/agent/mcp/mod.rs

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@ pub mod types;
110110

111111
use std::collections::HashMap;
112112
use std::path::PathBuf;
113-
use std::sync::Arc;
114113

115114
use actor::{
116115
McpServerActor,
@@ -124,8 +123,9 @@ use serde::{
124123
Serialize,
125124
};
126125
use serde_json::Value;
126+
use tokio::sync::broadcast::error::RecvError;
127127
use tokio::sync::{
128-
Mutex,
128+
broadcast,
129129
mpsc,
130130
oneshot,
131131
};
@@ -138,6 +138,8 @@ use types::Prompt;
138138

139139
use super::agent_loop::types::ToolSpec;
140140
use super::consts::DEFAULT_MCP_CREDENTIAL_PATH;
141+
use super::util::path::expand_path;
142+
use super::util::providers::RealProvider;
141143
use super::util::request_channel::{
142144
RequestReceiver,
143145
new_request_channel,
@@ -148,22 +150,30 @@ use crate::agent::util::request_channel::{
148150
respond,
149151
};
150152

151-
#[derive(Debug, Clone)]
153+
#[derive(Debug)]
152154
pub struct McpManagerHandle {
153155
/// Sender for sending requests to the tool manager task
154156
request_tx: RequestSender<McpManagerRequest, McpManagerResponse, McpManagerError>,
155157
server_to_handle_server_event_tx: mpsc::Sender<McpServerActorEvent>,
156-
mcp_main_loop_to_handle_server_event_rx: Arc<Mutex<mpsc::Receiver<McpServerActorEvent>>>,
158+
mcp_main_loop_to_handle_server_event_rx: broadcast::Receiver<McpServerActorEvent>,
159+
}
160+
161+
impl Clone for McpManagerHandle {
162+
fn clone(&self) -> Self {
163+
Self {
164+
request_tx: self.request_tx.clone(),
165+
server_to_handle_server_event_tx: self.server_to_handle_server_event_tx.clone(),
166+
mcp_main_loop_to_handle_server_event_rx: self.mcp_main_loop_to_handle_server_event_rx.resubscribe(),
167+
}
168+
}
157169
}
158170

159171
impl McpManagerHandle {
160172
fn new(
161173
request_tx: RequestSender<McpManagerRequest, McpManagerResponse, McpManagerError>,
162174
server_to_handle_server_event_tx: mpsc::Sender<McpServerActorEvent>,
163-
mcp_main_loop_to_handle_server_event_rx: mpsc::Receiver<McpServerActorEvent>,
175+
mcp_main_loop_to_handle_server_event_rx: broadcast::Receiver<McpServerActorEvent>,
164176
) -> Self {
165-
let mcp_main_loop_to_handle_server_event_rx = Arc::new(Mutex::new(mcp_main_loop_to_handle_server_event_rx));
166-
167177
Self {
168178
request_tx,
169179
server_to_handle_server_event_tx,
@@ -242,9 +252,8 @@ impl McpManagerHandle {
242252
}
243253
}
244254

245-
pub async fn recv(&self) -> Option<McpServerActorEvent> {
246-
let mut rx = self.mcp_main_loop_to_handle_server_event_rx.lock().await;
247-
rx.recv().await
255+
pub async fn recv(&mut self) -> Result<McpServerActorEvent, RecvError> {
256+
self.mcp_main_loop_to_handle_server_event_rx.recv().await
248257
}
249258
}
250259

@@ -281,7 +290,7 @@ impl McpManager {
281290
let request_tx = self.request_tx.clone();
282291
let server_to_handle_server_event_tx = self.server_event_tx.clone();
283292
let (mcp_main_loop_to_handle_server_event_tx, mcp_main_loop_to_handle_server_event_rx) =
284-
mpsc::channel::<McpServerActorEvent>(100);
293+
broadcast::channel::<McpServerActorEvent>(100);
285294

286295
tokio::spawn(async move {
287296
self.main_loop(mcp_main_loop_to_handle_server_event_tx).await;
@@ -294,7 +303,7 @@ impl McpManager {
294303
)
295304
}
296305

297-
async fn main_loop(mut self, mcp_main_loop_to_handle_server_event_tx: mpsc::Sender<McpServerActorEvent>) {
306+
async fn main_loop(mut self, mcp_main_loop_to_handle_server_event_tx: broadcast::Sender<McpServerActorEvent>) {
298307
loop {
299308
tokio::select! {
300309
req = self.request_rx.recv() => {
@@ -307,7 +316,7 @@ impl McpManager {
307316
},
308317
res = self.server_event_rx.recv() => {
309318
if let Some(evt) = res {
310-
self.handle_mcp_actor_event(evt, &mcp_main_loop_to_handle_server_event_tx).await;
319+
self.handle_mcp_actor_event(evt, &mcp_main_loop_to_handle_server_event_tx);
311320
}
312321
}
313322
}
@@ -355,10 +364,10 @@ impl McpManager {
355364
}
356365
}
357366

358-
async fn handle_mcp_actor_event(
367+
fn handle_mcp_actor_event(
359368
&mut self,
360369
evt: McpServerActorEvent,
361-
mcp_main_loop_to_handle_server_event_tx: &mpsc::Sender<McpServerActorEvent>,
370+
mcp_main_loop_to_handle_server_event_tx: &broadcast::Sender<McpServerActorEvent>,
362371
) {
363372
// TODO: keep a record of all the different server events received in this layer?
364373
match &evt {
@@ -384,7 +393,7 @@ impl McpManager {
384393
tracing::info!(?server_name, ?oauth_url, "received oauth request");
385394
},
386395
}
387-
let _ = mcp_main_loop_to_handle_server_event_tx.send(evt).await;
396+
let _ = mcp_main_loop_to_handle_server_event_tx.send(evt);
388397
}
389398
}
390399

crates/agent/src/agent/mod.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -367,9 +367,12 @@ impl Agent {
367367
loop {
368368
tokio::select! {
369369
evt = self.mcp_manager_handle.recv() => {
370-
let Some(evt) = evt else {
371-
error!("mcp manager handle channel closed");
372-
break;
370+
let evt = match evt {
371+
Ok(evt) => evt,
372+
Err(e) => {
373+
error!(?e, "mcp manager handle channel closed");
374+
break;
375+
}
373376
};
374377

375378
if matches!(evt, McpServerActorEvent::Initialized{ .. } | McpServerActorEvent::InitializeError { .. }) {
@@ -462,10 +465,13 @@ impl Agent {
462465
},
463466

464467
evt = self.mcp_manager_handle.recv() => {
465-
if let Some(evt) = evt {
466-
self.handle_mcp_server_actor_events(evt).await;
467-
} else {
468-
error!("mcp manager handle closed");
468+
match evt {
469+
Ok(evt) => {
470+
self.handle_mcp_server_actor_events(evt).await;
471+
},
472+
Err(e) => {
473+
error!(?e, "mcp manager handle closed");
474+
}
469475
}
470476
},
471477
}

0 commit comments

Comments
 (0)