Skip to content

Commit a283e23

Browse files
sanitygithub-actions[bot]iduartgomez
authored
fix: guard op-state timeout notifications (#2043)
Co-authored-by: claude[bot] <41898282+claude[bot]@users.noreply.github.com> Co-authored-by: nacho.d.g <iduartgomez@users.noreply.github.com>
1 parent 538d0ef commit a283e23

File tree

1 file changed

+75
-2
lines changed

1 file changed

+75
-2
lines changed

crates/core/src/node/op_state_manager.rs

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,27 @@ impl OpManager {
331331
}
332332
}
333333

334+
async fn notify_transaction_timeout(
335+
event_loop_notifier: &EventLoopNotificationsSender,
336+
tx: Transaction,
337+
) -> bool {
338+
match event_loop_notifier
339+
.notifications_sender
340+
.send(Either::Right(NodeEvent::TransactionTimedOut(tx)))
341+
.await
342+
{
343+
Ok(()) => true,
344+
Err(err) => {
345+
tracing::warn!(
346+
tx = %tx,
347+
error = ?err,
348+
"Failed to notify event loop about timed out transaction; receiver likely dropped"
349+
);
350+
false
351+
}
352+
}
353+
}
354+
334355
async fn garbage_cleanup_task<ER: NetEventRegister>(
335356
mut new_transactions: tokio::sync::mpsc::Receiver<Transaction>,
336357
ops: Arc<Ops>,
@@ -376,7 +397,7 @@ async fn garbage_cleanup_task<ER: NetEventRegister>(
376397
ops.under_progress.remove(&tx);
377398
ops.completed.remove(&tx);
378399
tracing::debug!("Transaction timed out: {tx}");
379-
event_loop_notifier.notifications_sender.send(Either::Right(NodeEvent::TransactionTimedOut(tx))).await.unwrap();
400+
notify_transaction_timeout(&event_loop_notifier, tx).await;
380401
live_tx_tracker.remove_finished_transaction(tx);
381402
}
382403
}
@@ -405,11 +426,63 @@ async fn garbage_cleanup_task<ER: NetEventRegister>(
405426
};
406427
if removed {
407428
tracing::debug!("Transaction timed out: {tx}");
408-
event_loop_notifier.notifications_sender.send(Either::Right(NodeEvent::TransactionTimedOut(tx))).await.unwrap();
429+
notify_transaction_timeout(&event_loop_notifier, tx).await;
409430
live_tx_tracker.remove_finished_transaction(tx);
410431
}
411432
}
412433
}
413434
}
414435
}
415436
}
437+
438+
#[cfg(test)]
439+
mod tests {
440+
use super::super::network_bridge::event_loop_notification_channel;
441+
use super::*;
442+
use crate::node::network_bridge::EventLoopNotificationsReceiver;
443+
use either::Either;
444+
use tokio::time::{timeout, Duration};
445+
446+
#[tokio::test]
447+
async fn notify_timeout_succeeds_when_receiver_alive() {
448+
let (receiver, notifier) = event_loop_notification_channel();
449+
let EventLoopNotificationsReceiver {
450+
mut notifications_receiver,
451+
..
452+
} = receiver;
453+
454+
let tx = Transaction::ttl_transaction();
455+
456+
let delivered = notify_transaction_timeout(&notifier, tx).await;
457+
assert!(
458+
delivered,
459+
"notification should be delivered while receiver is alive"
460+
);
461+
462+
let received = timeout(Duration::from_millis(100), notifications_receiver.recv())
463+
.await
464+
.expect("timed out waiting for notification")
465+
.expect("notification channel closed");
466+
467+
match received {
468+
Either::Right(NodeEvent::TransactionTimedOut(observed)) => {
469+
assert_eq!(observed, tx, "unexpected transaction in notification");
470+
}
471+
other => panic!("unexpected notification: {other:?}"),
472+
}
473+
}
474+
475+
#[tokio::test]
476+
async fn notify_timeout_handles_dropped_receiver() {
477+
let (receiver, notifier) = event_loop_notification_channel();
478+
drop(receiver);
479+
480+
let tx = Transaction::ttl_transaction();
481+
482+
let delivered = notify_transaction_timeout(&notifier, tx).await;
483+
assert!(
484+
!delivered,
485+
"notification delivery should fail once receiver is dropped"
486+
);
487+
}
488+
}

0 commit comments

Comments
 (0)