@@ -17,6 +17,9 @@ use crate::lsps5::msgs::{
1717 SetWebhookRequest , SetWebhookResponse , WebhookNotification , WebhookNotificationMethod ,
1818} ;
1919use crate :: message_queue:: MessageQueue ;
20+ use crate :: persist:: {
21+ LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE , LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE ,
22+ } ;
2023use crate :: prelude:: * ;
2124use crate :: sync:: { Arc , Mutex , RwLock , RwLockWriteGuard } ;
2225use crate :: utils:: time:: TimeProvider ;
@@ -28,10 +31,15 @@ use lightning::ln::channelmanager::AChannelManager;
2831use lightning:: ln:: msgs:: { ErrorAction , LightningError } ;
2932use lightning:: sign:: NodeSigner ;
3033use lightning:: util:: logger:: Level ;
34+ use lightning:: util:: persist:: KVStore ;
35+ use lightning:: util:: ser:: Writeable ;
3136
37+ use core:: future:: Future ;
3238use core:: ops:: Deref ;
39+ use core:: pin:: Pin ;
3340use core:: time:: Duration ;
3441
42+ use alloc:: boxed:: Box ;
3543use alloc:: string:: String ;
3644use alloc:: vec:: Vec ;
3745
@@ -131,6 +139,7 @@ where
131139 time_provider : TP ,
132140 channel_manager : CM ,
133141 node_signer : NS ,
142+ kv_store : Arc < dyn KVStore + Send + Sync > ,
134143 last_pruning : Mutex < Option < LSPSDateTime > > ,
135144}
136145
@@ -143,7 +152,8 @@ where
143152 /// Constructs a `LSPS5ServiceHandler` using the given time provider.
144153 pub ( crate ) fn new_with_time_provider (
145154 event_queue : Arc < EventQueue > , pending_messages : Arc < MessageQueue > , channel_manager : CM ,
146- node_signer : NS , config : LSPS5ServiceConfig , time_provider : TP ,
155+ kv_store : Arc < dyn KVStore + Send + Sync > , node_signer : NS , config : LSPS5ServiceConfig ,
156+ time_provider : TP ,
147157 ) -> Self {
148158 assert ! ( config. max_webhooks_per_client > 0 , "`max_webhooks_per_client` must be > 0" ) ;
149159 Self {
@@ -154,6 +164,7 @@ where
154164 time_provider,
155165 channel_manager,
156166 node_signer,
167+ kv_store,
157168 last_pruning : Mutex :: new ( None ) ,
158169 }
159170 }
@@ -186,6 +197,44 @@ where
186197 }
187198 }
188199
200+ fn persist_peer_state (
201+ & self , counterparty_node_id : PublicKey , peer_state : & PeerState ,
202+ ) -> Pin < Box < dyn Future < Output = Result < ( ) , lightning:: io:: Error > > + Send > > {
203+ let key = counterparty_node_id. to_string ( ) ;
204+ let encoded = peer_state. encode ( ) ;
205+ self . kv_store . write (
206+ LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
207+ LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE ,
208+ & key,
209+ encoded,
210+ )
211+ }
212+
213+ pub ( crate ) fn persist (
214+ & self ,
215+ ) -> Pin < Box < dyn Future < Output = Result < ( ) , lightning:: io:: Error > > + Send > > {
216+ let outer_state_lock = self . per_peer_state . read ( ) . unwrap ( ) ;
217+ let mut futures = Vec :: new ( ) ;
218+ for ( counterparty_node_id, peer_state) in outer_state_lock. iter ( ) {
219+ let fut = self . persist_peer_state ( * counterparty_node_id, peer_state) ;
220+ futures. push ( fut) ;
221+ }
222+
223+ // TODO: We should eventually persist in parallel, however, when we do, we probably want to
224+ // introduce some batching to upper-bound the number of requests inflight at any given
225+ // time.
226+ Box :: pin ( async move {
227+ let mut ret = Ok ( ( ) ) ;
228+ for fut in futures {
229+ let res = fut. await ;
230+ if res. is_err ( ) {
231+ ret = res;
232+ }
233+ }
234+ ret
235+ } )
236+ }
237+
189238 fn check_prune_stale_webhooks < ' a > (
190239 & self , outer_state_lock : & mut RwLockWriteGuard < ' a , HashMap < PublicKey , PeerState > > ,
191240 ) {
0 commit comments