Skip to content

Commit 33a5e58

Browse files
committed
Parallelize ChannelMonitor loading from async KVStores
Reading `ChannelMonitor`s on startup is one of the slowest parts of LDK initialization. Now that we have an async `KVStore`, there's no need for that, we can simply paralellize their loading, which we do here. Sadly, because Rust futures are pretty unergonomic, we have to add some `unsafe {}` here, but arguing its fine is relatively straightforward.
1 parent b1f1ee2 commit 33a5e58

File tree

2 files changed

+27
-15
lines changed

2 files changed

+27
-15
lines changed

lightning/src/util/async_poll.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,26 +15,31 @@ use core::marker::Unpin;
1515
use core::pin::Pin;
1616
use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
1717

18-
pub(crate) enum ResultFuture<F: Future<Output = Result<(), E>>, E: Unpin> {
18+
pub(crate) enum ResultFuture<F: Future<Output = O> + Unpin, O> {
1919
Pending(F),
20-
Ready(Result<(), E>),
20+
Ready(O),
2121
}
2222

23-
pub(crate) struct MultiResultFuturePoller<F: Future<Output = Result<(), E>> + Unpin, E: Unpin> {
24-
futures_state: Vec<ResultFuture<F, E>>,
23+
pub(crate) struct MultiResultFuturePoller<F: Future<Output = O> + Unpin, O> {
24+
futures_state: Vec<ResultFuture<F, O>>,
2525
}
2626

27-
impl<F: Future<Output = Result<(), E>> + Unpin, E: Unpin> MultiResultFuturePoller<F, E> {
28-
pub fn new(futures_state: Vec<ResultFuture<F, E>>) -> Self {
27+
impl<F: Future<Output = O> + Unpin, O> MultiResultFuturePoller<F, O> {
28+
pub fn new(futures_state: Vec<ResultFuture<F, O>>) -> Self {
2929
Self { futures_state }
3030
}
3131
}
3232

33-
impl<F: Future<Output = Result<(), E>> + Unpin, E: Unpin> Future for MultiResultFuturePoller<F, E> {
34-
type Output = Vec<Result<(), E>>;
35-
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Vec<Result<(), E>>> {
33+
impl<F: Future<Output = O> + Unpin, O> Future for MultiResultFuturePoller<F, O> {
34+
type Output = Vec<O>;
35+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Vec<O>> {
3636
let mut have_pending_futures = false;
37-
let futures_state = &mut self.get_mut().futures_state;
37+
// SAFETY: While we are pinned, we can't get direct access to `futures_state` because we
38+
// aren't `Unpin`. However, we don't actually need the `Pin` - we only use it below on the
39+
// `Future` in the `ResultFuture::Pending` case, and the `Future` is bound by `Unpin`.
40+
// Thus, the `Pin` is not actually used, and its safe to bypass it and access the inner
41+
// reference directly.
42+
let futures_state = unsafe { &mut self.get_unchecked_mut().futures_state };
3843
for state in futures_state.iter_mut() {
3944
match state {
4045
ResultFuture::Pending(ref mut fut) => match Pin::new(fut).poll(cx) {

lightning/src/util/persist.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ use crate::chain::transaction::OutPoint;
3434
use crate::ln::types::ChannelId;
3535
use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider};
3636
use crate::sync::Mutex;
37-
use crate::util::async_poll::{dummy_waker, MaybeSend, MaybeSync};
37+
use crate::util::async_poll::{
38+
dummy_waker, MaybeSend, MaybeSync, MultiResultFuturePoller, ResultFuture,
39+
};
3840
use crate::util::logger::Logger;
3941
use crate::util::native_async::FutureSpawner;
4042
use crate::util::ser::{Readable, ReadableArgs, Writeable};
@@ -875,11 +877,16 @@ where
875877
let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE;
876878
let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE;
877879
let monitor_list = self.0.kv_store.list(primary, secondary).await?;
878-
let mut res = Vec::with_capacity(monitor_list.len());
880+
let mut futures = Vec::with_capacity(monitor_list.len());
879881
for monitor_key in monitor_list {
880-
let result =
881-
self.0.maybe_read_channel_monitor_with_updates(monitor_key.as_str()).await?;
882-
if let Some(read_res) = result {
882+
futures.push(ResultFuture::Pending(Box::pin(async move {
883+
self.0.maybe_read_channel_monitor_with_updates(monitor_key.as_str()).await
884+
})));
885+
}
886+
let future_results = MultiResultFuturePoller::new(futures).await;
887+
let mut res = Vec::with_capacity(future_results.len());
888+
for result in future_results {
889+
if let Some(read_res) = result? {
883890
res.push(read_res);
884891
}
885892
}

0 commit comments

Comments
 (0)