@@ -667,7 +667,6 @@ where
667667 D :: Target : ' static + ChangeDestinationSource ,
668668 K :: Target : ' static + KVStore ,
669669{
670- let mut should_break = false ;
671670 let async_event_handler = |event| {
672671 let network_graph = gossip_sync. network_graph ( ) ;
673672 let event_handler = & event_handler;
@@ -744,24 +743,14 @@ where
744743 // generally, and as a fallback place such blocking only immediately before
745744 // persistence.
746745 peer_manager. as_ref ( ) . process_events ( ) ;
747- if ( |fut : & mut SleepFuture | {
748- let mut waker = dummy_waker ( ) ;
749- let mut ctx = task:: Context :: from_waker ( & mut waker) ;
750- match core:: pin:: Pin :: new ( fut) . poll ( & mut ctx) {
751- task:: Poll :: Ready ( exit) => {
752- should_break = exit;
753- true
754- } ,
755- task:: Poll :: Pending => false ,
756- }
757- } ) ( & mut last_forwards_processing_call)
758- {
759- channel_manager. get_cm ( ) . process_pending_htlc_forwards ( ) ;
760- cur_batch_delay = batch_delay. next ( ) ;
761- last_forwards_processing_call = sleeper ( cur_batch_delay) ;
762- }
763- if should_break {
764- break ;
746+ match check_sleeper ( & mut last_forwards_processing_call) {
747+ Some ( false ) => {
748+ channel_manager. get_cm ( ) . process_pending_htlc_forwards ( ) ;
749+ cur_batch_delay = batch_delay. next ( ) ;
750+ last_forwards_processing_call = sleeper ( cur_batch_delay) ;
751+ } ,
752+ Some ( true ) => break ,
753+ None => { } ,
765754 }
766755
767756 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
@@ -799,27 +788,21 @@ where
799788 match fut. await {
800789 SelectorOutput :: A | SelectorOutput :: B | SelectorOutput :: C | SelectorOutput :: D => { } ,
801790 SelectorOutput :: E ( exit) => {
802- should_break = exit;
791+ if exit {
792+ break ;
793+ }
803794 } ,
804795 }
796+
805797 let await_slow = if mobile_interruptable_platform {
806- ( |fut : & mut SleepFuture | {
807- let mut waker = dummy_waker ( ) ;
808- let mut ctx = task:: Context :: from_waker ( & mut waker) ;
809- match core:: pin:: Pin :: new ( fut) . poll ( & mut ctx) {
810- task:: Poll :: Ready ( exit) => {
811- should_break = exit;
812- true
813- } ,
814- task:: Poll :: Pending => false ,
815- }
816- } ) ( & mut await_start. unwrap ( ) )
798+ match check_sleeper ( & mut await_start. unwrap ( ) ) {
799+ Some ( true ) => break ,
800+ Some ( false ) => true ,
801+ None => false ,
802+ }
817803 } else {
818804 false
819805 } ;
820- if should_break {
821- break ;
822- }
823806 if channel_manager. get_cm ( ) . get_and_clear_needs_persistence ( ) {
824807 log_trace ! ( logger, "Persisting ChannelManager..." ) ;
825808 kv_store
@@ -832,39 +815,25 @@ where
832815 . await ?;
833816 log_trace ! ( logger, "Done persisting ChannelManager." ) ;
834817 }
835- if ( |fut : & mut SleepFuture | {
836- let mut waker = dummy_waker ( ) ;
837- let mut ctx = task:: Context :: from_waker ( & mut waker) ;
838- match core:: pin:: Pin :: new ( fut) . poll ( & mut ctx) {
839- task:: Poll :: Ready ( exit) => {
840- should_break = exit;
841- true
842- } ,
843- task:: Poll :: Pending => false ,
844- }
845- } ) ( & mut last_freshness_call)
846- {
847- log_trace ! ( logger, "Calling ChannelManager's timer_tick_occurred" ) ;
848- channel_manager. get_cm ( ) . timer_tick_occurred ( ) ;
849- last_freshness_call = sleeper ( FRESHNESS_TIMER ) ;
850- }
851- if ( |fut : & mut SleepFuture | {
852- let mut waker = dummy_waker ( ) ;
853- let mut ctx = task:: Context :: from_waker ( & mut waker) ;
854- match core:: pin:: Pin :: new ( fut) . poll ( & mut ctx) {
855- task:: Poll :: Ready ( exit) => {
856- should_break = exit;
857- true
858- } ,
859- task:: Poll :: Pending => false ,
860- }
861- } ) ( & mut last_onion_message_handler_call)
862- {
863- if let Some ( om) = & onion_messenger {
864- log_trace ! ( logger, "Calling OnionMessageHandler's timer_tick_occurred" ) ;
865- om. get_om ( ) . timer_tick_occurred ( ) ;
866- }
867- last_onion_message_handler_call = sleeper ( ONION_MESSAGE_HANDLER_TIMER ) ;
818+ match check_sleeper ( & mut last_freshness_call) {
819+ Some ( false ) => {
820+ log_trace ! ( logger, "Calling ChannelManager's timer_tick_occurred" ) ;
821+ channel_manager. get_cm ( ) . timer_tick_occurred ( ) ;
822+ last_freshness_call = sleeper ( FRESHNESS_TIMER ) ;
823+ } ,
824+ Some ( true ) => break ,
825+ None => { } ,
826+ }
827+ match check_sleeper ( & mut last_onion_message_handler_call) {
828+ Some ( false ) => {
829+ if let Some ( om) = & onion_messenger {
830+ log_trace ! ( logger, "Calling OnionMessageHandler's timer_tick_occurred" ) ;
831+ om. get_om ( ) . timer_tick_occurred ( ) ;
832+ }
833+ last_onion_message_handler_call = sleeper ( ONION_MESSAGE_HANDLER_TIMER ) ;
834+ } ,
835+ Some ( true ) => break ,
836+ None => { } ,
868837 }
869838 if await_slow {
870839 // On various platforms, we may be starved of CPU cycles for several reasons.
@@ -882,39 +851,31 @@ where
882851 log_trace ! ( logger, "100ms sleep took more than a second, disconnecting peers." ) ;
883852 peer_manager. as_ref ( ) . disconnect_all_peers ( ) ;
884853 last_ping_call = sleeper ( PING_TIMER ) ;
885- } else if ( |fut : & mut SleepFuture | {
886- let mut waker = dummy_waker ( ) ;
887- let mut ctx = task:: Context :: from_waker ( & mut waker) ;
888- match core:: pin:: Pin :: new ( fut) . poll ( & mut ctx) {
889- task:: Poll :: Ready ( exit) => {
890- should_break = exit;
891- true
854+ } else {
855+ match check_sleeper ( & mut last_ping_call) {
856+ Some ( false ) => {
857+ log_trace ! ( logger, "Calling PeerManager's timer_tick_occurred" ) ;
858+ peer_manager. as_ref ( ) . timer_tick_occurred ( ) ;
859+ last_ping_call = sleeper ( PING_TIMER ) ;
892860 } ,
893- task:: Poll :: Pending => false ,
861+ Some ( true ) => break ,
862+ _ => { } ,
894863 }
895- } ) ( & mut last_ping_call)
896- {
897- log_trace ! ( logger, "Calling PeerManager's timer_tick_occurred" ) ;
898- peer_manager. as_ref ( ) . timer_tick_occurred ( ) ;
899- last_ping_call = sleeper ( PING_TIMER ) ;
900864 }
901865
902866 // Note that we want to run a graph prune once not long after startup before
903867 // falling back to our usual hourly prunes. This avoids short-lived clients never
904868 // pruning their network graph. We run once 60 seconds after startup before
905869 // continuing our normal cadence. For RGS, since 60 seconds is likely too long,
906870 // we prune after an initial sync completes.
907- let prune_timer_elapsed = ( |fut : & mut SleepFuture | {
908- let mut waker = dummy_waker ( ) ;
909- let mut ctx = task:: Context :: from_waker ( & mut waker) ;
910- match core:: pin:: Pin :: new ( fut) . poll ( & mut ctx) {
911- task:: Poll :: Ready ( exit) => {
912- should_break = exit;
913- true
914- } ,
915- task:: Poll :: Pending => false ,
871+ let prune_timer_elapsed = {
872+ match check_sleeper ( & mut last_prune_call) {
873+ Some ( false ) => true ,
874+ Some ( true ) => break ,
875+ None => false ,
916876 }
917- } ) ( & mut last_prune_call) ;
877+ } ;
878+
918879 let should_prune = match gossip_sync {
919880 GossipSync :: Rapid ( _) => !have_pruned || prune_timer_elapsed,
920881 _ => prune_timer_elapsed,
@@ -957,76 +918,55 @@ where
957918 }
958919 have_decayed_scorer = true ;
959920 }
960- if ( |fut : & mut SleepFuture | {
961- let mut waker = dummy_waker ( ) ;
962- let mut ctx = task:: Context :: from_waker ( & mut waker) ;
963- match core:: pin:: Pin :: new ( fut) . poll ( & mut ctx) {
964- task:: Poll :: Ready ( exit) => {
965- should_break = exit;
966- true
967- } ,
968- task:: Poll :: Pending => false ,
969- }
970- } ) ( & mut last_scorer_persist_call)
971- {
972- if let Some ( ref scorer) = scorer {
973- if let Some ( duration_since_epoch) = fetch_time ( ) {
974- log_trace ! ( logger, "Calling time_passed and persisting scorer" ) ;
975- scorer. write_lock ( ) . time_passed ( duration_since_epoch) ;
976- } else {
977- log_trace ! ( logger, "Persisting scorer" ) ;
921+ match check_sleeper ( & mut last_scorer_persist_call) {
922+ Some ( false ) => {
923+ if let Some ( ref scorer) = scorer {
924+ if let Some ( duration_since_epoch) = fetch_time ( ) {
925+ log_trace ! ( logger, "Calling time_passed and persisting scorer" ) ;
926+ scorer. write_lock ( ) . time_passed ( duration_since_epoch) ;
927+ } else {
928+ log_trace ! ( logger, "Persisting scorer" ) ;
929+ }
930+ if let Err ( e) = kv_store
931+ . write (
932+ SCORER_PERSISTENCE_PRIMARY_NAMESPACE ,
933+ SCORER_PERSISTENCE_SECONDARY_NAMESPACE ,
934+ SCORER_PERSISTENCE_KEY ,
935+ & scorer. encode ( ) ,
936+ )
937+ . await
938+ {
939+ log_error ! (
940+ logger,
941+ "Error: Failed to persist scorer, check your disk and permissions {}" ,
942+ e
943+ ) ;
944+ }
978945 }
979- if let Err ( e) = kv_store
980- . write (
981- SCORER_PERSISTENCE_PRIMARY_NAMESPACE ,
982- SCORER_PERSISTENCE_SECONDARY_NAMESPACE ,
983- SCORER_PERSISTENCE_KEY ,
984- & scorer. encode ( ) ,
985- )
986- . await
987- {
988- log_error ! (
989- logger,
990- "Error: Failed to persist scorer, check your disk and permissions {}" ,
991- e
992- ) ;
946+ last_scorer_persist_call = sleeper ( SCORER_PERSIST_TIMER ) ;
947+ } ,
948+ Some ( true ) => break ,
949+ None => { } ,
950+ }
951+ match check_sleeper ( & mut last_rebroadcast_call) {
952+ Some ( false ) => {
953+ log_trace ! ( logger, "Rebroadcasting monitor's pending claims" ) ;
954+ chain_monitor. rebroadcast_pending_claims ( ) ;
955+ last_rebroadcast_call = sleeper ( REBROADCAST_TIMER ) ;
956+ } ,
957+ Some ( true ) => break ,
958+ None => { } ,
959+ }
960+ match check_sleeper ( & mut last_sweeper_call) {
961+ Some ( false ) => {
962+ log_trace ! ( logger, "Regenerating sweeper spends if necessary" ) ;
963+ if let Some ( ref sweeper) = sweeper {
964+ let _ = sweeper. regenerate_and_broadcast_spend_if_necessary ( ) . await ;
993965 }
994- }
995- last_scorer_persist_call = sleeper ( SCORER_PERSIST_TIMER ) ;
996- }
997- if ( |fut : & mut SleepFuture | {
998- let mut waker = dummy_waker ( ) ;
999- let mut ctx = task:: Context :: from_waker ( & mut waker) ;
1000- match core:: pin:: Pin :: new ( fut) . poll ( & mut ctx) {
1001- task:: Poll :: Ready ( exit) => {
1002- should_break = exit;
1003- true
1004- } ,
1005- task:: Poll :: Pending => false ,
1006- }
1007- } ) ( & mut last_rebroadcast_call)
1008- {
1009- log_trace ! ( logger, "Rebroadcasting monitor's pending claims" ) ;
1010- chain_monitor. rebroadcast_pending_claims ( ) ;
1011- last_rebroadcast_call = sleeper ( REBROADCAST_TIMER ) ;
1012- }
1013- if ( |fut : & mut SleepFuture | {
1014- let mut waker = dummy_waker ( ) ;
1015- let mut ctx = task:: Context :: from_waker ( & mut waker) ;
1016- match core:: pin:: Pin :: new ( fut) . poll ( & mut ctx) {
1017- task:: Poll :: Ready ( exit) => {
1018- should_break = exit;
1019- true
1020- } ,
1021- task:: Poll :: Pending => false ,
1022- }
1023- } ) ( & mut last_sweeper_call)
1024- {
1025- log_trace ! ( logger, "Regenerating sweeper spends if necessary" ) ;
1026- if let Some ( ref sweeper) = sweeper {
1027- let _ = sweeper. regenerate_and_broadcast_spend_if_necessary ( ) . await ;
1028- }
1029- last_sweeper_call = sleeper ( SWEEPER_TIMER ) ;
966+ last_sweeper_call = sleeper ( SWEEPER_TIMER ) ;
967+ } ,
968+ Some ( true ) => break ,
969+ None => { } ,
1030970 }
1031971 }
1032972 log_trace ! ( logger, "Terminating background processor." ) ;
@@ -1065,6 +1005,17 @@ where
10651005 Ok ( ( ) )
10661006}
10671007
1008+ fn check_sleeper < SleepFuture : core:: future:: Future < Output = bool > + core:: marker:: Unpin > (
1009+ fut : & mut SleepFuture ,
1010+ ) -> Option < bool > {
1011+ let mut waker = dummy_waker ( ) ;
1012+ let mut ctx = task:: Context :: from_waker ( & mut waker) ;
1013+ match core:: pin:: Pin :: new ( fut) . poll ( & mut ctx) {
1014+ task:: Poll :: Ready ( exit) => Some ( exit) ,
1015+ task:: Poll :: Pending => None ,
1016+ }
1017+ }
1018+
10681019/// Async events processor that is based on [`process_events_async`] but allows for [`KVStoreSync`] to be used for
10691020/// synchronous background persistence.
10701021pub async fn process_events_async_with_kv_store_sync <
0 commit comments