Skip to content

Commit 56b58c8

Browse files
committed
Add async persistence logic in MonitorUpdatingPersister
In the next commit we'll add the ability to use an async `KVStore` as the backing for a `ChainMonitor`. Here we tee this up by adding an async API to `MonitorUpdatingPersisterAsync`. Its not intended for public use and is thus only `pub(crate)` but allows spawning all operations via a generic `FutureSpawner` trait, initiating the write via the `KVStore` before any `await`s (or async functions). Because we aren't going to make the `ChannelManager` (or `ChainMonitor`) fully async, we need a way to alert the `ChainMonitor` when a persistence completes, but we leave that for the next commit.
1 parent 0aa3c62 commit 56b58c8

File tree

1 file changed

+155
-42
lines changed

1 file changed

+155
-42
lines changed

lightning/src/util/persist.rs

Lines changed: 155 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use bitcoin::hashes::hex::FromHex;
1717
use bitcoin::{BlockHash, Txid};
1818

1919
use core::future::Future;
20+
use core::mem;
2021
use core::ops::Deref;
2122
use core::pin::Pin;
2223
use core::str::FromStr;
@@ -32,7 +33,8 @@ use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate};
3233
use crate::chain::transaction::OutPoint;
3334
use crate::ln::types::ChannelId;
3435
use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider};
35-
use crate::util::async_poll::dummy_waker;
36+
use crate::sync::Mutex;
37+
use crate::util::async_poll::{dummy_waker, MaybeSend, MaybeSync};
3638
use crate::util::logger::Logger;
3739
use crate::util::ser::{Readable, ReadableArgs, Writeable};
3840

@@ -409,6 +411,21 @@ where
409411
Ok(res)
410412
}
411413

414+
/// A generic trait which is able to spawn futures in the background.
415+
pub trait FutureSpawner: Send + Sync + 'static {
416+
/// Spawns the given future as a background task.
417+
///
418+
/// This method MUST NOT block on the given future immediately.
419+
fn spawn<T: Future<Output = ()> + MaybeSend + 'static>(&self, future: T);
420+
}
421+
422+
struct PanicingSpawner;
423+
impl FutureSpawner for PanicingSpawner {
424+
fn spawn<T: Future<Output = ()> + MaybeSend + 'static>(&self, _: T) {
425+
unreachable!();
426+
}
427+
}
428+
412429
fn poll_sync_future<F: Future>(future: F) -> F::Output {
413430
let mut waker = dummy_waker();
414431
let mut ctx = task::Context::from_waker(&mut waker);
@@ -507,7 +524,7 @@ fn poll_sync_future<F: Future>(future: F) -> F::Output {
507524
/// would like to get rid of them, consider using the
508525
/// [`MonitorUpdatingPersister::cleanup_stale_updates`] function.
509526
pub struct MonitorUpdatingPersister<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>(
510-
MonitorUpdatingPersisterAsync<KVStoreSyncWrapper<K>, L, ES, SP, BI, FE>,
527+
MonitorUpdatingPersisterAsync<KVStoreSyncWrapper<K>, PanicingSpawner, L, ES, SP, BI, FE>,
511528
)
512529
where
513530
K::Target: KVStoreSync,
@@ -553,6 +570,7 @@ where
553570
) -> Self {
554571
MonitorUpdatingPersister(MonitorUpdatingPersisterAsync::new(
555572
KVStoreSyncWrapper(kv_store),
573+
PanicingSpawner,
556574
logger,
557575
maximum_pending_updates,
558576
entropy_source,
@@ -665,8 +683,8 @@ where
665683
&self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>,
666684
monitor: &ChannelMonitor<ChannelSigner>,
667685
) -> chain::ChannelMonitorUpdateStatus {
668-
let res =
669-
poll_sync_future(self.0 .0.update_persisted_channel(monitor_name, update, monitor));
686+
let inner = Arc::clone(&self.0.0);
687+
let res = poll_sync_future(inner.update_persisted_channel(monitor_name, update, monitor));
670688
match res {
671689
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
672690
Err(e) => {
@@ -691,14 +709,18 @@ where
691709
/// async versions of the public accessors.
692710
///
693711
/// Note that async monitor updating is considered beta, and bugs may be triggered by its use.
712+
///
713+
/// Unlike [`MonitorUpdatingPersister`], this does not implement [`Persist`], but is instead used
714+
/// directly by the [`ChainMonitor`].
694715
pub struct MonitorUpdatingPersisterAsync<
695716
K: Deref,
717+
S: FutureSpawner,
696718
L: Deref,
697719
ES: Deref,
698720
SP: Deref,
699721
BI: Deref,
700722
FE: Deref,
701-
>(Arc<MonitorUpdatingPersisterAsyncInner<K, L, ES, SP, BI, FE>>)
723+
>(Arc<MonitorUpdatingPersisterAsyncInner<K, S, L, ES, SP, BI, FE>>)
702724
where
703725
K::Target: KVStore,
704726
L::Target: Logger,
@@ -709,12 +731,14 @@ where
709731

710732
struct MonitorUpdatingPersisterAsyncInner<
711733
K: Deref,
734+
S: FutureSpawner,
712735
L: Deref,
713736
ES: Deref,
714737
SP: Deref,
715738
BI: Deref,
716739
FE: Deref,
717-
> where
740+
>
741+
where
718742
K::Target: KVStore,
719743
L::Target: Logger,
720744
ES::Target: EntropySource + Sized,
@@ -723,6 +747,7 @@ struct MonitorUpdatingPersisterAsyncInner<
723747
FE::Target: FeeEstimator,
724748
{
725749
kv_store: K,
750+
future_spawner: S,
726751
logger: L,
727752
maximum_pending_updates: u64,
728753
entropy_source: ES,
@@ -731,8 +756,8 @@ struct MonitorUpdatingPersisterAsyncInner<
731756
fee_estimator: FE,
732757
}
733758

734-
impl<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
735-
MonitorUpdatingPersisterAsync<K, L, ES, SP, BI, FE>
759+
impl<K: Deref, S: FutureSpawner, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
760+
MonitorUpdatingPersisterAsync<K, S, L, ES, SP, BI, FE>
736761
where
737762
K::Target: KVStore,
738763
L::Target: Logger,
@@ -745,11 +770,12 @@ where
745770
///
746771
/// See [`MonitorUpdatingPersister::new`] for more info.
747772
pub fn new(
748-
kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES,
749-
signer_provider: SP, broadcaster: BI, fee_estimator: FE,
773+
kv_store: K, future_spawner: S, logger: L, maximum_pending_updates: u64,
774+
entropy_source: ES, signer_provider: SP, broadcaster: BI, fee_estimator: FE,
750775
) -> Self {
751776
MonitorUpdatingPersisterAsync(Arc::new(MonitorUpdatingPersisterAsyncInner {
752777
kv_store,
778+
future_spawner,
753779
logger,
754780
maximum_pending_updates,
755781
entropy_source,
@@ -818,8 +844,78 @@ where
818844
}
819845
}
820846

821-
impl<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
822-
MonitorUpdatingPersisterAsyncInner<K, L, ES, SP, BI, FE>
847+
impl<
848+
K: Deref + MaybeSend + MaybeSync + 'static,
849+
S: FutureSpawner,
850+
L: Deref + MaybeSend + MaybeSync + 'static,
851+
ES: Deref + MaybeSend + MaybeSync + 'static,
852+
SP: Deref + MaybeSend + MaybeSync + 'static,
853+
BI: Deref + MaybeSend + MaybeSync + 'static,
854+
FE: Deref + MaybeSend + MaybeSync + 'static,
855+
>
856+
MonitorUpdatingPersisterAsync<K, S, L, ES, SP, BI, FE>
857+
where
858+
K::Target: KVStore + MaybeSync,
859+
L::Target: Logger,
860+
ES::Target: EntropySource + Sized,
861+
SP::Target: SignerProvider + Sized,
862+
BI::Target: BroadcasterInterface,
863+
FE::Target: FeeEstimator,
864+
<SP::Target as SignerProvider>::EcdsaSigner: MaybeSend + 'static,
865+
{
866+
pub(crate) fn spawn_async_persist_new_channel(
867+
&self, monitor_name: MonitorName, monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
868+
) {
869+
let inner = Arc::clone(&self.0);
870+
let future = inner.persist_new_channel(monitor_name, monitor);
871+
let channel_id = monitor.channel_id();
872+
self.0.future_spawner.spawn(async move {
873+
match future.await {
874+
Ok(()) => {}, // TODO: expose completions
875+
Err(e) => {
876+
log_error!(
877+
inner.logger,
878+
"Failed to persist new ChannelMonitor {channel_id}: {e}. The node will now likely stall as this channel will not be able to make progress. You should restart as soon as possible.",
879+
);
880+
},
881+
}
882+
});
883+
}
884+
885+
pub(crate) fn spawn_async_update_persisted_channel(
886+
&self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>,
887+
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
888+
) {
889+
let inner = Arc::clone(&self.0);
890+
let future = inner.update_persisted_channel(monitor_name, update, monitor);
891+
let channel_id = monitor.channel_id();
892+
let inner = Arc::clone(&self.0);
893+
self.0.future_spawner.spawn(async move {
894+
match future.await {
895+
Ok(()) => {}, // TODO: expose completions
896+
Err(e) => {
897+
log_error!(
898+
inner.logger,
899+
"Failed to persist new ChannelMonitor {channel_id}: {e}. The node will now likely stall as this channel will not be able to make progress. You should restart as soon as possible.",
900+
);
901+
},
902+
}
903+
});
904+
}
905+
906+
pub(crate) fn spawn_async_archive_persisted_channel(
907+
&self, monitor_name: MonitorName,
908+
) {
909+
let inner = Arc::clone(&self.0);
910+
self.0.future_spawner.spawn(async move {
911+
inner.archive_persisted_channel(monitor_name).await;
912+
});
913+
}
914+
}
915+
916+
917+
impl<K: Deref, S: FutureSpawner, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
918+
MonitorUpdatingPersisterAsyncInner<K, S, L, ES, SP, BI, FE>
823919
where
824920
K::Target: KVStore,
825921
L::Target: Logger,
@@ -938,7 +1034,7 @@ where
9381034
let monitor_name = MonitorName::from_str(&monitor_key)?;
9391035
let (_, current_monitor) = self.read_monitor(&monitor_name, &monitor_key).await?;
9401036
let latest_update_id = current_monitor.get_latest_update_id();
941-
self.cleanup_stale_updates_for_monitor_to(&monitor_key, latest_update_id, lazy).await;
1037+
self.cleanup_stale_updates_for_monitor_to(&monitor_key, latest_update_id, lazy).await?;
9421038
}
9431039
Ok(())
9441040
}
@@ -958,9 +1054,9 @@ where
9581054
Ok(())
9591055
}
9601056

961-
async fn persist_new_channel<ChannelSigner: EcdsaChannelSigner>(
1057+
fn persist_new_channel<ChannelSigner: EcdsaChannelSigner>(
9621058
&self, monitor_name: MonitorName, monitor: &ChannelMonitor<ChannelSigner>,
963-
) -> Result<(), io::Error> {
1059+
) -> impl Future<Output = Result<(), io::Error>> {
9641060
// Determine the proper key for this monitor
9651061
let monitor_key = monitor_name.to_string();
9661062
// Serialize and write the new monitor
@@ -974,16 +1070,22 @@ where
9741070
monitor_bytes.extend_from_slice(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL);
9751071
}
9761072
monitor.write(&mut monitor_bytes).unwrap();
1073+
// Note that this is NOT an async function, but rather calls the *sync* KVStore write
1074+
// method, allowing it to do its queueing immediately, and then return a future for the
1075+
// completion of the write. This ensures monitor persistence ordering is preserved.
9771076
let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE;
9781077
let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE;
979-
self.kv_store.write(primary, secondary, monitor_key.as_str(), monitor_bytes).await
1078+
self.kv_store.write(primary, secondary, monitor_key.as_str(), monitor_bytes)
9801079
}
9811080

982-
async fn update_persisted_channel<ChannelSigner: EcdsaChannelSigner>(
983-
&self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>,
1081+
fn update_persisted_channel<'a, ChannelSigner: EcdsaChannelSigner + 'a>(
1082+
self: Arc<Self>, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>,
9841083
monitor: &ChannelMonitor<ChannelSigner>,
985-
) -> Result<(), io::Error> {
1084+
) -> impl Future<Output = Result<(), io::Error>> + 'a where Self: 'a {
9861085
const LEGACY_CLOSED_CHANNEL_UPDATE_ID: u64 = u64::MAX;
1086+
let mut res_a = None;
1087+
let mut res_b = None;
1088+
let mut res_c = None;
9871089
if let Some(update) = update {
9881090
let persist_update = update.update_id != LEGACY_CLOSED_CHANNEL_UPDATE_ID
9891091
&& self.maximum_pending_updates != 0
@@ -992,37 +1094,48 @@ where
9921094
let monitor_key = monitor_name.to_string();
9931095
let update_name = UpdateName::from(update.update_id);
9941096
let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE;
995-
self.kv_store
1097+
res_a = Some(self.kv_store
9961098
.write(primary, &monitor_key, update_name.as_str(), update.encode())
997-
.await
1099+
);
9981100
} else {
9991101
// We could write this update, but it meets criteria of our design that calls for a full monitor write.
1000-
let write_status = self.persist_new_channel(monitor_name, monitor).await;
1001-
1002-
if let Ok(()) = write_status {
1003-
let channel_closed_legacy =
1004-
monitor.get_latest_update_id() == LEGACY_CLOSED_CHANNEL_UPDATE_ID;
1005-
let latest_update_id = monitor.get_latest_update_id();
1006-
if channel_closed_legacy {
1007-
let monitor_key = monitor_name.to_string();
1008-
self.cleanup_stale_updates_for_monitor_to(
1009-
&monitor_key,
1010-
latest_update_id,
1011-
true,
1012-
)
1013-
.await;
1014-
} else {
1015-
let end = latest_update_id;
1016-
let start = end.saturating_sub(self.maximum_pending_updates);
1017-
self.cleanup_in_range(monitor_name, start, end).await;
1102+
let write_fut = self.persist_new_channel(monitor_name, monitor);
1103+
let latest_update_id = monitor.get_latest_update_id();
1104+
1105+
res_b = Some(async move {
1106+
let write_status = write_fut.await;
1107+
if let Ok(()) = write_status {
1108+
if latest_update_id == LEGACY_CLOSED_CHANNEL_UPDATE_ID {
1109+
let monitor_key = monitor_name.to_string();
1110+
self.cleanup_stale_updates_for_monitor_to(&monitor_key, latest_update_id, true).await?;
1111+
} else {
1112+
let end = latest_update_id;
1113+
let start = end.saturating_sub(self.maximum_pending_updates);
1114+
self.cleanup_in_range(monitor_name, start, end).await;
1115+
}
10181116
}
1019-
}
10201117

1021-
write_status
1118+
write_status
1119+
});
10221120
}
10231121
} else {
10241122
// There is no update given, so we must persist a new monitor.
1025-
self.persist_new_channel(monitor_name, monitor).await
1123+
// Note that this is NOT an async function, but rather calls the *sync* KVStore write
1124+
// method, allowing it to do its queueing immediately, and then return a future for the
1125+
// completion of the write. This ensures monitor persistence ordering is preserved.
1126+
res_c = Some(self.persist_new_channel(monitor_name, monitor));
1127+
}
1128+
async move {
1129+
if let Some(a) = res_a {
1130+
a.await?;
1131+
}
1132+
if let Some(b) = res_b {
1133+
b.await?;
1134+
}
1135+
if let Some(c) = res_c {
1136+
c.await?;
1137+
}
1138+
Ok(())
10261139
}
10271140
}
10281141

0 commit comments

Comments
 (0)