11use super :: LiquidityEvent ;
2+
3+ use crate :: persist:: {
4+ LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_KEY ,
5+ LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE ,
6+ LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
7+ } ;
28use crate :: sync:: { Arc , Mutex } ;
39
10+ use alloc:: boxed:: Box ;
411use alloc:: collections:: VecDeque ;
512use alloc:: vec:: Vec ;
613
714use core:: future:: Future ;
15+ use core:: pin:: Pin ;
816use core:: task:: { Poll , Waker } ;
917
18+ use lightning:: util:: persist:: KVStore ;
19+ use lightning:: util:: ser:: { CollectionLength , MaybeReadable , Readable , Writeable , Writer } ;
20+
1021/// The maximum queue size we allow before starting to drop events.
1122pub const MAX_EVENT_QUEUE_SIZE : usize = 1000 ;
1223
@@ -15,17 +26,19 @@ pub(crate) struct EventQueue {
1526 waker : Arc < Mutex < Option < Waker > > > ,
1627 #[ cfg( feature = "std" ) ]
1728 condvar : Arc < crate :: sync:: Condvar > ,
29+ kv_store : Arc < dyn KVStore + Send + Sync > ,
1830}
1931
2032impl EventQueue {
21- pub fn new ( ) -> Self {
33+ pub fn new ( kv_store : Arc < dyn KVStore + Send + Sync > ) -> Self {
2234 let queue = Arc :: new ( Mutex :: new ( VecDeque :: new ( ) ) ) ;
2335 let waker = Arc :: new ( Mutex :: new ( None ) ) ;
2436 Self {
2537 queue,
2638 waker,
2739 #[ cfg( feature = "std" ) ]
2840 condvar : Arc :: new ( crate :: sync:: Condvar :: new ( ) ) ,
41+ kv_store,
2942 }
3043 }
3144
@@ -70,6 +83,20 @@ impl EventQueue {
7083 pub fn notifier ( & self ) -> EventQueueNotifierGuard < ' _ > {
7184 EventQueueNotifierGuard ( self )
7285 }
86+
87+ pub fn persist (
88+ & self ,
89+ ) -> Pin < Box < dyn Future < Output = Result < ( ) , lightning:: io:: Error > > + Send > > {
90+ let queue = self . queue . lock ( ) . unwrap ( ) ;
91+ let encoded = EventQueueSerWrapper ( & queue) . encode ( ) ;
92+
93+ self . kv_store . write (
94+ LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
95+ LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE ,
96+ LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_KEY ,
97+ encoded,
98+ )
99+ }
73100}
74101
75102// A guard type that will notify about new events when dropped.
@@ -122,6 +149,35 @@ impl Future for EventFuture {
122149 }
123150}
124151
152+ pub ( crate ) struct EventQueueDeserWrapper ( pub VecDeque < LiquidityEvent > ) ;
153+
154+ impl Readable for EventQueueDeserWrapper {
155+ fn read < R : lightning:: io:: Read > (
156+ reader : & mut R ,
157+ ) -> Result < Self , lightning:: ln:: msgs:: DecodeError > {
158+ let len: CollectionLength = Readable :: read ( reader) ?;
159+ let mut queue = VecDeque :: with_capacity ( len. 0 as usize ) ;
160+ for _ in 0 ..len. 0 {
161+ if let Some ( event) = MaybeReadable :: read ( reader) ? {
162+ queue. push_back ( event) ;
163+ }
164+ }
165+ Ok ( Self ( queue) )
166+ }
167+ }
168+
169+ struct EventQueueSerWrapper < ' a > ( & ' a VecDeque < LiquidityEvent > ) ;
170+
171+ impl Writeable for EventQueueSerWrapper < ' _ > {
172+ fn write < W : Writer > ( & self , writer : & mut W ) -> Result < ( ) , lightning:: io:: Error > {
173+ CollectionLength ( self . 0 . len ( ) as u64 ) . write ( writer) ?;
174+ for e in self . 0 . iter ( ) {
175+ e. write ( writer) ?;
176+ }
177+ Ok ( ( ) )
178+ }
179+ }
180+
125181#[ cfg( test) ]
126182mod tests {
127183 #[ tokio:: test]
@@ -131,10 +187,13 @@ mod tests {
131187 use crate :: lsps0:: event:: LSPS0ClientEvent ;
132188 use bitcoin:: secp256k1:: { PublicKey , Secp256k1 , SecretKey } ;
133189 use core:: sync:: atomic:: { AtomicU16 , Ordering } ;
190+ use lightning:: util:: persist:: KVStoreSyncWrapper ;
191+ use lightning:: util:: test_utils:: TestStore ;
134192 use std:: sync:: Arc ;
135193 use std:: time:: Duration ;
136194
137- let event_queue = Arc :: new ( EventQueue :: new ( ) ) ;
195+ let kv_store = Arc :: new ( KVStoreSyncWrapper ( Arc :: new ( TestStore :: new ( false ) ) ) ) ;
196+ let event_queue = Arc :: new ( EventQueue :: new ( kv_store) ) ;
138197 assert_eq ! ( event_queue. next_event( ) , None ) ;
139198
140199 let secp_ctx = Secp256k1 :: new ( ) ;
0 commit comments