Skip to content

Commit 179420c

Browse files
committed
Skip EventQueue persistence if unnecessary
.. we only persist the event queue if necessary and wake the BP to do so when something changes.
1 parent 5fb21bd commit 179420c

File tree

4 files changed

+117
-32
lines changed

4 files changed

+117
-32
lines changed

lightning-liquidity/src/events/event_queue.rs

Lines changed: 108 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use lightning::util::persist::KVStore;
2020
use lightning::util::ser::{
2121
BigSize, CollectionLength, FixedLengthReader, Readable, Writeable, Writer,
2222
};
23+
use lightning::util::wakers::Notifier;
2324

2425
/// The maximum queue size we allow before starting to drop events.
2526
pub const MAX_EVENT_QUEUE_SIZE: usize = 1000;
@@ -28,50 +29,73 @@ pub(crate) struct EventQueue<K: Deref + Clone>
2829
where
2930
K::Target: KVStore,
3031
{
31-
queue: Arc<Mutex<VecDeque<LiquidityEvent>>>,
32+
state: Arc<Mutex<QueueState>>,
3233
waker: Arc<Mutex<Option<Waker>>>,
3334
#[cfg(feature = "std")]
3435
condvar: Arc<crate::sync::Condvar>,
3536
kv_store: K,
37+
persist_notifier: Arc<Notifier>,
3638
}
3739

3840
impl<K: Deref + Clone> EventQueue<K>
3941
where
4042
K::Target: KVStore,
4143
{
42-
pub fn new(queue: VecDeque<LiquidityEvent>, kv_store: K) -> Self {
43-
let queue = Arc::new(Mutex::new(queue));
44+
pub fn new(
45+
queue: VecDeque<LiquidityEvent>, kv_store: K, persist_notifier: Arc<Notifier>,
46+
) -> Self {
47+
let state = Arc::new(Mutex::new(QueueState { queue, needs_persist: false }));
4448
let waker = Arc::new(Mutex::new(None));
4549
Self {
46-
queue,
50+
state,
4751
waker,
4852
#[cfg(feature = "std")]
4953
condvar: Arc::new(crate::sync::Condvar::new()),
5054
kv_store,
55+
persist_notifier,
5156
}
5257
}
5358

5459
pub fn next_event(&self) -> Option<LiquidityEvent> {
55-
self.queue.lock().unwrap().pop_front()
60+
let event_opt = {
61+
let mut state_lock = self.state.lock().unwrap();
62+
if state_lock.queue.is_empty() {
63+
// Skip notifying below if nothing changed.
64+
return None;
65+
}
66+
67+
state_lock.needs_persist = true;
68+
state_lock.queue.pop_front()
69+
};
70+
71+
self.persist_notifier.notify();
72+
73+
event_opt
5674
}
5775

5876
pub async fn next_event_async(&self) -> LiquidityEvent {
59-
EventFuture { event_queue: Arc::clone(&self.queue), waker: Arc::clone(&self.waker) }.await
77+
EventFuture {
78+
queue_state: Arc::clone(&self.state),
79+
waker: Arc::clone(&self.waker),
80+
persist_notifier: Arc::clone(&self.persist_notifier),
81+
}
82+
.await
6083
}
6184

6285
#[cfg(feature = "std")]
6386
pub fn wait_next_event(&self) -> LiquidityEvent {
64-
let mut queue = self
87+
let mut state_lock = self
6588
.condvar
66-
.wait_while(self.queue.lock().unwrap(), |queue: &mut VecDeque<LiquidityEvent>| {
67-
queue.is_empty()
89+
.wait_while(self.state.lock().unwrap(), |state_lock: &mut QueueState| {
90+
state_lock.queue.is_empty()
6891
})
6992
.unwrap();
7093

71-
let event = queue.pop_front().expect("non-empty queue");
72-
let should_notify = !queue.is_empty();
94+
let event = state_lock.queue.pop_front().expect("non-empty queue");
95+
let should_notify = !state_lock.queue.is_empty();
96+
state_lock.needs_persist = true;
7397

74-
drop(queue);
98+
drop(state_lock);
7599

76100
if should_notify {
77101
if let Some(waker) = self.waker.lock().unwrap().take() {
@@ -81,11 +105,28 @@ where
81105
self.condvar.notify_one();
82106
}
83107

108+
self.persist_notifier.notify();
109+
84110
event
85111
}
86112

87113
pub fn get_and_clear_pending_events(&self) -> Vec<LiquidityEvent> {
88-
self.queue.lock().unwrap().split_off(0).into()
114+
let mut state_lock = self.state.lock().unwrap();
115+
116+
let needs_persist = !state_lock.queue.is_empty();
117+
let events = state_lock.queue.split_off(0).into();
118+
119+
if needs_persist {
120+
state_lock.needs_persist = true;
121+
}
122+
123+
drop(state_lock);
124+
125+
if needs_persist {
126+
self.persist_notifier.notify();
127+
}
128+
129+
events
89130
}
90131

91132
// Returns an [`EventQueueNotifierGuard`] that will notify about new event when dropped.
@@ -94,20 +135,38 @@ where
94135
}
95136

96137
pub async fn persist(&self) -> Result<(), lightning::io::Error> {
97-
let queue = self.queue.lock().unwrap();
98-
let encoded = EventQueueSerWrapper(&queue).encode();
138+
let fut = {
139+
let mut state_lock = self.state.lock().unwrap();
140+
141+
if !state_lock.needs_persist {
142+
return Ok(());
143+
}
99144

100-
self.kv_store
101-
.write(
145+
state_lock.needs_persist = false;
146+
let encoded = EventQueueSerWrapper(&state_lock.queue).encode();
147+
148+
self.kv_store.write(
102149
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
103150
LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
104151
LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_KEY,
105152
encoded,
106153
)
107-
.await
154+
};
155+
156+
fut.await.map_err(|e| {
157+
self.state.lock().unwrap().needs_persist = true;
158+
e
159+
})?;
160+
161+
Ok(())
108162
}
109163
}
110164

165+
struct QueueState {
166+
queue: VecDeque<LiquidityEvent>,
167+
needs_persist: bool,
168+
}
169+
111170
// A guard type that will notify about new events when dropped.
112171
#[must_use]
113172
pub(crate) struct EventQueueNotifierGuard<'a, K: Deref + Clone>(&'a EventQueue<K>)
@@ -119,9 +178,10 @@ where
119178
K::Target: KVStore,
120179
{
121180
pub fn enqueue<E: Into<LiquidityEvent>>(&self, event: E) {
122-
let mut queue = self.0.queue.lock().unwrap();
123-
if queue.len() < MAX_EVENT_QUEUE_SIZE {
124-
queue.push_back(event.into());
181+
let mut state_lock = self.0.state.lock().unwrap();
182+
if state_lock.queue.len() < MAX_EVENT_QUEUE_SIZE {
183+
state_lock.queue.push_back(event.into());
184+
state_lock.needs_persist = true;
125185
} else {
126186
return;
127187
}
@@ -133,7 +193,10 @@ where
133193
K::Target: KVStore,
134194
{
135195
fn drop(&mut self) {
136-
let should_notify = !self.0.queue.lock().unwrap().is_empty();
196+
let (should_notify, should_persist_notify) = {
197+
let state_lock = self.0.state.lock().unwrap();
198+
(!state_lock.queue.is_empty(), state_lock.needs_persist)
199+
};
137200

138201
if should_notify {
139202
if let Some(waker) = self.0.waker.lock().unwrap().take() {
@@ -143,12 +206,17 @@ where
143206
#[cfg(feature = "std")]
144207
self.0.condvar.notify_one();
145208
}
209+
210+
if should_persist_notify {
211+
self.0.persist_notifier.notify();
212+
}
146213
}
147214
}
148215

149216
struct EventFuture {
150-
event_queue: Arc<Mutex<VecDeque<LiquidityEvent>>>,
217+
queue_state: Arc<Mutex<QueueState>>,
151218
waker: Arc<Mutex<Option<Waker>>>,
219+
persist_notifier: Arc<Notifier>,
152220
}
153221

154222
impl Future for EventFuture {
@@ -157,12 +225,22 @@ impl Future for EventFuture {
157225
fn poll(
158226
self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>,
159227
) -> core::task::Poll<Self::Output> {
160-
if let Some(event) = self.event_queue.lock().unwrap().pop_front() {
161-
Poll::Ready(event)
162-
} else {
163-
*self.waker.lock().unwrap() = Some(cx.waker().clone());
164-
Poll::Pending
228+
let (res, should_persist_notify) = {
229+
let mut state_lock = self.queue_state.lock().unwrap();
230+
if let Some(event) = state_lock.queue.pop_front() {
231+
state_lock.needs_persist = true;
232+
(Poll::Ready(event), true)
233+
} else {
234+
*self.waker.lock().unwrap() = Some(cx.waker().clone());
235+
(Poll::Pending, false)
236+
}
237+
};
238+
239+
if should_persist_notify {
240+
self.persist_notifier.notify();
165241
}
242+
243+
res
166244
}
167245
}
168246

@@ -266,7 +344,8 @@ mod tests {
266344
use std::time::Duration;
267345

268346
let kv_store = Arc::new(KVStoreSyncWrapper(Arc::new(TestStore::new(false))));
269-
let event_queue = Arc::new(EventQueue::new(VecDeque::new(), kv_store));
347+
let persist_notifier = Arc::new(Notifier::new());
348+
let event_queue = Arc::new(EventQueue::new(VecDeque::new(), kv_store, persist_notifier));
270349
assert_eq!(event_queue.next_event(), None);
271350

272351
let secp_ctx = Secp256k1::new();

lightning-liquidity/src/lsps0/client.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,8 @@ mod tests {
136136
let pending_messages = Arc::new(MessageQueue::new(notifier));
137137
let entropy_source = Arc::new(TestEntropy {});
138138
let kv_store = Arc::new(KVStoreSyncWrapper(Arc::new(TestStore::new(false))));
139-
let event_queue = Arc::new(EventQueue::new(VecDeque::new(), kv_store));
139+
let persist_notifier = Arc::new(Notifier::new());
140+
let event_queue = Arc::new(EventQueue::new(VecDeque::new(), kv_store, persist_notifier));
140141

141142
let lsps0_handler = Arc::new(LSPS0ClientHandler::new(
142143
entropy_source,

lightning-liquidity/src/lsps5/client.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,8 @@ mod tests {
477477
let message_queue = Arc::new(MessageQueue::new(notifier));
478478

479479
let kv_store = Arc::new(KVStoreSyncWrapper(Arc::new(TestStore::new(false))));
480-
let event_queue = Arc::new(EventQueue::new(VecDeque::new(), kv_store));
480+
let persist_notifier = Arc::new(Notifier::new());
481+
let event_queue = Arc::new(EventQueue::new(VecDeque::new(), kv_store, persist_notifier));
481482
let client = LSPS5ClientHandler::new(
482483
test_entropy_source,
483484
Arc::clone(&message_queue),

lightning-liquidity/src/manager.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,11 @@ where
389389
let pending_messages =
390390
Arc::new(MessageQueue::new(Arc::clone(&pending_msgs_or_needs_persist_notifier)));
391391
let persisted_queue = read_event_queue(kv_store.clone()).await?.unwrap_or_default();
392-
let pending_events = Arc::new(EventQueue::new(persisted_queue, kv_store.clone()));
392+
let pending_events = Arc::new(EventQueue::new(
393+
persisted_queue,
394+
kv_store.clone(),
395+
Arc::clone(&pending_msgs_or_needs_persist_notifier),
396+
));
393397
let ignored_peers = RwLock::new(new_hash_set());
394398

395399
let mut supported_protocols = Vec::new();

0 commit comments

Comments
 (0)