@@ -157,7 +157,7 @@ pub(crate) struct LdkLiteEventQueue<K: Deref>
157157where
158158 K :: Target : KVStorePersister ,
159159{
160- queue : Mutex < EventQueueSerWrapper > ,
160+ queue : Mutex < VecDeque < Arc < Event > > > ,
161161 notifier : Condvar ,
162162 persister : K ,
163163}
@@ -167,15 +167,15 @@ where
167167 K :: Target : KVStorePersister ,
168168{
169169 pub ( crate ) fn new ( persister : K ) -> Self {
170- let queue: Mutex < EventQueueSerWrapper > = Mutex :: new ( EventQueueSerWrapper ( VecDeque :: new ( ) ) ) ;
170+ let queue: Mutex < VecDeque < Arc < Event > > > = Mutex :: new ( VecDeque :: new ( ) ) ;
171171 let notifier = Condvar :: new ( ) ;
172172 Self { queue, notifier, persister }
173173 }
174174 pub ( crate ) fn add_event ( & self , event : LdkLiteEvent ) -> Result < ( ) , Error > {
175175 {
176176 let mut locked_queue = self . queue . lock ( ) . unwrap ( ) ;
177- locked_queue. 0 . push_back ( Arc :: new ( event) ) ;
178- self . persister . persist ( EVENTS_PERSISTENCE_KEY , & * locked_queue) ?;
177+ locked_queue. push_back ( Arc :: new ( event) ) ;
178+ self . persist_queue ( & * locked_queue) ?;
179179 }
180180
181181 self . notifier . notify_one ( ) ;
@@ -185,20 +185,27 @@ where
185185 pub ( crate ) fn next_event ( & self ) -> Arc < LdkLiteEvent > {
186186 let locked_queue = self
187187 . notifier
188- . wait_while ( self . queue . lock ( ) . unwrap ( ) , |queue| queue. 0 . is_empty ( ) )
188+ . wait_while ( self . queue . lock ( ) . unwrap ( ) , |queue| queue. is_empty ( ) )
189189 . unwrap ( ) ;
190- Arc :: clone ( & locked_queue. 0 . front ( ) . unwrap ( ) )
190+ Arc :: clone ( & locked_queue. front ( ) . unwrap ( ) )
191191 }
192192
193193 pub ( crate ) fn event_handled ( & self ) -> Result < ( ) , Error > {
194194 {
195195 let mut locked_queue = self . queue . lock ( ) . unwrap ( ) ;
196- locked_queue. 0 . pop_front ( ) ;
197- self . persister . persist ( EVENTS_PERSISTENCE_KEY , & * locked_queue) ?;
196+ locked_queue. pop_front ( ) ;
197+ self . persist_queue ( & * locked_queue) ?;
198198 }
199199 self . notifier . notify_one ( ) ;
200200 Ok ( ( ) )
201201 }
202+
203+ fn persist_queue ( & self , locked_queue : & VecDeque < Arc < Event > > ) -> Result < ( ) , Error > {
204+ self . persister
205+ . persist ( EVENTS_PERSISTENCE_KEY , & EventQueueSerWrapper ( locked_queue) )
206+ . map_err ( |_| Error :: PersistenceFailed ) ?;
207+ Ok ( ( ) )
208+ }
202209}
203210
204211impl < K : Deref > ReadableArgs < K > for LdkLiteEventQueue < K >
@@ -209,15 +216,16 @@ where
209216 fn read < R : lightning:: io:: Read > (
210217 reader : & mut R , persister : K ,
211218 ) -> Result < Self , lightning:: ln:: msgs:: DecodeError > {
212- let queue: Mutex < EventQueueSerWrapper > = Mutex :: new ( Readable :: read ( reader) ?) ;
219+ let read_queue: EventQueueDeserWrapper = Readable :: read ( reader) ?;
220+ let queue: Mutex < VecDeque < Arc < Event > > > = Mutex :: new ( read_queue. 0 ) ;
213221 let notifier = Condvar :: new ( ) ;
214222 Ok ( Self { queue, notifier, persister } )
215223 }
216224}
217225
218- struct EventQueueSerWrapper ( VecDeque < Arc < LdkLiteEvent > > ) ;
226+ struct EventQueueDeserWrapper ( VecDeque < Arc < LdkLiteEvent > > ) ;
219227
220- impl Readable for EventQueueSerWrapper {
228+ impl Readable for EventQueueDeserWrapper {
221229 fn read < R : lightning:: io:: Read > (
222230 reader : & mut R ,
223231 ) -> Result < Self , lightning:: ln:: msgs:: DecodeError > {
@@ -226,11 +234,13 @@ impl Readable for EventQueueSerWrapper {
226234 for _ in 0 ..len {
227235 queue. push_back ( Arc :: new ( Readable :: read ( reader) ?) ) ;
228236 }
229- Ok ( EventQueueSerWrapper ( queue) )
237+ Ok ( Self ( queue) )
230238 }
231239}
232240
233- impl Writeable for EventQueueSerWrapper {
241+ struct EventQueueSerWrapper < ' a > ( & ' a VecDeque < Arc < Event > > ) ;
242+
243+ impl Writeable for EventQueueSerWrapper < ' _ > {
234244 fn write < W : Writer > ( & self , writer : & mut W ) -> Result < ( ) , lightning:: io:: Error > {
235245 ( self . 0 . len ( ) as u16 ) . write ( writer) ?;
236246 for e in self . 0 . iter ( ) {
@@ -601,7 +611,7 @@ mod tests {
601611
602612 // Check we get the expected event and that it is returned until we mark it handled.
603613 for _ in 0 ..5 {
604- assert_eq ! ( event_queue. next_event( ) , expected_event) ;
614+ assert_eq ! ( * event_queue. next_event( ) , expected_event) ;
605615 assert_eq ! ( false , test_persister. get_and_clear_pending_persist( ) ) ;
606616 }
607617
0 commit comments