@@ -117,8 +117,14 @@ where
117117 }
118118}
119119
120- impl < S : Service , C : ServiceEndpoint < S > > RpcServer < S , C > {
121- /// Accepts a new channel from a client, and reads the first request.
120+ /// The result of accepting a new connection.
121+ pub struct Accepting < S : Service , C : ServiceEndpoint < S > > {
122+ send : C :: SendSink ,
123+ recv : C :: RecvStream ,
124+ }
125+
126+ impl < S : Service , C : ServiceEndpoint < S > > Accepting < S , C > {
127+ /// Read the first message from the client.
122128 ///
123129 /// The return value is a tuple of `(request, channel)`. Here `request` is the
124130 /// first request which is already read from the stream. The `channel` is a
@@ -127,13 +133,8 @@ impl<S: Service, C: ServiceEndpoint<S>> RpcServer<S, C> {
127133 ///
128134 /// Often sink and stream will wrap an an underlying byte stream. In this case you can
129135 /// call into_inner() on them to get it back to perform byte level reads and writes.
130- pub async fn accept ( & self ) -> result:: Result < ( S :: Req , RpcChannel < S , C > ) , RpcServerError < C > > {
131- let ( send, mut recv) = self
132- . source
133- . accept_bi ( )
134- . await
135- . map_err ( RpcServerError :: Accept ) ?;
136-
136+ pub async fn read_first ( self ) -> result:: Result < ( S :: Req , RpcChannel < S , C > ) , RpcServerError < C > > {
137+ let Accepting { send, mut recv } = self ;
137138 // get the first message from the client. This will tell us what it wants to do.
138139 let request: S :: Req = recv
139140 . next ( )
@@ -144,6 +145,37 @@ impl<S: Service, C: ServiceEndpoint<S>> RpcServer<S, C> {
144145 . map_err ( RpcServerError :: RecvError ) ?;
145146 Ok ( ( request, RpcChannel :: new ( send, recv) ) )
146147 }
148+ }
149+
150+ impl < S : Service , C : ServiceEndpoint < S > > RpcServer < S , C > {
151+ /// Accepts a new channel from a client. The result is an [Accepting] object that
152+ /// can be used to read the first request.
153+ pub async fn accept ( & self ) -> result:: Result < Accepting < S , C > , RpcServerError < C > > {
154+ let ( send, recv) = self
155+ . source
156+ . accept_bi ( )
157+ . await
158+ . map_err ( RpcServerError :: Accept ) ?;
159+ Ok ( Accepting { send, recv } )
160+ }
161+
162+ /// Accepts a new channel from a client, and reads the first request.
163+ ///
164+ /// The return value is a tuple of `(request, channel)`. Here `request` is the
165+ /// first request which is already read from the stream. The `channel` is a
166+ /// [RpcChannel] that has `sink` and `stream` fields that can be used to send more
167+ /// requests and/or receive more responses.
168+ ///
169+ /// Often sink and stream will wrap an an underlying byte stream. In this case you can
170+ /// call into_inner() on them to get it back to perform byte level reads and writes.
171+ ///
172+ /// Note that this method is not cancellation safe. If the future is dropped, a request
173+ /// might be lost. So do not use this inside a select! block.
174+ pub async fn accept_and_read_first (
175+ & self ,
176+ ) -> result:: Result < ( S :: Req , RpcChannel < S , C > ) , RpcServerError < C > > {
177+ self . accept ( ) . await ?. read_first ( ) . await
178+ }
147179
148180 /// Get the underlying service endpoint
149181 pub fn into_inner ( self ) -> C {
@@ -309,7 +341,7 @@ where
309341{
310342 let server = RpcServer :: < S , C > :: new ( conn) ;
311343 loop {
312- let ( req, chan) = server. accept ( ) . await ?;
344+ let ( req, chan) = server. accept_and_read_first ( ) . await ?;
313345 let target = target. clone ( ) ;
314346 handler ( chan, req, target) . await ?;
315347 }
0 commit comments