@@ -55,11 +55,11 @@ use lightning::sign::EntropySource;
5555use lightning:: sign:: OutputSpender ;
5656use lightning:: util:: logger:: Logger ;
5757use lightning:: util:: persist:: {
58- KVStoreSync , CHANNEL_MANAGER_PERSISTENCE_KEY , CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
59- CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE , NETWORK_GRAPH_PERSISTENCE_KEY ,
60- NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE , NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE ,
61- SCORER_PERSISTENCE_KEY , SCORER_PERSISTENCE_PRIMARY_NAMESPACE ,
62- SCORER_PERSISTENCE_SECONDARY_NAMESPACE ,
58+ KVStore , KVStoreSync , KVStoreSyncWrapper , CHANNEL_MANAGER_PERSISTENCE_KEY ,
59+ CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE , CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE ,
60+ NETWORK_GRAPH_PERSISTENCE_KEY , NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE ,
61+ NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE , SCORER_PERSISTENCE_KEY ,
62+ SCORER_PERSISTENCE_PRIMARY_NAMESPACE , SCORER_PERSISTENCE_SECONDARY_NAMESPACE ,
6363} ;
6464use lightning:: util:: sweep:: OutputSweeper ;
6565#[ cfg( feature = "std" ) ]
@@ -331,6 +331,15 @@ fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + Wri
331331 true
332332}
333333
334+ macro_rules! maybe_await {
335+ ( true , $e: expr) => {
336+ $e. await
337+ } ;
338+ ( false , $e: expr) => {
339+ $e
340+ } ;
341+ }
342+
334343macro_rules! define_run_body {
335344 (
336345 $kv_store: ident,
@@ -340,7 +349,7 @@ macro_rules! define_run_body {
340349 $peer_manager: ident, $gossip_sync: ident,
341350 $process_sweeper: expr,
342351 $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
343- $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, $batch_delay: expr,
352+ $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, $batch_delay: expr, $async_persist : tt ,
344353 ) => { {
345354 log_trace!( $logger, "Calling ChannelManager's timer_tick_occurred on startup" ) ;
346355 $channel_manager. get_cm( ) . timer_tick_occurred( ) ;
@@ -412,12 +421,12 @@ macro_rules! define_run_body {
412421
413422 if $channel_manager. get_cm( ) . get_and_clear_needs_persistence( ) {
414423 log_trace!( $logger, "Persisting ChannelManager..." ) ;
415- $kv_store. write(
424+ maybe_await! ( $async_persist , $kv_store. write(
416425 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
417426 CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE ,
418427 CHANNEL_MANAGER_PERSISTENCE_KEY ,
419428 & $channel_manager. get_cm( ) . encode( ) ,
420- ) ?;
429+ ) ) ?;
421430 log_trace!( $logger, "Done persisting ChannelManager." ) ;
422431 }
423432 if $timer_elapsed( & mut last_freshness_call, FRESHNESS_TIMER ) {
@@ -478,12 +487,12 @@ macro_rules! define_run_body {
478487 log_trace!( $logger, "Persisting network graph." ) ;
479488 }
480489
481- if let Err ( e) = $kv_store. write(
490+ if let Err ( e) = maybe_await! ( $async_persist , $kv_store. write(
482491 NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE ,
483492 NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE ,
484493 NETWORK_GRAPH_PERSISTENCE_KEY ,
485494 & network_graph. encode( ) ,
486- ) {
495+ ) ) {
487496 log_error!( $logger, "Error: Failed to persist network graph, check your disk and permissions {}" , e)
488497 }
489498
@@ -511,12 +520,12 @@ macro_rules! define_run_body {
511520 } else {
512521 log_trace!( $logger, "Persisting scorer" ) ;
513522 }
514- if let Err ( e) = $kv_store. write(
523+ if let Err ( e) = maybe_await! ( $async_persist , $kv_store. write(
515524 SCORER_PERSISTENCE_PRIMARY_NAMESPACE ,
516525 SCORER_PERSISTENCE_SECONDARY_NAMESPACE ,
517526 SCORER_PERSISTENCE_KEY ,
518527 & scorer. encode( ) ,
519- ) {
528+ ) ) {
520529 log_error!( $logger, "Error: Failed to persist scorer, check your disk and permissions {}" , e)
521530 }
522531 }
@@ -539,31 +548,31 @@ macro_rules! define_run_body {
539548 // After we exit, ensure we persist the ChannelManager one final time - this avoids
540549 // some races where users quit while channel updates were in-flight, with
541550 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
542- $kv_store. write(
551+ maybe_await! ( $async_persist , $kv_store. write(
543552 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
544553 CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE ,
545554 CHANNEL_MANAGER_PERSISTENCE_KEY ,
546555 & $channel_manager. get_cm( ) . encode( ) ,
547- ) ?;
556+ ) ) ?;
548557
549558 // Persist Scorer on exit
550559 if let Some ( ref scorer) = $scorer {
551- $kv_store. write(
560+ maybe_await! ( $async_persist , $kv_store. write(
552561 SCORER_PERSISTENCE_PRIMARY_NAMESPACE ,
553562 SCORER_PERSISTENCE_SECONDARY_NAMESPACE ,
554563 SCORER_PERSISTENCE_KEY ,
555564 & scorer. encode( ) ,
556- ) ?;
565+ ) ) ?;
557566 }
558567
559568 // Persist NetworkGraph on exit
560569 if let Some ( network_graph) = $gossip_sync. network_graph( ) {
561- $kv_store. write(
570+ maybe_await! ( $async_persist , $kv_store. write(
562571 NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE ,
563572 NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE ,
564573 NETWORK_GRAPH_PERSISTENCE_KEY ,
565574 & network_graph. encode( ) ,
566- ) ?;
575+ ) ) ?;
567576 }
568577
569578 Ok ( ( ) )
@@ -720,11 +729,12 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
720729/// ```
721730/// # use lightning::io;
722731/// # use lightning::events::ReplayEvent;
723- /// # use lightning::util::sweep::OutputSweeper;
724732/// # use std::sync::{Arc, RwLock};
725733/// # use std::sync::atomic::{AtomicBool, Ordering};
726734/// # use std::time::SystemTime;
727- /// # use lightning_background_processor::{process_events_async, GossipSync};
735+ /// # use lightning_background_processor::{process_events_full_async, GossipSync};
736+ /// # use core::future::Future;
737+ /// # use core::pin::Pin;
728738/// # struct Logger {}
729739/// # impl lightning::util::logger::Logger for Logger {
730740/// # fn log(&self, _record: lightning::util::logger::Record) {}
@@ -736,6 +746,13 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
736746/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
737747/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
738748/// # }
749+ /// # struct Store {}
750+ /// # impl lightning::util::persist::KVStore for Store {
751+ /// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send>> { todo!() }
752+ /// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
753+ /// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
754+ /// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + 'static + Send>> { todo!() }
755+ /// # }
739756/// # struct EventHandler {}
740757/// # impl EventHandler {
741758/// # async fn handle_event(&self, _: lightning::events::Event) -> Result<(), ReplayEvent> { Ok(()) }
@@ -754,7 +771,8 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
754771/// # type LiquidityManager<B, F, FE> = lightning_liquidity::LiquidityManager<Arc<lightning::sign::KeysManager>, Arc<ChannelManager<B, F, FE>>, Arc<F>>;
755772/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
756773/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger, F, StoreSync>;
757- /// #
774+ /// # type OutputSweeper<B, D, FE, F, O> = lightning::util::sweep::OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<Store>, Arc<Logger>, Arc<O>>;
775+ ///
758776/// # struct Node<
759777/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
760778/// # F: lightning::chain::Filter + Send + Sync + 'static,
@@ -770,10 +788,10 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
770788/// # liquidity_manager: Arc<LiquidityManager<B, F, FE>>,
771789/// # chain_monitor: Arc<ChainMonitor<B, F, FE>>,
772790/// # gossip_sync: Arc<P2PGossipSync<UL>>,
773- /// # persister: Arc<StoreSync >,
791+ /// # persister: Arc<Store >,
774792/// # logger: Arc<Logger>,
775793/// # scorer: Arc<Scorer>,
776- /// # sweeper: Arc<OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<StoreSync>, Arc<Logger>, Arc<O> >>,
794+ /// # sweeper: Arc<OutputSweeper<B, D, FE, F, O >>,
777795/// # }
778796/// #
779797/// # async fn setup_background_processing<
@@ -819,7 +837,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
819837 doc = " let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();"
820838) ]
821839#[ cfg_attr( not( feature = "std" ) , doc = " rt.block_on(async move {" ) ]
822- /// process_events_async (
840+ /// process_events_full_async (
823841/// background_persister,
824842/// |e| background_event_handler.handle_event(e),
825843/// background_chain_mon,
@@ -844,7 +862,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
844862#[ cfg_attr( feature = "std" , doc = " handle.await.unwrap()" ) ]
845863/// # }
846864///```
847- pub async fn process_events_async <
865+ pub async fn process_events_full_async <
848866 ' a ,
849867 UL : ' static + Deref ,
850868 CF : ' static + Deref ,
@@ -895,7 +913,7 @@ where
895913 LM :: Target : ALiquidityManager ,
896914 O :: Target : ' static + OutputSpender ,
897915 D :: Target : ' static + ChangeDestinationSource ,
898- K :: Target : ' static + KVStoreSync ,
916+ K :: Target : ' static + KVStore ,
899917{
900918 let mut should_break = false ;
901919 let async_event_handler = |event| {
@@ -914,12 +932,15 @@ where
914932 if let Some ( duration_since_epoch) = fetch_time ( ) {
915933 if update_scorer ( scorer, & event, duration_since_epoch) {
916934 log_trace ! ( logger, "Persisting scorer after update" ) ;
917- if let Err ( e) = kv_store. write (
918- SCORER_PERSISTENCE_PRIMARY_NAMESPACE ,
919- SCORER_PERSISTENCE_SECONDARY_NAMESPACE ,
920- SCORER_PERSISTENCE_KEY ,
921- & scorer. encode ( ) ,
922- ) {
935+ if let Err ( e) = kv_store
936+ . write (
937+ SCORER_PERSISTENCE_PRIMARY_NAMESPACE ,
938+ SCORER_PERSISTENCE_SECONDARY_NAMESPACE ,
939+ SCORER_PERSISTENCE_KEY ,
940+ & scorer. encode ( ) ,
941+ )
942+ . await
943+ {
923944 log_error ! ( logger, "Error: Failed to persist scorer, check your disk and permissions {}" , e) ;
924945 // We opt not to abort early on persistence failure here as persisting
925946 // the scorer is non-critical and we still hope that it will have
@@ -1007,7 +1028,82 @@ where
10071028 mobile_interruptable_platform,
10081029 fetch_time,
10091030 batch_delay,
1031+ true ,
1032+ )
1033+ }
1034+
1035+ /// Async events processor that is based on [`process_events_async`] but allows for [`KVStoreSync`] to be used for
1036+ /// synchronous background persistence.
1037+ pub async fn process_events_async <
1038+ UL : ' static + Deref ,
1039+ CF : ' static + Deref ,
1040+ T : ' static + Deref ,
1041+ F : ' static + Deref ,
1042+ G : ' static + Deref < Target = NetworkGraph < L > > ,
1043+ L : ' static + Deref + Send + Sync ,
1044+ P : ' static + Deref ,
1045+ EventHandlerFuture : core:: future:: Future < Output = Result < ( ) , ReplayEvent > > ,
1046+ EventHandler : Fn ( Event ) -> EventHandlerFuture ,
1047+ ES : ' static + Deref + Send ,
1048+ M : ' static
1049+ + Deref < Target = ChainMonitor < <CM :: Target as AChannelManager >:: Signer , CF , T , F , L , P , ES > >
1050+ + Send
1051+ + Sync ,
1052+ CM : ' static + Deref + Send + Sync ,
1053+ OM : ' static + Deref ,
1054+ PGS : ' static + Deref < Target = P2PGossipSync < G , UL , L > > ,
1055+ RGS : ' static + Deref < Target = RapidGossipSync < G , L > > ,
1056+ PM : ' static + Deref ,
1057+ LM : ' static + Deref ,
1058+ D : ' static + Deref ,
1059+ O : ' static + Deref ,
1060+ K : ' static + Deref ,
1061+ OS : ' static + Deref < Target = OutputSweeper < T , D , F , CF , KVStoreSyncWrapper < K > , L , O > > ,
1062+ S : ' static + Deref < Target = SC > + Send + Sync ,
1063+ SC : for < ' b > WriteableScore < ' b > ,
1064+ SleepFuture : core:: future:: Future < Output = bool > + core:: marker:: Unpin ,
1065+ Sleeper : Fn ( Duration ) -> SleepFuture ,
1066+ FetchTime : Fn ( ) -> Option < Duration > ,
1067+ > (
1068+ kv_store : K , event_handler : EventHandler , chain_monitor : M , channel_manager : CM ,
1069+ onion_messenger : Option < OM > , gossip_sync : GossipSync < PGS , RGS , G , UL , L > , peer_manager : PM ,
1070+ liquidity_manager : Option < LM > , sweeper : Option < OS > , logger : L , scorer : Option < S > ,
1071+ sleeper : Sleeper , mobile_interruptable_platform : bool , fetch_time : FetchTime ,
1072+ ) -> Result < ( ) , lightning:: io:: Error >
1073+ where
1074+ UL :: Target : ' static + UtxoLookup ,
1075+ CF :: Target : ' static + chain:: Filter ,
1076+ T :: Target : ' static + BroadcasterInterface ,
1077+ F :: Target : ' static + FeeEstimator ,
1078+ L :: Target : ' static + Logger ,
1079+ P :: Target : ' static + Persist < <CM :: Target as AChannelManager >:: Signer > ,
1080+ ES :: Target : ' static + EntropySource ,
1081+ CM :: Target : AChannelManager ,
1082+ OM :: Target : AOnionMessenger ,
1083+ PM :: Target : APeerManager ,
1084+ LM :: Target : ALiquidityManager ,
1085+ O :: Target : ' static + OutputSpender ,
1086+ D :: Target : ' static + ChangeDestinationSource ,
1087+ K :: Target : ' static + KVStoreSync ,
1088+ {
1089+ let kv_store = KVStoreSyncWrapper ( kv_store) ;
1090+ process_events_full_async (
1091+ kv_store,
1092+ event_handler,
1093+ chain_monitor,
1094+ channel_manager,
1095+ onion_messenger,
1096+ gossip_sync,
1097+ peer_manager,
1098+ liquidity_manager,
1099+ sweeper,
1100+ logger,
1101+ scorer,
1102+ sleeper,
1103+ mobile_interruptable_platform,
1104+ fetch_time,
10101105 )
1106+ . await
10111107}
10121108
10131109#[ cfg( feature = "std" ) ]
@@ -1195,6 +1291,7 @@ impl BackgroundProcessor {
11951291 )
11961292 } ,
11971293 batch_delay,
1294+ false ,
11981295 )
11991296 } ) ;
12001297 Self { stop_thread : stop_thread_clone, thread_handle : Some ( handle) }
@@ -1283,7 +1380,7 @@ mod tests {
12831380 use lightning:: types:: payment:: PaymentHash ;
12841381 use lightning:: util:: config:: UserConfig ;
12851382 use lightning:: util:: persist:: {
1286- KVStoreSync , CHANNEL_MANAGER_PERSISTENCE_KEY ,
1383+ KVStoreSync , KVStoreSyncWrapper , CHANNEL_MANAGER_PERSISTENCE_KEY ,
12871384 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
12881385 CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE , NETWORK_GRAPH_PERSISTENCE_KEY ,
12891386 NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE , NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE ,
@@ -2209,12 +2306,13 @@ mod tests {
22092306 open_channel ! ( nodes[ 0 ] , nodes[ 1 ] , 100000 ) ;
22102307
22112308 let data_dir = nodes[ 0 ] . kv_store . get_data_dir ( ) ;
2212- let persister = Arc :: new (
2309+ let kv_store_sync = Arc :: new (
22132310 Persister :: new ( data_dir) . with_manager_error ( std:: io:: ErrorKind :: Other , "test" ) ,
22142311 ) ;
2312+ let kv_store = Arc :: new ( KVStoreSyncWrapper ( kv_store_sync) ) ;
22152313
2216- let bp_future = super :: process_events_async (
2217- persister ,
2314+ let bp_future = super :: process_events_full_async (
2315+ kv_store ,
22182316 |_: _ | async { Ok ( ( ) ) } ,
22192317 Arc :: clone ( & nodes[ 0 ] . chain_monitor ) ,
22202318 Arc :: clone ( & nodes[ 0 ] . node ) ,
@@ -2717,11 +2815,13 @@ mod tests {
27172815 let ( _, nodes) =
27182816 create_nodes ( 2 , "test_not_pruning_network_graph_until_graph_sync_completion_async" ) ;
27192817 let data_dir = nodes[ 0 ] . kv_store . get_data_dir ( ) ;
2720- let persister = Arc :: new ( Persister :: new ( data_dir) . with_graph_persistence_notifier ( sender) ) ;
2818+ let kv_store_sync =
2819+ Arc :: new ( Persister :: new ( data_dir) . with_graph_persistence_notifier ( sender) ) ;
2820+ let kv_store = Arc :: new ( KVStoreSyncWrapper ( kv_store_sync) ) ;
27212821
27222822 let ( exit_sender, exit_receiver) = tokio:: sync:: watch:: channel ( ( ) ) ;
2723- let bp_future = super :: process_events_async (
2724- persister ,
2823+ let bp_future = super :: process_events_full_async (
2824+ kv_store ,
27252825 |_: _ | async { Ok ( ( ) ) } ,
27262826 Arc :: clone ( & nodes[ 0 ] . chain_monitor ) ,
27272827 Arc :: clone ( & nodes[ 0 ] . node ) ,
@@ -2930,12 +3030,13 @@ mod tests {
29303030
29313031 let ( _, nodes) = create_nodes ( 1 , "test_payment_path_scoring_async" ) ;
29323032 let data_dir = nodes[ 0 ] . kv_store . get_data_dir ( ) ;
2933- let persister = Arc :: new ( Persister :: new ( data_dir) ) ;
3033+ let kv_store_sync = Arc :: new ( Persister :: new ( data_dir) ) ;
3034+ let kv_store = Arc :: new ( KVStoreSyncWrapper ( kv_store_sync) ) ;
29343035
29353036 let ( exit_sender, exit_receiver) = tokio:: sync:: watch:: channel ( ( ) ) ;
29363037
2937- let bp_future = super :: process_events_async (
2938- persister ,
3038+ let bp_future = super :: process_events_full_async (
3039+ kv_store ,
29393040 event_handler,
29403041 Arc :: clone ( & nodes[ 0 ] . chain_monitor ) ,
29413042 Arc :: clone ( & nodes[ 0 ] . node ) ,
0 commit comments