@@ -126,8 +126,9 @@ pub use builder::NodeBuilder as Builder;
126126
127127use chain:: ChainSource ;
128128use config:: {
129- default_user_config, may_announce_channel, ChannelConfig , Config , NODE_ANN_BCAST_INTERVAL ,
130- PEER_RECONNECTION_INTERVAL , RGS_SYNC_INTERVAL ,
129+ default_user_config, may_announce_channel, ChannelConfig , Config ,
130+ LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS , NODE_ANN_BCAST_INTERVAL , PEER_RECONNECTION_INTERVAL ,
131+ RGS_SYNC_INTERVAL ,
131132} ;
132133use connection:: ConnectionManager ;
133134use event:: { EventHandler , EventQueue } ;
@@ -146,9 +147,7 @@ use types::{
146147} ;
147148pub use types:: { ChannelDetails , CustomTlvRecord , PeerDetails , UserChannelId } ;
148149
149- #[ cfg( tokio_unstable) ]
150- use logger:: log_trace;
151- use logger:: { log_debug, log_error, log_info, LdkLogger , Logger } ;
150+ use logger:: { log_debug, log_error, log_info, log_trace, LdkLogger , Logger } ;
152151
153152use lightning:: chain:: BestBlock ;
154153use lightning:: events:: bump_transaction:: Wallet as LdkWallet ;
@@ -179,7 +178,7 @@ uniffi::include_scaffolding!("ldk_node");
179178pub struct Node {
180179 runtime : Arc < RwLock < Option < Arc < tokio:: runtime:: Runtime > > > > ,
181180 stop_sender : tokio:: sync:: watch:: Sender < ( ) > ,
182- event_handling_stopped_sender : tokio:: sync :: watch :: Sender < ( ) > ,
181+ background_processor_task : Mutex < Option < tokio:: task :: JoinHandle < ( ) > > > ,
183182 config : Arc < Config > ,
184183 wallet : Arc < Wallet > ,
185184 chain_source : Arc < ChainSource > ,
@@ -577,9 +576,7 @@ impl Node {
577576 } )
578577 } ;
579578
580- let background_stop_logger = Arc :: clone ( & self . logger ) ;
581- let event_handling_stopped_sender = self . event_handling_stopped_sender . clone ( ) ;
582- runtime. spawn ( async move {
579+ let handle = runtime. spawn ( async move {
583580 process_events_async (
584581 background_persister,
585582 |e| background_event_handler. handle_event ( e) ,
@@ -599,20 +596,9 @@ impl Node {
599596 log_error ! ( background_error_logger, "Failed to process events: {}" , e) ;
600597 panic ! ( "Failed to process events" ) ;
601598 } ) ;
602- log_debug ! ( background_stop_logger, "Events processing stopped." , ) ;
603-
604- match event_handling_stopped_sender. send ( ( ) ) {
605- Ok ( _) => ( ) ,
606- Err ( e) => {
607- log_error ! (
608- background_stop_logger,
609- "Failed to send 'events handling stopped' signal. This should never happen: {}" ,
610- e
611- ) ;
612- debug_assert ! ( false ) ;
613- } ,
614- }
615599 } ) ;
600+ debug_assert ! ( self . background_processor_task. lock( ) . unwrap( ) . is_none( ) ) ;
601+ * self . background_processor_task . lock ( ) . unwrap ( ) = Some ( handle) ;
616602
617603 if let Some ( liquidity_source) = self . liquidity_source . as_ref ( ) {
618604 let mut stop_liquidity_handler = self . stop_sender . subscribe ( ) ;
@@ -655,7 +641,7 @@ impl Node {
655641
656642 // Stop the runtime.
657643 match self . stop_sender . send ( ( ) ) {
658- Ok ( _) => ( ) ,
644+ Ok ( _) => log_trace ! ( self . logger , "Sent shutdown signal to background tasks." ) ,
659645 Err ( e) => {
660646 log_error ! (
661647 self . logger,
@@ -668,43 +654,45 @@ impl Node {
668654
669655 // Disconnect all peers.
670656 self . peer_manager . disconnect_all_peers ( ) ;
657+ log_debug ! ( self . logger, "Disconnected all network peers." ) ;
671658
672- // Wait until event handling stopped, at least until a timeout is reached.
673- let event_handling_stopped_logger = Arc :: clone ( & self . logger ) ;
674- let mut event_handling_stopped_receiver = self . event_handling_stopped_sender . subscribe ( ) ;
675-
676- // FIXME: For now, we wait up to 100 secs (BDK_WALLET_SYNC_TIMEOUT_SECS + 10) to allow
677- // event handling to exit gracefully even if it was blocked on the BDK wallet syncing. We
678- // should drop this considerably post upgrading to BDK 1.0.
679- let timeout_res = tokio:: task:: block_in_place ( move || {
680- runtime. block_on ( async {
681- tokio:: time:: timeout (
682- Duration :: from_secs ( 100 ) ,
683- event_handling_stopped_receiver. changed ( ) ,
684- )
685- . await
686- } )
687- } ) ;
659+ // Stop any runtime-dependant chain sources.
660+ self . chain_source . stop ( ) ;
661+ log_debug ! ( self . logger, "Stopped chain sources." ) ;
662+
663+ // Wait until background processing stopped, at least until a timeout is reached.
664+ if let Some ( background_processor_task) =
665+ self . background_processor_task . lock ( ) . unwrap ( ) . take ( )
666+ {
667+ let abort_handle = background_processor_task. abort_handle ( ) ;
668+ let timeout_res = tokio:: task:: block_in_place ( move || {
669+ runtime. block_on ( async {
670+ tokio:: time:: timeout (
671+ Duration :: from_secs ( LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS ) ,
672+ background_processor_task,
673+ )
674+ . await
675+ } )
676+ } ) ;
688677
689- match timeout_res {
690- Ok ( stop_res) => match stop_res {
691- Ok ( ( ) ) => { } ,
678+ match timeout_res {
679+ Ok ( stop_res) => match stop_res {
680+ Ok ( ( ) ) => log_debug ! ( self . logger, "Stopped background processing of events." ) ,
681+ Err ( e) => {
682+ abort_handle. abort ( ) ;
683+ log_error ! (
684+ self . logger,
685+ "Stopping event handling failed. This should never happen: {}" ,
686+ e
687+ ) ;
688+ panic ! ( "Stopping event handling failed. This should never happen." ) ;
689+ } ,
690+ } ,
692691 Err ( e) => {
693- log_error ! (
694- event_handling_stopped_logger,
695- "Stopping event handling failed. This should never happen: {}" ,
696- e
697- ) ;
698- panic ! ( "Stopping event handling failed. This should never happen." ) ;
692+ abort_handle. abort ( ) ;
693+ log_error ! ( self . logger, "Stopping event handling timed out: {}" , e) ;
699694 } ,
700- } ,
701- Err ( e) => {
702- log_error ! (
703- event_handling_stopped_logger,
704- "Stopping event handling timed out: {}" ,
705- e
706- ) ;
707- } ,
695+ }
708696 }
709697
710698 #[ cfg( tokio_unstable) ]
0 commit comments