Skip to content

Commit 0503f6b

Browse files
committed
Parallelize ChannelMonitorUpdate loading
When reading `ChannelMonitor`s from a `MonitorUpdatingPersister` on startup, we have to make sure to load any `ChannelMonitorUpdate`s and re-apply them as well. Now that we know which `ChannelMonitorUpdate`s to load from `list`ing the entries from the `KVStore` we can parallelize the reads themselves, which we do here. Now, loading all `ChannelMonitor`s from an async `KVStore` requires only three full RTTs - one to list the set of `ChannelMonitor`s, one to both fetch the `ChanelMonitor` and list the set of `ChannelMonitorUpdate`s, and one to fetch all the `ChannelMonitorUpdate`s (with the last one skipped when there are no `ChannelMonitorUpdate`s to read).
1 parent d93c809 commit 0503f6b

File tree

1 file changed

+22
-18
lines changed

1 file changed

+22
-18
lines changed

lightning/src/util/persist.rs

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1096,28 +1096,32 @@ where
10961096
Some(res) => res,
10971097
None => return Ok(None),
10981098
};
1099-
let mut current_update_id = monitor.get_latest_update_id();
1099+
let current_update_id = monitor.get_latest_update_id();
11001100
let updates: Result<Vec<_>, _> =
11011101
list_res?.into_iter().map(|name| UpdateName::new(name)).collect();
11021102
let mut updates = updates?;
11031103
updates.sort_unstable();
1104-
// TODO: Parallelize this loop
1105-
for update_name in updates {
1106-
if update_name.0 > current_update_id {
1107-
let update = self.read_monitor_update(monitor_key, &update_name).await?;
1108-
monitor
1109-
.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger)
1110-
.map_err(|e| {
1111-
log_error!(
1112-
self.logger,
1113-
"Monitor update failed. monitor: {} update: {} reason: {:?}",
1114-
monitor_key,
1115-
update_name.as_str(),
1116-
e
1117-
);
1118-
io::Error::new(io::ErrorKind::Other, "Monitor update failed")
1119-
})?;
1120-
}
1104+
let updates_to_load = updates.iter().filter(|update| update.0 > current_update_id);
1105+
let mut update_futures = Vec::with_capacity(updates_to_load.clone().count());
1106+
for update_name in updates_to_load {
1107+
update_futures.push(ResultFuture::Pending(Box::pin(async move {
1108+
(update_name, self.read_monitor_update(monitor_key, update_name).await)
1109+
})));
1110+
}
1111+
for (update_name, update_res) in MultiResultFuturePoller::new(update_futures).await {
1112+
let update = update_res?;
1113+
monitor
1114+
.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger)
1115+
.map_err(|e| {
1116+
log_error!(
1117+
self.logger,
1118+
"Monitor update failed. monitor: {} update: {} reason: {:?}",
1119+
monitor_key,
1120+
update_name.as_str(),
1121+
e
1122+
);
1123+
io::Error::new(io::ErrorKind::Other, "Monitor update failed")
1124+
})?;
11211125
}
11221126
Ok(Some((block_hash, monitor)))
11231127
}

0 commit comments

Comments
 (0)