Skip to content

Commit dea8c88

Browse files
committed
Read persisted LSPS2 service state in LiquidityManager::new
We read any previously-persisted state upon construction of `LiquidityManager`.
1 parent b889ca6 commit dea8c88

File tree

6 files changed

+166
-41
lines changed

6 files changed

+166
-41
lines changed

fuzz/src/lsps_message.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ pub fn do_test(data: &[u8]) {
8686
kv_store,
8787
None,
8888
None,
89-
));
89+
).unwrap());
9090
let mut reader = data;
9191
if let Ok(Some(msg)) = liquidity_manager.read(LSPS_MESSAGE_TYPE_ID, &mut reader) {
9292
let secp = Secp256k1::signing_only();

lightning-background-processor/src/lib.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2370,16 +2370,19 @@ mod tests {
23702370
Arc::clone(&logger),
23712371
Arc::clone(&keys_manager),
23722372
));
2373-
let liquidity_manager = Arc::new(LiquidityManagerSync::new(
2374-
Arc::clone(&keys_manager),
2375-
Arc::clone(&keys_manager),
2376-
Arc::clone(&manager),
2377-
None,
2378-
None,
2379-
Arc::clone(&kv_store),
2380-
None,
2381-
None,
2382-
));
2373+
let liquidity_manager = Arc::new(
2374+
LiquidityManagerSync::new(
2375+
Arc::clone(&keys_manager),
2376+
Arc::clone(&keys_manager),
2377+
Arc::clone(&manager),
2378+
None,
2379+
None,
2380+
Arc::clone(&kv_store),
2381+
None,
2382+
None,
2383+
)
2384+
.unwrap(),
2385+
);
23832386
let node = Node {
23842387
node: manager,
23852388
p2p_gossip_sync,

lightning-liquidity/src/lsps2/service.rs

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ impl OutboundJITChannel {
465465
}
466466
}
467467

468-
struct PeerState {
468+
pub(crate) struct PeerState {
469469
outbound_channels_by_intercept_scid: HashMap<u64, OutboundJITChannel>,
470470
intercept_scid_by_user_channel_id: HashMap<u128, u64>,
471471
intercept_scid_by_channel_id: HashMap<ChannelId, u64>,
@@ -592,20 +592,48 @@ where
592592
{
593593
/// Constructs a `LSPS2ServiceHandler`.
594594
pub(crate) fn new(
595-
pending_messages: Arc<MessageQueue>, pending_events: Arc<EventQueue<K>>,
596-
channel_manager: CM, kv_store: K, config: LSPS2ServiceConfig,
597-
) -> Self {
598-
Self {
595+
per_peer_state: HashMap<PublicKey, Mutex<PeerState>>, pending_messages: Arc<MessageQueue>,
596+
pending_events: Arc<EventQueue<K>>, channel_manager: CM, kv_store: K,
597+
config: LSPS2ServiceConfig,
598+
) -> Result<Self, lightning::io::Error> {
599+
let mut peer_by_intercept_scid = new_hash_map();
600+
let mut peer_by_channel_id = new_hash_map();
601+
for (node_id, peer_state) in per_peer_state.iter() {
602+
let peer_state_lock = peer_state.lock().unwrap();
603+
for (intercept_scid, _) in peer_state_lock.outbound_channels_by_intercept_scid.iter() {
604+
let res = peer_by_intercept_scid.insert(*intercept_scid, *node_id);
605+
debug_assert!(res.is_none(), "Intercept SCIDs should never collide");
606+
if res.is_some() {
607+
return Err(lightning::io::Error::new(
608+
lightning::io::ErrorKind::InvalidData,
609+
"Failed to read LSPS2 peer state due to data inconsistencies: Intercept SCIDs should never collide",
610+
));
611+
}
612+
}
613+
614+
for (channel_id, _) in peer_state_lock.intercept_scid_by_channel_id.iter() {
615+
let res = peer_by_channel_id.insert(*channel_id, *node_id);
616+
debug_assert!(res.is_none(), "Channel IDs should never collide");
617+
if res.is_some() {
618+
return Err(lightning::io::Error::new(
619+
lightning::io::ErrorKind::InvalidData,
620+
"Failed to read LSPS2 peer state due to data inconsistencies: Channel IDs should never collide",
621+
));
622+
}
623+
}
624+
}
625+
626+
Ok(Self {
599627
pending_messages,
600628
pending_events,
601-
per_peer_state: RwLock::new(new_hash_map()),
602-
peer_by_intercept_scid: RwLock::new(new_hash_map()),
603-
peer_by_channel_id: RwLock::new(new_hash_map()),
629+
per_peer_state: RwLock::new(per_peer_state),
630+
peer_by_intercept_scid: RwLock::new(peer_by_intercept_scid),
631+
peer_by_channel_id: RwLock::new(peer_by_channel_id),
604632
total_pending_requests: AtomicUsize::new(0),
605633
channel_manager,
606634
kv_store,
607635
config,
608-
}
636+
})
609637
}
610638

611639
/// Returns a reference to the used config.

lightning-liquidity/src/manager.rs

Lines changed: 54 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use crate::lsps5::client::{LSPS5ClientConfig, LSPS5ClientHandler};
2424
use crate::lsps5::msgs::LSPS5Message;
2525
use crate::lsps5::service::{LSPS5ServiceConfig, LSPS5ServiceHandler};
2626
use crate::message_queue::MessageQueue;
27+
use crate::persist::read_lsps2_service_peer_states;
2728

2829
use crate::lsps1::client::{LSPS1ClientConfig, LSPS1ClientHandler};
2930
use crate::lsps1::msgs::LSPS1Message;
@@ -327,12 +328,14 @@ where
327328
K::Target: KVStore,
328329
{
329330
/// Constructor for the [`LiquidityManager`] using the default system clock
330-
pub fn new(
331+
///
332+
/// Will read persisted service states from the given [`KVStore`].
333+
pub async fn new(
331334
entropy_source: ES, node_signer: NS, channel_manager: CM, chain_source: Option<C>,
332335
chain_params: Option<ChainParameters>, kv_store: K,
333336
service_config: Option<LiquidityServiceConfig>,
334337
client_config: Option<LiquidityClientConfig>,
335-
) -> Self {
338+
) -> Result<Self, lightning::io::Error> {
336339
let time_provider = Arc::new(DefaultTimeProvider);
337340
Self::new_with_custom_time_provider(
338341
entropy_source,
@@ -345,6 +348,7 @@ where
345348
client_config,
346349
time_provider,
347350
)
351+
.await
348352
}
349353
}
350354

@@ -366,16 +370,18 @@ where
366370
{
367371
/// Constructor for the [`LiquidityManager`] with a custom time provider.
368372
///
373+
/// Will read persisted service states from the given [`KVStore`].
374+
///
369375
/// This should be used on non-std platforms where access to the system time is not
370376
/// available.
371377
/// Sets up the required protocol message handlers based on the given
372378
/// [`LiquidityClientConfig`] and [`LiquidityServiceConfig`].
373-
pub fn new_with_custom_time_provider(
379+
pub async fn new_with_custom_time_provider(
374380
entropy_source: ES, node_signer: NS, channel_manager: CM, chain_source: Option<C>,
375381
chain_params: Option<ChainParameters>, kv_store: K,
376382
service_config: Option<LiquidityServiceConfig>,
377383
client_config: Option<LiquidityClientConfig>, time_provider: TP,
378-
) -> Self {
384+
) -> Result<Self, lightning::io::Error> {
379385
let pending_messages = Arc::new(MessageQueue::new());
380386
let pending_events = Arc::new(EventQueue::new(kv_store.clone()));
381387
let ignored_peers = RwLock::new(new_hash_set());
@@ -392,22 +398,30 @@ where
392398
)
393399
})
394400
});
395-
let lsps2_service_handler = service_config.as_ref().and_then(|config| {
396-
config.lsps2_service_config.as_ref().map(|config| {
401+
402+
let lsps2_service_handler = if let Some(service_config) = service_config.as_ref() {
403+
if let Some(lsps2_service_config) = service_config.lsps2_service_config.as_ref() {
397404
if let Some(number) =
398405
<LSPS2ServiceHandler<CM, K> as LSPSProtocolMessageHandler>::PROTOCOL_NUMBER
399406
{
400407
supported_protocols.push(number);
401408
}
402-
LSPS2ServiceHandler::new(
409+
410+
let peer_states = read_lsps2_service_peer_states(kv_store.clone()).await?;
411+
Some(LSPS2ServiceHandler::new(
412+
peer_states,
403413
Arc::clone(&pending_messages),
404414
Arc::clone(&pending_events),
405415
channel_manager.clone(),
406416
kv_store.clone(),
407-
config.clone(),
408-
)
409-
})
410-
});
417+
lsps2_service_config.clone(),
418+
)?)
419+
} else {
420+
None
421+
}
422+
} else {
423+
None
424+
};
411425

412426
let lsps5_client_handler = client_config.as_ref().and_then(|config| {
413427
config.lsps5_client_config.as_ref().map(|config| {
@@ -482,7 +496,7 @@ where
482496
None
483497
};
484498

485-
Self {
499+
Ok(Self {
486500
pending_messages,
487501
pending_events,
488502
request_id_to_method_map: Mutex::new(new_hash_map()),
@@ -500,7 +514,7 @@ where
500514
_client_config: client_config,
501515
best_block: RwLock::new(chain_params.map(|chain_params| chain_params.best_block)),
502516
_chain_source: chain_source,
503-
}
517+
})
504518
}
505519

506520
/// Returns a reference to the LSPS0 client-side handler.
@@ -1038,9 +1052,10 @@ where
10381052
chain_params: Option<ChainParameters>, kv_store_sync: KS,
10391053
service_config: Option<LiquidityServiceConfig>,
10401054
client_config: Option<LiquidityClientConfig>,
1041-
) -> Self {
1055+
) -> Result<Self, lightning::io::Error> {
10421056
let kv_store = Arc::new(KVStoreSyncWrapper(kv_store_sync));
1043-
let inner = Arc::new(LiquidityManager::new(
1057+
1058+
let mut fut = Box::pin(LiquidityManager::new(
10441059
entropy_source,
10451060
node_signer,
10461061
channel_manager,
@@ -1050,7 +1065,17 @@ where
10501065
service_config,
10511066
client_config,
10521067
));
1053-
Self { inner }
1068+
1069+
let mut waker = dummy_waker();
1070+
let mut ctx = task::Context::from_waker(&mut waker);
1071+
let inner = match fut.as_mut().poll(&mut ctx) {
1072+
task::Poll::Ready(result) => result,
1073+
task::Poll::Pending => {
1074+
// In a sync context, we can't wait for the future to complete.
1075+
unreachable!("LiquidityManager::new should not be pending in a sync context");
1076+
},
1077+
}?;
1078+
Ok(Self { inner: Arc::new(inner) })
10541079
}
10551080
}
10561081

@@ -1078,9 +1103,9 @@ where
10781103
chain_params: Option<ChainParameters>, kv_store_sync: KS,
10791104
service_config: Option<LiquidityServiceConfig>,
10801105
client_config: Option<LiquidityClientConfig>, time_provider: TP,
1081-
) -> Self {
1106+
) -> Result<Self, lightning::io::Error> {
10821107
let kv_store = Arc::new(KVStoreSyncWrapper(kv_store_sync));
1083-
let inner = Arc::new(LiquidityManager::new_with_custom_time_provider(
1108+
let mut fut = Box::pin(LiquidityManager::new_with_custom_time_provider(
10841109
entropy_source,
10851110
node_signer,
10861111
channel_manager,
@@ -1091,7 +1116,17 @@ where
10911116
client_config,
10921117
time_provider,
10931118
));
1094-
Self { inner }
1119+
1120+
let mut waker = dummy_waker();
1121+
let mut ctx = task::Context::from_waker(&mut waker);
1122+
let inner = match fut.as_mut().poll(&mut ctx) {
1123+
task::Poll::Ready(result) => result,
1124+
task::Poll::Pending => {
1125+
// In a sync context, we can't wait for the future to complete.
1126+
unreachable!("LiquidityManager::new should not be pending in a sync context");
1127+
},
1128+
}?;
1129+
Ok(Self { inner: Arc::new(inner) })
10951130
}
10961131

10971132
/// Returns a reference to the LSPS0 client-side handler.

lightning-liquidity/src/persist.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,19 @@
99

1010
//! Types and utils for persistence.
1111
12+
use crate::lsps2::service::PeerState as LSPS2ServicePeerState;
13+
use crate::prelude::{new_hash_map, HashMap};
14+
use crate::sync::Mutex;
15+
16+
use lightning::io::Cursor;
17+
use lightning::util::persist::KVStore;
18+
use lightning::util::ser::Readable;
19+
20+
use bitcoin::secp256k1::PublicKey;
21+
22+
use core::ops::Deref;
23+
use core::str::FromStr;
24+
1225
/// The primary namespace under which the [`LiquidityManager`] will be persisted.
1326
///
1427
/// [`LiquidityManager`]: crate::LiquidityManager
@@ -33,3 +46,47 @@ pub const LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE: &str = "lsps2_service";
3346
///
3447
/// [`LSPS5ServiceHandler`]: crate::lsps5::service::LSPS5ServiceHandler
3548
pub const LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE: &str = "lsps5_service";
49+
50+
pub(crate) async fn read_lsps2_service_peer_states<K: Deref>(
51+
kv_store: K,
52+
) -> Result<HashMap<PublicKey, Mutex<LSPS2ServicePeerState>>, lightning::io::Error>
53+
where
54+
K::Target: KVStore,
55+
{
56+
let mut res = new_hash_map();
57+
58+
for stored_key in kv_store
59+
.list(
60+
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
61+
LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
62+
)
63+
.await?
64+
{
65+
let mut reader = Cursor::new(
66+
kv_store
67+
.read(
68+
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
69+
LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
70+
&stored_key,
71+
)
72+
.await?,
73+
);
74+
75+
let peer_state = LSPS2ServicePeerState::read(&mut reader).map_err(|_| {
76+
lightning::io::Error::new(
77+
lightning::io::ErrorKind::InvalidData,
78+
"Failed to deserialize LSPS2 peer state",
79+
)
80+
})?;
81+
82+
let key = PublicKey::from_str(&stored_key).map_err(|_| {
83+
lightning::io::Error::new(
84+
lightning::io::ErrorKind::InvalidData,
85+
"Failed to deserialize stored key entry",
86+
)
87+
})?;
88+
89+
res.insert(key, Mutex::new(peer_state));
90+
}
91+
Ok(res)
92+
}

lightning-liquidity/tests/common/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ pub(crate) fn create_service_and_client_nodes<'a, 'b, 'c>(
3838
Some(service_config),
3939
None,
4040
Arc::clone(&time_provider),
41-
);
41+
)
42+
.unwrap();
4243

4344
let client_kv_store = Arc::new(TestStore::new(false));
4445
let client_lm = LiquidityManagerSync::new_with_custom_time_provider(
@@ -51,7 +52,8 @@ pub(crate) fn create_service_and_client_nodes<'a, 'b, 'c>(
5152
None,
5253
Some(client_config),
5354
time_provider,
54-
);
55+
)
56+
.unwrap();
5557

5658
let mut iter = nodes.into_iter();
5759
let service_node = LiquidityNode::new(iter.next().unwrap(), service_lm);

0 commit comments

Comments
 (0)