Skip to content

Commit 6a01d75

Browse files
committed
Read persisted event queue state in LiquidityManager::new
We read any previously-persisted state upon construction of `LiquidityManager`.
1 parent aceb3c1 commit 6a01d75

File tree

6 files changed

+49
-8
lines changed

6 files changed

+49
-8
lines changed

lightning-liquidity/src/events/event_queue.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ impl<K: Deref + Clone> EventQueue<K>
3939
where
4040
K::Target: KVStore,
4141
{
42-
pub fn new(kv_store: K) -> Self {
43-
let queue = Arc::new(Mutex::new(VecDeque::new()));
42+
pub fn new(queue: VecDeque<LiquidityEvent>, kv_store: K) -> Self {
43+
let queue = Arc::new(Mutex::new(queue));
4444
let waker = Arc::new(Mutex::new(None));
4545
Self {
4646
queue,
@@ -266,7 +266,7 @@ mod tests {
266266
use std::time::Duration;
267267

268268
let kv_store = Arc::new(KVStoreSyncWrapper(Arc::new(TestStore::new(false))));
269-
let event_queue = Arc::new(EventQueue::new(kv_store));
269+
let event_queue = Arc::new(EventQueue::new(VecDeque::new(), kv_store));
270270
assert_eq!(event_queue.next_event(), None);
271271

272272
let secp_ctx = Secp256k1::new();

lightning-liquidity/src/events/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
1818
mod event_queue;
1919

20-
pub(crate) use event_queue::EventQueue;
2120
pub use event_queue::MAX_EVENT_QUEUE_SIZE;
21+
pub(crate) use event_queue::{EventQueue, EventQueueDeserWrapper};
2222

2323
use crate::lsps0;
2424
use crate::lsps1;

lightning-liquidity/src/lsps0/client.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ where
117117

118118
#[cfg(test)]
119119
mod tests {
120+
use alloc::collections::VecDeque;
120121
use alloc::string::ToString;
121122
use alloc::sync::Arc;
122123

@@ -133,7 +134,7 @@ mod tests {
133134
let pending_messages = Arc::new(MessageQueue::new());
134135
let entropy_source = Arc::new(TestEntropy {});
135136
let kv_store = Arc::new(KVStoreSyncWrapper(Arc::new(TestStore::new(false))));
136-
let event_queue = Arc::new(EventQueue::new(kv_store));
137+
let event_queue = Arc::new(EventQueue::new(VecDeque::new(), kv_store));
137138

138139
let lsps0_handler = Arc::new(LSPS0ClientHandler::new(
139140
entropy_source,

lightning-liquidity/src/lsps5/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,7 @@ mod tests {
475475
let message_queue = Arc::new(MessageQueue::new());
476476

477477
let kv_store = Arc::new(KVStoreSyncWrapper(Arc::new(TestStore::new(false))));
478-
let event_queue = Arc::new(EventQueue::new(kv_store));
478+
let event_queue = Arc::new(EventQueue::new(VecDeque::new(), kv_store));
479479
let client = LSPS5ClientHandler::new(
480480
test_entropy_source,
481481
Arc::clone(&message_queue),

lightning-liquidity/src/manager.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ use crate::lsps5::client::{LSPS5ClientConfig, LSPS5ClientHandler};
2424
use crate::lsps5::msgs::LSPS5Message;
2525
use crate::lsps5::service::{LSPS5ServiceConfig, LSPS5ServiceHandler};
2626
use crate::message_queue::MessageQueue;
27-
use crate::persist::{read_lsps2_service_peer_states, read_lsps5_service_peer_states};
27+
use crate::persist::{
28+
read_event_queue, read_lsps2_service_peer_states, read_lsps5_service_peer_states,
29+
};
2830

2931
use crate::lsps1::client::{LSPS1ClientConfig, LSPS1ClientHandler};
3032
use crate::lsps1::msgs::LSPS1Message;
@@ -383,7 +385,8 @@ where
383385
client_config: Option<LiquidityClientConfig>, time_provider: TP,
384386
) -> Result<Self, lightning::io::Error> {
385387
let pending_messages = Arc::new(MessageQueue::new());
386-
let pending_events = Arc::new(EventQueue::new(kv_store.clone()));
388+
let persisted_queue = read_event_queue(kv_store.clone()).await?.unwrap_or_default();
389+
let pending_events = Arc::new(EventQueue::new(persisted_queue, kv_store.clone()));
387390
let ignored_peers = RwLock::new(new_hash_set());
388391

389392
let mut supported_protocols = Vec::new();

lightning-liquidity/src/persist.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
//! Types and utils for persistence.
1111
12+
use crate::events::{EventQueueDeserWrapper, LiquidityEvent};
1213
use crate::lsps2::service::PeerState as LSPS2ServicePeerState;
1314
use crate::lsps5::service::PeerState as LSPS5ServicePeerState;
1415
use crate::prelude::{new_hash_map, HashMap};
@@ -20,6 +21,8 @@ use lightning::util::ser::Readable;
2021

2122
use bitcoin::secp256k1::PublicKey;
2223

24+
use alloc::collections::VecDeque;
25+
2326
use core::ops::Deref;
2427
use core::str::FromStr;
2528

@@ -48,6 +51,40 @@ pub const LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE: &str = "lsps2_service";
4851
/// [`LSPS5ServiceHandler`]: crate::lsps5::service::LSPS5ServiceHandler
4952
pub const LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE: &str = "lsps5_service";
5053

54+
pub(crate) async fn read_event_queue<K: Deref>(
55+
kv_store: K,
56+
) -> Result<Option<VecDeque<LiquidityEvent>>, lightning::io::Error>
57+
where
58+
K::Target: KVStore,
59+
{
60+
let read_fut = kv_store.read(
61+
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
62+
LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
63+
LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_KEY,
64+
);
65+
66+
let mut reader = match read_fut.await {
67+
Ok(r) => Cursor::new(r),
68+
Err(e) => {
69+
if e.kind() == lightning::io::ErrorKind::NotFound {
70+
// Key wasn't found, no error but first time running.
71+
return Ok(None);
72+
} else {
73+
return Err(e);
74+
}
75+
},
76+
};
77+
78+
let queue: EventQueueDeserWrapper = Readable::read(&mut reader).map_err(|_| {
79+
lightning::io::Error::new(
80+
lightning::io::ErrorKind::InvalidData,
81+
"Failed to deserialize liquidity event queue",
82+
)
83+
})?;
84+
85+
Ok(Some(queue.0))
86+
}
87+
5188
pub(crate) async fn read_lsps2_service_peer_states<K: Deref>(
5289
kv_store: K,
5390
) -> Result<HashMap<PublicKey, Mutex<LSPS2ServicePeerState>>, lightning::io::Error>

0 commit comments

Comments
 (0)