Skip to content

Commit da932f6

Browse files
committed
Add support for native async KVStore persist to ChainMonitor
This finally adds support for full native Rust `async` persistence to `ChainMonitor`. Way back when, before we had any other persistence, we added the `Persist` trait to persist `ChannelMonitor`s. It eventualy grew homegrown async persistence support via a simple immediate return and callback upon completion. We later added a persistence trait in `lightning-background-processor` to persist the few fields that it needed to drive writes for. Over time, we found more places where persistence was useful, and we eventually added a generic `KVStore` trait. In dc75436 we removed the `lightning-background-processor` `Persister` in favor of simply using the native `KVStore` directly. Here we continue that trend, building native `async` `ChannelMonitor` persistence on top of our native `KVStore` rather than hacking support for it into the `chain::Persist` trait. Because `MonitorUpdatingPersister` already exists as a common way to wrap a `KVStore` into a `ChannelMonitor` persister, we build exclusively on that (though note that the "monitor updating" part is now optional), utilizing its new async option as our native async driver. Thus, we end up with a `ChainMonitor::new_async_beta` which takes a `MonitorUpdatingPersisterAsync` rather than a classic `chain::Persist` and then operates the same as a normal `ChainMonitor`. While the requirement that users now use a `MonitorUpdatingPersister` to wrap their `KVStore` before providing it to `ChainMonitor` is somewhat awkward, as we move towards a `KVStore`-only world it seems like `MonitorUpdatingPersister` should eventually merge into `ChainMonitor`.
1 parent c0fc36d commit da932f6

File tree

2 files changed

+148
-5
lines changed

2 files changed

+148
-5
lines changed

lightning/src/chain/chainmonitor.rs

Lines changed: 131 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,13 @@ use crate::ln::our_peer_storage::{DecryptedOurPeerStorage, PeerStorageMonitorHol
4646
use crate::ln::types::ChannelId;
4747
use crate::prelude::*;
4848
use crate::sign::ecdsa::EcdsaChannelSigner;
49-
use crate::sign::{EntropySource, PeerStorageKey};
49+
use crate::sign::{EntropySource, PeerStorageKey, SignerProvider};
5050
use crate::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard};
5151
use crate::types::features::{InitFeatures, NodeFeatures};
52+
use crate::util::async_poll::{MaybeSend, MaybeSync};
5253
use crate::util::errors::APIError;
5354
use crate::util::logger::{Logger, WithContext};
54-
use crate::util::persist::MonitorName;
55+
use crate::util::persist::{FutureSpawner, MonitorName, MonitorUpdatingPersisterAsync, KVStore};
5556
#[cfg(peer_storage)]
5657
use crate::util::ser::{VecWriter, Writeable};
5758
use crate::util::wakers::{Future, Notifier};
@@ -192,6 +193,15 @@ pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
192193
/// restart, this method must in that case be idempotent, ensuring it can handle scenarios where
193194
/// the monitor already exists in the archive.
194195
fn archive_persisted_channel(&self, monitor_name: MonitorName);
196+
197+
/// Fetches the set of [`ChannelMonitorUpdate`]s, previously persisted with
198+
/// [`Self::update_persisted_channel`], which have completed.
199+
///
200+
/// Returning an update here is equivalent to calling
201+
/// [`ChainMonitor::channel_monitor_updated`]. Because of this, this method is defaulted and
202+
/// hidden in the docs.
203+
#[doc(hidden)]
204+
fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> { Vec::new() }
195205
}
196206

197207
struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {
@@ -235,6 +245,73 @@ impl<ChannelSigner: EcdsaChannelSigner> Deref for LockedChannelMonitor<'_, Chann
235245
}
236246
}
237247

248+
249+
/// An unconstructable [`Persist`]er which is used under the hood when you call
250+
/// [`ChainMonitor::new_async_beta`].
251+
pub struct AsyncPersister<K: Deref + MaybeSend + MaybeSync + 'static, S: FutureSpawner, L: Deref + MaybeSend + MaybeSync + 'static, ES: Deref + MaybeSend + MaybeSync + 'static, SP: Deref + MaybeSend + MaybeSync + 'static, BI: Deref + MaybeSend + MaybeSync + 'static, FE: Deref + MaybeSend + MaybeSync + 'static>
252+
where
253+
K::Target: KVStore + MaybeSync,
254+
L::Target: Logger,
255+
ES::Target: EntropySource + Sized,
256+
SP::Target: SignerProvider + Sized,
257+
BI::Target: BroadcasterInterface,
258+
FE::Target: FeeEstimator
259+
{
260+
persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, BI, FE>,
261+
}
262+
263+
impl<K: Deref + MaybeSend + MaybeSync + 'static, S: FutureSpawner, L: Deref + MaybeSend + MaybeSync + 'static, ES: Deref + MaybeSend + MaybeSync + 'static, SP: Deref + MaybeSend + MaybeSync + 'static, BI: Deref + MaybeSend + MaybeSync + 'static, FE: Deref + MaybeSend + MaybeSync + 'static>
264+
Deref for AsyncPersister<K, S, L, ES, SP, BI, FE>
265+
where
266+
K::Target: KVStore + MaybeSync,
267+
L::Target: Logger,
268+
ES::Target: EntropySource + Sized,
269+
SP::Target: SignerProvider + Sized,
270+
BI::Target: BroadcasterInterface,
271+
FE::Target: FeeEstimator
272+
{
273+
type Target = Self;
274+
fn deref(&self) -> &Self {
275+
self
276+
}
277+
}
278+
279+
impl<K: Deref + MaybeSend + MaybeSync + 'static, S: FutureSpawner, L: Deref + MaybeSend + MaybeSync + 'static, ES: Deref + MaybeSend + MaybeSync + 'static, SP: Deref + MaybeSend + MaybeSync + 'static, BI: Deref + MaybeSend + MaybeSync + 'static, FE: Deref + MaybeSend + MaybeSync + 'static>
280+
Persist<<SP::Target as SignerProvider>::EcdsaSigner> for AsyncPersister<K, S, L, ES, SP, BI, FE>
281+
where
282+
K::Target: KVStore + MaybeSync,
283+
L::Target: Logger,
284+
ES::Target: EntropySource + Sized,
285+
SP::Target: SignerProvider + Sized,
286+
BI::Target: BroadcasterInterface,
287+
FE::Target: FeeEstimator,
288+
<SP::Target as SignerProvider>::EcdsaSigner: MaybeSend + 'static,
289+
{
290+
fn persist_new_channel(
291+
&self, monitor_name: MonitorName, monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
292+
) -> ChannelMonitorUpdateStatus {
293+
self.persister.spawn_async_persist_new_channel(monitor_name, monitor);
294+
ChannelMonitorUpdateStatus::InProgress
295+
}
296+
297+
fn update_persisted_channel(
298+
&self, monitor_name: MonitorName, monitor_update: Option<&ChannelMonitorUpdate>,
299+
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
300+
) -> ChannelMonitorUpdateStatus {
301+
self.persister.spawn_async_update_persisted_channel(monitor_name, monitor_update, monitor);
302+
ChannelMonitorUpdateStatus::InProgress
303+
}
304+
305+
fn archive_persisted_channel(&self, monitor_name: MonitorName) {
306+
self.persister.spawn_async_archive_persisted_channel(monitor_name);
307+
}
308+
309+
fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
310+
self.persister.get_and_clear_completed_updates()
311+
}
312+
}
313+
314+
238315
/// An implementation of [`chain::Watch`] for monitoring channels.
239316
///
240317
/// Connected and disconnected blocks must be provided to `ChainMonitor` as documented by
@@ -291,6 +368,55 @@ pub struct ChainMonitor<
291368
our_peerstorage_encryption_key: PeerStorageKey,
292369
}
293370

371+
impl<
372+
K: Deref + MaybeSend + MaybeSync + 'static,
373+
S: FutureSpawner,
374+
SP: Deref + MaybeSend + MaybeSync + 'static,
375+
C: Deref,
376+
T: Deref + MaybeSend + MaybeSync + 'static,
377+
F: Deref + MaybeSend + MaybeSync + 'static,
378+
L: Deref + MaybeSend + MaybeSync + 'static,
379+
ES: Deref + MaybeSend + MaybeSync + 'static,
380+
> ChainMonitor<<SP::Target as SignerProvider>::EcdsaSigner, C, T, F, L, AsyncPersister<K, S, L, ES, SP, T, F>, ES>
381+
where
382+
K::Target: KVStore + MaybeSync,
383+
SP::Target: SignerProvider + Sized,
384+
C::Target: chain::Filter,
385+
T::Target: BroadcasterInterface,
386+
F::Target: FeeEstimator,
387+
L::Target: Logger,
388+
ES::Target: EntropySource + Sized,
389+
<SP::Target as SignerProvider>::EcdsaSigner: MaybeSend + 'static,
390+
{
391+
/// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels.
392+
///
393+
/// This behaves the same as [`ChainMonitor::new`] except that it relies on
394+
/// [`MonitorUpdatingPersisterAsync`] and thus allows persistence to be completed async.
395+
///
396+
/// Note that async monitor updating is considered beta, and bugs may be triggered by its use.
397+
pub fn new_async_beta(
398+
chain_source: Option<C>, broadcaster: T, logger: L, feeest: F,
399+
persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, T, F>, _entropy_source: ES,
400+
_our_peerstorage_encryption_key: PeerStorageKey,
401+
) -> Self {
402+
Self {
403+
monitors: RwLock::new(new_hash_map()),
404+
chain_source,
405+
broadcaster,
406+
logger,
407+
fee_estimator: feeest,
408+
persister: AsyncPersister { persister },
409+
_entropy_source,
410+
pending_monitor_events: Mutex::new(Vec::new()),
411+
highest_chain_height: AtomicUsize::new(0),
412+
event_notifier: Notifier::new(),
413+
pending_send_only_events: Mutex::new(Vec::new()),
414+
#[cfg(peer_storage)]
415+
our_peerstorage_encryption_key: _our_peerstorage_encryption_key,
416+
}
417+
}
418+
}
419+
294420
impl<
295421
ChannelSigner: EcdsaChannelSigner,
296422
C: Deref,
@@ -1357,6 +1483,9 @@ where
13571483
fn release_pending_monitor_events(
13581484
&self,
13591485
) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)> {
1486+
for (channel_id, update_id) in self.persister.get_and_clear_completed_updates() {
1487+
self.channel_monitor_updated(channel_id, update_id);
1488+
}
13601489
let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
13611490
for monitor_state in self.monitors.read().unwrap().values() {
13621491
let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events();

lightning/src/util/persist.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,7 @@ where
703703
/// Note that async monitor updating is considered beta, and bugs may be triggered by its use.
704704
///
705705
/// Unlike [`MonitorUpdatingPersister`], this does not implement [`Persist`], but is instead used
706-
/// directly by the [`ChainMonitor`].
706+
/// directly by the [`ChainMonitor`] via [`ChainMonitor::new_async_beta`].
707707
///
708708
/// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor
709709
pub struct MonitorUpdatingPersisterAsync<K: Deref, S: FutureSpawner, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
@@ -726,6 +726,7 @@ where
726726
FE::Target: FeeEstimator,
727727
{
728728
kv_store: K,
729+
async_completed_updates: Mutex<Vec<(ChannelId, u64)>>,
729730
future_spawner: S,
730731
logger: L,
731732
maximum_pending_updates: u64,
@@ -754,6 +755,7 @@ where
754755
) -> Self {
755756
MonitorUpdatingPersisterAsync(Arc::new(MonitorUpdatingPersisterAsyncInner {
756757
kv_store,
758+
async_completed_updates: Mutex::new(Vec::new()),
757759
future_spawner,
758760
logger,
759761
maximum_pending_updates,
@@ -841,9 +843,10 @@ where
841843
let inner = Arc::clone(&self.0);
842844
let future = inner.persist_new_channel(monitor_name, monitor);
843845
let channel_id = monitor.channel_id();
846+
let completion = (monitor.channel_id(), monitor.get_latest_update_id());
844847
self.0.future_spawner.spawn(async move {
845848
match future.await {
846-
Ok(()) => {}, // TODO: expose completions
849+
Ok(()) => inner.async_completed_updates.lock().unwrap().push(completion),
847850
Err(e) => {
848851
log_error!(
849852
inner.logger,
@@ -861,10 +864,17 @@ where
861864
let inner = Arc::clone(&self.0);
862865
let future = inner.update_persisted_channel(monitor_name, update, monitor);
863866
let channel_id = monitor.channel_id();
867+
let completion = if let Some(update) = update {
868+
Some((monitor.channel_id(), update.update_id))
869+
} else {
870+
None
871+
};
864872
let inner = Arc::clone(&self.0);
865873
self.0.future_spawner.spawn(async move {
866874
match future.await {
867-
Ok(()) => {}, // TODO: expose completions
875+
Ok(()) => if let Some(completion) = completion {
876+
inner.async_completed_updates.lock().unwrap().push(completion);
877+
},
868878
Err(e) => {
869879
log_error!(
870880
inner.logger,
@@ -883,6 +893,10 @@ where
883893
inner.archive_persisted_channel(monitor_name).await;
884894
});
885895
}
896+
897+
pub(crate) fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
898+
mem::take(&mut *self.0.async_completed_updates.lock().unwrap())
899+
}
886900
}
887901

888902

0 commit comments

Comments
 (0)