Skip to content

Commit 15abec0

Browse files
committed
Add an option to deserialize monitors in parallel in async load
`MonitorUpdatingPersister::read_all_channel_monitors_with_updates` was made to do the IO operations in parallel in a previous commit, however in practice this doesn't provide material parallelism for large routing nodes. Because deserializing `ChannelMonitor`s is the bulk of the work (when IO operations are sufficiently fast), we end up blocked in single-threaded work nearly the entire time. Here, we add an alternative option - a new `read_all_channel_monitors_with_updates_parallel` method which uses the `FutureSpawner` to cause the deserialization operations to proceed in parallel.
1 parent 572007a commit 15abec0

File tree

1 file changed

+54
-0
lines changed

1 file changed

+54
-0
lines changed

lightning/src/util/persist.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -870,6 +870,10 @@ where
870870

871871
/// Reads all stored channel monitors, along with any stored updates for them.
872872
///
873+
/// While the reads themselves are performend in parallel, deserializing the
874+
/// [`ChannelMonitor`]s is not. For large [`ChannelMonitor`]s actively used for forwarding,
875+
/// this may substantially limit the parallelism of this method.
876+
///
873877
/// It is extremely important that your [`KVStore::read`] implementation uses the
874878
/// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
875879
/// documentation for [`MonitorUpdatingPersister`].
@@ -898,6 +902,56 @@ where
898902
Ok(res)
899903
}
900904

905+
/// Reads all stored channel monitors, along with any stored updates for them, in parallel.
906+
///
907+
/// Because deserializing large [`ChannelMonitor`]s from forwarding nodes is often CPU-bound,
908+
/// this version of [`Self::read_all_channel_monitors_with_updates`] uses the [`FutureSpawner`]
909+
/// to parallelize deserialization as well as the IO operations.
910+
///
911+
/// Because [`FutureSpawner`] requires that the spawned future be `'static` (matching `tokio`
912+
/// and other multi-threaded runtime requirements), this method requires that `self` be an
913+
/// `Arc` that can live for `'static` and be sent and accessed across threads.
914+
///
915+
/// It is extremely important that your [`KVStore::read`] implementation uses the
916+
/// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
917+
/// documentation for [`MonitorUpdatingPersister`].
918+
pub async fn read_all_channel_monitors_with_updates_parallel(
919+
self: &Arc<Self>,
920+
) -> Result<
921+
Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>,
922+
io::Error,
923+
> where
924+
K: MaybeSend + MaybeSync + 'static,
925+
L: MaybeSend + MaybeSync + 'static,
926+
ES: MaybeSend + MaybeSync + 'static,
927+
SP: MaybeSend + MaybeSync + 'static,
928+
BI: MaybeSend + MaybeSync + 'static,
929+
FE: MaybeSend + MaybeSync + 'static,
930+
<SP::Target as SignerProvider>::EcdsaSigner: MaybeSend,
931+
{
932+
let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE;
933+
let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE;
934+
let monitor_list = self.0.kv_store.list(primary, secondary).await?;
935+
let mut futures = Vec::with_capacity(monitor_list.len());
936+
for monitor_key in monitor_list {
937+
let us = Arc::clone(&self);
938+
futures.push(ResultFuture::Pending(self.0.future_spawner.spawn(async move {
939+
us.0.maybe_read_channel_monitor_with_updates(monitor_key.as_str()).await
940+
})));
941+
}
942+
let future_results = MultiResultFuturePoller::new(futures).await;
943+
let mut res = Vec::with_capacity(future_results.len());
944+
for result in future_results {
945+
match result {
946+
Err(_) => return Err(io::Error::new(io::ErrorKind::Other, "Future was cancelled")),
947+
Ok(Err(e)) => return Err(e),
948+
Ok(Ok(Some(read_res))) => res.push(read_res),
949+
Ok(Ok(None)) => {},
950+
}
951+
}
952+
Ok(res)
953+
}
954+
901955
/// Read a single channel monitor, along with any stored updates for it.
902956
///
903957
/// It is extremely important that your [`KVStoreSync::read`] implementation uses the

0 commit comments

Comments
 (0)