@@ -2720,6 +2720,10 @@ pub struct ChannelManager<
27202720 /// A simple atomic flag to ensure only one task at a time can be processing events asynchronously.
27212721 pending_events_processor: AtomicBool,
27222722
2723+ /// A simple atomic flag to ensure only one task at a time can be processing HTLC forwards via
2724+ /// [`Self::process_pending_htlc_forwards`].
2725+ pending_htlc_forwards_processor: AtomicBool,
2726+
27232727 /// If we are running during init (either directly during the deserialization method or in
27242728 /// block connection methods which run after deserialization but before normal operation) we
27252729 /// cannot provide the user with [`ChannelMonitorUpdate`]s through the normal update flow -
@@ -3796,6 +3800,7 @@ where
37963800
37973801 pending_events: Mutex::new(VecDeque::new()),
37983802 pending_events_processor: AtomicBool::new(false),
3803+ pending_htlc_forwards_processor: AtomicBool::new(false),
37993804 pending_background_events: Mutex::new(Vec::new()),
38003805 total_consistency_lock: RwLock::new(()),
38013806 background_events_processed_since_startup: AtomicBool::new(false),
@@ -6338,9 +6343,19 @@ where
63386343 /// Users implementing their own background processing logic should call this in irregular,
63396344 /// randomly-distributed intervals.
63406345 pub fn process_pending_htlc_forwards(&self) {
6346+ if self
6347+ .pending_htlc_forwards_processor
6348+ .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
6349+ .is_err()
6350+ {
6351+ return;
6352+ }
6353+
63416354 let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
63426355 self.internal_process_pending_htlc_forwards()
63436356 });
6357+
6358+ self.pending_htlc_forwards_processor.store(false, Ordering::Release);
63446359 }
63456360
63466361 // Returns whether or not we need to re-persist.
@@ -16474,6 +16489,7 @@ where
1647416489
1647516490 pending_events: Mutex::new(pending_events_read),
1647616491 pending_events_processor: AtomicBool::new(false),
16492+ pending_htlc_forwards_processor: AtomicBool::new(false),
1647716493 pending_background_events: Mutex::new(pending_background_events),
1647816494 total_consistency_lock: RwLock::new(()),
1647916495 background_events_processed_since_startup: AtomicBool::new(false),
0 commit comments