@@ -43,6 +43,7 @@ use lightning::util::persist::{KVStore, Persister};
4343use lightning:: util:: sweep:: OutputSweeper ;
4444#[ cfg( feature = "std" ) ]
4545use lightning:: util:: sweep:: OutputSweeperSync ;
46+ use lightning:: util:: wakers:: Sleep ;
4647#[ cfg( feature = "std" ) ]
4748use lightning:: util:: wakers:: Sleeper ;
4849use lightning_rapid_gossip_sync:: RapidGossipSync ;
@@ -991,7 +992,7 @@ impl BackgroundProcessor {
991992 D : ' static + Deref ,
992993 O : ' static + Deref ,
993994 K : ' static + Deref ,
994- OS : ' static + Deref < Target = OutputSweeperSync < T , D , F , CF , K , L , O > > + Send ,
995+ OS : ' static + Deref < Target = OutputSweeper < T , D , F , CF , K , L , O > > + Send ,
995996 > (
996997 persister : PS , event_handler : EH , chain_monitor : M , channel_manager : CM ,
997998 onion_messenger : Option < OM > , gossip_sync : GossipSync < PGS , RGS , G , UL , L > , peer_manager : PM ,
@@ -1009,79 +1010,33 @@ impl BackgroundProcessor {
10091010 OM :: Target : AOnionMessenger ,
10101011 PM :: Target : APeerManager ,
10111012 LM :: Target : ALiquidityManager ,
1012- D :: Target : ChangeDestinationSourceSync ,
1013+ D :: Target : ChangeDestinationSource ,
10131014 O :: Target : ' static + OutputSpender ,
10141015 K :: Target : ' static + KVStore ,
10151016 {
10161017 let stop_thread = Arc :: new ( AtomicBool :: new ( false ) ) ;
10171018 let stop_thread_clone = stop_thread. clone ( ) ;
10181019 let handle = thread:: spawn ( move || -> Result < ( ) , std:: io:: Error > {
1019- let event_handler = |event| {
1020- let network_graph = gossip_sync. network_graph ( ) ;
1021- if let Some ( network_graph) = network_graph {
1022- handle_network_graph_update ( network_graph, & event)
1023- }
1024- if let Some ( ref scorer) = scorer {
1025- use std:: time:: SystemTime ;
1026- let duration_since_epoch = SystemTime :: now ( )
1027- . duration_since ( SystemTime :: UNIX_EPOCH )
1028- . expect ( "Time should be sometime after 1970" ) ;
1029- if update_scorer ( scorer, & event, duration_since_epoch) {
1030- log_trace ! ( logger, "Persisting scorer after update" ) ;
1031- if let Err ( e) = persister. persist_scorer ( & scorer) {
1032- log_error ! ( logger, "Error: Failed to persist scorer, check your disk and permissions {}" , e)
1033- }
1034- }
1035- }
1036- event_handler. handle_event ( event)
1037- } ;
1038- define_run_body ! (
1020+ let fut = process_events_async (
10391021 persister,
1022+ |event| async { event_handler. handle_event ( event) } ,
10401023 chain_monitor,
1041- chain_monitor. process_pending_events( & event_handler) ,
10421024 channel_manager,
1043- channel_manager. get_cm( ) . process_pending_events( & event_handler) ,
10441025 onion_messenger,
1045- if let Some ( om) = & onion_messenger {
1046- om. get_om( ) . process_pending_events( & event_handler)
1047- } ,
1048- peer_manager,
10491026 gossip_sync,
1050- {
1051- if let Some ( ref sweeper) = sweeper {
1052- let _ = sweeper. regenerate_and_broadcast_spend_if_necessary( ) ;
1053- }
1054- } ,
1027+ peer_manager,
1028+ liquidity_manager,
1029+ sweeper,
10551030 logger,
10561031 scorer,
1057- stop_thread. load( Ordering :: Acquire ) ,
1058- {
1059- let sleeper = match ( onion_messenger. as_ref( ) , liquidity_manager. as_ref( ) ) {
1060- ( Some ( om) , Some ( lm) ) => Sleeper :: from_four_futures(
1061- & channel_manager. get_cm( ) . get_event_or_persistence_needed_future( ) ,
1062- & chain_monitor. get_update_future( ) ,
1063- & om. get_om( ) . get_update_future( ) ,
1064- & lm. get_lm( ) . get_pending_msgs_future( ) ,
1065- ) ,
1066- ( Some ( om) , None ) => Sleeper :: from_three_futures(
1067- & channel_manager. get_cm( ) . get_event_or_persistence_needed_future( ) ,
1068- & chain_monitor. get_update_future( ) ,
1069- & om. get_om( ) . get_update_future( ) ,
1070- ) ,
1071- ( None , Some ( lm) ) => Sleeper :: from_three_futures(
1072- & channel_manager. get_cm( ) . get_event_or_persistence_needed_future( ) ,
1073- & chain_monitor. get_update_future( ) ,
1074- & lm. get_lm( ) . get_pending_msgs_future( ) ,
1075- ) ,
1076- ( None , None ) => Sleeper :: from_two_futures(
1077- & channel_manager. get_cm( ) . get_event_or_persistence_needed_future( ) ,
1078- & chain_monitor. get_update_future( ) ,
1079- ) ,
1080- } ;
1081- sleeper. wait_timeout( Duration :: from_millis( 100 ) ) ;
1032+ move |dur : Duration | {
1033+ let stop_thread_clone = stop_thread. clone ( ) ;
1034+
1035+ Box :: pin ( async move {
1036+ Sleep :: new ( dur) . await ;
1037+ stop_thread_clone. load ( Ordering :: Acquire )
1038+ } )
10821039 } ,
1083- |_| Instant :: now( ) ,
1084- |time: & Instant , dur| time. elapsed( ) . as_secs( ) > dur,
10851040 false ,
10861041 || {
10871042 use std:: time:: SystemTime ;
@@ -1091,7 +1046,10 @@ impl BackgroundProcessor {
10911046 . expect ( "Time should be sometime after 1970" ) ,
10921047 )
10931048 } ,
1094- )
1049+ ) ;
1050+
1051+ // TODO: Implement simple executor in utils.
1052+ futures:: executor:: block_on ( fut) . map_err ( Into :: into)
10951053 } ) ;
10961054 Self { stop_thread : stop_thread_clone, thread_handle : Some ( handle) }
10971055 }
@@ -1935,7 +1893,7 @@ mod tests {
19351893 nodes[ 0 ] . p2p_gossip_sync ( ) ,
19361894 nodes[ 0 ] . peer_manager . clone ( ) ,
19371895 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
1938- Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
1896+ Some ( nodes[ 0 ] . sweeper . sweeper_async ( ) . clone ( ) ) ,
19391897 nodes[ 0 ] . logger . clone ( ) ,
19401898 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
19411899 ) ;
@@ -2030,7 +1988,7 @@ mod tests {
20301988 nodes[ 0 ] . no_gossip_sync ( ) ,
20311989 nodes[ 0 ] . peer_manager . clone ( ) ,
20321990 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2033- Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
1991+ Some ( nodes[ 0 ] . sweeper . sweeper_async ( ) . clone ( ) ) ,
20341992 nodes[ 0 ] . logger . clone ( ) ,
20351993 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
20361994 ) ;
@@ -2074,7 +2032,7 @@ mod tests {
20742032 nodes[ 0 ] . no_gossip_sync ( ) ,
20752033 nodes[ 0 ] . peer_manager . clone ( ) ,
20762034 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2077- Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
2035+ Some ( nodes[ 0 ] . sweeper . sweeper_async ( ) . clone ( ) ) ,
20782036 nodes[ 0 ] . logger . clone ( ) ,
20792037 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
20802038 ) ;
@@ -2145,7 +2103,7 @@ mod tests {
21452103 nodes[ 0 ] . p2p_gossip_sync ( ) ,
21462104 nodes[ 0 ] . peer_manager . clone ( ) ,
21472105 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2148- Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
2106+ Some ( nodes[ 0 ] . sweeper . sweeper_async ( ) . clone ( ) ) ,
21492107 nodes[ 0 ] . logger . clone ( ) ,
21502108 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
21512109 ) ;
@@ -2176,7 +2134,7 @@ mod tests {
21762134 nodes[ 0 ] . no_gossip_sync ( ) ,
21772135 nodes[ 0 ] . peer_manager . clone ( ) ,
21782136 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2179- Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
2137+ Some ( nodes[ 0 ] . sweeper . sweeper_async ( ) . clone ( ) ) ,
21802138 nodes[ 0 ] . logger . clone ( ) ,
21812139 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
21822140 ) ;
@@ -2224,7 +2182,7 @@ mod tests {
22242182 nodes[ 0 ] . no_gossip_sync ( ) ,
22252183 nodes[ 0 ] . peer_manager . clone ( ) ,
22262184 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2227- Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
2185+ Some ( nodes[ 0 ] . sweeper . sweeper_async ( ) . clone ( ) ) ,
22282186 nodes[ 0 ] . logger . clone ( ) ,
22292187 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
22302188 ) ;
@@ -2288,7 +2246,7 @@ mod tests {
22882246 nodes[ 0 ] . no_gossip_sync ( ) ,
22892247 nodes[ 0 ] . peer_manager . clone ( ) ,
22902248 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2291- Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
2249+ Some ( nodes[ 0 ] . sweeper . sweeper_async ( ) . clone ( ) ) ,
22922250 nodes[ 0 ] . logger . clone ( ) ,
22932251 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
22942252 ) ;
@@ -2453,7 +2411,7 @@ mod tests {
24532411 nodes[ 0 ] . no_gossip_sync ( ) ,
24542412 nodes[ 0 ] . peer_manager . clone ( ) ,
24552413 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2456- Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
2414+ Some ( nodes[ 0 ] . sweeper . sweeper_async ( ) . clone ( ) ) ,
24572415 nodes[ 0 ] . logger . clone ( ) ,
24582416 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
24592417 ) ;
@@ -2484,7 +2442,7 @@ mod tests {
24842442 nodes[ 0 ] . no_gossip_sync ( ) ,
24852443 nodes[ 0 ] . peer_manager . clone ( ) ,
24862444 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2487- Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
2445+ Some ( nodes[ 0 ] . sweeper . sweeper_async ( ) . clone ( ) ) ,
24882446 nodes[ 0 ] . logger . clone ( ) ,
24892447 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
24902448 ) ;
@@ -2581,7 +2539,7 @@ mod tests {
25812539 nodes[ 0 ] . rapid_gossip_sync ( ) ,
25822540 nodes[ 0 ] . peer_manager . clone ( ) ,
25832541 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2584- Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
2542+ Some ( nodes[ 0 ] . sweeper . sweeper_async ( ) . clone ( ) ) ,
25852543 nodes[ 0 ] . logger . clone ( ) ,
25862544 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
25872545 ) ;
@@ -2778,7 +2736,7 @@ mod tests {
27782736 nodes[ 0 ] . no_gossip_sync ( ) ,
27792737 nodes[ 0 ] . peer_manager . clone ( ) ,
27802738 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2781- Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
2739+ Some ( nodes[ 0 ] . sweeper . sweeper_async ( ) . clone ( ) ) ,
27822740 nodes[ 0 ] . logger . clone ( ) ,
27832741 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
27842742 ) ;
0 commit comments