@@ -179,7 +179,6 @@ uniffi::include_scaffolding!("ldk_node");
179179pub struct Node < K : KVStore + Sync + Send + ' static > {
180180 runtime : Arc < RwLock < Option < tokio:: runtime:: Runtime > > > ,
181181 stop_sender : tokio:: sync:: watch:: Sender < ( ) > ,
182- stop_receiver : tokio:: sync:: watch:: Receiver < ( ) > ,
183182 config : Arc < Config > ,
184183 wallet : Arc < Wallet > ,
185184 tx_sync : Arc < EsploraSyncClient < Arc < FilesystemLogger > > > ,
@@ -247,7 +246,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
247246 // Setup wallet sync
248247 let wallet = Arc :: clone ( & self . wallet ) ;
249248 let sync_logger = Arc :: clone ( & self . logger ) ;
250- let mut stop_sync = self . stop_receiver . clone ( ) ;
249+ let mut stop_sync = self . stop_sender . subscribe ( ) ;
251250 let onchain_wallet_sync_interval_secs = self
252251 . config
253252 . onchain_wallet_sync_interval_secs
@@ -288,7 +287,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
288287 ) ;
289288 } ) ;
290289
291- let mut stop_fee_updates = self . stop_receiver . clone ( ) ;
290+ let mut stop_fee_updates = self . stop_sender . subscribe ( ) ;
292291 let fee_update_logger = Arc :: clone ( & self . logger ) ;
293292 let fee_estimator = Arc :: clone ( & self . fee_estimator ) ;
294293 let fee_rate_cache_update_interval_secs =
@@ -331,7 +330,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
331330 let sync_cmon = Arc :: clone ( & self . chain_monitor ) ;
332331 let sync_sweeper = Arc :: clone ( & self . output_sweeper ) ;
333332 let sync_logger = Arc :: clone ( & self . logger ) ;
334- let mut stop_sync = self . stop_receiver . clone ( ) ;
333+ let mut stop_sync = self . stop_sender . subscribe ( ) ;
335334 let wallet_sync_interval_secs =
336335 self . config . wallet_sync_interval_secs . max ( WALLET_SYNC_INTERVAL_MINIMUM_SECS ) ;
337336 runtime. spawn ( async move {
@@ -369,7 +368,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
369368 let gossip_source = Arc :: clone ( & self . gossip_source ) ;
370369 let gossip_sync_store = Arc :: clone ( & self . kv_store ) ;
371370 let gossip_sync_logger = Arc :: clone ( & self . logger ) ;
372- let mut stop_gossip_sync = self . stop_receiver . clone ( ) ;
371+ let mut stop_gossip_sync = self . stop_sender . subscribe ( ) ;
373372 runtime. spawn ( async move {
374373 let mut interval = tokio:: time:: interval ( RGS_SYNC_INTERVAL ) ;
375374 loop {
@@ -412,7 +411,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
412411 if let Some ( listening_addresses) = & self . config . listening_addresses {
413412 // Setup networking
414413 let peer_manager_connection_handler = Arc :: clone ( & self . peer_manager ) ;
415- let mut stop_listen = self . stop_receiver . clone ( ) ;
414+ let mut stop_listen = self . stop_sender . subscribe ( ) ;
416415 let listening_logger = Arc :: clone ( & self . logger ) ;
417416
418417 let mut bind_addrs = Vec :: with_capacity ( listening_addresses. len ( ) ) ;
@@ -462,12 +461,11 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
462461 } ) ;
463462 }
464463
465- // Regularly reconnect to channel peers.
466- let connect_cm = Arc :: clone ( & self . channel_manager ) ;
464+ // Regularly reconnect to persisted peers.
467465 let connect_pm = Arc :: clone ( & self . peer_manager ) ;
468466 let connect_logger = Arc :: clone ( & self . logger ) ;
469467 let connect_peer_store = Arc :: clone ( & self . peer_store ) ;
470- let mut stop_connect = self . stop_receiver . clone ( ) ;
468+ let mut stop_connect = self . stop_sender . subscribe ( ) ;
471469 runtime. spawn ( async move {
472470 let mut interval = tokio:: time:: interval ( PEER_RECONNECTION_INTERVAL ) ;
473471 interval. set_missed_tick_behavior ( tokio:: time:: MissedTickBehavior :: Skip ) ;
@@ -482,29 +480,23 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
482480 . iter( )
483481 . map( |( peer, _addr) | * peer)
484482 . collect:: <Vec <_>>( ) ;
485- for node_id in connect_cm
486- . list_channels( )
487- . iter( )
488- . map( |chan| chan. counterparty. node_id)
489- . filter( |id| !pm_peers. contains( id) )
490- {
491- if let Some ( peer_info) = connect_peer_store. get_peer( & node_id) {
492- let res = do_connect_peer(
493- peer_info. node_id,
494- peer_info. address,
495- Arc :: clone( & connect_pm) ,
496- Arc :: clone( & connect_logger) ,
497- ) . await ;
498- match res {
499- Ok ( _) => {
500- log_info!( connect_logger, "Successfully reconnected to peer {}" , node_id) ;
501- } ,
502- Err ( e) => {
503- log_error!( connect_logger, "Failed to reconnect to peer {}: {}" , node_id, e) ;
504- }
505- }
506- }
483+
484+ for peer_info in connect_peer_store. list_peers( ) . iter( ) . filter( |info| !pm_peers. contains( & info. node_id) ) {
485+ let res = do_connect_peer(
486+ peer_info. node_id,
487+ peer_info. address. clone( ) ,
488+ Arc :: clone( & connect_pm) ,
489+ Arc :: clone( & connect_logger) ,
490+ ) . await ;
491+ match res {
492+ Ok ( _) => {
493+ log_info!( connect_logger, "Successfully reconnected to peer {}" , peer_info. node_id) ;
494+ } ,
495+ Err ( e) => {
496+ log_error!( connect_logger, "Failed to reconnect to peer {}: {}" , peer_info. node_id, e) ;
507497 }
498+ }
499+ }
508500 }
509501 }
510502 }
@@ -516,7 +508,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
516508 let bcast_config = Arc :: clone ( & self . config ) ;
517509 let bcast_store = Arc :: clone ( & self . kv_store ) ;
518510 let bcast_logger = Arc :: clone ( & self . logger ) ;
519- let mut stop_bcast = self . stop_receiver . clone ( ) ;
511+ let mut stop_bcast = self . stop_sender . subscribe ( ) ;
520512 runtime. spawn ( async move {
521513 // We check every 30 secs whether our last broadcast is NODE_ANN_BCAST_INTERVAL away.
522514 let mut interval = tokio:: time:: interval ( Duration :: from_secs ( 30 ) ) ;
@@ -572,7 +564,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
572564 }
573565 } ) ;
574566
575- let mut stop_tx_bcast = self . stop_receiver . clone ( ) ;
567+ let mut stop_tx_bcast = self . stop_sender . subscribe ( ) ;
576568 let tx_bcaster = Arc :: clone ( & self . tx_broadcaster ) ;
577569 runtime. spawn ( async move {
578570 // Every second we try to clear our broadcasting queue.
@@ -613,7 +605,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
613605 let background_logger = Arc :: clone ( & self . logger ) ;
614606 let background_error_logger = Arc :: clone ( & self . logger ) ;
615607 let background_scorer = Arc :: clone ( & self . scorer ) ;
616- let stop_bp = self . stop_receiver . clone ( ) ;
608+ let stop_bp = self . stop_sender . subscribe ( ) ;
617609 let sleeper = move |d| {
618610 let mut stop = stop_bp. clone ( ) ;
619611 Box :: pin ( async move {
@@ -650,7 +642,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
650642 } ) ;
651643
652644 if let Some ( liquidity_source) = self . liquidity_source . as_ref ( ) {
653- let mut stop_liquidity_handler = self . stop_receiver . clone ( ) ;
645+ let mut stop_liquidity_handler = self . stop_sender . subscribe ( ) ;
654646 let liquidity_handler = Arc :: clone ( & liquidity_source) ;
655647 runtime. spawn ( async move {
656648 loop {
0 commit comments