Skip to content

Commit 03ab5cf

Browse files
committed
Have background processor task drive LiquidityManger persistence
We let the background processor task regularly call `LiquidityManger::persist`. We also change the semantics of the `Future` for waking the background processor to also be used when we need repersisting (which we'll do in the next commit).
1 parent 4772882 commit 03ab5cf

File tree

6 files changed

+87
-52
lines changed

6 files changed

+87
-52
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 61 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -571,45 +571,49 @@ pub(crate) mod futures_util {
571571
unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) }
572572
}
573573

574-
enum JoinerResult<E, F: Future<Output = Result<(), E>> + Unpin> {
574+
enum JoinerResult<ERR, F: Future<Output = Result<(), ERR>> + Unpin> {
575575
Pending(Option<F>),
576-
Ready(Result<(), E>),
576+
Ready(Result<(), ERR>),
577577
}
578578

579579
pub(crate) struct Joiner<
580-
E,
581-
A: Future<Output = Result<(), E>> + Unpin,
582-
B: Future<Output = Result<(), E>> + Unpin,
583-
C: Future<Output = Result<(), E>> + Unpin,
584-
D: Future<Output = Result<(), E>> + Unpin,
580+
ERR,
581+
A: Future<Output = Result<(), ERR>> + Unpin,
582+
B: Future<Output = Result<(), ERR>> + Unpin,
583+
C: Future<Output = Result<(), ERR>> + Unpin,
584+
D: Future<Output = Result<(), ERR>> + Unpin,
585+
E: Future<Output = Result<(), ERR>> + Unpin,
585586
> {
586-
a: JoinerResult<E, A>,
587-
b: JoinerResult<E, B>,
588-
c: JoinerResult<E, C>,
589-
d: JoinerResult<E, D>,
587+
a: JoinerResult<ERR, A>,
588+
b: JoinerResult<ERR, B>,
589+
c: JoinerResult<ERR, C>,
590+
d: JoinerResult<ERR, D>,
591+
e: JoinerResult<ERR, E>,
590592
}
591593

592594
impl<
593-
E,
594-
A: Future<Output = Result<(), E>> + Unpin,
595-
B: Future<Output = Result<(), E>> + Unpin,
596-
C: Future<Output = Result<(), E>> + Unpin,
597-
D: Future<Output = Result<(), E>> + Unpin,
598-
> Joiner<E, A, B, C, D>
595+
ERR,
596+
A: Future<Output = Result<(), ERR>> + Unpin,
597+
B: Future<Output = Result<(), ERR>> + Unpin,
598+
C: Future<Output = Result<(), ERR>> + Unpin,
599+
D: Future<Output = Result<(), ERR>> + Unpin,
600+
E: Future<Output = Result<(), ERR>> + Unpin,
601+
> Joiner<ERR, A, B, C, D, E>
599602
{
600603
pub(crate) fn new() -> Self {
601604
Self {
602605
a: JoinerResult::Pending(None),
603606
b: JoinerResult::Pending(None),
604607
c: JoinerResult::Pending(None),
605608
d: JoinerResult::Pending(None),
609+
e: JoinerResult::Pending(None),
606610
}
607611
}
608612

609613
pub(crate) fn set_a(&mut self, fut: A) {
610614
self.a = JoinerResult::Pending(Some(fut));
611615
}
612-
pub(crate) fn set_a_res(&mut self, res: Result<(), E>) {
616+
pub(crate) fn set_a_res(&mut self, res: Result<(), ERR>) {
613617
self.a = JoinerResult::Ready(res);
614618
}
615619
pub(crate) fn set_b(&mut self, fut: B) {
@@ -621,19 +625,23 @@ pub(crate) mod futures_util {
621625
pub(crate) fn set_d(&mut self, fut: D) {
622626
self.d = JoinerResult::Pending(Some(fut));
623627
}
628+
pub(crate) fn set_e(&mut self, fut: E) {
629+
self.e = JoinerResult::Pending(Some(fut));
630+
}
624631
}
625632

626633
impl<
627-
E,
628-
A: Future<Output = Result<(), E>> + Unpin,
629-
B: Future<Output = Result<(), E>> + Unpin,
630-
C: Future<Output = Result<(), E>> + Unpin,
631-
D: Future<Output = Result<(), E>> + Unpin,
632-
> Future for Joiner<E, A, B, C, D>
634+
ERR,
635+
A: Future<Output = Result<(), ERR>> + Unpin,
636+
B: Future<Output = Result<(), ERR>> + Unpin,
637+
C: Future<Output = Result<(), ERR>> + Unpin,
638+
D: Future<Output = Result<(), ERR>> + Unpin,
639+
E: Future<Output = Result<(), ERR>> + Unpin,
640+
> Future for Joiner<ERR, A, B, C, D, E>
633641
where
634-
Joiner<E, A, B, C, D>: Unpin,
642+
Joiner<ERR, A, B, C, D, E>: Unpin,
635643
{
636-
type Output = [Result<(), E>; 4];
644+
type Output = [Result<(), ERR>; 5];
637645
fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
638646
let mut all_complete = true;
639647
macro_rules! handle {
@@ -642,7 +650,7 @@ pub(crate) mod futures_util {
642650
JoinerResult::Pending(None) => {
643651
self.$val = JoinerResult::Ready(Ok(()));
644652
},
645-
JoinerResult::<E, _>::Pending(Some(ref mut val)) => {
653+
JoinerResult::<ERR, _>::Pending(Some(ref mut val)) => {
646654
match Pin::new(val).poll(ctx) {
647655
Poll::Ready(res) => {
648656
self.$val = JoinerResult::Ready(res);
@@ -660,9 +668,10 @@ pub(crate) mod futures_util {
660668
handle!(b);
661669
handle!(c);
662670
handle!(d);
671+
handle!(e);
663672

664673
if all_complete {
665-
let mut res = [Ok(()), Ok(()), Ok(()), Ok(())];
674+
let mut res = [Ok(()), Ok(()), Ok(()), Ok(()), Ok(())];
666675
if let JoinerResult::Ready(ref mut val) = &mut self.a {
667676
core::mem::swap(&mut res[0], val);
668677
}
@@ -675,6 +684,9 @@ pub(crate) mod futures_util {
675684
if let JoinerResult::Ready(ref mut val) = &mut self.d {
676685
core::mem::swap(&mut res[3], val);
677686
}
687+
if let JoinerResult::Ready(ref mut val) = &mut self.e {
688+
core::mem::swap(&mut res[4], val);
689+
}
678690
Poll::Ready(res)
679691
} else {
680692
Poll::Pending
@@ -1003,7 +1015,7 @@ where
10031015
OptionalSelector { optional_future: None }
10041016
};
10051017
let lm_fut = if let Some(lm) = liquidity_manager.as_ref() {
1006-
let fut = lm.get_lm().get_pending_msgs_future();
1018+
let fut = lm.get_lm().get_pending_msgs_or_needs_persist_future();
10071019
OptionalSelector { optional_future: Some(fut) }
10081020
} else {
10091021
OptionalSelector { optional_future: None }
@@ -1206,6 +1218,17 @@ where
12061218
None => {},
12071219
}
12081220

1221+
if let Some(liquidity_manager) = liquidity_manager.as_ref() {
1222+
log_trace!(logger, "Persisting LiquidityManager...");
1223+
let fut = async {
1224+
liquidity_manager.get_lm().persist().await.map_err(|e| {
1225+
log_error!(logger, "Persisting LiquidityManager failed: {}", e);
1226+
e
1227+
})
1228+
};
1229+
futures.set_e(Box::pin(fut));
1230+
}
1231+
12091232
// Run persistence tasks in parallel and exit if any of them returns an error.
12101233
for res in futures.await {
12111234
res?;
@@ -1562,7 +1585,7 @@ impl BackgroundProcessor {
15621585
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
15631586
&chain_monitor.get_update_future(),
15641587
&om.get_om().get_update_future(),
1565-
&lm.get_lm().get_pending_msgs_future(),
1588+
&lm.get_lm().get_pending_msgs_or_needs_persist_future(),
15661589
),
15671590
(Some(om), None) => Sleeper::from_three_futures(
15681591
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
@@ -1572,7 +1595,7 @@ impl BackgroundProcessor {
15721595
(None, Some(lm)) => Sleeper::from_three_futures(
15731596
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
15741597
&chain_monitor.get_update_future(),
1575-
&lm.get_lm().get_pending_msgs_future(),
1598+
&lm.get_lm().get_pending_msgs_or_needs_persist_future(),
15761599
),
15771600
(None, None) => Sleeper::from_two_futures(
15781601
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
@@ -1606,6 +1629,13 @@ impl BackgroundProcessor {
16061629
log_trace!(logger, "Done persisting ChannelManager.");
16071630
}
16081631

1632+
if let Some(liquidity_manager) = liquidity_manager.as_ref() {
1633+
log_trace!(logger, "Persisting LiquidityManager...");
1634+
let _ = liquidity_manager.get_lm().persist().map_err(|e| {
1635+
log_error!(logger, "Persisting LiquidityManager failed: {}", e);
1636+
});
1637+
}
1638+
16091639
// Note that we want to run a graph prune once not long after startup before
16101640
// falling back to our usual hourly prunes. This avoids short-lived clients never
16111641
// pruning their network graph. We run once 60 seconds after startup before

lightning-liquidity/src/lsps0/client.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ mod tests {
123123

124124
use lightning::util::persist::KVStoreSyncWrapper;
125125
use lightning::util::test_utils::TestStore;
126+
use lightning::util::wakers::Notifier;
126127

127128
use crate::lsps0::ser::{LSPSMessage, LSPSRequestId};
128129
use crate::tests::utils::{self, TestEntropy};
@@ -131,7 +132,8 @@ mod tests {
131132

132133
#[test]
133134
fn test_list_protocols() {
134-
let pending_messages = Arc::new(MessageQueue::new());
135+
let notifier = Arc::new(Notifier::new());
136+
let pending_messages = Arc::new(MessageQueue::new(notifier));
135137
let entropy_source = Arc::new(TestEntropy {});
136138
let kv_store = Arc::new(KVStoreSyncWrapper(Arc::new(TestStore::new(false))));
137139
let event_queue = Arc::new(EventQueue::new(VecDeque::new(), kv_store));

lightning-liquidity/src/lsps0/service.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,13 +87,15 @@ mod tests {
8787
use crate::tests::utils;
8888
use alloc::string::ToString;
8989
use alloc::sync::Arc;
90+
use lightning::util::wakers::Notifier;
9091

9192
use super::*;
9293

9394
#[test]
9495
fn test_handle_list_protocols_request() {
9596
let protocols: Vec<u16> = vec![];
96-
let pending_messages = Arc::new(MessageQueue::new());
97+
let notifier = Arc::new(Notifier::new());
98+
let pending_messages = Arc::new(MessageQueue::new(notifier));
9799

98100
let lsps0_handler =
99101
Arc::new(LSPS0ServiceHandler::new(protocols, Arc::clone(&pending_messages)));

lightning-liquidity/src/lsps5/client.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,7 @@ mod tests {
450450
use core::sync::atomic::{AtomicU64, Ordering};
451451
use lightning::util::persist::KVStoreSyncWrapper;
452452
use lightning::util::test_utils::TestStore;
453+
use lightning::util::wakers::Notifier;
453454

454455
struct UniqueTestEntropy {
455456
counter: AtomicU64,
@@ -472,7 +473,8 @@ mod tests {
472473
PublicKey,
473474
) {
474475
let test_entropy_source = Arc::new(UniqueTestEntropy { counter: AtomicU64::new(2) });
475-
let message_queue = Arc::new(MessageQueue::new());
476+
let notifier = Arc::new(Notifier::new());
477+
let message_queue = Arc::new(MessageQueue::new(notifier));
476478

477479
let kv_store = Arc::new(KVStoreSyncWrapper(Arc::new(TestStore::new(false))));
478480
let event_queue = Arc::new(EventQueue::new(VecDeque::new(), kv_store));

lightning-liquidity/src/manager.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ use lightning::sign::{EntropySource, NodeSigner};
5252
use lightning::util::logger::Level;
5353
use lightning::util::persist::{KVStore, KVStoreSync, KVStoreSyncWrapper};
5454
use lightning::util::ser::{LengthLimitedRead, LengthReadable};
55-
use lightning::util::wakers::Future;
55+
use lightning::util::wakers::{Future, Notifier};
5656

5757
use lightning_types::features::{InitFeatures, NodeFeatures};
5858

@@ -312,6 +312,7 @@ pub struct LiquidityManager<
312312
_client_config: Option<LiquidityClientConfig>,
313313
best_block: RwLock<Option<BestBlock>>,
314314
_chain_source: Option<C>,
315+
pending_msgs_or_needs_persist_notifier: Arc<Notifier>,
315316
}
316317

317318
#[cfg(feature = "time")]
@@ -384,7 +385,9 @@ where
384385
service_config: Option<LiquidityServiceConfig>,
385386
client_config: Option<LiquidityClientConfig>, time_provider: TP,
386387
) -> Result<Self, lightning::io::Error> {
387-
let pending_messages = Arc::new(MessageQueue::new());
388+
let pending_msgs_or_needs_persist_notifier = Arc::new(Notifier::new());
389+
let pending_messages =
390+
Arc::new(MessageQueue::new(Arc::clone(&pending_msgs_or_needs_persist_notifier)));
388391
let persisted_queue = read_event_queue(kv_store.clone()).await?.unwrap_or_default();
389392
let pending_events = Arc::new(EventQueue::new(persisted_queue, kv_store.clone()));
390393
let ignored_peers = RwLock::new(new_hash_set());
@@ -523,6 +526,7 @@ where
523526
_client_config: client_config,
524527
best_block: RwLock::new(chain_params.map(|chain_params| chain_params.best_block)),
525528
_chain_source: chain_source,
529+
pending_msgs_or_needs_persist_notifier,
526530
})
527531
}
528532

@@ -581,12 +585,12 @@ where
581585
}
582586

583587
/// Returns a [`Future`] that will complete when the next batch of pending messages is ready to
584-
/// be processed.
588+
/// be processed *or* we need to be repersisted.
585589
///
586590
/// Note that callbacks registered on the [`Future`] MUST NOT call back into this
587591
/// [`LiquidityManager`] and should instead register actions to be taken later.
588-
pub fn get_pending_msgs_future(&self) -> Future {
589-
self.pending_messages.get_pending_msgs_future()
592+
pub fn get_pending_msgs_or_needs_persist_future(&self) -> Future {
593+
self.pending_msgs_or_needs_persist_notifier.get_future()
590594
}
591595

592596
/// Blocks the current thread until next event is ready and returns it.
@@ -1208,11 +1212,11 @@ where
12081212
}
12091213

12101214
/// Returns a [`Future`] that will complete when the next batch of pending messages is ready to
1211-
/// be processed.
1215+
/// be processed *or* we need to be repersisted.
12121216
///
1213-
/// Wraps [`LiquidityManager::get_pending_msgs_future`].
1214-
pub fn get_pending_msgs_future(&self) -> Future {
1215-
self.inner.get_pending_msgs_future()
1217+
/// Wraps [`LiquidityManager::get_pending_msgs_or_needs_persist_future`].
1218+
pub fn get_pending_msgs_or_needs_persist_future(&self) -> Future {
1219+
self.inner.get_pending_msgs_or_needs_persist_future()
12161220
}
12171221

12181222
/// Blocks the current thread until next event is ready and returns it.

lightning-liquidity/src/message_queue.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ use alloc::collections::VecDeque;
1313
use alloc::vec::Vec;
1414

1515
use crate::lsps0::ser::LSPSMessage;
16-
use crate::sync::Mutex;
16+
use crate::sync::{Arc, Mutex};
1717

18-
use lightning::util::wakers::{Future, Notifier};
18+
use lightning::util::wakers::Notifier;
1919

2020
use bitcoin::secp256k1::PublicKey;
2121

@@ -24,24 +24,19 @@ use bitcoin::secp256k1::PublicKey;
2424
/// [`LiquidityManager`]: crate::LiquidityManager
2525
pub struct MessageQueue {
2626
queue: Mutex<VecDeque<(PublicKey, LSPSMessage)>>,
27-
pending_msgs_notifier: Notifier,
27+
pending_msgs_notifier: Arc<Notifier>,
2828
}
2929

3030
impl MessageQueue {
31-
pub(crate) fn new() -> Self {
31+
pub(crate) fn new(pending_msgs_notifier: Arc<Notifier>) -> Self {
3232
let queue = Mutex::new(VecDeque::new());
33-
let pending_msgs_notifier = Notifier::new();
3433
Self { queue, pending_msgs_notifier }
3534
}
3635

3736
pub(crate) fn get_and_clear_pending_msgs(&self) -> Vec<(PublicKey, LSPSMessage)> {
3837
self.queue.lock().unwrap().drain(..).collect()
3938
}
4039

41-
pub(crate) fn get_pending_msgs_future(&self) -> Future {
42-
self.pending_msgs_notifier.get_future()
43-
}
44-
4540
pub(crate) fn notifier(&self) -> MessageQueueNotifierGuard<'_> {
4641
MessageQueueNotifierGuard { msg_queue: self, buffer: VecDeque::new() }
4742
}

0 commit comments

Comments
 (0)