@@ -7,7 +7,6 @@ use async_trait::async_trait;
77use futures:: StreamExt ;
88use rust_mcp_transport:: { IoStream , McpDispatch , MessageDispatcher , Transport } ;
99use schema_utils:: ClientMessage ;
10- use std:: pin:: Pin ;
1110use std:: sync:: { Arc , RwLock } ;
1211use tokio:: io:: AsyncWriteExt ;
1312
@@ -27,9 +26,6 @@ pub struct ServerRuntime {
2726 server_details : Arc < InitializeResult > ,
2827 // Details about the connected client
2928 client_details : Arc < RwLock < Option < InitializeRequestParams > > > ,
30-
31- message_sender : tokio:: sync:: RwLock < Option < MessageDispatcher < ClientMessage > > > ,
32- error_stream : tokio:: sync:: RwLock < Option < Pin < Box < dyn tokio:: io:: AsyncWrite + Send + Sync > > > > ,
3329 #[ cfg( feature = "hyper-server" ) ]
3430 session_id : Option < SessionId > ,
3531}
@@ -70,24 +66,14 @@ impl McpServer for ServerRuntime {
7066 where
7167 MessageDispatcher < ClientMessage > : McpDispatch < ClientMessage , MessageFromServer > ,
7268 {
73- ( & self . message_sender ) as _
69+ ( self . transport . message_sender ( ) . await ) as _
7470 }
7571
7672 /// Main runtime loop, processes incoming messages and handles requests
7773 async fn start ( & self ) -> SdkResult < ( ) > {
78- // Start the transport layer to begin handling messages
79- // self.transport.start().await?;
80- // Open the transport stream
81- // let mut stream = self.transport.open();
82- let ( mut stream, sender, error_io) = self . transport . start ( ) . await ?;
83-
84- self . set_message_sender ( sender) . await ;
85-
86- if let IoStream :: Writable ( error_stream) = error_io {
87- self . set_error_stream ( error_stream) . await ;
88- }
74+ let mut stream = self . transport . start ( ) . await ?;
8975
90- let sender = self . sender ( ) . await . read ( ) . await ;
76+ let sender = self . transport . message_sender ( ) . await . read ( ) . await ;
9177 let sender = sender
9278 . as_ref ( )
9379 . ok_or ( schema_utils:: SdkError :: connection_closed ( ) ) ?;
@@ -138,8 +124,8 @@ impl McpServer for ServerRuntime {
138124 }
139125
140126 async fn stderr_message ( & self , message : String ) -> SdkResult < ( ) > {
141- let mut lock = self . error_stream . write ( ) . await ;
142- if let Some ( stderr) = lock. as_mut ( ) {
127+ let mut lock = self . transport . error_stream ( ) . await . write ( ) . await ;
128+ if let Some ( IoStream :: Writable ( stderr) ) = lock. as_mut ( ) {
143129 stderr. write_all ( message. as_bytes ( ) ) . await ?;
144130 stderr. write_all ( b"\n " ) . await ?;
145131 stderr. flush ( ) . await ?;
@@ -149,24 +135,11 @@ impl McpServer for ServerRuntime {
149135}
150136
151137impl ServerRuntime {
152- pub ( crate ) async fn set_message_sender ( & self , sender : MessageDispatcher < ClientMessage > ) {
153- let mut lock = self . message_sender . write ( ) . await ;
154- * lock = Some ( sender) ;
155- }
156-
157138 #[ cfg( feature = "hyper-server" ) ]
158139 pub ( crate ) async fn session_id ( & self ) -> Option < SessionId > {
159140 self . session_id . to_owned ( )
160141 }
161142
162- pub ( crate ) async fn set_error_stream (
163- & self ,
164- error_stream : Pin < Box < dyn tokio:: io:: AsyncWrite + Send + Sync > > ,
165- ) {
166- let mut lock = self . error_stream . write ( ) . await ;
167- * lock = Some ( error_stream) ;
168- }
169-
170143 #[ cfg( feature = "hyper-server" ) ]
171144 pub ( crate ) fn new_instance (
172145 server_details : Arc < InitializeResult > ,
@@ -179,8 +152,6 @@ impl ServerRuntime {
179152 client_details : Arc :: new ( RwLock :: new ( None ) ) ,
180153 transport : Box :: new ( transport) ,
181154 handler,
182- message_sender : tokio:: sync:: RwLock :: new ( None ) ,
183- error_stream : tokio:: sync:: RwLock :: new ( None ) ,
184155 session_id : Some ( session_id) ,
185156 }
186157 }
@@ -195,8 +166,6 @@ impl ServerRuntime {
195166 client_details : Arc :: new ( RwLock :: new ( None ) ) ,
196167 transport : Box :: new ( transport) ,
197168 handler,
198- message_sender : tokio:: sync:: RwLock :: new ( None ) ,
199- error_stream : tokio:: sync:: RwLock :: new ( None ) ,
200169 #[ cfg( feature = "hyper-server" ) ]
201170 session_id : None ,
202171 }
0 commit comments