Skip to content

Commit b889ca6

Browse files
committed
Add EventQueue persistence
We add simple `persist` call to `EventQueue` that persists it under a `event_queue` key.
1 parent e90e128 commit b889ca6

File tree

16 files changed

+343
-68
lines changed

16 files changed

+343
-68
lines changed

lightning-liquidity/src/events/event_queue.rs

Lines changed: 141 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,53 @@
11
use super::LiquidityEvent;
2+
3+
use crate::lsps2::event::LSPS2ServiceEvent;
4+
use crate::persist::{
5+
LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_KEY,
6+
LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
7+
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
8+
};
29
use crate::sync::{Arc, Mutex};
310

411
use alloc::collections::VecDeque;
512
use alloc::vec::Vec;
613

714
use core::future::Future;
15+
use core::ops::Deref;
816
use core::task::{Poll, Waker};
917

18+
use lightning::ln::msgs::DecodeError;
19+
use lightning::util::persist::KVStore;
20+
use lightning::util::ser::{
21+
BigSize, CollectionLength, FixedLengthReader, Readable, Writeable, Writer,
22+
};
23+
1024
/// The maximum queue size we allow before starting to drop events.
1125
pub const MAX_EVENT_QUEUE_SIZE: usize = 1000;
1226

13-
pub(crate) struct EventQueue {
27+
pub(crate) struct EventQueue<K: Deref + Clone>
28+
where
29+
K::Target: KVStore,
30+
{
1431
queue: Arc<Mutex<VecDeque<LiquidityEvent>>>,
1532
waker: Arc<Mutex<Option<Waker>>>,
1633
#[cfg(feature = "std")]
1734
condvar: Arc<crate::sync::Condvar>,
35+
kv_store: K,
1836
}
1937

20-
impl EventQueue {
21-
pub fn new() -> Self {
38+
impl<K: Deref + Clone> EventQueue<K>
39+
where
40+
K::Target: KVStore,
41+
{
42+
pub fn new(kv_store: K) -> Self {
2243
let queue = Arc::new(Mutex::new(VecDeque::new()));
2344
let waker = Arc::new(Mutex::new(None));
2445
Self {
2546
queue,
2647
waker,
2748
#[cfg(feature = "std")]
2849
condvar: Arc::new(crate::sync::Condvar::new()),
50+
kv_store,
2951
}
3052
}
3153

@@ -67,16 +89,35 @@ impl EventQueue {
6789
}
6890

6991
// Returns an [`EventQueueNotifierGuard`] that will notify about new event when dropped.
70-
pub fn notifier(&self) -> EventQueueNotifierGuard<'_> {
92+
pub fn notifier(&self) -> EventQueueNotifierGuard<'_, K> {
7193
EventQueueNotifierGuard(self)
7294
}
95+
96+
pub async fn persist(&self) -> Result<(), lightning::io::Error> {
97+
let queue = self.queue.lock().unwrap();
98+
let encoded = EventQueueSerWrapper(&queue).encode();
99+
100+
self.kv_store
101+
.write(
102+
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
103+
LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
104+
LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_KEY,
105+
encoded,
106+
)
107+
.await
108+
}
73109
}
74110

75111
// A guard type that will notify about new events when dropped.
76112
#[must_use]
77-
pub(crate) struct EventQueueNotifierGuard<'a>(&'a EventQueue);
78-
79-
impl<'a> EventQueueNotifierGuard<'a> {
113+
pub(crate) struct EventQueueNotifierGuard<'a, K: Deref + Clone>(&'a EventQueue<K>)
114+
where
115+
K::Target: KVStore;
116+
117+
impl<'a, K: Deref + Clone> EventQueueNotifierGuard<'a, K>
118+
where
119+
K::Target: KVStore,
120+
{
80121
pub fn enqueue<E: Into<LiquidityEvent>>(&self, event: E) {
81122
let mut queue = self.0.queue.lock().unwrap();
82123
if queue.len() < MAX_EVENT_QUEUE_SIZE {
@@ -87,7 +128,10 @@ impl<'a> EventQueueNotifierGuard<'a> {
87128
}
88129
}
89130

90-
impl<'a> Drop for EventQueueNotifierGuard<'a> {
131+
impl<'a, K: Deref + Clone> Drop for EventQueueNotifierGuard<'a, K>
132+
where
133+
K::Target: KVStore,
134+
{
91135
fn drop(&mut self) {
92136
let should_notify = !self.0.queue.lock().unwrap().is_empty();
93137

@@ -122,6 +166,91 @@ impl Future for EventFuture {
122166
}
123167
}
124168

169+
pub(crate) struct EventQueueDeserWrapper(pub VecDeque<LiquidityEvent>);
170+
171+
impl Readable for EventQueueDeserWrapper {
172+
fn read<R: lightning::io::Read>(reader: &mut R) -> Result<Self, DecodeError> {
173+
let len: CollectionLength = Readable::read(reader)?;
174+
let mut queue = VecDeque::with_capacity(len.0 as usize);
175+
for _ in 0..len.0 {
176+
let event = match Readable::read(reader)? {
177+
0u8 => {
178+
let ev = Readable::read(reader)?;
179+
LiquidityEvent::LSPS2Service(ev)
180+
},
181+
2u8 => {
182+
let ev = Readable::read(reader)?;
183+
LiquidityEvent::LSPS5Service(ev)
184+
},
185+
x if x % 2 == 1 => {
186+
// If the event is of unknown type, assume it was written with `write_tlv_fields`,
187+
// which prefixes the whole thing with a length BigSize. Because the event is
188+
// odd-type unknown, we should treat it as `Ok(None)` even if it has some TLV
189+
// fields that are even. Thus, we avoid using `read_tlv_fields` and simply read
190+
// exactly the number of bytes specified, ignoring them entirely.
191+
let tlv_len: BigSize = Readable::read(reader)?;
192+
FixedLengthReader::new(reader, tlv_len.0)
193+
.eat_remaining()
194+
.map_err(|_| DecodeError::ShortRead)?;
195+
continue;
196+
},
197+
_ => return Err(DecodeError::InvalidValue),
198+
};
199+
queue.push_back(event);
200+
}
201+
Ok(Self(queue))
202+
}
203+
}
204+
205+
struct EventQueueSerWrapper<'a>(&'a VecDeque<LiquidityEvent>);
206+
207+
impl Writeable for EventQueueSerWrapper<'_> {
208+
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), lightning::io::Error> {
209+
let maybe_process_event = |event: &LiquidityEvent,
210+
writer: Option<&mut W>|
211+
-> Result<bool, lightning::io::Error> {
212+
match event {
213+
LiquidityEvent::LSPS2Service(event) => {
214+
if matches!(event, LSPS2ServiceEvent::GetInfo { .. })
215+
|| matches!(event, LSPS2ServiceEvent::BuyRequest { .. })
216+
{
217+
// Skip persisting GetInfoRequest and BuyRequest events as we prune the pending
218+
// request state currently anyways.
219+
Ok(false)
220+
} else {
221+
if let Some(writer) = writer {
222+
0u8.write(writer)?;
223+
event.write(writer)?;
224+
}
225+
Ok(true)
226+
}
227+
},
228+
LiquidityEvent::LSPS5Service(event) => {
229+
if let Some(writer) = writer {
230+
2u8.write(writer)?;
231+
event.write(writer)?;
232+
}
233+
Ok(true)
234+
},
235+
_ => Ok(false),
236+
}
237+
};
238+
239+
let mut persisted_events_len = 0;
240+
for e in self.0.iter() {
241+
if maybe_process_event(e, None)? {
242+
persisted_events_len += 1;
243+
}
244+
}
245+
246+
CollectionLength(persisted_events_len).write(writer)?;
247+
for e in self.0.iter() {
248+
maybe_process_event(e, Some(writer))?;
249+
}
250+
Ok(())
251+
}
252+
}
253+
125254
#[cfg(test)]
126255
mod tests {
127256
#[tokio::test]
@@ -131,10 +260,13 @@ mod tests {
131260
use crate::lsps0::event::LSPS0ClientEvent;
132261
use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
133262
use core::sync::atomic::{AtomicU16, Ordering};
263+
use lightning::util::persist::KVStoreSyncWrapper;
264+
use lightning::util::test_utils::TestStore;
134265
use std::sync::Arc;
135266
use std::time::Duration;
136267

137-
let event_queue = Arc::new(EventQueue::new());
268+
let kv_store = Arc::new(KVStoreSyncWrapper(Arc::new(TestStore::new(false))));
269+
let event_queue = Arc::new(EventQueue::new(kv_store));
138270
assert_eq!(event_queue.next_event(), None);
139271

140272
let secp_ctx = Secp256k1::new();

lightning-liquidity/src/lsps0/client.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,28 +18,31 @@ use crate::utils;
1818
use lightning::ln::msgs::{ErrorAction, LightningError};
1919
use lightning::sign::EntropySource;
2020
use lightning::util::logger::Level;
21+
use lightning::util::persist::KVStore;
2122

2223
use bitcoin::secp256k1::PublicKey;
2324

2425
use core::ops::Deref;
2526

2627
/// A message handler capable of sending and handling bLIP-50 / LSPS0 messages.
27-
pub struct LSPS0ClientHandler<ES: Deref>
28+
pub struct LSPS0ClientHandler<ES: Deref, K: Deref + Clone>
2829
where
2930
ES::Target: EntropySource,
31+
K::Target: KVStore,
3032
{
3133
entropy_source: ES,
3234
pending_messages: Arc<MessageQueue>,
33-
pending_events: Arc<EventQueue>,
35+
pending_events: Arc<EventQueue<K>>,
3436
}
3537

36-
impl<ES: Deref> LSPS0ClientHandler<ES>
38+
impl<ES: Deref, K: Deref + Clone> LSPS0ClientHandler<ES, K>
3739
where
3840
ES::Target: EntropySource,
41+
K::Target: KVStore,
3942
{
4043
/// Returns a new instance of [`LSPS0ClientHandler`].
4144
pub(crate) fn new(
42-
entropy_source: ES, pending_messages: Arc<MessageQueue>, pending_events: Arc<EventQueue>,
45+
entropy_source: ES, pending_messages: Arc<MessageQueue>, pending_events: Arc<EventQueue<K>>,
4346
) -> Self {
4447
Self { entropy_source, pending_messages, pending_events }
4548
}
@@ -86,9 +89,10 @@ where
8689
}
8790
}
8891

89-
impl<ES: Deref> LSPSProtocolMessageHandler for LSPS0ClientHandler<ES>
92+
impl<ES: Deref, K: Deref + Clone> LSPSProtocolMessageHandler for LSPS0ClientHandler<ES, K>
9093
where
9194
ES::Target: EntropySource,
95+
K::Target: KVStore,
9296
{
9397
type ProtocolMessage = LSPS0Message;
9498
const PROTOCOL_NUMBER: Option<u16> = None;
@@ -113,10 +117,12 @@ where
113117

114118
#[cfg(test)]
115119
mod tests {
116-
117120
use alloc::string::ToString;
118121
use alloc::sync::Arc;
119122

123+
use lightning::util::persist::KVStoreSyncWrapper;
124+
use lightning::util::test_utils::TestStore;
125+
120126
use crate::lsps0::ser::{LSPSMessage, LSPSRequestId};
121127
use crate::tests::utils::{self, TestEntropy};
122128

@@ -126,7 +132,8 @@ mod tests {
126132
fn test_list_protocols() {
127133
let pending_messages = Arc::new(MessageQueue::new());
128134
let entropy_source = Arc::new(TestEntropy {});
129-
let event_queue = Arc::new(EventQueue::new());
135+
let kv_store = Arc::new(KVStoreSyncWrapper(Arc::new(TestStore::new(false))));
136+
let event_queue = Arc::new(EventQueue::new(kv_store));
130137

131138
let lsps0_handler = Arc::new(LSPS0ClientHandler::new(
132139
entropy_source,

lightning-liquidity/src/lsps0/event.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ use alloc::vec::Vec;
1414
use bitcoin::secp256k1::PublicKey;
1515

1616
/// An event which an bLIP-50 / LSPS0 client may want to take some action in response to.
17+
///
18+
/// **Note: ** This event will *not* be persisted across restarts.
1719
#[derive(Clone, Debug, PartialEq, Eq)]
1820
pub enum LSPS0ClientEvent {
1921
/// Information from the LSP about the protocols they support.

lightning-liquidity/src/lsps1/client.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::sync::{Arc, Mutex, RwLock};
2525
use lightning::ln::msgs::{ErrorAction, LightningError};
2626
use lightning::sign::EntropySource;
2727
use lightning::util::logger::Level;
28+
use lightning::util::persist::KVStore;
2829

2930
use bitcoin::secp256k1::PublicKey;
3031
use bitcoin::Address;
@@ -46,25 +47,27 @@ struct PeerState {
4647
}
4748

4849
/// The main object allowing to send and receive bLIP-51 / LSPS1 messages.
49-
pub struct LSPS1ClientHandler<ES: Deref>
50+
pub struct LSPS1ClientHandler<ES: Deref, K: Deref + Clone>
5051
where
5152
ES::Target: EntropySource,
53+
K::Target: KVStore,
5254
{
5355
entropy_source: ES,
5456
pending_messages: Arc<MessageQueue>,
55-
pending_events: Arc<EventQueue>,
57+
pending_events: Arc<EventQueue<K>>,
5658
per_peer_state: RwLock<HashMap<PublicKey, Mutex<PeerState>>>,
5759
config: LSPS1ClientConfig,
5860
}
5961

60-
impl<ES: Deref> LSPS1ClientHandler<ES>
62+
impl<ES: Deref, K: Deref + Clone> LSPS1ClientHandler<ES, K>
6163
where
6264
ES::Target: EntropySource,
65+
K::Target: KVStore,
6366
{
6467
/// Constructs an `LSPS1ClientHandler`.
6568
pub(crate) fn new(
66-
entropy_source: ES, pending_messages: Arc<MessageQueue>, pending_events: Arc<EventQueue>,
67-
config: LSPS1ClientConfig,
69+
entropy_source: ES, pending_messages: Arc<MessageQueue>,
70+
pending_events: Arc<EventQueue<K>>, config: LSPS1ClientConfig,
6871
) -> Self {
6972
Self {
7073
entropy_source,
@@ -429,9 +432,10 @@ where
429432
}
430433
}
431434

432-
impl<ES: Deref> LSPSProtocolMessageHandler for LSPS1ClientHandler<ES>
435+
impl<ES: Deref, K: Deref + Clone> LSPSProtocolMessageHandler for LSPS1ClientHandler<ES, K>
433436
where
434437
ES::Target: EntropySource,
438+
K::Target: KVStore,
435439
{
436440
type ProtocolMessage = LSPS1Message;
437441
const PROTOCOL_NUMBER: Option<u16> = Some(1);

0 commit comments

Comments
 (0)