@@ -10,14 +10,15 @@ use crate::schema::{
1010 } ,
1111 InitializeRequestParams , InitializeResult , RequestId , RpcError ,
1212} ;
13+ use crate :: utils:: AbortTaskOnDrop ;
1314use async_trait:: async_trait;
1415use futures:: future:: try_join_all;
1516use futures:: { StreamExt , TryFutureExt } ;
1617#[ cfg( feature = "hyper-server" ) ]
1718use rust_mcp_transport:: SessionId ;
1819use rust_mcp_transport:: { IoStream , TransportDispatcher } ;
1920use std:: collections:: HashMap ;
20- use std:: sync:: { Arc , RwLock } ;
21+ use std:: sync:: Arc ;
2122use std:: time:: Duration ;
2223use tokio:: io:: AsyncWriteExt ;
2324use tokio:: sync:: { oneshot, watch} ;
@@ -41,8 +42,6 @@ pub struct ServerRuntime {
4142 handler : Arc < dyn McpServerHandler > ,
4243 // Information about the server
4344 server_details : Arc < InitializeResult > ,
44- // Details about the connected client
45- client_details : Arc < RwLock < Option < InitializeRequestParams > > > ,
4645 #[ cfg( feature = "hyper-server" ) ]
4746 session_id : Option < SessionId > ,
4847 transport_map : tokio:: sync:: RwLock < HashMap < String , TransportType > > ,
@@ -123,12 +122,7 @@ impl McpServer for ServerRuntime {
123122
124123 /// Returns the client information if available, after successful initialization , otherwise returns None
125124 fn client_info ( & self ) -> Option < InitializeRequestParams > {
126- if let Ok ( details) = self . client_details . read ( ) {
127- details. clone ( )
128- } else {
129- // Failed to acquire read lock, likely due to PoisonError from a thread panic. Returning None.
130- None
131- }
125+ self . client_details_rx . borrow ( ) . clone ( )
132126 }
133127
134128 /// Main runtime loop, processes incoming messages and handles requests
@@ -404,6 +398,11 @@ impl ServerRuntime {
404398 . await ?
405399 . abort_handle ( ) ;
406400
401+ // ensure keep_alive task will be aborted
402+ let _abort_guard = AbortTaskOnDrop {
403+ handle : abort_alive_task,
404+ } ;
405+
407406 // in case there is a payload, we consume it by transport to get processed
408407 if let Some ( payload) = payload {
409408 transport. consume_string_payload ( & payload) . await ?;
@@ -439,13 +438,11 @@ impl ServerRuntime {
439438 }
440439 // close the stream after all messages are sent, unless it is a standalone stream
441440 if !stream_id. eq( DEFAULT_STREAM_ID ) {
442- abort_alive_task. abort( ) ;
443441 return Ok ( ( ) ) ;
444442 }
445443 }
446444 _ = & mut disconnect_rx => {
447445 self . remove_transport( stream_id) . await ?;
448- abort_alive_task. abort( ) ;
449446 // Disconnection detected by keep-alive task
450447 return Err ( SdkError :: connection_closed( ) . into( ) ) ;
451448
@@ -469,7 +466,6 @@ impl ServerRuntime {
469466 watch:: channel :: < Option < InitializeRequestParams > > ( None ) ;
470467 Self {
471468 server_details,
472- client_details : Arc :: new ( RwLock :: new ( None ) ) ,
473469 handler,
474470 session_id : Some ( session_id) ,
475471 transport_map : tokio:: sync:: RwLock :: new ( HashMap :: new ( ) ) ,
@@ -495,7 +491,6 @@ impl ServerRuntime {
495491 watch:: channel :: < Option < InitializeRequestParams > > ( None ) ;
496492 Self {
497493 server_details : Arc :: new ( server_details) ,
498- client_details : Arc :: new ( RwLock :: new ( None ) ) ,
499494 handler,
500495 #[ cfg( feature = "hyper-server" ) ]
501496 session_id : None ,
0 commit comments