@@ -453,7 +453,8 @@ impl Dispatcher {
453453 }
454454
455455 fn on_grpc_receive ( & self , token_id : u32 , response_size : usize ) {
456- if let Some ( context_id) = self . grpc_callouts . borrow_mut ( ) . remove ( & token_id) {
456+ let context_id = self . grpc_callouts . borrow_mut ( ) . remove ( & token_id) ;
457+ if let Some ( context_id) = context_id {
457458 if let Some ( http_stream) = self . http_streams . borrow_mut ( ) . get_mut ( & context_id) {
458459 self . active_id . set ( context_id) ;
459460 hostcalls:: set_effective_context ( context_id) . unwrap ( ) ;
@@ -467,24 +468,26 @@ impl Dispatcher {
467468 hostcalls:: set_effective_context ( context_id) . unwrap ( ) ;
468469 root. on_grpc_call_response ( token_id, 0 , response_size) ;
469470 }
470- } else if let Some ( context_id) = self . grpc_streams . borrow_mut ( ) . get ( & token_id) {
471- let context_id = * context_id;
472- if let Some ( http_stream) = self . http_streams . borrow_mut ( ) . get_mut ( & context_id) {
473- self . active_id . set ( context_id) ;
474- hostcalls:: set_effective_context ( context_id) . unwrap ( ) ;
475- http_stream. on_grpc_stream_message ( token_id, response_size) ;
476- } else if let Some ( stream) = self . streams . borrow_mut ( ) . get_mut ( & context_id) {
477- self . active_id . set ( context_id) ;
478- hostcalls:: set_effective_context ( context_id) . unwrap ( ) ;
479- stream. on_grpc_stream_message ( token_id, response_size) ;
480- } else if let Some ( root) = self . roots . borrow_mut ( ) . get_mut ( & context_id) {
481- self . active_id . set ( context_id) ;
482- hostcalls:: set_effective_context ( context_id) . unwrap ( ) ;
483- root. on_grpc_stream_message ( token_id, response_size) ;
484- }
485471 } else {
486- // TODO: change back to a panic once underlying issue is fixed.
487- trace ! ( "on_grpc_receive_initial_metadata: invalid token_id" ) ;
472+ let context_id = self . grpc_streams . borrow ( ) . get ( & token_id) . cloned ( ) ;
473+ if let Some ( context_id) = context_id {
474+ if let Some ( http_stream) = self . http_streams . borrow_mut ( ) . get_mut ( & context_id) {
475+ self . active_id . set ( context_id) ;
476+ hostcalls:: set_effective_context ( context_id) . unwrap ( ) ;
477+ http_stream. on_grpc_stream_message ( token_id, response_size) ;
478+ } else if let Some ( stream) = self . streams . borrow_mut ( ) . get_mut ( & context_id) {
479+ self . active_id . set ( context_id) ;
480+ hostcalls:: set_effective_context ( context_id) . unwrap ( ) ;
481+ stream. on_grpc_stream_message ( token_id, response_size) ;
482+ } else if let Some ( root) = self . roots . borrow_mut ( ) . get_mut ( & context_id) {
483+ self . active_id . set ( context_id) ;
484+ hostcalls:: set_effective_context ( context_id) . unwrap ( ) ;
485+ root. on_grpc_stream_message ( token_id, response_size) ;
486+ }
487+ } else {
488+ // TODO: change back to a panic once underlying issue is fixed.
489+ trace ! ( "on_grpc_receive_initial_metadata: invalid token_id" ) ;
490+ }
488491 }
489492 }
490493
@@ -514,7 +517,8 @@ impl Dispatcher {
514517 }
515518
516519 fn on_grpc_close ( & self , token_id : u32 , status_code : u32 ) {
517- if let Some ( context_id) = self . grpc_callouts . borrow_mut ( ) . remove ( & token_id) {
520+ let context_id = self . grpc_callouts . borrow_mut ( ) . remove ( & token_id) ;
521+ if let Some ( context_id) = context_id {
518522 if let Some ( http_stream) = self . http_streams . borrow_mut ( ) . get_mut ( & context_id) {
519523 self . active_id . set ( context_id) ;
520524 hostcalls:: set_effective_context ( context_id) . unwrap ( ) ;
@@ -528,23 +532,26 @@ impl Dispatcher {
528532 hostcalls:: set_effective_context ( context_id) . unwrap ( ) ;
529533 root. on_grpc_call_response ( token_id, status_code, 0 ) ;
530534 }
531- } else if let Some ( context_id) = self . grpc_streams . borrow_mut ( ) . remove ( & token_id) {
532- if let Some ( http_stream) = self . http_streams . borrow_mut ( ) . get_mut ( & context_id) {
533- self . active_id . set ( context_id) ;
534- hostcalls:: set_effective_context ( context_id) . unwrap ( ) ;
535- http_stream. on_grpc_stream_close ( token_id, status_code)
536- } else if let Some ( stream) = self . streams . borrow_mut ( ) . get_mut ( & context_id) {
537- self . active_id . set ( context_id) ;
538- hostcalls:: set_effective_context ( context_id) . unwrap ( ) ;
539- stream. on_grpc_stream_close ( token_id, status_code)
540- } else if let Some ( root) = self . roots . borrow_mut ( ) . get_mut ( & context_id) {
541- self . active_id . set ( context_id) ;
542- hostcalls:: set_effective_context ( context_id) . unwrap ( ) ;
543- root. on_grpc_stream_close ( token_id, status_code)
544- }
545535 } else {
546- // TODO: change back to a panic once underlying issue is fixed.
547- trace ! ( "on_grpc_close: invalid token_id, a non-connected stream has closed" ) ;
536+ let context_id = self . grpc_streams . borrow_mut ( ) . remove ( & token_id) ;
537+ if let Some ( context_id) = context_id {
538+ if let Some ( http_stream) = self . http_streams . borrow_mut ( ) . get_mut ( & context_id) {
539+ self . active_id . set ( context_id) ;
540+ hostcalls:: set_effective_context ( context_id) . unwrap ( ) ;
541+ http_stream. on_grpc_stream_close ( token_id, status_code)
542+ } else if let Some ( stream) = self . streams . borrow_mut ( ) . get_mut ( & context_id) {
543+ self . active_id . set ( context_id) ;
544+ hostcalls:: set_effective_context ( context_id) . unwrap ( ) ;
545+ stream. on_grpc_stream_close ( token_id, status_code)
546+ } else if let Some ( root) = self . roots . borrow_mut ( ) . get_mut ( & context_id) {
547+ self . active_id . set ( context_id) ;
548+ hostcalls:: set_effective_context ( context_id) . unwrap ( ) ;
549+ root. on_grpc_stream_close ( token_id, status_code)
550+ }
551+ } else {
552+ // TODO: change back to a panic once underlying issue is fixed.
553+ trace ! ( "on_grpc_close: invalid token_id, a non-connected stream has closed" ) ;
554+ }
548555 }
549556 }
550557}
0 commit comments