From 552abe7c23342f1d4f10150c4e6256d9acc6d7a8 Mon Sep 17 00:00:00 2001 From: Peng Zhang Date: Thu, 13 Nov 2025 13:48:42 -0800 Subject: [PATCH] Close channel first before return undeliverable and remove log Summary: Two changes: * Close NetTx's `mspc::Receiver` before returning the message. Otherwise, we might get stuck on this loop if there is another thread actively writing to this NetTx: https://www.internalfb.com/code/fbsource/[5ee448cb323e2539ea2cf169644b7d829d715040]/fbcode/monarch/hyperactor/src/channel/net/client.rs?lines=586-590 New messages are still returned after `mspc::Receiver` is closed: https://www.internalfb.com/code/fbsource/[5ee448cb323e2539ea2cf169644b7d829d715040]/fbcode/monarch/hyperactor/src/channel/net.rs?lines=156-161 * Remove the log when the return_channel is closed. This log does not seem to be helpful, and also we do not log it in other places for closed `return_channel`. Differential Revision: D87002126 --- hyperactor/src/channel/net/client.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hyperactor/src/channel/net/client.rs b/hyperactor/src/channel/net/client.rs index d3767db8f..83ee28f46 100644 --- a/hyperactor/src/channel/net/client.rs +++ b/hyperactor/src/channel/net/client.rs @@ -576,6 +576,8 @@ async fn run( // TODO(T233029051): Return reason through return_channel too. reason: _, } => { + // Close the channel to prevent any further messages from being sent. + receiver.close(); // Return in order from oldest to newest, messages // either not acknowledged or not sent. unacked @@ -584,9 +586,7 @@ async fn run( .chain(outbox.deque.drain(..)) .for_each(|queued| queued.try_return()); while let Ok((msg, return_channel, _)) = receiver.try_recv() { - if let Err(m) = return_channel.send(SendError(ChannelError::Closed, msg)) { - tracing::warn!("failed to deliver SendError: {}", m); - } + let _ = return_channel.send(SendError(ChannelError::Closed, msg)); } } _ => (),