@@ -36,11 +36,13 @@ use crate::chain::transaction::{OutPoint, TransactionData};
3636use crate :: ln:: types:: ChannelId ;
3737use crate :: sign:: ecdsa:: EcdsaChannelSigner ;
3838use crate :: events:: { self , Event , EventHandler , ReplayEvent } ;
39+ use crate :: util:: async_poll:: { poll_or_spawn, AsyncResult , FutureSpawner } ;
3940use crate :: util:: logger:: { Logger , WithContext } ;
4041use crate :: util:: errors:: APIError ;
4142use crate :: util:: persist:: MonitorName ;
4243use crate :: util:: wakers:: { Future , Notifier } ;
4344use crate :: ln:: channel_state:: ChannelDetails ;
45+ use crate :: sync:: { Arc } ;
4446
4547use crate :: prelude:: * ;
4648use crate :: sync:: { RwLock , RwLockReadGuard , Mutex , MutexGuard } ;
@@ -122,7 +124,7 @@ pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
122124 ///
123125 /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
124126 /// [`Writeable::write`]: crate::util::ser::Writeable::write
125- fn persist_new_channel ( & self , monitor_name : MonitorName , monitor : & ChannelMonitor < ChannelSigner > ) -> ChannelMonitorUpdateStatus ;
127+ fn persist_new_channel ( & self , monitor_name : MonitorName , monitor : & ChannelMonitor < ChannelSigner > ) -> AsyncResult < ' static , ( ) > ;
126128
127129 /// Update one channel's data. The provided [`ChannelMonitor`] has already applied the given
128130 /// update.
@@ -161,7 +163,7 @@ pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
161163 /// [`ChannelMonitorUpdateStatus`] for requirements when returning errors.
162164 ///
163165 /// [`Writeable::write`]: crate::util::ser::Writeable::write
164- fn update_persisted_channel ( & self , monitor_name : MonitorName , monitor_update : Option < & ChannelMonitorUpdate > , monitor : & ChannelMonitor < ChannelSigner > ) -> ChannelMonitorUpdateStatus ;
166+ fn update_persisted_channel ( & self , monitor_name : MonitorName , monitor_update : Option < & ChannelMonitorUpdate > , monitor : & ChannelMonitor < ChannelSigner > ) -> AsyncResult < ' static , ( ) > ;
165167 /// Prevents the channel monitor from being loaded on startup.
166168 ///
167169 /// Archiving the data in a backup location (rather than deleting it fully) is useful for
@@ -233,31 +235,33 @@ impl<ChannelSigner: EcdsaChannelSigner> Deref for LockedChannelMonitor<'_, Chann
233235/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
234236/// [module-level documentation]: crate::chain::chainmonitor
235237/// [`rebroadcast_pending_claims`]: Self::rebroadcast_pending_claims
236- pub struct ChainMonitor < ChannelSigner : EcdsaChannelSigner , C : Deref , T : Deref , F : Deref , L : Deref , P : Deref >
238+ pub struct ChainMonitor < ChannelSigner : EcdsaChannelSigner , C : Deref , T : Deref , F : Deref , L : Deref , P : Deref , FS : FutureSpawner >
237239 where C :: Target : chain:: Filter ,
238240 T :: Target : BroadcasterInterface ,
239241 F :: Target : FeeEstimator ,
240242 L :: Target : Logger ,
241243 P :: Target : Persist < ChannelSigner > ,
242244{
243- monitors : RwLock < HashMap < ChannelId , MonitorHolder < ChannelSigner > > > ,
245+ monitors : Arc < RwLock < HashMap < ChannelId , MonitorHolder < ChannelSigner > > > > ,
244246 chain_source : Option < C > ,
245247 broadcaster : T ,
246248 logger : L ,
247249 fee_estimator : F ,
248250 persister : P ,
249251 /// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly
250252 /// from the user and not from a [`ChannelMonitor`].
251- pending_monitor_events : Mutex < Vec < ( OutPoint , ChannelId , Vec < MonitorEvent > , PublicKey ) > > ,
253+ pending_monitor_events : Arc < Mutex < Vec < ( OutPoint , ChannelId , Vec < MonitorEvent > , PublicKey ) > > > ,
252254 /// The best block height seen, used as a proxy for the passage of time.
253255 highest_chain_height : AtomicUsize ,
254256
255257 /// A [`Notifier`] used to wake up the background processor in case we have any [`Event`]s for
256258 /// it to give to users (or [`MonitorEvent`]s for `ChannelManager` to process).
257- event_notifier : Notifier ,
259+ event_notifier : Arc < Notifier > ,
260+
261+ future_spawner : Arc < FS > ,
258262}
259263
260- impl < ChannelSigner : EcdsaChannelSigner , C : Deref , T : Deref , F : Deref , L : Deref , P : Deref > ChainMonitor < ChannelSigner , C , T , F , L , P >
264+ impl < ChannelSigner : EcdsaChannelSigner + ' static , C : Deref , T : Deref , F : Deref , L : Deref , P : Deref , FS : FutureSpawner > ChainMonitor < ChannelSigner , C , T , F , L , P , FS >
261265where C :: Target : chain:: Filter ,
262266 T :: Target : BroadcasterInterface ,
263267 F :: Target : FeeEstimator ,
@@ -347,18 +351,31 @@ where C::Target: chain::Filter,
347351 // `ChannelMonitorUpdate` after a channel persist for a channel with the same
348352 // `latest_update_id`.
349353 let _pending_monitor_updates = monitor_state. pending_monitor_updates . lock ( ) . unwrap ( ) ;
350- match self . persister . update_persisted_channel ( monitor. persistence_key ( ) , None , monitor) {
351- ChannelMonitorUpdateStatus :: Completed =>
352- log_trace ! ( logger, "Finished syncing Channel Monitor for channel {} for block-data" ,
353- log_funding_info!( monitor)
354- ) ,
355- ChannelMonitorUpdateStatus :: InProgress => {
356- log_trace ! ( logger, "Channel Monitor sync for channel {} in progress." , log_funding_info!( monitor) ) ;
357- }
358- ChannelMonitorUpdateStatus :: UnrecoverableError => {
359- return Err ( ( ) ) ;
354+ let max_update_id = _pending_monitor_updates. iter ( ) . copied ( ) . max ( ) . unwrap_or ( 0 ) ;
355+
356+ let persist_res = self . persister . update_persisted_channel ( monitor. persistence_key ( ) , None , monitor) ;
357+
358+ let monitors = self . monitors . clone ( ) ;
359+ let pending_monitor_updates_cb = self . pending_monitor_events . clone ( ) ;
360+ let event_notifier = self . event_notifier . clone ( ) ;
361+ let future_spawner = self . future_spawner . clone ( ) ;
362+ let channel_id = * channel_id;
363+
364+ match poll_or_spawn ( persist_res, move || {
365+ // TODO: Log error if the monitor is not persisted.
366+ let _ = ChainMonitor :: < ChannelSigner , C , T , F , L , P , FS > :: channel_monitor_updated_internal ( & monitors, & pending_monitor_updates_cb, & event_notifier,
367+ channel_id, max_update_id) ;
368+ } , future_spawner. deref ( ) ) {
369+ Ok ( true ) => {
370+ // log
371+ } ,
372+ Ok ( false ) => {
373+ // log
374+ }
375+ Err ( _) => {
376+ return Err ( ( ) ) ;
377+ } ,
360378 }
361- }
362379 }
363380
364381 // Register any new outputs with the chain source for filtering, storing any dependent
@@ -388,17 +405,18 @@ where C::Target: chain::Filter,
388405 /// pre-filter blocks or only fetch blocks matching a compact filter. Otherwise, clients may
389406 /// always need to fetch full blocks absent another means for determining which blocks contain
390407 /// transactions relevant to the watched channels.
391- pub fn new ( chain_source : Option < C > , broadcaster : T , logger : L , feeest : F , persister : P ) -> Self {
408+ pub fn new ( chain_source : Option < C > , broadcaster : T , logger : L , feeest : F , persister : P , future_spawner : FS ) -> Self {
392409 Self {
393- monitors : RwLock :: new ( new_hash_map ( ) ) ,
410+ monitors : Arc :: new ( RwLock :: new ( new_hash_map ( ) ) ) ,
394411 chain_source,
395412 broadcaster,
396413 logger,
397414 fee_estimator : feeest,
398415 persister,
399- pending_monitor_events : Mutex :: new ( Vec :: new ( ) ) ,
416+ pending_monitor_events : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
400417 highest_chain_height : AtomicUsize :: new ( 0 ) ,
401- event_notifier : Notifier :: new ( ) ,
418+ event_notifier : Arc :: new ( Notifier :: new ( ) ) ,
419+ future_spawner : Arc :: new ( future_spawner) ,
402420 }
403421 }
404422
@@ -531,6 +549,40 @@ where C::Target: chain::Filter,
531549 Ok ( ( ) )
532550 }
533551
552+ fn channel_monitor_updated_internal (
553+ monitors : & RwLock < HashMap < ChannelId , MonitorHolder < ChannelSigner > > > ,
554+ pending_monitor_events : & Mutex < Vec < ( OutPoint , ChannelId , Vec < MonitorEvent > , PublicKey ) > > ,
555+ event_notifier : & Notifier ,
556+ channel_id : ChannelId , completed_update_id : u64 ) -> Result < ( ) , APIError > {
557+ let monitors = monitors. read ( ) . unwrap ( ) ;
558+ let monitor_data = if let Some ( mon) = monitors. get ( & channel_id) { mon } else {
559+ return Err ( APIError :: APIMisuseError { err : format ! ( "No ChannelMonitor matching channel ID {} found" , channel_id) } ) ;
560+ } ;
561+ let mut pending_monitor_updates = monitor_data. pending_monitor_updates . lock ( ) . unwrap ( ) ;
562+ pending_monitor_updates. retain ( |update_id| * update_id != completed_update_id) ;
563+
564+ // Note that we only check for pending non-chainsync monitor updates and we don't track monitor
565+ // updates resulting from chainsync in `pending_monitor_updates`.
566+ let monitor_is_pending_updates = monitor_data. has_pending_updates ( & pending_monitor_updates) ;
567+
568+ // TODO: Add logger
569+
570+ if monitor_is_pending_updates {
571+ // If there are still monitor updates pending, we cannot yet construct a
572+ // Completed event.
573+ return Ok ( ( ) ) ;
574+ }
575+ let funding_txo = monitor_data. monitor . get_funding_txo ( ) ;
576+ pending_monitor_events. lock ( ) . unwrap ( ) . push ( ( funding_txo, channel_id, vec ! [ MonitorEvent :: Completed {
577+ funding_txo,
578+ channel_id,
579+ monitor_update_id: monitor_data. monitor. get_latest_update_id( ) ,
580+ } ] , monitor_data. monitor . get_counterparty_node_id ( ) ) ) ;
581+
582+ event_notifier. notify ( ) ;
583+ Ok ( ( ) )
584+ }
585+
534586 /// This wrapper avoids having to update some of our tests for now as they assume the direct
535587 /// chain::Watch API wherein we mark a monitor fully-updated by just calling
536588 /// channel_monitor_updated once with the highest ID.
@@ -669,8 +721,8 @@ where C::Target: chain::Filter,
669721 }
670722}
671723
672- impl < ChannelSigner : EcdsaChannelSigner , C : Deref , T : Deref , F : Deref , L : Deref , P : Deref >
673- chain:: Listen for ChainMonitor < ChannelSigner , C , T , F , L , P >
724+ impl < ChannelSigner : EcdsaChannelSigner + Send + Sync + ' static , C : Deref , T : Deref , F : Deref , L : Deref , P : Deref , FS : FutureSpawner >
725+ chain:: Listen for ChainMonitor < ChannelSigner , C , T , F , L , P , FS >
674726where
675727 C :: Target : chain:: Filter ,
676728 T :: Target : BroadcasterInterface ,
@@ -698,8 +750,8 @@ where
698750 }
699751}
700752
701- impl < ChannelSigner : EcdsaChannelSigner , C : Deref , T : Deref , F : Deref , L : Deref , P : Deref >
702- chain:: Confirm for ChainMonitor < ChannelSigner , C , T , F , L , P >
753+ impl < ChannelSigner : EcdsaChannelSigner + Sync + Send + ' static , C : Deref , T : Deref , F : Deref , L : Deref , P : Deref , FS : FutureSpawner >
754+ chain:: Confirm for ChainMonitor < ChannelSigner , C , T , F , L , P , FS >
703755where
704756 C :: Target : chain:: Filter ,
705757 T :: Target : BroadcasterInterface ,
@@ -752,8 +804,8 @@ where
752804 }
753805}
754806
755- impl < ChannelSigner : EcdsaChannelSigner , C : Deref , T : Deref , F : Deref , L : Deref , P : Deref >
756- chain:: Watch < ChannelSigner > for ChainMonitor < ChannelSigner , C , T , F , L , P >
807+ impl < ChannelSigner : EcdsaChannelSigner + Sync + Send + ' static , C : Deref , T : Deref , F : Deref , L : Deref , P : Deref , FS : FutureSpawner + Clone >
808+ chain:: Watch < ChannelSigner > for ChainMonitor < ChannelSigner , C , T , F , L , P , FS >
757809where C :: Target : chain:: Filter ,
758810 T :: Target : BroadcasterInterface ,
759811 F :: Target : FeeEstimator ,
@@ -774,15 +826,28 @@ where C::Target: chain::Filter,
774826 let update_id = monitor. get_latest_update_id ( ) ;
775827 let mut pending_monitor_updates = Vec :: new ( ) ;
776828 let persist_res = self . persister . persist_new_channel ( monitor. persistence_key ( ) , & monitor) ;
777- match persist_res {
778- ChannelMonitorUpdateStatus :: InProgress => {
779- log_info ! ( logger, "Persistence of new ChannelMonitor for channel {} in progress" , log_funding_info!( monitor) ) ;
780- pending_monitor_updates. push ( update_id) ;
781- } ,
782- ChannelMonitorUpdateStatus :: Completed => {
829+
830+ let update_status;
831+ let monitors = self . monitors . clone ( ) ;
832+ let pending_monitor_updates_cb = self . pending_monitor_events . clone ( ) ;
833+ let event_notifier = self . event_notifier . clone ( ) ;
834+ let future_spawner = self . future_spawner . clone ( ) ;
835+
836+ match poll_or_spawn ( persist_res, move || {
837+ // TODO: Log error if the monitor is not persisted.
838+ let _ = ChainMonitor :: < ChannelSigner , C , T , F , L , P , FS > :: channel_monitor_updated_internal ( & monitors, & pending_monitor_updates_cb, & event_notifier,
839+ channel_id, update_id) ;
840+ } , future_spawner. deref ( ) ) {
841+ Ok ( true ) => {
783842 log_info ! ( logger, "Persistence of new ChannelMonitor for channel {} completed" , log_funding_info!( monitor) ) ;
843+ update_status = ChannelMonitorUpdateStatus :: Completed ;
784844 } ,
785- ChannelMonitorUpdateStatus :: UnrecoverableError => {
845+ Ok ( false ) => {
846+ log_info ! ( logger, "Persistence of new ChannelMonitor for channel {} in progress" , log_funding_info!( monitor) ) ;
847+ pending_monitor_updates. push ( update_id) ;
848+ update_status = ChannelMonitorUpdateStatus :: InProgress ;
849+ }
850+ Err ( _) => {
786851 let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down." ;
787852 log_error ! ( logger, "{}" , err_str) ;
788853 panic ! ( "{}" , err_str) ;
@@ -795,7 +860,7 @@ where C::Target: chain::Filter,
795860 monitor,
796861 pending_monitor_updates : Mutex :: new ( pending_monitor_updates) ,
797862 } ) ;
798- Ok ( persist_res )
863+ Ok ( update_status )
799864 }
800865
801866 fn update_channel ( & self , channel_id : ChannelId , update : & ChannelMonitorUpdate ) -> ChannelMonitorUpdateStatus {
@@ -840,27 +905,40 @@ where C::Target: chain::Filter,
840905 } else {
841906 self . persister . update_persisted_channel ( monitor. persistence_key ( ) , Some ( update) , monitor)
842907 } ;
843- match persist_res {
844- ChannelMonitorUpdateStatus :: InProgress => {
845- pending_monitor_updates. push ( update_id) ;
908+
909+ let monitors = self . monitors . clone ( ) ;
910+ let pending_monitor_updates_cb = self . pending_monitor_events . clone ( ) ;
911+ let event_notifier = self . event_notifier . clone ( ) ;
912+ let future_spawner = self . future_spawner . clone ( ) ;
913+
914+ let update_status;
915+ match poll_or_spawn ( persist_res, move || {
916+ // TODO: Log error if the monitor is not persisted.
917+ let _ = ChainMonitor :: < ChannelSigner , C , T , F , L , P , FS > :: channel_monitor_updated_internal ( & monitors, & pending_monitor_updates_cb, & event_notifier,
918+ channel_id, update_id) ;
919+ } , future_spawner. deref ( ) ) {
920+ Ok ( true ) => {
846921 log_debug ! ( logger,
847- "Persistence of ChannelMonitorUpdate id {:?} for channel {} in progress " ,
922+ "Persistence of ChannelMonitorUpdate id {:?} for channel {} completed " ,
848923 update_id,
849924 log_funding_info!( monitor)
850925 ) ;
926+ update_status = ChannelMonitorUpdateStatus :: Completed ;
851927 } ,
852- ChannelMonitorUpdateStatus :: Completed => {
928+ Ok ( false ) => {
853929 log_debug ! ( logger,
854- "Persistence of ChannelMonitorUpdate id {:?} for channel {} completed " ,
930+ "Persistence of ChannelMonitorUpdate id {:?} for channel {} in progress " ,
855931 update_id,
856932 log_funding_info!( monitor)
857933 ) ;
858- } ,
859- ChannelMonitorUpdateStatus :: UnrecoverableError => {
934+ pending_monitor_updates. push ( update_id) ;
935+ update_status = ChannelMonitorUpdateStatus :: InProgress ;
936+ }
937+ Err ( _) => {
860938 // Take the monitors lock for writing so that we poison it and any future
861939 // operations going forward fail immediately.
862940 core:: mem:: drop ( pending_monitor_updates) ;
863- core:: mem:: drop ( monitors) ;
941+ // core::mem::drop(monitors);
864942 let _poison = self . monitors . write ( ) . unwrap ( ) ;
865943 let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down." ;
866944 log_error ! ( logger, "{}" , err_str) ;
@@ -870,7 +948,7 @@ where C::Target: chain::Filter,
870948 if update_res. is_err ( ) {
871949 ChannelMonitorUpdateStatus :: InProgress
872950 } else {
873- persist_res
951+ update_status
874952 }
875953 }
876954 }
@@ -891,7 +969,7 @@ where C::Target: chain::Filter,
891969 }
892970}
893971
894- impl < ChannelSigner : EcdsaChannelSigner , C : Deref , T : Deref , F : Deref , L : Deref , P : Deref > events:: EventsProvider for ChainMonitor < ChannelSigner , C , T , F , L , P >
972+ impl < ChannelSigner : EcdsaChannelSigner , C : Deref , T : Deref , F : Deref , L : Deref , P : Deref , FS : FutureSpawner > events:: EventsProvider for ChainMonitor < ChannelSigner , C , T , F , L , P , FS >
895973 where C :: Target : chain:: Filter ,
896974 T :: Target : BroadcasterInterface ,
897975 F :: Target : FeeEstimator ,
@@ -1138,4 +1216,3 @@ mod tests {
11381216 } ) . is_err( ) ) ;
11391217 }
11401218}
1141-
0 commit comments