@@ -131,7 +131,7 @@ pub trait KVStore {
131131 /// [`ErrorKind::NotFound`]: io::ErrorKind::NotFound
132132 fn read (
133133 & self , primary_namespace : & str , secondary_namespace : & str , key : & str ,
134- ) -> Result < Vec < u8 > , io:: Error > ;
134+ ) -> AsyncResultType < ' static , Vec < u8 > , io:: Error > ;
135135 /// Persists the given data under the given `key`.
136136 ///
137137 /// Will create the given `primary_namespace` and `secondary_namespace` if not already present
@@ -194,7 +194,7 @@ pub async fn migrate_kv_store_data<S: MigratableKVStore, T: MigratableKVStore>(
194194 let keys_to_migrate = source_store. list_all_keys ( ) ?;
195195
196196 for ( primary_namespace, secondary_namespace, key) in & keys_to_migrate {
197- let data = source_store. read ( primary_namespace, secondary_namespace, key) ?;
197+ let data = source_store. read ( primary_namespace, secondary_namespace, key) . await ?;
198198 target_store. write ( primary_namespace, secondary_namespace, key, & data) . await . map_err (
199199 |_| {
200200 io:: Error :: new (
@@ -327,11 +327,14 @@ impl<ChannelSigner: EcdsaChannelSigner, K: KVStore + ?Sized + Sync + Send + 'sta
327327
328328 Box :: pin ( async move {
329329 let monitor_key = monitor_name. to_string ( ) ;
330- let monitor = match kv_store. read (
331- CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ,
332- CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
333- monitor_key. as_str ( ) ,
334- ) {
330+ let monitor = match kv_store
331+ . read (
332+ CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ,
333+ CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
334+ monitor_key. as_str ( ) ,
335+ )
336+ . await
337+ {
335338 Ok ( monitor) => monitor,
336339 Err ( _) => return ,
337340 } ;
@@ -358,7 +361,7 @@ impl<ChannelSigner: EcdsaChannelSigner, K: KVStore + ?Sized + Sync + Send + 'sta
358361}
359362
360363/// Read previously persisted [`ChannelMonitor`]s from the store.
361- pub fn read_channel_monitors < K : Deref , ES : Deref , SP : Deref > (
364+ pub async fn read_channel_monitors < K : Deref , ES : Deref , SP : Deref > (
362365 kv_store : K , entropy_source : ES , signer_provider : SP ,
363366) -> Result < Vec < ( BlockHash , ChannelMonitor < <SP :: Target as SignerProvider >:: EcdsaSigner > ) > , io:: Error >
364367where
@@ -373,11 +376,15 @@ where
373376 CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
374377 ) ? {
375378 match <( BlockHash , ChannelMonitor < <SP :: Target as SignerProvider >:: EcdsaSigner > ) >:: read (
376- & mut io:: Cursor :: new ( kv_store. read (
377- CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ,
378- CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
379- & stored_key,
380- ) ?) ,
379+ & mut io:: Cursor :: new (
380+ kv_store
381+ . read (
382+ CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ,
383+ CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
384+ & stored_key,
385+ )
386+ . await ?,
387+ ) ,
381388 ( & * entropy_source, & * signer_provider) ,
382389 ) {
383390 Ok ( ( block_hash, channel_monitor) ) => {
@@ -563,7 +570,7 @@ where
563570 /// It is extremely important that your [`KVStore::read`] implementation uses the
564571 /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
565572 /// documentation for [`MonitorUpdatingPersister`].
566- pub fn read_all_channel_monitors_with_updates (
573+ pub async fn read_all_channel_monitors_with_updates (
567574 & self ,
568575 ) -> Result <
569576 Vec < ( BlockHash , ChannelMonitor < <SP :: Target as SignerProvider >:: EcdsaSigner > ) > ,
@@ -575,7 +582,7 @@ where
575582 ) ?;
576583 let mut res = Vec :: with_capacity ( monitor_list. len ( ) ) ;
577584 for monitor_key in monitor_list {
578- res. push ( self . read_channel_monitor_with_updates ( monitor_key. as_str ( ) ) ?)
585+ res. push ( self . read_channel_monitor_with_updates ( monitor_key. as_str ( ) ) . await ?)
579586 }
580587 Ok ( res)
581588 }
@@ -599,20 +606,20 @@ where
599606 ///
600607 /// Loading a large number of monitors will be faster if done in parallel. You can use this
601608 /// function to accomplish this. Take care to limit the number of parallel readers.
602- pub fn read_channel_monitor_with_updates (
609+ pub async fn read_channel_monitor_with_updates (
603610 & self , monitor_key : & str ,
604611 ) -> Result < ( BlockHash , ChannelMonitor < <SP :: Target as SignerProvider >:: EcdsaSigner > ) , io:: Error >
605612 {
606613 let monitor_name = MonitorName :: from_str ( monitor_key) ?;
607- let ( block_hash, monitor) = self . read_monitor ( & monitor_name, monitor_key) ?;
614+ let ( block_hash, monitor) = self . read_monitor ( & monitor_name, monitor_key) . await ?;
608615 let mut current_update_id = monitor. get_latest_update_id ( ) ;
609616 loop {
610617 current_update_id = match current_update_id. checked_add ( 1 ) {
611618 Some ( next_update_id) => next_update_id,
612619 None => break ,
613620 } ;
614621 let update_name = UpdateName :: from ( current_update_id) ;
615- let update = match self . read_monitor_update ( monitor_key, & update_name) {
622+ let update = match self . read_monitor_update ( monitor_key, & update_name) . await {
616623 Ok ( update) => update,
617624 Err ( err) if err. kind ( ) == io:: ErrorKind :: NotFound => {
618625 // We can't find any more updates, so we are done.
@@ -638,15 +645,19 @@ where
638645 }
639646
640647 /// Read a channel monitor.
641- fn read_monitor (
648+ async fn read_monitor (
642649 & self , monitor_name : & MonitorName , monitor_key : & str ,
643650 ) -> Result < ( BlockHash , ChannelMonitor < <SP :: Target as SignerProvider >:: EcdsaSigner > ) , io:: Error >
644651 {
645- let mut monitor_cursor = io:: Cursor :: new ( self . kv_store . read (
646- CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ,
647- CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
648- monitor_key,
649- ) ?) ;
652+ let mut monitor_cursor = io:: Cursor :: new (
653+ self . kv_store
654+ . read (
655+ CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ,
656+ CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
657+ monitor_key,
658+ )
659+ . await ?,
660+ ) ;
650661 // Discard the sentinel bytes if found.
651662 if monitor_cursor. get_ref ( ) . starts_with ( MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL ) {
652663 monitor_cursor. set_position ( MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL . len ( ) as u64 ) ;
@@ -683,14 +694,17 @@ where
683694 }
684695
685696 /// Read a channel monitor update.
686- fn read_monitor_update (
697+ async fn read_monitor_update (
687698 & self , monitor_key : & str , update_name : & UpdateName ,
688699 ) -> Result < ChannelMonitorUpdate , io:: Error > {
689- let update_bytes = self . kv_store . read (
690- CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
691- monitor_key,
692- update_name. as_str ( ) ,
693- ) ?;
700+ let update_bytes = self
701+ . kv_store
702+ . read (
703+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
704+ monitor_key,
705+ update_name. as_str ( ) ,
706+ )
707+ . await ?;
694708 ChannelMonitorUpdate :: read ( & mut io:: Cursor :: new ( update_bytes) ) . map_err ( |e| {
695709 log_error ! (
696710 self . logger,
@@ -710,14 +724,14 @@ where
710724 /// updates. The updates that have an `update_id` less than or equal to than the stored monitor
711725 /// are deleted. The deletion can either be lazy or non-lazy based on the `lazy` flag; this will
712726 /// be passed to [`KVStore::remove`].
713- pub fn cleanup_stale_updates ( & self , lazy : bool ) -> Result < ( ) , io:: Error > {
727+ pub async fn cleanup_stale_updates ( & self , lazy : bool ) -> Result < ( ) , io:: Error > {
714728 let monitor_keys = self . kv_store . list (
715729 CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ,
716730 CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
717731 ) ?;
718732 for monitor_key in monitor_keys {
719733 let monitor_name = MonitorName :: from_str ( & monitor_key) ?;
720- let ( _, current_monitor) = self . read_monitor ( & monitor_name, & monitor_key) ?;
734+ let ( _, current_monitor) = self . read_monitor ( & monitor_name, & monitor_key) . await ?;
721735 let updates = self
722736 . kv_store
723737 . list ( CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE , monitor_key. as_str ( ) ) ?;
@@ -909,7 +923,7 @@ where
909923 let maybe_old_monitor = match monitor_latest_update_id {
910924 LEGACY_CLOSED_CHANNEL_UPDATE_ID => {
911925 let monitor_key = monitor_name. to_string ( ) ;
912- self . read_monitor ( & monitor_name, & monitor_key) . ok ( )
926+ self . read_monitor ( & monitor_name, & monitor_key) . await . ok ( )
913927 } ,
914928 _ => None ,
915929 } ;
@@ -953,7 +967,7 @@ where
953967
954968 async fn archive_persisted_channel ( & self , monitor_name : MonitorName ) {
955969 let monitor_key = monitor_name. to_string ( ) ;
956- let monitor = match self . read_channel_monitor_with_updates ( & monitor_key) {
970+ let monitor = match self . read_channel_monitor_with_updates ( & monitor_key) . await {
957971 Ok ( ( _block_hash, monitor) ) => monitor,
958972 Err ( _) => return ,
959973 } ;
0 commit comments