@@ -20,6 +20,7 @@ use lightning::util::persist::KVStore;
2020use lightning:: util:: ser:: {
2121 BigSize , CollectionLength , FixedLengthReader , Readable , Writeable , Writer ,
2222} ;
23+ use lightning:: util:: wakers:: Notifier ;
2324
2425/// The maximum queue size we allow before starting to drop events.
2526pub const MAX_EVENT_QUEUE_SIZE : usize = 1000 ;
@@ -28,50 +29,73 @@ pub(crate) struct EventQueue<K: Deref + Clone>
2829where
2930 K :: Target : KVStore ,
3031{
31- queue : Arc < Mutex < VecDeque < LiquidityEvent > > > ,
32+ state : Arc < Mutex < QueueState > > ,
3233 waker : Arc < Mutex < Option < Waker > > > ,
3334 #[ cfg( feature = "std" ) ]
3435 condvar : Arc < crate :: sync:: Condvar > ,
3536 kv_store : K ,
37+ persist_notifier : Arc < Notifier > ,
3638}
3739
3840impl < K : Deref + Clone > EventQueue < K >
3941where
4042 K :: Target : KVStore ,
4143{
42- pub fn new ( queue : VecDeque < LiquidityEvent > , kv_store : K ) -> Self {
43- let queue = Arc :: new ( Mutex :: new ( queue) ) ;
44+ pub fn new (
45+ queue : VecDeque < LiquidityEvent > , kv_store : K , persist_notifier : Arc < Notifier > ,
46+ ) -> Self {
47+ let state = Arc :: new ( Mutex :: new ( QueueState { queue, needs_persist : false } ) ) ;
4448 let waker = Arc :: new ( Mutex :: new ( None ) ) ;
4549 Self {
46- queue ,
50+ state ,
4751 waker,
4852 #[ cfg( feature = "std" ) ]
4953 condvar : Arc :: new ( crate :: sync:: Condvar :: new ( ) ) ,
5054 kv_store,
55+ persist_notifier,
5156 }
5257 }
5358
5459 pub fn next_event ( & self ) -> Option < LiquidityEvent > {
55- self . queue . lock ( ) . unwrap ( ) . pop_front ( )
60+ let event_opt = {
61+ let mut state_lock = self . state . lock ( ) . unwrap ( ) ;
62+ if state_lock. queue . is_empty ( ) {
63+ // Skip notifying below if nothing changed.
64+ return None ;
65+ }
66+
67+ state_lock. needs_persist = true ;
68+ state_lock. queue . pop_front ( )
69+ } ;
70+
71+ self . persist_notifier . notify ( ) ;
72+
73+ event_opt
5674 }
5775
5876 pub async fn next_event_async ( & self ) -> LiquidityEvent {
59- EventFuture { event_queue : Arc :: clone ( & self . queue ) , waker : Arc :: clone ( & self . waker ) } . await
77+ EventFuture {
78+ queue_state : Arc :: clone ( & self . state ) ,
79+ waker : Arc :: clone ( & self . waker ) ,
80+ persist_notifier : Arc :: clone ( & self . persist_notifier ) ,
81+ }
82+ . await
6083 }
6184
6285 #[ cfg( feature = "std" ) ]
6386 pub fn wait_next_event ( & self ) -> LiquidityEvent {
64- let mut queue = self
87+ let mut state_lock = self
6588 . condvar
66- . wait_while ( self . queue . lock ( ) . unwrap ( ) , |queue : & mut VecDeque < LiquidityEvent > | {
67- queue. is_empty ( )
89+ . wait_while ( self . state . lock ( ) . unwrap ( ) , |state_lock : & mut QueueState | {
90+ state_lock . queue . is_empty ( )
6891 } )
6992 . unwrap ( ) ;
7093
71- let event = queue. pop_front ( ) . expect ( "non-empty queue" ) ;
72- let should_notify = !queue. is_empty ( ) ;
94+ let event = state_lock. queue . pop_front ( ) . expect ( "non-empty queue" ) ;
95+ let should_notify = !state_lock. queue . is_empty ( ) ;
96+ state_lock. needs_persist = true ;
7397
74- drop ( queue ) ;
98+ drop ( state_lock ) ;
7599
76100 if should_notify {
77101 if let Some ( waker) = self . waker . lock ( ) . unwrap ( ) . take ( ) {
@@ -81,11 +105,28 @@ where
81105 self . condvar . notify_one ( ) ;
82106 }
83107
108+ self . persist_notifier . notify ( ) ;
109+
84110 event
85111 }
86112
87113 pub fn get_and_clear_pending_events ( & self ) -> Vec < LiquidityEvent > {
88- self . queue . lock ( ) . unwrap ( ) . split_off ( 0 ) . into ( )
114+ let mut state_lock = self . state . lock ( ) . unwrap ( ) ;
115+
116+ let needs_persist = !state_lock. queue . is_empty ( ) ;
117+ let events = state_lock. queue . split_off ( 0 ) . into ( ) ;
118+
119+ if needs_persist {
120+ state_lock. needs_persist = true ;
121+ }
122+
123+ drop ( state_lock) ;
124+
125+ if needs_persist {
126+ self . persist_notifier . notify ( ) ;
127+ }
128+
129+ events
89130 }
90131
91132 // Returns an [`EventQueueNotifierGuard`] that will notify about new event when dropped.
@@ -94,20 +135,38 @@ where
94135 }
95136
96137 pub async fn persist ( & self ) -> Result < ( ) , lightning:: io:: Error > {
97- let queue = self . queue . lock ( ) . unwrap ( ) ;
98- let encoded = EventQueueSerWrapper ( & queue) . encode ( ) ;
138+ let fut = {
139+ let mut state_lock = self . state . lock ( ) . unwrap ( ) ;
140+
141+ if !state_lock. needs_persist {
142+ return Ok ( ( ) ) ;
143+ }
99144
100- self . kv_store
101- . write (
145+ state_lock. needs_persist = false ;
146+ let encoded = EventQueueSerWrapper ( & state_lock. queue ) . encode ( ) ;
147+
148+ self . kv_store . write (
102149 LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
103150 LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE ,
104151 LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_KEY ,
105152 encoded,
106153 )
107- . await
154+ } ;
155+
156+ fut. await . map_err ( |e| {
157+ self . state . lock ( ) . unwrap ( ) . needs_persist = true ;
158+ e
159+ } ) ?;
160+
161+ Ok ( ( ) )
108162 }
109163}
110164
165+ struct QueueState {
166+ queue : VecDeque < LiquidityEvent > ,
167+ needs_persist : bool ,
168+ }
169+
111170// A guard type that will notify about new events when dropped.
112171#[ must_use]
113172pub ( crate ) struct EventQueueNotifierGuard < ' a , K : Deref + Clone > ( & ' a EventQueue < K > )
@@ -119,9 +178,10 @@ where
119178 K :: Target : KVStore ,
120179{
121180 pub fn enqueue < E : Into < LiquidityEvent > > ( & self , event : E ) {
122- let mut queue = self . 0 . queue . lock ( ) . unwrap ( ) ;
123- if queue. len ( ) < MAX_EVENT_QUEUE_SIZE {
124- queue. push_back ( event. into ( ) ) ;
181+ let mut state_lock = self . 0 . state . lock ( ) . unwrap ( ) ;
182+ if state_lock. queue . len ( ) < MAX_EVENT_QUEUE_SIZE {
183+ state_lock. queue . push_back ( event. into ( ) ) ;
184+ state_lock. needs_persist = true ;
125185 } else {
126186 return ;
127187 }
@@ -133,7 +193,10 @@ where
133193 K :: Target : KVStore ,
134194{
135195 fn drop ( & mut self ) {
136- let should_notify = !self . 0 . queue . lock ( ) . unwrap ( ) . is_empty ( ) ;
196+ let ( should_notify, should_persist_notify) = {
197+ let state_lock = self . 0 . state . lock ( ) . unwrap ( ) ;
198+ ( !state_lock. queue . is_empty ( ) , state_lock. needs_persist )
199+ } ;
137200
138201 if should_notify {
139202 if let Some ( waker) = self . 0 . waker . lock ( ) . unwrap ( ) . take ( ) {
@@ -143,12 +206,17 @@ where
143206 #[ cfg( feature = "std" ) ]
144207 self . 0 . condvar . notify_one ( ) ;
145208 }
209+
210+ if should_persist_notify {
211+ self . 0 . persist_notifier . notify ( ) ;
212+ }
146213 }
147214}
148215
149216struct EventFuture {
150- event_queue : Arc < Mutex < VecDeque < LiquidityEvent > > > ,
217+ queue_state : Arc < Mutex < QueueState > > ,
151218 waker : Arc < Mutex < Option < Waker > > > ,
219+ persist_notifier : Arc < Notifier > ,
152220}
153221
154222impl Future for EventFuture {
@@ -157,12 +225,22 @@ impl Future for EventFuture {
157225 fn poll (
158226 self : core:: pin:: Pin < & mut Self > , cx : & mut core:: task:: Context < ' _ > ,
159227 ) -> core:: task:: Poll < Self :: Output > {
160- if let Some ( event) = self . event_queue . lock ( ) . unwrap ( ) . pop_front ( ) {
161- Poll :: Ready ( event)
162- } else {
163- * self . waker . lock ( ) . unwrap ( ) = Some ( cx. waker ( ) . clone ( ) ) ;
164- Poll :: Pending
228+ let ( res, should_persist_notify) = {
229+ let mut state_lock = self . queue_state . lock ( ) . unwrap ( ) ;
230+ if let Some ( event) = state_lock. queue . pop_front ( ) {
231+ state_lock. needs_persist = true ;
232+ ( Poll :: Ready ( event) , true )
233+ } else {
234+ * self . waker . lock ( ) . unwrap ( ) = Some ( cx. waker ( ) . clone ( ) ) ;
235+ ( Poll :: Pending , false )
236+ }
237+ } ;
238+
239+ if should_persist_notify {
240+ self . persist_notifier . notify ( ) ;
165241 }
242+
243+ res
166244 }
167245}
168246
@@ -266,7 +344,8 @@ mod tests {
266344 use std:: time:: Duration ;
267345
268346 let kv_store = Arc :: new ( KVStoreSyncWrapper ( Arc :: new ( TestStore :: new ( false ) ) ) ) ;
269- let event_queue = Arc :: new ( EventQueue :: new ( VecDeque :: new ( ) , kv_store) ) ;
347+ let persist_notifier = Arc :: new ( Notifier :: new ( ) ) ;
348+ let event_queue = Arc :: new ( EventQueue :: new ( VecDeque :: new ( ) , kv_store, persist_notifier) ) ;
270349 assert_eq ! ( event_queue. next_event( ) , None ) ;
271350
272351 let secp_ctx = Secp256k1 :: new ( ) ;
0 commit comments