@@ -40,7 +40,7 @@ use lightning::sign::ChangeDestinationSourceSync;
4040use lightning:: sign:: EntropySource ;
4141use lightning:: sign:: OutputSpender ;
4242use lightning:: util:: logger:: Logger ;
43- use lightning:: util:: persist:: { KVStoreSync , PersisterSync } ;
43+ use lightning:: util:: persist:: { KVStoreSync , Persister , PersisterSync } ;
4444use lightning:: util:: sweep:: OutputSweeper ;
4545#[ cfg( feature = "std" ) ]
4646use lightning:: util:: sweep:: OutputSweeperSync ;
@@ -311,6 +311,15 @@ fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + Wri
311311 true
312312}
313313
314+ macro_rules! maybe_await {
315+ ( true , $e: expr) => {
316+ $e. await
317+ } ;
318+ ( false , $e: expr) => {
319+ $e
320+ } ;
321+ }
322+
314323macro_rules! define_run_body {
315324 (
316325 $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
@@ -319,7 +328,7 @@ macro_rules! define_run_body {
319328 $peer_manager: ident, $gossip_sync: ident,
320329 $process_sweeper: expr,
321330 $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
322- $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
331+ $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, $async : tt ,
323332 ) => { {
324333 log_trace!( $logger, "Calling ChannelManager's timer_tick_occurred on startup" ) ;
325334 $channel_manager. get_cm( ) . timer_tick_occurred( ) ;
@@ -375,7 +384,7 @@ macro_rules! define_run_body {
375384
376385 if $channel_manager. get_cm( ) . get_and_clear_needs_persistence( ) {
377386 log_trace!( $logger, "Persisting ChannelManager..." ) ;
378- $ persister. persist_manager( & $channel_manager) ?;
387+ maybe_await! ( $async , $ persister. persist_manager( & $channel_manager) ) ?;
379388 log_trace!( $logger, "Done persisting ChannelManager." ) ;
380389 }
381390 if $timer_elapsed( & mut last_freshness_call, FRESHNESS_TIMER ) {
@@ -436,7 +445,7 @@ macro_rules! define_run_body {
436445 log_trace!( $logger, "Persisting network graph." ) ;
437446 }
438447
439- if let Err ( e) = $ persister. persist_graph( network_graph) {
448+ if let Err ( e) = maybe_await! ( $async , $ persister. persist_graph( network_graph) ) {
440449 log_error!( $logger, "Error: Failed to persist network graph, check your disk and permissions {}" , e)
441450 }
442451
@@ -464,8 +473,8 @@ macro_rules! define_run_body {
464473 } else {
465474 log_trace!( $logger, "Persisting scorer" ) ;
466475 }
467- if let Err ( e) = $ persister. persist_scorer( & scorer) {
468- log_error!( $logger, "Error: Failed to persist scorer, check your disk and permissions {}" , e)
476+ if let Err ( e) = maybe_await! ( $async , $ persister. persist_scorer( & scorer) ) {
477+ log_error!( $logger, "Error: Failed to persist scorer, check your disk and permissions {}" , e)
469478 }
470479 }
471480 last_scorer_persist_call = $get_timer( SCORER_PERSIST_TIMER ) ;
@@ -487,16 +496,16 @@ macro_rules! define_run_body {
487496 // After we exit, ensure we persist the ChannelManager one final time - this avoids
488497 // some races where users quit while channel updates were in-flight, with
489498 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
490- $ persister. persist_manager( & $channel_manager) ?;
499+ maybe_await! ( $async , $ persister. persist_manager( & $channel_manager) ) ?;
491500
492501 // Persist Scorer on exit
493502 if let Some ( ref scorer) = $scorer {
494- $ persister. persist_scorer( & scorer) ?;
503+ maybe_await! ( $async , $ persister. persist_scorer( & scorer) ) ?;
495504 }
496505
497506 // Persist NetworkGraph on exit
498507 if let Some ( network_graph) = $gossip_sync. network_graph( ) {
499- $ persister. persist_graph( network_graph) ?;
508+ maybe_await! ( $async , $ persister. persist_graph( network_graph) ) ?;
500509 }
501510
502511 Ok ( ( ) )
@@ -643,22 +652,30 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
643652/// ```
644653/// # use lightning::io;
645654/// # use lightning::events::ReplayEvent;
646- /// # use lightning::util::sweep::OutputSweeper;
647655/// # use std::sync::{Arc, RwLock};
648656/// # use std::sync::atomic::{AtomicBool, Ordering};
649657/// # use std::time::SystemTime;
650658/// # use lightning_background_processor::{process_events_async, GossipSync};
659+ /// # use core::future::Future;
660+ /// # use core::pin::Pin;
651661/// # struct Logger {}
652662/// # impl lightning::util::logger::Logger for Logger {
653663/// # fn log(&self, _record: lightning::util::logger::Record) {}
654664/// # }
655- /// # struct Store {}
656- /// # impl lightning::util::persist::KVStore for Store {
665+ /// # struct StoreSync {}
666+ /// # impl lightning::util::persist::KVStoreSync for StoreSync {
657667/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
658668/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
659669/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
660670/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
661671/// # }
672+ /// # struct Store {}
673+ /// # impl lightning::util::persist::KVStore for Store {
674+ /// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send>> { todo!() }
675+ /// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
676+ /// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
677+ /// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
678+ /// # }
662679/// # struct EventHandler {}
663680/// # impl EventHandler {
664681/// # async fn handle_event(&self, _: lightning::events::Event) -> Result<(), ReplayEvent> { Ok(()) }
@@ -669,22 +686,22 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
669686/// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
670687/// # fn disconnect_socket(&mut self) {}
671688/// # }
672- /// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<Store >, Arc<lightning::sign::KeysManager>>;
689+ /// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync >, Arc<lightning::sign::KeysManager>>;
673690/// # type NetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<Logger>>;
674691/// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
675692/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
676693/// # type OnionMessenger<B, F, FE> = lightning::onion_message::messenger::OnionMessenger<Arc<lightning::sign::KeysManager>, Arc<lightning::sign::KeysManager>, Arc<Logger>, Arc<ChannelManager<B, F, FE>>, Arc<lightning::onion_message::messenger::DefaultMessageRouter<Arc<NetworkGraph>, Arc<Logger>, Arc<lightning::sign::KeysManager>>>, Arc<ChannelManager<B, F, FE>>, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler>;
677694/// # type LiquidityManager<B, F, FE> = lightning_liquidity::LiquidityManager<Arc<lightning::sign::KeysManager>, Arc<ChannelManager<B, F, FE>>, Arc<F>>;
678695/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
679- /// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger, F, Store>;
680- /// #
696+ /// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger, F, StoreSync>;
697+ /// # type OutputSweeper<B, D, FE, F, O> = lightning::util::sweep::OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<StoreSync>, Arc<Logger>, Arc<O>>;
698+ ///
681699/// # struct Node<
682700/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
683701/// # F: lightning::chain::Filter + Send + Sync + 'static,
684702/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
685703/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
686704/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
687- /// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
688705/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
689706/// # > {
690707/// # peer_manager: Arc<PeerManager<B, F, FE, UL>>,
@@ -697,7 +714,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
697714/// # persister: Arc<Store>,
698715/// # logger: Arc<Logger>,
699716/// # scorer: Arc<Scorer>,
700- /// # sweeper: Arc<OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<K>, Arc<Logger>, Arc<O> >>,
717+ /// # sweeper: Arc<OutputSweeper<B, D, FE, F, O >>,
701718/// # }
702719/// #
703720/// # async fn setup_background_processing<
@@ -706,10 +723,9 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
706723/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
707724/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
708725/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
709- /// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
710726/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
711- /// # >(node: Node<B, F, FE, UL, D, K, O>) {
712- /// let background_persister = Arc::clone(&node.persister);
727+ /// # >(node: Node<B, F, FE, UL, D, O>) {
728+ /// let background_persister = Arc::new(Arc:: clone(&node.persister) );
713729/// let background_event_handler = Arc::clone(&node.event_handler);
714730/// let background_chain_mon = Arc::clone(&node.chain_monitor);
715731/// let background_chan_man = Arc::clone(&node.channel_manager);
@@ -814,7 +830,7 @@ where
814830 F :: Target : ' static + FeeEstimator ,
815831 L :: Target : ' static + Logger ,
816832 P :: Target : ' static + Persist < <CM :: Target as AChannelManager >:: Signer > ,
817- PS :: Target : ' static + PersisterSync < ' a , CM , L , S > ,
833+ PS :: Target : ' static + Persister < ' a , CM , L , S > ,
818834 ES :: Target : ' static + EntropySource ,
819835 CM :: Target : AChannelManager ,
820836 OM :: Target : AOnionMessenger ,
@@ -841,7 +857,7 @@ where
841857 if let Some ( duration_since_epoch) = fetch_time ( ) {
842858 if update_scorer ( scorer, & event, duration_since_epoch) {
843859 log_trace ! ( logger, "Persisting scorer after update" ) ;
844- if let Err ( e) = persister. persist_scorer ( & * scorer) {
860+ if let Err ( e) = persister. persist_scorer ( & * scorer) . await {
845861 log_error ! ( logger, "Error: Failed to persist scorer, check your disk and permissions {}" , e) ;
846862 // We opt not to abort early on persistence failure here as persisting
847863 // the scorer is non-critical and we still hope that it will have
@@ -919,6 +935,7 @@ where
919935 } ,
920936 mobile_interruptable_platform,
921937 fetch_time,
938+ true ,
922939 )
923940}
924941
@@ -1098,6 +1115,7 @@ impl BackgroundProcessor {
10981115 . expect( "Time should be sometime after 1970" ) ,
10991116 )
11001117 } ,
1118+ false ,
11011119 )
11021120 } ) ;
11031121 Self { stop_thread : stop_thread_clone, thread_handle : Some ( handle) }
@@ -1162,7 +1180,10 @@ mod tests {
11621180 use bitcoin:: transaction:: Version ;
11631181 use bitcoin:: transaction:: { Transaction , TxOut } ;
11641182 use bitcoin:: { Amount , ScriptBuf , Txid } ;
1183+ use core:: future:: Future ;
1184+ use core:: pin:: Pin ;
11651185 use core:: sync:: atomic:: { AtomicBool , Ordering } ;
1186+ use lightning;
11661187 use lightning:: chain:: channelmonitor:: ANTI_REORG_DELAY ;
11671188 use lightning:: chain:: transaction:: OutPoint ;
11681189 use lightning:: chain:: { chainmonitor, BestBlock , Confirm , Filter } ;
@@ -1186,7 +1207,7 @@ mod tests {
11861207 use lightning:: types:: payment:: PaymentHash ;
11871208 use lightning:: util:: config:: UserConfig ;
11881209 use lightning:: util:: persist:: {
1189- KVStoreSync , CHANNEL_MANAGER_PERSISTENCE_KEY ,
1210+ KVStore , KVStoreSync , CHANNEL_MANAGER_PERSISTENCE_KEY ,
11901211 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
11911212 CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE , NETWORK_GRAPH_PERSISTENCE_KEY ,
11921213 NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE , NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE ,
@@ -1483,6 +1504,42 @@ mod tests {
14831504 }
14841505 }
14851506
1507+ struct PersisterSyncWrapper ( PersisterSync ) ;
1508+
1509+ impl KVStore for PersisterSyncWrapper {
1510+ fn read (
1511+ & self , primary_namespace : & str , secondary_namespace : & str , key : & str ,
1512+ ) -> Pin < Box < dyn Future < Output = Result < Vec < u8 > , lightning:: io:: Error > > + ' static + Send > > {
1513+ let res = self . 0 . read ( primary_namespace, secondary_namespace, key) ;
1514+
1515+ Box :: pin ( async move { res } )
1516+ }
1517+
1518+ fn write (
1519+ & self , primary_namespace : & str , secondary_namespace : & str , key : & str , buf : & [ u8 ] ,
1520+ ) -> Pin < Box < dyn Future < Output = Result < ( ) , lightning:: io:: Error > > + ' static + Send > > {
1521+ let res = self . 0 . write ( primary_namespace, secondary_namespace, key, buf) ;
1522+
1523+ Box :: pin ( async move { res } )
1524+ }
1525+
1526+ fn remove (
1527+ & self , primary_namespace : & str , secondary_namespace : & str , key : & str , lazy : bool ,
1528+ ) -> Result < ( ) , lightning:: io:: Error > {
1529+ let res = self . 0 . remove ( primary_namespace, secondary_namespace, key, lazy) ;
1530+
1531+ return res;
1532+ }
1533+
1534+ fn list (
1535+ & self , primary_namespace : & str , secondary_namespace : & str ,
1536+ ) -> Result < Vec < String > , lightning:: io:: Error > {
1537+ let res = self . 0 . list ( primary_namespace, secondary_namespace) ;
1538+
1539+ return res;
1540+ }
1541+ }
1542+
14861543 struct TestScorer {
14871544 event_expectations : Option < VecDeque < TestResult > > ,
14881545 }
@@ -2107,9 +2164,9 @@ mod tests {
21072164 open_channel ! ( nodes[ 0 ] , nodes[ 1 ] , 100000 ) ;
21082165
21092166 let data_dir = nodes[ 0 ] . kv_store . get_data_dir ( ) ;
2110- let persister = Arc :: new (
2111- PersisterSync :: new ( data_dir) . with_manager_error ( std:: io:: ErrorKind :: Other , "test" ) ,
2112- ) ;
2167+ let persister_sync =
2168+ PersisterSync :: new ( data_dir) . with_manager_error ( std:: io:: ErrorKind :: Other , "test" ) ;
2169+ let persister = Arc :: new ( Arc :: new ( PersisterSyncWrapper ( persister_sync ) ) ) ;
21132170
21142171 let bp_future = super :: process_events_async (
21152172 persister,
@@ -2618,8 +2675,8 @@ mod tests {
26182675 let ( _, nodes) =
26192676 create_nodes ( 2 , "test_not_pruning_network_graph_until_graph_sync_completion_async" ) ;
26202677 let data_dir = nodes[ 0 ] . kv_store . get_data_dir ( ) ;
2621- let persister =
2622- Arc :: new ( PersisterSync :: new ( data_dir ) . with_graph_persistence_notifier ( sender ) ) ;
2678+ let persister_sync = PersisterSync :: new ( data_dir ) . with_graph_persistence_notifier ( sender ) ;
2679+ let persister = Arc :: new ( Arc :: new ( PersisterSyncWrapper ( persister_sync ) ) ) ;
26232680
26242681 let ( exit_sender, exit_receiver) = tokio:: sync:: watch:: channel ( ( ) ) ;
26252682 let bp_future = super :: process_events_async (
@@ -2835,7 +2892,8 @@ mod tests {
28352892
28362893 let ( _, nodes) = create_nodes ( 1 , "test_payment_path_scoring_async" ) ;
28372894 let data_dir = nodes[ 0 ] . kv_store . get_data_dir ( ) ;
2838- let persister = Arc :: new ( PersisterSync :: new ( data_dir) ) ;
2895+ let persister_sync = PersisterSync :: new ( data_dir) ;
2896+ let persister = Arc :: new ( Arc :: new ( PersisterSyncWrapper ( persister_sync) ) ) ;
28392897
28402898 let ( exit_sender, exit_receiver) = tokio:: sync:: watch:: channel ( ( ) ) ;
28412899
0 commit comments