@@ -39,8 +39,11 @@ use lightning::sign::ChangeDestinationSource;
3939use lightning:: sign:: ChangeDestinationSourceSync ;
4040use lightning:: sign:: EntropySource ;
4141use lightning:: sign:: OutputSpender ;
42+ use lightning:: util:: async_poll:: FutureSpawner ;
4243use lightning:: util:: logger:: Logger ;
43- use lightning:: util:: persist:: { KVStore , Persister } ;
44+ use lightning:: util:: persist:: {
45+ KVStore , KVStoreSync , KVStoreSyncWrapper , Persister , PersisterSync ,
46+ } ;
4447use lightning:: util:: sweep:: OutputSweeper ;
4548#[ cfg( feature = "std" ) ]
4649use lightning:: util:: sweep:: OutputSweeperSync ;
@@ -311,6 +314,15 @@ fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + Wri
311314 true
312315}
313316
317+ macro_rules! maybe_await {
318+ ( true , $e: expr) => {
319+ $e. await
320+ } ;
321+ ( false , $e: expr) => {
322+ $e
323+ } ;
324+ }
325+
314326macro_rules! define_run_body {
315327 (
316328 $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
@@ -319,7 +331,7 @@ macro_rules! define_run_body {
319331 $peer_manager: ident, $gossip_sync: ident,
320332 $process_sweeper: expr,
321333 $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
322- $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
334+ $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, $async : tt ,
323335 ) => { {
324336 log_trace!( $logger, "Calling ChannelManager's timer_tick_occurred on startup" ) ;
325337 $channel_manager. get_cm( ) . timer_tick_occurred( ) ;
@@ -375,7 +387,7 @@ macro_rules! define_run_body {
375387
376388 if $channel_manager. get_cm( ) . get_and_clear_needs_persistence( ) {
377389 log_trace!( $logger, "Persisting ChannelManager..." ) ;
378- $ persister. persist_manager( & $channel_manager) ?;
390+ maybe_await! ( $async , $ persister. persist_manager( & $channel_manager) ) ?;
379391 log_trace!( $logger, "Done persisting ChannelManager." ) ;
380392 }
381393 if $timer_elapsed( & mut last_freshness_call, FRESHNESS_TIMER ) {
@@ -436,7 +448,7 @@ macro_rules! define_run_body {
436448 log_trace!( $logger, "Persisting network graph." ) ;
437449 }
438450
439- if let Err ( e) = $ persister. persist_graph( network_graph) {
451+ if let Err ( e) = maybe_await! ( $async , $ persister. persist_graph( network_graph) ) {
440452 log_error!( $logger, "Error: Failed to persist network graph, check your disk and permissions {}" , e)
441453 }
442454
@@ -464,7 +476,7 @@ macro_rules! define_run_body {
464476 } else {
465477 log_trace!( $logger, "Persisting scorer" ) ;
466478 }
467- if let Err ( e) = $ persister. persist_scorer( & scorer) {
479+ if let Err ( e) = maybe_await! ( $async , $ persister. persist_scorer( & scorer) ) {
468480 log_error!( $logger, "Error: Failed to persist scorer, check your disk and permissions {}" , e)
469481 }
470482 }
@@ -487,16 +499,16 @@ macro_rules! define_run_body {
487499 // After we exit, ensure we persist the ChannelManager one final time - this avoids
488500 // some races where users quit while channel updates were in-flight, with
489501 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
490- $ persister. persist_manager( & $channel_manager) ?;
502+ maybe_await! ( $async , $ persister. persist_manager( & $channel_manager) ) ?;
491503
492504 // Persist Scorer on exit
493505 if let Some ( ref scorer) = $scorer {
494- $ persister. persist_scorer( & scorer) ?;
506+ maybe_await! ( $async , $ persister. persist_scorer( & scorer) ) ?;
495507 }
496508
497509 // Persist NetworkGraph on exit
498510 if let Some ( network_graph) = $gossip_sync. network_graph( ) {
499- $ persister. persist_graph( network_graph) ?;
511+ maybe_await! ( $async , $ persister. persist_graph( network_graph) ) ?;
500512 }
501513
502514 Ok ( ( ) )
@@ -782,8 +794,11 @@ pub async fn process_events_async<
782794 EventHandler : Fn ( Event ) -> EventHandlerFuture ,
783795 PS : ' static + Deref + Send ,
784796 ES : ' static + Deref + Send ,
797+ FS : FutureSpawner ,
785798 M : ' static
786- + Deref < Target = ChainMonitor < <CM :: Target as AChannelManager >:: Signer , CF , T , F , L , P , ES > >
799+ + Deref <
800+ Target = ChainMonitor < <CM :: Target as AChannelManager >:: Signer , CF , T , F , L , P , ES , FS > ,
801+ >
787802 + Send
788803 + Sync ,
789804 CM : ' static + Deref ,
@@ -841,7 +856,7 @@ where
841856 if let Some ( duration_since_epoch) = fetch_time ( ) {
842857 if update_scorer ( scorer, & event, duration_since_epoch) {
843858 log_trace ! ( logger, "Persisting scorer after update" ) ;
844- if let Err ( e) = persister. persist_scorer ( & * scorer) {
859+ if let Err ( e) = persister. persist_scorer ( & * scorer) . await {
845860 log_error ! ( logger, "Error: Failed to persist scorer, check your disk and permissions {}" , e) ;
846861 // We opt not to abort early on persistence failure here as persisting
847862 // the scorer is non-critical and we still hope that it will have
@@ -919,6 +934,7 @@ where
919934 } ,
920935 mobile_interruptable_platform,
921936 fetch_time,
937+ true ,
922938 )
923939}
924940
@@ -982,7 +998,16 @@ impl BackgroundProcessor {
982998 ES : ' static + Deref + Send ,
983999 M : ' static
9841000 + Deref <
985- Target = ChainMonitor < <CM :: Target as AChannelManager >:: Signer , CF , T , F , L , P , ES > ,
1001+ Target = ChainMonitor <
1002+ <CM :: Target as AChannelManager >:: Signer ,
1003+ CF ,
1004+ T ,
1005+ F ,
1006+ L ,
1007+ P ,
1008+ ES ,
1009+ FS ,
1010+ > ,
9861011 >
9871012 + Send
9881013 + Sync ,
@@ -998,6 +1023,7 @@ impl BackgroundProcessor {
9981023 O : ' static + Deref ,
9991024 K : ' static + Deref ,
10001025 OS : ' static + Deref < Target = OutputSweeperSync < T , D , F , CF , K , L , O > > + Send ,
1026+ FS : FutureSpawner ,
10011027 > (
10021028 persister : PS , event_handler : EH , chain_monitor : M , channel_manager : CM ,
10031029 onion_messenger : Option < OM > , gossip_sync : GossipSync < PGS , RGS , G , UL , L > , peer_manager : PM ,
@@ -1010,15 +1036,15 @@ impl BackgroundProcessor {
10101036 F :: Target : ' static + FeeEstimator ,
10111037 L :: Target : ' static + Logger ,
10121038 P :: Target : ' static + Persist < <CM :: Target as AChannelManager >:: Signer > ,
1013- PS :: Target : ' static + Persister < ' a , CM , L , S > ,
1039+ PS :: Target : ' static + PersisterSync < ' a , CM , L , S > ,
10141040 ES :: Target : ' static + EntropySource ,
10151041 CM :: Target : AChannelManager ,
10161042 OM :: Target : AOnionMessenger ,
10171043 PM :: Target : APeerManager ,
10181044 LM :: Target : ALiquidityManager ,
10191045 D :: Target : ChangeDestinationSourceSync ,
10201046 O :: Target : ' static + OutputSpender ,
1021- K :: Target : ' static + KVStore ,
1047+ K :: Target : ' static + KVStoreSync ,
10221048 {
10231049 let stop_thread = Arc :: new ( AtomicBool :: new ( false ) ) ;
10241050 let stop_thread_clone = stop_thread. clone ( ) ;
@@ -1098,6 +1124,7 @@ impl BackgroundProcessor {
10981124 . expect( "Time should be sometime after 1970" ) ,
10991125 )
11001126 } ,
1127+ false ,
11011128 )
11021129 } ) ;
11031130 Self { stop_thread : stop_thread_clone, thread_handle : Some ( handle) }
0 commit comments