@@ -36,7 +36,7 @@ use crate::ln::types::ChannelId;
3636use crate :: sign:: { ecdsa:: EcdsaChannelSigner , EntropySource , SignerProvider } ;
3737use crate :: sync:: Mutex ;
3838use crate :: util:: async_poll:: {
39- dummy_waker, MaybeSend , MaybeSync , MultiResultFuturePoller , ResultFuture ,
39+ dummy_waker, MaybeSend , MaybeSync , MultiResultFuturePoller , ResultFuture , TwoFutureJoiner ,
4040} ;
4141use crate :: util:: logger:: Logger ;
4242use crate :: util:: native_async:: FutureSpawner ;
@@ -576,15 +576,6 @@ fn poll_sync_future<F: Future>(future: F) -> F::Output {
576576/// list channel monitors themselves and load channels individually using
577577/// [`MonitorUpdatingPersister::read_channel_monitor_with_updates`].
578578///
579- /// ## EXTREMELY IMPORTANT
580- ///
581- /// It is extremely important that your [`KVStoreSync::read`] implementation uses the
582- /// [`io::ErrorKind::NotFound`] variant correctly: that is, when a file is not found, and _only_ in
583- /// that circumstance (not when there is really a permissions error, for example). This is because
584- /// neither channel monitor reading function lists updates. Instead, either reads the monitor, and
585- /// using its stored `update_id`, synthesizes update storage keys, and tries them in sequence until
586- /// one is not found. All _other_ errors will be bubbled up in the function's [`Result`].
587- ///
588579/// # Pruning stale channel updates
589580///
590581/// Stale updates are pruned when the consolidation threshold is reached according to `maximum_pending_updates`.
@@ -658,10 +649,6 @@ where
658649 }
659650
660651 /// Reads all stored channel monitors, along with any stored updates for them.
661- ///
662- /// It is extremely important that your [`KVStoreSync::read`] implementation uses the
663- /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
664- /// documentation for [`MonitorUpdatingPersister`].
665652 pub fn read_all_channel_monitors_with_updates (
666653 & self ,
667654 ) -> Result <
@@ -673,10 +660,6 @@ where
673660
674661 /// Read a single channel monitor, along with any stored updates for it.
675662 ///
676- /// It is extremely important that your [`KVStoreSync::read`] implementation uses the
677- /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
678- /// documentation for [`MonitorUpdatingPersister`].
679- ///
680663 /// For `monitor_key`, channel storage keys can be the channel's funding [`OutPoint`], with an
681664 /// underscore `_` between txid and index for v1 channels. For example, given:
682665 ///
@@ -873,10 +856,6 @@ where
873856 /// While the reads themselves are performend in parallel, deserializing the
874857 /// [`ChannelMonitor`]s is not. For large [`ChannelMonitor`]s actively used for forwarding,
875858 /// this may substantially limit the parallelism of this method.
876- ///
877- /// It is extremely important that your [`KVStore::read`] implementation uses the
878- /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
879- /// documentation for [`MonitorUpdatingPersister`].
880859 pub async fn read_all_channel_monitors_with_updates (
881860 & self ,
882861 ) -> Result <
@@ -911,10 +890,6 @@ where
911890 /// Because [`FutureSpawner`] requires that the spawned future be `'static` (matching `tokio`
912891 /// and other multi-threaded runtime requirements), this method requires that `self` be an
913892 /// `Arc` that can live for `'static` and be sent and accessed across threads.
914- ///
915- /// It is extremely important that your [`KVStore::read`] implementation uses the
916- /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
917- /// documentation for [`MonitorUpdatingPersister`].
918893 pub async fn read_all_channel_monitors_with_updates_parallel (
919894 self : & Arc < Self > ,
920895 ) -> Result <
@@ -954,10 +929,6 @@ where
954929
955930 /// Read a single channel monitor, along with any stored updates for it.
956931 ///
957- /// It is extremely important that your [`KVStoreSync::read`] implementation uses the
958- /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
959- /// documentation for [`MonitorUpdatingPersister`].
960- ///
961932 /// For `monitor_key`, channel storage keys can be the channel's funding [`OutPoint`], with an
962933 /// underscore `_` between txid and index for v1 channels. For example, given:
963934 ///
@@ -1116,40 +1087,37 @@ where
11161087 io:: Error ,
11171088 > {
11181089 let monitor_name = MonitorName :: from_str ( monitor_key) ?;
1119- let read_res = self . maybe_read_monitor ( & monitor_name, monitor_key) . await ?;
1120- let ( block_hash, monitor) = match read_res {
1090+ let read_future = pin ! ( self . maybe_read_monitor( & monitor_name, monitor_key) ) ;
1091+ let list_future = pin ! ( self
1092+ . kv_store
1093+ . list( CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE , monitor_key) ) ;
1094+ let ( read_res, list_res) = TwoFutureJoiner :: new ( read_future, list_future) . await ;
1095+ let ( block_hash, monitor) = match read_res? {
11211096 Some ( res) => res,
11221097 None => return Ok ( None ) ,
11231098 } ;
11241099 let mut current_update_id = monitor. get_latest_update_id ( ) ;
1125- // TODO: Parallelize this loop by speculatively reading a batch of updates
1126- loop {
1127- current_update_id = match current_update_id. checked_add ( 1 ) {
1128- Some ( next_update_id) => next_update_id,
1129- None => break ,
1130- } ;
1131- let update_name = UpdateName :: from ( current_update_id) ;
1132- let update = match self . read_monitor_update ( monitor_key, & update_name) . await {
1133- Ok ( update) => update,
1134- Err ( err) if err. kind ( ) == io:: ErrorKind :: NotFound => {
1135- // We can't find any more updates, so we are done.
1136- break ;
1137- } ,
1138- Err ( err) => return Err ( err) ,
1139- } ;
1140-
1141- monitor
1142- . update_monitor ( & update, & self . broadcaster , & self . fee_estimator , & self . logger )
1143- . map_err ( |e| {
1144- log_error ! (
1145- self . logger,
1146- "Monitor update failed. monitor: {} update: {} reason: {:?}" ,
1147- monitor_key,
1148- update_name. as_str( ) ,
1149- e
1150- ) ;
1151- io:: Error :: new ( io:: ErrorKind :: Other , "Monitor update failed" )
1152- } ) ?;
1100+ let updates: Result < Vec < _ > , _ > =
1101+ list_res?. into_iter ( ) . map ( |name| UpdateName :: new ( name) ) . collect ( ) ;
1102+ let mut updates = updates?;
1103+ updates. sort_unstable ( ) ;
1104+ // TODO: Parallelize this loop
1105+ for update_name in updates {
1106+ if update_name. 0 > current_update_id {
1107+ let update = self . read_monitor_update ( monitor_key, & update_name) . await ?;
1108+ monitor
1109+ . update_monitor ( & update, & self . broadcaster , & self . fee_estimator , & self . logger )
1110+ . map_err ( |e| {
1111+ log_error ! (
1112+ self . logger,
1113+ "Monitor update failed. monitor: {} update: {} reason: {:?}" ,
1114+ monitor_key,
1115+ update_name. as_str( ) ,
1116+ e
1117+ ) ;
1118+ io:: Error :: new ( io:: ErrorKind :: Other , "Monitor update failed" )
1119+ } ) ?;
1120+ }
11531121 }
11541122 Ok ( Some ( ( block_hash, monitor) ) )
11551123 }
@@ -1524,7 +1492,7 @@ impl core::fmt::Display for MonitorName {
15241492/// let monitor_name = "some_monitor_name";
15251493/// let storage_key = format!("channel_monitor_updates/{}/{}", monitor_name, update_name.as_str());
15261494/// ```
1527- #[ derive( Debug ) ]
1495+ #[ derive( Debug , PartialEq , Eq , PartialOrd , Ord ) ]
15281496pub struct UpdateName ( pub u64 , String ) ;
15291497
15301498impl UpdateName {
0 commit comments