11//! Memory transport implementation using [tokio::sync::mpsc]
22
3- use futures_lite:: { Future , Stream } ;
3+ use futures_lite:: Stream ;
44use futures_sink:: Sink ;
55
66use crate :: {
77 transport:: { Connection , ConnectionErrors , LocalAddr , ServerEndpoint } ,
88 RpcMessage , Service ,
99} ;
1010use core:: fmt;
11- use std:: { error, fmt:: Display , marker :: PhantomData , pin:: Pin , result, sync:: Arc , task:: Poll } ;
11+ use std:: { error, fmt:: Display , pin:: Pin , result, sync:: Arc , task:: Poll } ;
1212use tokio:: sync:: { mpsc, Mutex } ;
1313
1414use super :: ConnectionCommon ;
@@ -131,81 +131,6 @@ impl<S: Service> ConnectionErrors for MpscServerEndpoint<S> {
131131
132132type Socket < In , Out > = ( self :: SendSink < Out > , self :: RecvStream < In > ) ;
133133
134- /// Future returned by [MpscConnection::open_bi]
135- pub struct OpenBiFuture < In : RpcMessage , Out : RpcMessage > {
136- inner : OpenBiFutureBox < In , Out > ,
137- res : Option < Socket < In , Out > > ,
138- }
139-
140- impl < In : RpcMessage , Out : RpcMessage > fmt:: Debug for OpenBiFuture < In , Out > {
141- fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
142- f. debug_struct ( "OpenBiFuture" ) . finish ( )
143- }
144- }
145-
146- type OpenBiFutureBox < In , Out > = Pin <
147- Box <
148- dyn Future < Output = Result < ( ) , mpsc:: error:: SendError < ( SendSink < In > , RecvStream < Out > ) > > >
149- + Send
150- + ' static ,
151- > ,
152- > ;
153-
154- impl < In : RpcMessage , Out : RpcMessage > OpenBiFuture < In , Out > {
155- fn new ( inner : OpenBiFutureBox < In , Out > , res : Socket < In , Out > ) -> Self {
156- Self {
157- inner,
158- res : Some ( res) ,
159- }
160- }
161- }
162-
163- impl < In : RpcMessage , Out : RpcMessage > Future for OpenBiFuture < In , Out > {
164- type Output = result:: Result < Socket < In , Out > , self :: OpenBiError > ;
165-
166- fn poll (
167- mut self : Pin < & mut Self > ,
168- cx : & mut std:: task:: Context < ' _ > ,
169- ) -> std:: task:: Poll < Self :: Output > {
170- match Pin :: new ( & mut self . inner ) . poll ( cx) {
171- Poll :: Ready ( Ok ( ( ) ) ) => self
172- . res
173- . take ( )
174- . map ( |x| Poll :: Ready ( Ok ( x) ) )
175- . unwrap_or ( Poll :: Pending ) ,
176- Poll :: Ready ( Err ( _) ) => Poll :: Ready ( Err ( self :: OpenBiError :: RemoteDropped ) ) ,
177- Poll :: Pending => Poll :: Pending ,
178- }
179- }
180- }
181-
182- type AcceptBiFutureBox < In , Out > =
183- Pin < Box < dyn Future < Output = Option < ( SendSink < In > , RecvStream < Out > ) > > + Send + ' static > > ;
184-
185- /// Future returned by [MpscServerEndpoint::accept_bi]
186- pub struct AcceptBiFuture < In : RpcMessage , Out : RpcMessage > {
187- wrapped : AcceptBiFutureBox < Out , In > ,
188- _p : PhantomData < ( In , Out ) > ,
189- }
190-
191- impl < In : RpcMessage , Out : RpcMessage > fmt:: Debug for AcceptBiFuture < In , Out > {
192- fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
193- f. debug_struct ( "AcceptBiFuture" ) . finish ( )
194- }
195- }
196-
197- impl < In : RpcMessage , Out : RpcMessage > Future for AcceptBiFuture < In , Out > {
198- type Output = result:: Result < ( SendSink < Out > , RecvStream < In > ) , AcceptBiError > ;
199-
200- fn poll ( mut self : Pin < & mut Self > , cx : & mut std:: task:: Context < ' _ > ) -> Poll < Self :: Output > {
201- match Pin :: new ( & mut self . wrapped ) . poll ( cx) {
202- Poll :: Ready ( Some ( ( send, recv) ) ) => Poll :: Ready ( Ok ( ( send, recv) ) ) ,
203- Poll :: Ready ( None ) => Poll :: Ready ( Err ( AcceptBiError :: RemoteDropped ) ) ,
204- Poll :: Pending => Poll :: Pending ,
205- }
206- }
207- }
208-
209134impl < S : Service > ConnectionCommon < S :: Req , S :: Res > for MpscServerEndpoint < S > {
210135 type SendSink = SendSink < S :: Res > ;
211136 type RecvStream = RecvStream < S :: Req > ;
@@ -242,8 +167,7 @@ impl<S: Service> ConnectionCommon<S::Res, S::Req> for MpscConnection<S> {
242167}
243168
244169impl < S : Service > Connection < S :: Res , S :: Req > for MpscConnection < S > {
245- #[ allow( refining_impl_trait) ]
246- fn open_bi ( & self ) -> OpenBiFuture < S :: Res , S :: Req > {
170+ async fn open_bi ( & self ) -> result:: Result < Socket < S :: Res , S :: Req > , self :: OpenBiError > {
247171 let ( local_send, remote_recv) = mpsc:: channel :: < S :: Req > ( 128 ) ;
248172 let ( remote_send, local_recv) = mpsc:: channel :: < S :: Res > ( 128 ) ;
249173 let remote_chan = (
@@ -254,11 +178,11 @@ impl<S: Service> Connection<S::Res, S::Req> for MpscConnection<S> {
254178 SendSink ( tokio_util:: sync:: PollSender :: new ( local_send) ) ,
255179 RecvStream ( tokio_stream:: wrappers:: ReceiverStream :: new ( local_recv) ) ,
256180 ) ;
257- let sink = self . sink . clone ( ) ;
258- OpenBiFuture :: new (
259- Box :: pin ( async move { sink . send ( remote_chan ) . await } ) ,
260- local_chan ,
261- )
181+ self . sink
182+ . send ( remote_chan )
183+ . await
184+ . map_err ( |_| self :: OpenBiError :: RemoteDropped ) ? ;
185+ Ok ( local_chan )
262186 }
263187}
264188
0 commit comments