@@ -180,7 +180,7 @@ uniffi::include_scaffolding!("ldk_node");
180180pub struct Node {
181181 runtime : Arc < RwLock < Option < Arc < tokio:: runtime:: Runtime > > > > ,
182182 stop_sender : tokio:: sync:: watch:: Sender < ( ) > ,
183- event_handling_stopped_sender : tokio:: sync :: watch :: Sender < ( ) > ,
183+ background_processor_task : Mutex < Option < tokio:: task :: JoinHandle < ( ) > > > ,
184184 config : Arc < Config > ,
185185 wallet : Arc < Wallet > ,
186186 chain_source : Arc < ChainSource > ,
@@ -579,8 +579,7 @@ impl Node {
579579 } ;
580580
581581 let background_stop_logger = Arc :: clone ( & self . logger ) ;
582- let event_handling_stopped_sender = self . event_handling_stopped_sender . clone ( ) ;
583- runtime. spawn ( async move {
582+ let handle = runtime. spawn ( async move {
584583 process_events_async (
585584 background_persister,
586585 |e| background_event_handler. handle_event ( e) ,
@@ -601,19 +600,9 @@ impl Node {
601600 panic ! ( "Failed to process events" ) ;
602601 } ) ;
603602 log_debug ! ( background_stop_logger, "Events processing stopped." , ) ;
604-
605- match event_handling_stopped_sender. send ( ( ) ) {
606- Ok ( _) => ( ) ,
607- Err ( e) => {
608- log_error ! (
609- background_stop_logger,
610- "Failed to send 'events handling stopped' signal. This should never happen: {}" ,
611- e
612- ) ;
613- debug_assert ! ( false ) ;
614- } ,
615- }
616603 } ) ;
604+ debug_assert ! ( self . background_processor_task. lock( ) . unwrap( ) . is_none( ) ) ;
605+ * self . background_processor_task . lock ( ) . unwrap ( ) = Some ( handle) ;
617606
618607 if let Some ( liquidity_source) = self . liquidity_source . as_ref ( ) {
619608 let mut stop_liquidity_handler = self . stop_sender . subscribe ( ) ;
@@ -670,39 +659,42 @@ impl Node {
670659 // Disconnect all peers.
671660 self . peer_manager . disconnect_all_peers ( ) ;
672661
673- // Wait until event handling stopped, at least until a timeout is reached.
674- let event_handling_stopped_logger = Arc :: clone ( & self . logger ) ;
675- let mut event_handling_stopped_receiver = self . event_handling_stopped_sender . subscribe ( ) ;
662+ // Stop any runtime-dependant chain sources.
663+ self . chain_source . stop ( ) ;
676664
677- let timeout_res = tokio:: task:: block_in_place ( move || {
678- runtime. block_on ( async {
679- tokio:: time:: timeout (
680- Duration :: from_secs ( LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS ) ,
681- event_handling_stopped_receiver. changed ( ) ,
682- )
683- . await
684- } )
685- } ) ;
665+ // Wait until background processing stopped, at least until a timeout is reached.
666+ if let Some ( background_processor_task) =
667+ self . background_processor_task . lock ( ) . unwrap ( ) . take ( )
668+ {
669+ let abort_handle = background_processor_task. abort_handle ( ) ;
670+ let timeout_res = tokio:: task:: block_in_place ( move || {
671+ runtime. block_on ( async {
672+ tokio:: time:: timeout (
673+ Duration :: from_secs ( LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS ) ,
674+ background_processor_task,
675+ )
676+ . await
677+ } )
678+ } ) ;
686679
687- match timeout_res {
688- Ok ( stop_res) => match stop_res {
689- Ok ( ( ) ) => { } ,
680+ match timeout_res {
681+ Ok ( stop_res) => match stop_res {
682+ Ok ( ( ) ) => { } ,
683+ Err ( e) => {
684+ abort_handle. abort ( ) ;
685+ log_error ! (
686+ self . logger,
687+ "Stopping event handling failed. This should never happen: {}" ,
688+ e
689+ ) ;
690+ panic ! ( "Stopping event handling failed. This should never happen." ) ;
691+ } ,
692+ } ,
690693 Err ( e) => {
691- log_error ! (
692- event_handling_stopped_logger,
693- "Stopping event handling failed. This should never happen: {}" ,
694- e
695- ) ;
696- panic ! ( "Stopping event handling failed. This should never happen." ) ;
694+ abort_handle. abort ( ) ;
695+ log_error ! ( self . logger, "Stopping event handling timed out: {}" , e) ;
697696 } ,
698- } ,
699- Err ( e) => {
700- log_error ! (
701- event_handling_stopped_logger,
702- "Stopping event handling timed out: {}" ,
703- e
704- ) ;
705- } ,
697+ }
706698 }
707699
708700 #[ cfg( tokio_unstable) ]
0 commit comments