Skip to content

Commit 9087bde

Browse files
thedavekwonmeta-codesync[bot]
authored andcommitted
Introduce do_post channel API to implement both try_post and post (#1762)
Summary: Pull Request resolved: #1762 So that the senderror log can be dropped in fire-or-forget (`post`) Reviewed By: mariusae Differential Revision: D86369587 fbshipit-source-id: 187a7d5a84e1ce4d521fbdc07f1622165937c41a
1 parent 0cf8d13 commit 9087bde

File tree

4 files changed

+53
-27
lines changed

4 files changed

+53
-27
lines changed

hyperactor/src/channel.rs

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -108,16 +108,24 @@ pub enum TxStatus {
108108
/// The transmit end of an M-typed channel.
109109
#[async_trait]
110110
pub trait Tx<M: RemoteMessage>: std::fmt::Debug {
111+
/// Post a message; returning failed deliveries on the return channel, if provided.
112+
/// If provided, the sender is dropped when the message has been
113+
/// enqueued at the channel endpoint.
114+
///
115+
/// Users should use the `try_post`, and `post` variants directly.
116+
fn do_post(&self, message: M, return_channel: Option<oneshot::Sender<SendError<M>>>);
117+
111118
/// Enqueue a `message` on the local end of the channel. The
112119
/// message is either delivered, or we eventually discover that
113120
/// the channel has failed and it will be sent back on `return_channel`.
114121
#[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `SendError`.
115-
// TODO: Consider making return channel optional to indicate that the log can be dropped.
116-
fn try_post(&self, message: M, return_channel: oneshot::Sender<SendError<M>>);
122+
fn try_post(&self, message: M, return_channel: oneshot::Sender<SendError<M>>) {
123+
self.do_post(message, Some(return_channel));
124+
}
117125

118126
/// Enqueue a message to be sent on the channel.
119127
fn post(&self, message: M) {
120-
self.try_post(message, oneshot::channel().0);
128+
self.do_post(message, None);
121129
}
122130

123131
/// Send a message synchronously, returning when the messsage has
@@ -176,10 +184,12 @@ impl<M: RemoteMessage> MpscTx<M> {
176184

177185
#[async_trait]
178186
impl<M: RemoteMessage> Tx<M> for MpscTx<M> {
179-
fn try_post(&self, message: M, return_channel: oneshot::Sender<SendError<M>>) {
187+
fn do_post(&self, message: M, return_channel: Option<oneshot::Sender<SendError<M>>>) {
180188
if let Err(mpsc::error::SendError(message)) = self.tx.send(message) {
181-
if let Err(m) = return_channel.send(SendError(ChannelError::Closed, message)) {
182-
tracing::warn!("failed to deliver SendError: {}", m);
189+
if let Some(return_channel) = return_channel {
190+
return_channel
191+
.send(SendError(ChannelError::Closed, message))
192+
.unwrap_or_else(|m| tracing::warn!("failed to deliver SendError: {}", m));
183193
}
184194
}
185195
}
@@ -744,13 +754,13 @@ enum ChannelTxKind<M: RemoteMessage> {
744754

745755
#[async_trait]
746756
impl<M: RemoteMessage> Tx<M> for ChannelTx<M> {
747-
fn try_post(&self, message: M, return_channel: oneshot::Sender<SendError<M>>) {
757+
fn do_post(&self, message: M, return_channel: Option<oneshot::Sender<SendError<M>>>) {
748758
match &self.inner {
749-
ChannelTxKind::Local(tx) => tx.try_post(message, return_channel),
750-
ChannelTxKind::Tcp(tx) => tx.try_post(message, return_channel),
751-
ChannelTxKind::MetaTls(tx) => tx.try_post(message, return_channel),
752-
ChannelTxKind::Sim(tx) => tx.try_post(message, return_channel),
753-
ChannelTxKind::Unix(tx) => tx.try_post(message, return_channel),
759+
ChannelTxKind::Local(tx) => tx.do_post(message, return_channel),
760+
ChannelTxKind::Tcp(tx) => tx.do_post(message, return_channel),
761+
ChannelTxKind::MetaTls(tx) => tx.do_post(message, return_channel),
762+
ChannelTxKind::Sim(tx) => tx.do_post(message, return_channel),
763+
ChannelTxKind::Unix(tx) => tx.do_post(message, return_channel),
754764
}
755765
}
756766

hyperactor/src/channel/local.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,19 +72,23 @@ pub struct LocalTx<M: RemoteMessage> {
7272

7373
#[async_trait]
7474
impl<M: RemoteMessage> Tx<M> for LocalTx<M> {
75-
fn try_post(&self, message: M, return_channel: oneshot::Sender<SendError<M>>) {
75+
fn do_post(&self, message: M, return_channel: Option<oneshot::Sender<SendError<M>>>) {
7676
let data: Data = match bincode::serialize(&message) {
7777
Ok(data) => data,
7878
Err(err) => {
79-
if let Err(m) = return_channel.send(SendError(err.into(), message)) {
80-
tracing::warn!("failed to deliver SendError: {}", m);
79+
if let Some(return_channel) = return_channel {
80+
return_channel
81+
.send(SendError(err.into(), message))
82+
.unwrap_or_else(|m| tracing::warn!("failed to deliver SendError: {}", m));
8183
}
8284
return;
8385
}
8486
};
8587
if self.tx.send(data).is_err() {
86-
if let Err(m) = return_channel.send(SendError(ChannelError::Closed, message)) {
87-
tracing::warn!("failed to deliver SendError: {}", m);
88+
if let Some(return_channel) = return_channel {
89+
return_channel
90+
.send(SendError(ChannelError::Closed, message))
91+
.unwrap_or_else(|m| tracing::warn!("failed to deliver SendError: {}", m));
8892
}
8993
}
9094
}

hyperactor/src/channel/net.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1030,10 +1030,14 @@ impl<M: RemoteMessage> Tx<M> for NetTx<M> {
10301030
&self.status
10311031
}
10321032

1033-
fn try_post(&self, message: M, return_channel: oneshot::Sender<SendError<M>>) {
1033+
fn do_post(&self, message: M, return_channel: Option<oneshot::Sender<SendError<M>>>) {
10341034
tracing::trace!(name = "post", "sending message to {}", self.dest);
1035-
if let Err(err) = self.sender.send((message, return_channel, RealClock.now())) {
1036-
let _ = err.0.1.send(SendError(ChannelError::Closed, err.0.0));
1035+
1036+
let return_channel = return_channel.unwrap_or_else(|| oneshot::channel().0);
1037+
if let Err(mpsc::error::SendError((message, return_channel, _))) =
1038+
self.sender.send((message, return_channel, RealClock.now()))
1039+
{
1040+
let _ = return_channel.send(SendError(ChannelError::Closed, message));
10371041
}
10381042
}
10391043
}

hyperactor/src/channel/sim.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -277,12 +277,14 @@ pub(crate) struct SimRx<M: RemoteMessage> {
277277

278278
#[async_trait]
279279
impl<M: RemoteMessage + Any> Tx<M> for SimTx<M> {
280-
fn try_post(&self, message: M, return_channel: oneshot::Sender<SendError<M>>) {
280+
fn do_post(&self, message: M, return_channel: Option<oneshot::Sender<SendError<M>>>) {
281281
let data = match Serialized::serialize(&message) {
282282
Ok(data) => data,
283283
Err(err) => {
284-
if let Err(m) = return_channel.send(SendError(err.into(), message)) {
285-
tracing::warn!("failed to deliver SendError: {}", m);
284+
if let Some(return_channel) = return_channel {
285+
return_channel
286+
.send(SendError(err.into(), message))
287+
.unwrap_or_else(|m| tracing::warn!("failed to deliver SendError: {}", m));
286288
}
287289

288290
return;
@@ -311,14 +313,20 @@ impl<M: RemoteMessage + Any> Tx<M> for SimTx<M> {
311313
_ => handle.send_event(event),
312314
};
313315
if let Err(err) = result {
314-
if let Err(m) = return_channel.send(SendError(err.into(), message)) {
315-
tracing::warn!("failed to deliver SendError: {}", m);
316+
if let Some(return_channel) = return_channel {
317+
return_channel
318+
.send(SendError(err.into(), message))
319+
.unwrap_or_else(|m| {
320+
tracing::warn!("failed to deliver SendError: {}", m)
321+
});
316322
}
317323
}
318324
}
319325
Err(err) => {
320-
if let Err(m) = return_channel.send(SendError(err.into(), message)) {
321-
tracing::warn!("failed to deliver SendError: {}", m);
326+
if let Some(return_channel) = return_channel {
327+
return_channel
328+
.send(SendError(err.into(), message))
329+
.unwrap_or_else(|m| tracing::warn!("failed to deliver SendError: {}", m));
322330
}
323331
}
324332
}

0 commit comments

Comments
 (0)