@@ -236,15 +236,11 @@ where A::Target: chain::Access, L::Target: Logger {
236236}
237237
238238macro_rules! define_run_body {
239- ( $persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident,
239+ ( $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
240+ $channel_manager: ident, $process_channel_manager_events: expr,
240241 $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
241242 $loop_exit_check: expr, $await: expr)
242243 => { {
243- let event_handler = DecoratingEventHandler {
244- event_handler: $event_handler,
245- gossip_sync: & $gossip_sync,
246- } ;
247-
248244 log_trace!( $logger, "Calling ChannelManager's timer_tick_occurred on startup" ) ;
249245 $channel_manager. timer_tick_occurred( ) ;
250246
@@ -255,8 +251,8 @@ macro_rules! define_run_body {
255251 let mut have_pruned = false ;
256252
257253 loop {
258- $channel_manager . process_pending_events ( & event_handler ) ;
259- $chain_monitor . process_pending_events ( & event_handler ) ;
254+ $process_channel_manager_events ;
255+ $process_chain_monitor_events ;
260256
261257 // Note that the PeerManager::process_events may block on ChannelManager's locks,
262258 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
@@ -389,7 +385,8 @@ pub async fn process_events_async<
389385 CMH : ' static + Deref + Send + Sync ,
390386 RMH : ' static + Deref + Send + Sync ,
391387 OMH : ' static + Deref + Send + Sync ,
392- EH : ' static + EventHandler + Send ,
388+ EventHandlerFuture : core:: future:: Future < Output = ( ) > ,
389+ EventHandler : Fn ( Event ) -> EventHandlerFuture ,
393390 PS : ' static + Deref + Send ,
394391 M : ' static + Deref < Target = ChainMonitor < Signer , CF , T , F , L , P > > + Send + Sync ,
395392 CM : ' static + Deref < Target = ChannelManager < CW , T , K , F , L > > + Send + Sync ,
@@ -402,7 +399,7 @@ pub async fn process_events_async<
402399 SleepFuture : core:: future:: Future < Output = bool > ,
403400 Sleeper : Fn ( Duration ) -> SleepFuture
404401> (
405- persister : PS , event_handler : EH , chain_monitor : M , channel_manager : CM ,
402+ persister : PS , event_handler : EventHandler , chain_monitor : M , channel_manager : CM ,
406403 gossip_sync : GossipSync < PGS , RGS , G , CA , L > , peer_manager : PM , logger : L , scorer : Option < S > ,
407404 sleeper : Sleeper ,
408405) -> Result < ( ) , std:: io:: Error >
@@ -422,7 +419,22 @@ where
422419 PS :: Target : ' static + Persister < ' a , Signer , CW , T , K , F , L , SC > ,
423420{
424421 let mut should_continue = true ;
425- define_run_body ! ( persister, event_handler, chain_monitor, channel_manager,
422+ // TODO: Change closure return type to `impl core::future::Future<Output = ()>` once we have a
423+ // more recent `rustc`. This is currently not supported by our MSRV:
424+ // `error[E0562]: `impl Trait` not allowed outside of function and method return types`
425+ let async_event_handler = |event| -> core:: pin:: Pin < Box < dyn core:: future:: Future < Output = ( ) > > > {
426+ let network_graph = gossip_sync. network_graph ( ) ;
427+ let event_handler = & event_handler;
428+ Box :: pin ( async move {
429+ if let Some ( network_graph) = network_graph {
430+ handle_network_graph_update ( network_graph, & event)
431+ }
432+ event_handler ( event) . await ;
433+ } )
434+ } ;
435+ define_run_body ! ( persister,
436+ chain_monitor, chain_monitor. process_pending_events_async( async_event_handler) . await ,
437+ channel_manager, channel_manager. process_pending_events_async( async_event_handler) . await ,
426438 gossip_sync, peer_manager, logger, scorer, should_continue, {
427439 select_biased! {
428440 _ = channel_manager. get_persistable_update_future( ) . fuse( ) => true ,
@@ -527,7 +539,12 @@ impl BackgroundProcessor {
527539 let stop_thread = Arc :: new ( AtomicBool :: new ( false ) ) ;
528540 let stop_thread_clone = stop_thread. clone ( ) ;
529541 let handle = thread:: spawn ( move || -> Result < ( ) , std:: io:: Error > {
530- define_run_body ! ( persister, event_handler, chain_monitor, channel_manager,
542+ let event_handler = DecoratingEventHandler {
543+ event_handler,
544+ gossip_sync : & gossip_sync,
545+ } ;
546+ define_run_body ! ( persister, chain_monitor, chain_monitor. process_pending_events( & event_handler) ,
547+ channel_manager, channel_manager. process_pending_events( & event_handler) ,
531548 gossip_sync, peer_manager, logger, scorer, stop_thread. load( Ordering :: Acquire ) ,
532549 channel_manager. await_persistable_update_timeout( Duration :: from_millis( 100 ) ) )
533550 } ) ;
0 commit comments