@@ -333,49 +333,70 @@ pub struct BidiStreamRunner {
333333
334334impl BidiStreamRunner {
335335 pub async fn handle (
336- mut self ,
337- mut input_rx : InputReceiver ,
336+ self ,
337+ input_rx : InputReceiver ,
338338 output_tx : OutputSender ,
339339 ) -> Result < ( ) , Error > {
340- Self :: init_loop_vm ( & mut self . vm , & mut input_rx) . await ?;
341-
342340 // Retrieve the service from the Arc
343341 let svc = self
344342 . endpoint
345343 . svcs
346344 . get ( & self . svc_name )
347345 . expect ( "service must exist at this point" ) ;
348346
349- // Initialize handler context
350- let ( handler_state_tx , handler_state_rx ) = HandlerStateNotifier :: new ( ) ;
351- let ctx = ContextInternal :: new (
347+ handle (
348+ input_rx ,
349+ output_tx ,
352350 self . vm ,
353351 self . svc_name ,
354352 self . handler_name ,
355- input_rx,
356- output_tx,
357- handler_state_tx,
358- ) ;
359-
360- // Start user code
361- let user_code_fut = InterceptErrorFuture :: new ( ctx. clone ( ) , svc. handle ( ctx. clone ( ) ) ) ;
362-
363- // Wrap it in handler state aware future
364- HandlerStateAwareFuture :: new ( ctx. clone ( ) , handler_state_rx, user_code_fut) . await
353+ svc,
354+ )
355+ . await
365356 }
357+ }
366358
367- async fn init_loop_vm ( vm : & mut CoreVM , input_rx : & mut InputReceiver ) -> Result < ( ) , ErrorInner > {
368- while !vm. is_ready_to_execute ( ) . map_err ( ErrorInner :: VM ) ? {
369- match input_rx. recv ( ) . await {
370- Some ( Ok ( b) ) => vm. notify_input ( b) ,
371- Some ( Err ( e) ) => vm. notify_error (
372- "Error when reading the body" . into ( ) ,
373- e. to_string ( ) . into ( ) ,
374- None ,
375- ) ,
376- None => vm. notify_input_closed ( ) ,
377- }
359+ #[ doc( hidden) ]
360+ pub async fn handle < S : Service < Future = BoxFuture < ' static , Result < ( ) , Error > > > + Send + Sync > (
361+ mut input_rx : InputReceiver ,
362+ output_tx : OutputSender ,
363+ vm : CoreVM ,
364+ svc_name : String ,
365+ handler_name : String ,
366+ svc : & S ,
367+ ) -> Result < ( ) , Error > {
368+ let mut vm = vm;
369+ init_loop_vm ( & mut vm, & mut input_rx) . await ?;
370+
371+ // Initialize handler context
372+ let ( handler_state_tx, handler_state_rx) = HandlerStateNotifier :: new ( ) ;
373+ let ctx = ContextInternal :: new (
374+ vm,
375+ svc_name,
376+ handler_name,
377+ input_rx,
378+ output_tx,
379+ handler_state_tx,
380+ ) ;
381+
382+ // Start user code
383+ let user_code_fut = InterceptErrorFuture :: new ( ctx. clone ( ) , svc. handle ( ctx. clone ( ) ) ) ;
384+
385+ // Wrap it in handler state aware future
386+ HandlerStateAwareFuture :: new ( ctx. clone ( ) , handler_state_rx, user_code_fut) . await
387+ }
388+
389+ async fn init_loop_vm ( vm : & mut CoreVM , input_rx : & mut InputReceiver ) -> Result < ( ) , ErrorInner > {
390+ while !vm. is_ready_to_execute ( ) . map_err ( ErrorInner :: VM ) ? {
391+ match input_rx. recv ( ) . await {
392+ Some ( Ok ( b) ) => vm. notify_input ( b) ,
393+ Some ( Err ( e) ) => vm. notify_error (
394+ "Error when reading the body" . into ( ) ,
395+ e. to_string ( ) . into ( ) ,
396+ None ,
397+ ) ,
398+ None => vm. notify_input_closed ( ) ,
378399 }
379- Ok ( ( ) )
380400 }
401+ Ok ( ( ) )
381402}
0 commit comments