@@ -7,9 +7,7 @@ use crate::payment_store::{
77 PaymentDetails , PaymentDetailsUpdate , PaymentDirection , PaymentStatus , PaymentStore ,
88} ;
99
10- use crate :: io:: {
11- KVStore , TransactionalWrite , EVENT_QUEUE_PERSISTENCE_KEY , EVENT_QUEUE_PERSISTENCE_NAMESPACE ,
12- } ;
10+ use crate :: io:: { KVStore , EVENT_QUEUE_PERSISTENCE_KEY , EVENT_QUEUE_PERSISTENCE_NAMESPACE } ;
1311use crate :: logger:: { log_error, log_info, Logger } ;
1412
1513use lightning:: chain:: chaininterface:: { BroadcasterInterface , ConfirmationTarget , FeeEstimator } ;
@@ -108,23 +106,21 @@ impl_writeable_tlv_based_enum!(Event,
108106 } ;
109107) ;
110108
111- pub struct EventQueue < K : Deref , L : Deref >
109+ pub struct EventQueue < K : KVStore + Sync + Send , L : Deref >
112110where
113- K :: Target : KVStore ,
114111 L :: Target : Logger ,
115112{
116113 queue : Mutex < VecDeque < Event > > ,
117114 notifier : Condvar ,
118- kv_store : K ,
115+ kv_store : Arc < K > ,
119116 logger : L ,
120117}
121118
122- impl < K : Deref , L : Deref > EventQueue < K , L >
119+ impl < K : KVStore + Sync + Send , L : Deref > EventQueue < K , L >
123120where
124- K :: Target : KVStore ,
125121 L :: Target : Logger ,
126122{
127- pub ( crate ) fn new ( kv_store : K , logger : L ) -> Self {
123+ pub ( crate ) fn new ( kv_store : Arc < K > , logger : L ) -> Self {
128124 let queue: Mutex < VecDeque < Event > > = Mutex :: new ( VecDeque :: new ( ) ) ;
129125 let notifier = Condvar :: new ( ) ;
130126 Self { queue, notifier, kv_store, logger }
@@ -134,7 +130,7 @@ where
134130 {
135131 let mut locked_queue = self . queue . lock ( ) . unwrap ( ) ;
136132 locked_queue. push_back ( event) ;
137- self . write_queue_and_commit ( & locked_queue) ?;
133+ self . persist_queue ( & locked_queue) ?;
138134 }
139135
140136 self . notifier . notify_one ( ) ;
@@ -156,58 +152,37 @@ where
156152 {
157153 let mut locked_queue = self . queue . lock ( ) . unwrap ( ) ;
158154 locked_queue. pop_front ( ) ;
159- self . write_queue_and_commit ( & locked_queue) ?;
155+ self . persist_queue ( & locked_queue) ?;
160156 }
161157 self . notifier . notify_one ( ) ;
162158 Ok ( ( ) )
163159 }
164160
165- fn write_queue_and_commit ( & self , locked_queue : & VecDeque < Event > ) -> Result < ( ) , Error > {
166- let mut writer = self
167- . kv_store
168- . write ( EVENT_QUEUE_PERSISTENCE_NAMESPACE , EVENT_QUEUE_PERSISTENCE_KEY )
161+ fn persist_queue ( & self , locked_queue : & VecDeque < Event > ) -> Result < ( ) , Error > {
162+ let data = EventQueueSerWrapper ( locked_queue ) . encode ( ) ;
163+ self . kv_store
164+ . write ( EVENT_QUEUE_PERSISTENCE_NAMESPACE , EVENT_QUEUE_PERSISTENCE_KEY , & data )
169165 . map_err ( |e| {
170166 log_error ! (
171167 self . logger,
172- "Getting writer for key {}/{} failed due to: {}" ,
168+ "Write for key {}/{} failed due to: {}" ,
173169 EVENT_QUEUE_PERSISTENCE_NAMESPACE ,
174170 EVENT_QUEUE_PERSISTENCE_KEY ,
175171 e
176172 ) ;
177173 Error :: PersistenceFailed
178174 } ) ?;
179- EventQueueSerWrapper ( locked_queue) . write ( & mut writer) . map_err ( |e| {
180- log_error ! (
181- self . logger,
182- "Writing event queue data to key {}/{} failed due to: {}" ,
183- EVENT_QUEUE_PERSISTENCE_NAMESPACE ,
184- EVENT_QUEUE_PERSISTENCE_KEY ,
185- e
186- ) ;
187- Error :: PersistenceFailed
188- } ) ?;
189- writer. commit ( ) . map_err ( |e| {
190- log_error ! (
191- self . logger,
192- "Committing event queue data to key {}/{} failed due to: {}" ,
193- EVENT_QUEUE_PERSISTENCE_NAMESPACE ,
194- EVENT_QUEUE_PERSISTENCE_KEY ,
195- e
196- ) ;
197- Error :: PersistenceFailed
198- } ) ?;
199175 Ok ( ( ) )
200176 }
201177}
202178
203- impl < K : Deref , L : Deref > ReadableArgs < ( K , L ) > for EventQueue < K , L >
179+ impl < K : KVStore + Sync + Send , L : Deref > ReadableArgs < ( Arc < K > , L ) > for EventQueue < K , L >
204180where
205- K :: Target : KVStore ,
206181 L :: Target : Logger ,
207182{
208183 #[ inline]
209184 fn read < R : lightning:: io:: Read > (
210- reader : & mut R , args : ( K , L ) ,
185+ reader : & mut R , args : ( Arc < K > , L ) ,
211186 ) -> Result < Self , lightning:: ln:: msgs:: DecodeError > {
212187 let ( kv_store, logger) = args;
213188 let read_queue: EventQueueDeserWrapper = Readable :: read ( reader) ?;
@@ -244,14 +219,13 @@ impl Writeable for EventQueueSerWrapper<'_> {
244219 }
245220}
246221
247- pub ( crate ) struct EventHandler < K : Deref + Clone , L : Deref >
222+ pub ( crate ) struct EventHandler < K : KVStore + Sync + Send , L : Deref >
248223where
249- K :: Target : KVStore ,
250224 L :: Target : Logger ,
251225{
252226 wallet : Arc < Wallet < bdk:: database:: SqliteDatabase > > ,
253227 event_queue : Arc < EventQueue < K , L > > ,
254- channel_manager : Arc < ChannelManager > ,
228+ channel_manager : Arc < ChannelManager < K > > ,
255229 network_graph : Arc < NetworkGraph > ,
256230 keys_manager : Arc < KeysManager > ,
257231 payment_store : Arc < PaymentStore < K , L > > ,
@@ -260,14 +234,13 @@ where
260234 _config : Arc < Config > ,
261235}
262236
263- impl < K : Deref + Clone , L : Deref > EventHandler < K , L >
237+ impl < K : KVStore + Sync + Send + ' static , L : Deref > EventHandler < K , L >
264238where
265- K :: Target : KVStore ,
266239 L :: Target : Logger ,
267240{
268241 pub fn new (
269242 wallet : Arc < Wallet < bdk:: database:: SqliteDatabase > > , event_queue : Arc < EventQueue < K , L > > ,
270- channel_manager : Arc < ChannelManager > , network_graph : Arc < NetworkGraph > ,
243+ channel_manager : Arc < ChannelManager < K > > , network_graph : Arc < NetworkGraph > ,
271244 keys_manager : Arc < KeysManager > , payment_store : Arc < PaymentStore < K , L > > ,
272245 runtime : Arc < RwLock < Option < tokio:: runtime:: Runtime > > > , logger : L , _config : Arc < Config > ,
273246 ) -> Self {
0 commit comments