Skip to content

Commit 786750c

Browse files
committed
Avoid a storage RTT when loading ChannelMonitors without updates
When reading `ChannelMonitor`s from a `MonitorUpdatingPersister` on startup, we have to make sure to load any `ChannelMonitorUpdate`s and re-apply them as well. For users of async persistence who don't have any `ChannelMonitorUpdate`s (e.g. because they set `maximum_pending_updates` to 0 or, in the future, we avoid persisting updates for small `ChannelMonitor`s), this means two round-trips to the storage backend, one to load the `ChannelMonitor` and one to try to read the next `ChannelMonitorUpdate` only to have it fail. Instead, here, we use `KVStore::list` to fetch the list of stored `ChannelMonitorUpdate`s, which for async `KVStore` users allows us to parallelize the list of update fetching and the `ChannelMonitor` loading itself. Then we know exactly when to stop reading `ChannelMonitorUpdate`s, including reading none if there are none to read. This also avoids relying on `KVStore::read` correctly returning `NotFound` in order to correctly discover when to stop reading `ChannelMonitorUpdate`s.
1 parent 9499cef commit 786750c

File tree

2 files changed

+92
-61
lines changed

2 files changed

+92
-61
lines changed

lightning/src/util/async_poll.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,69 @@ pub(crate) enum ResultFuture<F: Future<Output = O> + Unpin, O> {
2020
Ready(O),
2121
}
2222

23+
pub(crate) struct TwoFutureJoiner<AO, BO, AF: Future<Output = AO> + Unpin, BF: Future<Output = BO> + Unpin> {
24+
a: Option<ResultFuture<AF, AO>>,
25+
b: Option<ResultFuture<BF, BO>>,
26+
}
27+
28+
impl<AO, BO, AF: Future<Output = AO> + Unpin, BF: Future<Output = BO> + Unpin> TwoFutureJoiner<AO, BO, AF, BF> {
29+
pub fn new(future_a: AF, future_b: BF) -> Self {
30+
Self {
31+
a: Some(ResultFuture::Pending(future_a)),
32+
b: Some(ResultFuture::Pending(future_b)),
33+
}
34+
}
35+
}
36+
37+
impl<AO, BO, AF: Future<Output = AO> + Unpin, BF: Future<Output = BO> + Unpin> Future for TwoFutureJoiner<AO, BO, AF, BF> {
38+
type Output = (AO, BO);
39+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<(AO, BO)> {
40+
let mut have_pending_futures = false;
41+
// SAFETY: While we are pinned, we can't get direct access to our internal state because we
42+
// aren't `Unpin`. However, we don't actually need the `Pin` - we only use it below on the
43+
// `Future` in the `ResultFuture::Pending` case, and the `Future` is bound by `Unpin`.
44+
// Thus, the `Pin` is not actually used, and its safe to bypass it and access the inner
45+
// reference directly.
46+
let state = unsafe { &mut self.get_unchecked_mut() };
47+
macro_rules! poll_future {
48+
($future: ident) => {
49+
match state.$future {
50+
Some(ResultFuture::Pending(ref mut fut)) => match Pin::new(fut).poll(cx) {
51+
Poll::Ready(res) => {
52+
state.$future = Some(ResultFuture::Ready(res));
53+
},
54+
Poll::Pending => {
55+
have_pending_futures = true;
56+
},
57+
},
58+
Some(ResultFuture::Ready(_)) => {},
59+
None => {
60+
debug_assert!(false, "Future polled after Ready");
61+
return Poll::Pending;
62+
},
63+
}
64+
};
65+
}
66+
poll_future!(a);
67+
poll_future!(b);
68+
69+
if have_pending_futures {
70+
Poll::Pending
71+
} else {
72+
Poll::Ready((
73+
match state.a.take() {
74+
Some(ResultFuture::Ready(a)) => a,
75+
_ => unreachable!(),
76+
},
77+
match state.b.take() {
78+
Some(ResultFuture::Ready(b)) => b,
79+
_ => unreachable!(),
80+
}
81+
))
82+
}
83+
}
84+
}
85+
2386
pub(crate) struct MultiResultFuturePoller<F: Future<Output = O> + Unpin, O> {
2487
futures_state: Vec<ResultFuture<F, O>>,
2588
}

lightning/src/util/persist.rs

Lines changed: 29 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use crate::ln::types::ChannelId;
3636
use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider};
3737
use crate::sync::Mutex;
3838
use crate::util::async_poll::{
39-
dummy_waker, MaybeSend, MaybeSync, MultiResultFuturePoller, ResultFuture,
39+
dummy_waker, MaybeSend, MaybeSync, MultiResultFuturePoller, ResultFuture, TwoFutureJoiner,
4040
};
4141
use crate::util::logger::Logger;
4242
use crate::util::native_async::FutureSpawner;
@@ -576,15 +576,6 @@ fn poll_sync_future<F: Future>(future: F) -> F::Output {
576576
/// list channel monitors themselves and load channels individually using
577577
/// [`MonitorUpdatingPersister::read_channel_monitor_with_updates`].
578578
///
579-
/// ## EXTREMELY IMPORTANT
580-
///
581-
/// It is extremely important that your [`KVStoreSync::read`] implementation uses the
582-
/// [`io::ErrorKind::NotFound`] variant correctly: that is, when a file is not found, and _only_ in
583-
/// that circumstance (not when there is really a permissions error, for example). This is because
584-
/// neither channel monitor reading function lists updates. Instead, either reads the monitor, and
585-
/// using its stored `update_id`, synthesizes update storage keys, and tries them in sequence until
586-
/// one is not found. All _other_ errors will be bubbled up in the function's [`Result`].
587-
///
588579
/// # Pruning stale channel updates
589580
///
590581
/// Stale updates are pruned when the consolidation threshold is reached according to `maximum_pending_updates`.
@@ -658,10 +649,6 @@ where
658649
}
659650

660651
/// Reads all stored channel monitors, along with any stored updates for them.
661-
///
662-
/// It is extremely important that your [`KVStoreSync::read`] implementation uses the
663-
/// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
664-
/// documentation for [`MonitorUpdatingPersister`].
665652
pub fn read_all_channel_monitors_with_updates(
666653
&self,
667654
) -> Result<
@@ -673,10 +660,6 @@ where
673660

674661
/// Read a single channel monitor, along with any stored updates for it.
675662
///
676-
/// It is extremely important that your [`KVStoreSync::read`] implementation uses the
677-
/// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
678-
/// documentation for [`MonitorUpdatingPersister`].
679-
///
680663
/// For `monitor_key`, channel storage keys can be the channel's funding [`OutPoint`], with an
681664
/// underscore `_` between txid and index for v1 channels. For example, given:
682665
///
@@ -873,10 +856,6 @@ where
873856
/// While the reads themselves are performend in parallel, deserializing the
874857
/// [`ChannelMonitor`]s is not. For large [`ChannelMonitor`]s actively used for forwarding,
875858
/// this may substantially limit the parallelism of this method.
876-
///
877-
/// It is extremely important that your [`KVStore::read`] implementation uses the
878-
/// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
879-
/// documentation for [`MonitorUpdatingPersister`].
880859
pub async fn read_all_channel_monitors_with_updates(
881860
&self,
882861
) -> Result<
@@ -911,10 +890,6 @@ where
911890
/// Because [`FutureSpawner`] requires that the spawned future be `'static` (matching `tokio`
912891
/// and other multi-threaded runtime requirements), this method requires that `self` be an
913892
/// `Arc` that can live for `'static` and be sent and accessed across threads.
914-
///
915-
/// It is extremely important that your [`KVStore::read`] implementation uses the
916-
/// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
917-
/// documentation for [`MonitorUpdatingPersister`].
918893
pub async fn read_all_channel_monitors_with_updates_parallel(
919894
self: &Arc<Self>,
920895
) -> Result<
@@ -954,10 +929,6 @@ where
954929

955930
/// Read a single channel monitor, along with any stored updates for it.
956931
///
957-
/// It is extremely important that your [`KVStoreSync::read`] implementation uses the
958-
/// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
959-
/// documentation for [`MonitorUpdatingPersister`].
960-
///
961932
/// For `monitor_key`, channel storage keys can be the channel's funding [`OutPoint`], with an
962933
/// underscore `_` between txid and index for v1 channels. For example, given:
963934
///
@@ -1116,40 +1087,37 @@ where
11161087
io::Error,
11171088
> {
11181089
let monitor_name = MonitorName::from_str(monitor_key)?;
1119-
let read_res = self.maybe_read_monitor(&monitor_name, monitor_key).await?;
1120-
let (block_hash, monitor) = match read_res {
1090+
// TODO: After an MSRV bump we should be able to use the pin macro rather than Box::pin
1091+
let read_future = Box::pin(self.maybe_read_monitor(&monitor_name, monitor_key));
1092+
let list_future =
1093+
Box::pin(self.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_key));
1094+
let (read_res, list_res) = TwoFutureJoiner::new(read_future, list_future).await;
1095+
let (block_hash, monitor) = match read_res? {
11211096
Some(res) => res,
11221097
None => return Ok(None),
11231098
};
11241099
let mut current_update_id = monitor.get_latest_update_id();
1125-
// TODO: Parallelize this loop by speculatively reading a batch of updates
1126-
loop {
1127-
current_update_id = match current_update_id.checked_add(1) {
1128-
Some(next_update_id) => next_update_id,
1129-
None => break,
1130-
};
1131-
let update_name = UpdateName::from(current_update_id);
1132-
let update = match self.read_monitor_update(monitor_key, &update_name).await {
1133-
Ok(update) => update,
1134-
Err(err) if err.kind() == io::ErrorKind::NotFound => {
1135-
// We can't find any more updates, so we are done.
1136-
break;
1137-
},
1138-
Err(err) => return Err(err),
1139-
};
1140-
1141-
monitor
1142-
.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger)
1143-
.map_err(|e| {
1144-
log_error!(
1145-
self.logger,
1146-
"Monitor update failed. monitor: {} update: {} reason: {:?}",
1147-
monitor_key,
1148-
update_name.as_str(),
1149-
e
1150-
);
1151-
io::Error::new(io::ErrorKind::Other, "Monitor update failed")
1152-
})?;
1100+
let updates: Result<Vec<_>, _> =
1101+
list_res?.into_iter().map(|name| UpdateName::new(name)).collect();
1102+
let mut updates = updates?;
1103+
updates.sort_unstable();
1104+
// TODO: Parallelize this loop
1105+
for update_name in updates {
1106+
if update_name.0 > current_update_id {
1107+
let update = self.read_monitor_update(monitor_key, &update_name).await?;
1108+
monitor
1109+
.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger)
1110+
.map_err(|e| {
1111+
log_error!(
1112+
self.logger,
1113+
"Monitor update failed. monitor: {} update: {} reason: {:?}",
1114+
monitor_key,
1115+
update_name.as_str(),
1116+
e
1117+
);
1118+
io::Error::new(io::ErrorKind::Other, "Monitor update failed")
1119+
})?;
1120+
}
11531121
}
11541122
Ok(Some((block_hash, monitor)))
11551123
}
@@ -1524,7 +1492,7 @@ impl core::fmt::Display for MonitorName {
15241492
/// let monitor_name = "some_monitor_name";
15251493
/// let storage_key = format!("channel_monitor_updates/{}/{}", monitor_name, update_name.as_str());
15261494
/// ```
1527-
#[derive(Debug)]
1495+
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
15281496
pub struct UpdateName(pub u64, String);
15291497

15301498
impl UpdateName {

0 commit comments

Comments
 (0)