@@ -26,6 +26,7 @@ use tokio::{
2626 self ,
2727 io:: { split, AsyncRead , AsyncWrite , AsyncWriteExt } ,
2828 net:: UnixListener ,
29+ select, spawn,
2930 sync:: mpsc:: { channel, Receiver , Sender } ,
3031 sync:: watch,
3132} ;
@@ -155,13 +156,12 @@ impl Server {
155156 let ( stop_listen_tx, mut stop_listen_rx) = channel ( 1 ) ;
156157 self . stop_listen_tx = Some ( stop_listen_tx) ;
157158
158- tokio :: spawn ( async move {
159+ spawn ( async move {
159160 loop {
160- tokio :: select! {
161+ select ! {
161162 conn = incoming. next( ) => {
162163 if let Some ( conn) = conn {
163164 // Accept a new connection
164- let methods = methods. clone( ) ;
165165 match conn {
166166 Ok ( stream) => {
167167 let fd = stream. as_raw_fd( ) ;
@@ -170,59 +170,14 @@ impl Server {
170170 continue ;
171171 }
172172
173- let mut close_conn_rx = close_conn_rx. clone( ) ;
174-
175- let ( req_done_tx, mut all_req_done_rx) = channel:: <i32 >( 1 ) ;
176- let conn_done_tx2 = conn_done_tx. clone( ) ;
177-
178- // The connection handler
179- tokio:: spawn( async move {
180- let ( mut reader, mut writer) = split( stream) ;
181- let ( tx, mut rx) : ( Sender <Vec <u8 >>, Receiver <Vec <u8 >>) = channel( 100 ) ;
182-
183- tokio:: spawn( async move {
184- while let Some ( buf) = rx. recv( ) . await {
185- if let Err ( e) = writer. write_all( & buf) . await {
186- error!( "write_message got error: {:?}" , e) ;
187- }
188- }
189- } ) ;
190-
191- loop {
192- let tx = tx. clone( ) ;
193- let methods = methods. clone( ) ;
194- let req_done_tx2 = req_done_tx. clone( ) ;
195-
196- tokio:: select! {
197- resp = receive( & mut reader) => {
198- match resp {
199- Ok ( message) => {
200- tokio:: spawn( async move {
201- handle_request( tx, listenfd, methods, message) . await ;
202- drop( req_done_tx2) ;
203- } ) ;
204- }
205- Err ( e) => {
206- trace!( "error {:?}" , e) ;
207- break ;
208- }
209- }
210- }
211- v = close_conn_rx. changed( ) => {
212- // 0 is the init value of this watch, not a valid signal
213- // is_err means the tx was dropped.
214- if v. is_err( ) || * close_conn_rx. borrow( ) != 0 {
215- info!( "Stop accepting new connections." ) ;
216- break ;
217- }
218- }
219- }
220- }
221-
222- drop( req_done_tx) ;
223- all_req_done_rx. recv( ) . await ;
224- drop( conn_done_tx2) ;
225- } ) ;
173+ // spawn a connection handler, would not block
174+ spawn_connection_handler(
175+ listenfd,
176+ stream,
177+ methods. clone( ) ,
178+ close_conn_rx. clone( ) ,
179+ conn_done_tx. clone( )
180+ ) . await ;
226181 }
227182 Err ( e) => {
228183 error!( "{:?}" , e)
@@ -281,6 +236,73 @@ impl Server {
281236 }
282237}
283238
239+ async fn spawn_connection_handler < S > (
240+ listenfd : RawFd ,
241+ stream : S ,
242+ methods : Arc < HashMap < String , Box < dyn MethodHandler + Send + Sync > > > ,
243+ mut close_conn_rx : watch:: Receiver < i32 > ,
244+ conn_done_tx : Sender < i32 > ,
245+ ) where
246+ S : AsyncRead + AsyncWrite + AsRawFd + Send + ' static ,
247+ {
248+ let ( req_done_tx, mut all_req_done_rx) = channel :: < i32 > ( 1 ) ;
249+
250+ spawn ( async move {
251+ let ( mut reader, mut writer) = split ( stream) ;
252+ let ( tx, mut rx) : ( Sender < Vec < u8 > > , Receiver < Vec < u8 > > ) = channel ( 100 ) ;
253+ let ( client_disconnected_tx, client_disconnected_rx) = watch:: channel ( false ) ;
254+
255+ spawn ( async move {
256+ while let Some ( buf) = rx. recv ( ) . await {
257+ if let Err ( e) = writer. write_all ( & buf) . await {
258+ error ! ( "write_message got error: {:?}" , e) ;
259+ }
260+ }
261+ } ) ;
262+
263+ loop {
264+ let tx = tx. clone ( ) ;
265+ let methods = methods. clone ( ) ;
266+ let req_done_tx2 = req_done_tx. clone ( ) ;
267+ let mut client_disconnected_rx2 = client_disconnected_rx. clone ( ) ;
268+
269+ select ! {
270+ resp = receive( & mut reader) => {
271+ match resp {
272+ Ok ( message) => {
273+ spawn( async move {
274+ select! {
275+ _ = handle_request( tx, listenfd, methods, message) => { }
276+ _ = client_disconnected_rx2. changed( ) => { }
277+ }
278+
279+ drop( req_done_tx2) ;
280+ } ) ;
281+ }
282+ Err ( e) => {
283+ let _ = client_disconnected_tx. send( true ) ;
284+ trace!( "error {:?}" , e) ;
285+ break ;
286+ }
287+ }
288+ }
289+ v = close_conn_rx. changed( ) => {
290+ // 0 is the init value of this watch, not a valid signal
291+ // is_err means the tx was dropped.
292+ if v. is_err( ) || * close_conn_rx. borrow( ) != 0 {
293+ info!( "Stop accepting new connections." ) ;
294+ break ;
295+ }
296+ }
297+ }
298+ }
299+
300+ drop ( req_done_tx) ;
301+ all_req_done_rx. recv ( ) . await ;
302+ drop ( conn_done_tx) ;
303+ } ) ;
304+ }
305+
284306async fn handle_request (
285307 tx : Sender < Vec < u8 > > ,
286308 fd : RawFd ,
0 commit comments