Skip to content

Commit 6976258

Browse files
samluryemeta-codesync[bot]
authored andcommitted
Add return_undeliverable property to PortRef and MessageEnvelope, and fix race in tensor engine shutdown (#1777)
Summary: Pull Request resolved: #1777 Sometimes when we send a message, we want it to be fully fire-and-forget, including if the destination is not even reachable. This is typically only used in scenarios like: * When shutting down the system, we try to ask a process to nicely shut itself down before ungracefully killing it. If the message is undeliverable, we can just proceed with killing the process (it's probably already dead anyways) * Replying to a message. If the sender is down, there's nothing the current actor can do about it This should be used sparingly as it could hide real errors, like your messages not getting sent. This diff adds a `return_undeliverable: bool` property on `MessageEnvelope` and `PortRef`. When the property is set on `PortRef`, any `MessageEnvelope` sent via that `PortRef` will have an equivalent value for `return_undeliverable`. Any envelope with `return_undeliverable == true` will not be returned to its sender on delivery failure. This is useful for messages like `GetRankStatus` and `GetState`, where the receiver shouldn't fail its reply fails to be delivered. It is also useful during proc termination, when the host mesh agent sends `StopAll` to the proc mesh agent; if the proc mesh agent is already dead, the message won't be delivered, but that shouldn't crash the host mesh agent. Unrelatedly, this diff also fixes a race condition with host/proc mesh shutdown vs. tensor engine shutdown. Basically, `DeviceMesh.exit` was sending a fire-and-forget `WorkerMessage::Exit` via `Controller.drain_and_stop()`. But if you simultaneously try to shut down the host/proc mesh, then the worker exit message might fail to deliver, crashing the process. With this diff, `Controller.drain_and_stop()` synchronously calls `ActorMesh::stop` on the worker actor mesh so that there can't be a race with host/proc mesh shutdown (at least not from the same thread). ghstack-source-id: 322545046 exported-using-ghexport Reviewed By: mariusae, dulinriley, shayne-fletcher Differential Revision: D86315780 fbshipit-source-id: 06c4aa92331e7f11c64f1ea8b13c52c2e7f0c153
1 parent b72dbe4 commit 6976258

File tree

14 files changed

+183
-68
lines changed

14 files changed

+183
-68
lines changed

hyperactor/src/context.rs

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,15 @@ pub trait Actor: Mailbox {
6161
pub(crate) trait MailboxExt: Mailbox {
6262
/// Post a message to the provided destination with the provided headers, and data.
6363
/// All messages posted from actors should use this implementation.
64-
fn post(&self, dest: PortId, headers: Attrs, data: Serialized);
64+
fn post(&self, dest: PortId, headers: Attrs, data: Serialized, return_undeliverable: bool);
6565

6666
/// Split a port, using a provided reducer spec, if provided.
6767
fn split(
6868
&self,
6969
port_id: PortId,
7070
reducer_spec: Option<ReducerSpec>,
7171
reducer_opts: Option<ReducerOpts>,
72+
return_undeliverable: bool,
7273
) -> anyhow::Result<PortId>;
7374
}
7475

@@ -80,7 +81,7 @@ static CAN_SEND_WARNED_MAILBOXES: OnceLock<DashSet<ActorId>> = OnceLock::new();
8081

8182
/// Only actors CanSend because they need a return port.
8283
impl<T: Actor + Send + Sync> MailboxExt for T {
83-
fn post(&self, dest: PortId, headers: Attrs, data: Serialized) {
84+
fn post(&self, dest: PortId, headers: Attrs, data: Serialized, return_undeliverable: bool) {
8485
let return_handle = self.mailbox().bound_return_handle().unwrap_or_else(|| {
8586
let actor_id = self.mailbox().actor_id();
8687
if CAN_SEND_WARNED_MAILBOXES
@@ -97,7 +98,9 @@ impl<T: Actor + Send + Sync> MailboxExt for T {
9798
mailbox::monitored_return_handle()
9899
});
99100

100-
let envelope = MessageEnvelope::new(self.mailbox().actor_id().clone(), dest, data, headers);
101+
let mut envelope =
102+
MessageEnvelope::new(self.mailbox().actor_id().clone(), dest, data, headers);
103+
envelope.set_return_undeliverable(return_undeliverable);
101104
MailboxSender::post(self.mailbox(), envelope, return_handle);
102105
}
103106

@@ -106,11 +109,20 @@ impl<T: Actor + Send + Sync> MailboxExt for T {
106109
port_id: PortId,
107110
reducer_spec: Option<ReducerSpec>,
108111
reducer_opts: Option<ReducerOpts>,
112+
return_undeliverable: bool,
109113
) -> anyhow::Result<PortId> {
110-
fn post(mailbox: &mailbox::Mailbox, port_id: PortId, msg: Serialized) {
114+
fn post(
115+
mailbox: &mailbox::Mailbox,
116+
port_id: PortId,
117+
msg: Serialized,
118+
return_undeliverable: bool,
119+
) {
120+
let mut envelope =
121+
MessageEnvelope::new(mailbox.actor_id().clone(), port_id, msg, Attrs::new());
122+
envelope.set_return_undeliverable(return_undeliverable);
111123
mailbox::MailboxSender::post(
112124
mailbox,
113-
MessageEnvelope::new(mailbox.actor_id().clone(), port_id, msg, Attrs::new()),
125+
envelope,
114126
// TODO(pzhang) figure out how to use upstream's return handle,
115127
// instead of getting a new one like this.
116128
// This is okay for now because upstream is currently also using
@@ -135,7 +147,7 @@ impl<T: Actor + Send + Sync> MailboxExt for T {
135147
dyn Fn(Serialized) -> Result<(), (Serialized, anyhow::Error)> + Send + Sync,
136148
> = match reducer {
137149
None => Box::new(move |serialized: Serialized| {
138-
post(&mailbox, port_id.clone(), serialized);
150+
post(&mailbox, port_id.clone(), serialized, return_undeliverable);
139151
Ok(())
140152
}),
141153
Some(reducer) => {
@@ -154,7 +166,9 @@ impl<T: Actor + Send + Sync> MailboxExt for T {
154166
let mut buf = buffer.lock().unwrap();
155167
match buf.reduce() {
156168
None => (),
157-
Some(Ok(reduced)) => post(&mailbox, port_id.clone(), reduced),
169+
Some(Ok(reduced)) => {
170+
post(&mailbox, port_id.clone(), reduced, return_undeliverable)
171+
}
158172
// We simply ignore errors here, and let them be propagated
159173
// later in the enqueueing function.
160174
//
@@ -197,7 +211,7 @@ impl<T: Actor + Send + Sync> MailboxExt for T {
197211
None => Ok(()),
198212
Some(Ok(reduced)) => {
199213
alarm.lock().unwrap().disarm();
200-
post(&mailbox, port_id.clone(), reduced);
214+
post(&mailbox, port_id.clone(), reduced, return_undeliverable);
201215
Ok(())
202216
}
203217
Some(Err(e)) => Err((buf.pop().unwrap(), e)),

hyperactor/src/mailbox.rs

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,10 @@ pub struct MessageEnvelope {
213213

214214
/// Decremented at every `MailboxSender` hop.
215215
ttl: u8,
216+
217+
/// If true, undeliverable messages should be returned to sender. Else, they
218+
/// are dropped.
219+
return_undeliverable: bool,
216220
// TODO: add typename, source, seq, etc.
217221
}
218222

@@ -226,6 +230,8 @@ impl MessageEnvelope {
226230
errors: Vec::new(),
227231
headers,
228232
ttl: crate::config::global::get(crate::config::MESSAGE_TTL_DEFAULT),
233+
// By default, all undeliverable messages should be returned to the sender.
234+
return_undeliverable: true,
229235
}
230236
}
231237

@@ -248,6 +254,8 @@ impl MessageEnvelope {
248254
dest,
249255
errors: Vec::new(),
250256
ttl: crate::config::global::get(crate::config::MESSAGE_TTL_DEFAULT),
257+
// By default, all undeliverable messages should be returned to the sender.
258+
return_undeliverable: true,
251259
})
252260
}
253261

@@ -333,6 +341,13 @@ impl MessageEnvelope {
333341
self.sender = sender;
334342
}
335343

344+
/// Set to true if you want this message to be returned to sender if it cannot
345+
/// reach dest. This is the default.
346+
/// Set to false if you want the message to be dropped instead.
347+
pub fn set_return_undeliverable(&mut self, return_undeliverable: bool) {
348+
self.return_undeliverable = return_undeliverable;
349+
}
350+
336351
/// The message has been determined to be undeliverable with the
337352
/// provided error. Mark the envelope with the error and return to
338353
/// sender.
@@ -393,6 +408,7 @@ impl MessageEnvelope {
393408
errors,
394409
headers,
395410
ttl,
411+
return_undeliverable,
396412
} = self;
397413

398414
(
@@ -402,6 +418,7 @@ impl MessageEnvelope {
402418
errors,
403419
headers,
404420
ttl,
421+
return_undeliverable,
405422
},
406423
data,
407424
)
@@ -414,6 +431,7 @@ impl MessageEnvelope {
414431
errors,
415432
headers,
416433
ttl,
434+
return_undeliverable,
417435
} = metadata;
418436

419437
Self {
@@ -423,8 +441,13 @@ impl MessageEnvelope {
423441
errors,
424442
headers,
425443
ttl,
444+
return_undeliverable,
426445
}
427446
}
447+
448+
fn return_undeliverable(&self) -> bool {
449+
self.return_undeliverable
450+
}
428451
}
429452

430453
impl fmt::Display for MessageEnvelope {
@@ -452,6 +475,7 @@ pub struct MessageMetadata {
452475
errors: Vec<DeliveryError>,
453476
headers: Attrs,
454477
ttl: u8,
478+
return_undeliverable: bool,
455479
}
456480

457481
/// Errors that occur during mailbox operations. Each error is associated
@@ -1534,6 +1558,7 @@ impl MailboxSender for Mailbox {
15341558
dest,
15351559
errors: metadata_errors,
15361560
ttl,
1561+
return_undeliverable,
15371562
} = metadata;
15381563

15391564
// We use the entry API here so that we can remove the
@@ -1562,6 +1587,7 @@ impl MailboxSender for Mailbox {
15621587
dest,
15631588
errors: metadata_errors,
15641589
ttl,
1590+
return_undeliverable,
15651591
},
15661592
data,
15671593
)
@@ -3340,15 +3366,15 @@ mod tests {
33403366

33413367
// Split it twice on actor1
33423368
let port_id1 = port_id
3343-
.split(&actor1, reducer_spec.clone(), reducer_opts.clone())
3369+
.split(&actor1, reducer_spec.clone(), reducer_opts.clone(), true)
33443370
.unwrap();
33453371
let port_id2 = port_id
3346-
.split(&actor1, reducer_spec.clone(), reducer_opts.clone())
3372+
.split(&actor1, reducer_spec.clone(), reducer_opts.clone(), true)
33473373
.unwrap();
33483374

33493375
// A split port id can also be split
33503376
let port_id2_1 = port_id2
3351-
.split(&actor1, reducer_spec, reducer_opts.clone())
3377+
.split(&actor1, reducer_spec, reducer_opts.clone(), true)
33523378
.unwrap();
33533379

33543380
Setup {
@@ -3470,7 +3496,7 @@ mod tests {
34703496
let port_id = port_handle.bind().port_id().clone();
34713497
// Split it
34723498
let reducer_spec = accum::sum::<u64>().reducer_spec();
3473-
let split_port_id = port_id.split(&actor, reducer_spec, None).unwrap();
3499+
let split_port_id = port_id.split(&actor, reducer_spec, None, true).unwrap();
34743500

34753501
// Send 9 messages.
34763502
for msg in [1, 5, 3, 4, 2, 91, 92, 93, 94] {

hyperactor/src/mailbox/undeliverable.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,11 @@ pub(crate) fn return_undeliverable(
9898
return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
9999
envelope: MessageEnvelope,
100100
) {
101-
let envelope_copy = envelope.clone();
102-
if (return_handle.send(Undeliverable(envelope))).is_err() {
103-
UndeliverableMailboxSender.post(envelope_copy, /*unsued*/ return_handle)
101+
if envelope.return_undeliverable() {
102+
let envelope_copy = envelope.clone();
103+
if (return_handle.send(Undeliverable(envelope))).is_err() {
104+
UndeliverableMailboxSender.post(envelope_copy, /*unused*/ return_handle)
105+
}
104106
}
105107
}
106108

@@ -109,7 +111,7 @@ pub(crate) fn return_undeliverable(
109111
pub enum UndeliverableMessageError {
110112
/// Delivery of a message to its destination failed.
111113
#[error(
112-
"a message from {} to {} was undeliverable and returned: {:?}: {envelope}",
114+
"a message from {} to {} was undeliverable and returned: {:?}: {envelope}",
113115
.envelope.sender(),
114116
.envelope.dest(),
115117
.envelope.error_msg()

hyperactor/src/proc.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1036,7 +1036,7 @@ impl<A: Actor> Instance<A> {
10361036

10371037
/// Send a message to the actor running on the proc.
10381038
pub fn post(&self, port_id: PortId, headers: Attrs, message: Serialized) {
1039-
<Self as context::MailboxExt>::post(self, port_id, headers, message)
1039+
<Self as context::MailboxExt>::post(self, port_id, headers, message, true)
10401040
}
10411041

10421042
/// Send a message to the actor itself with a delay usually to trigger some event.

hyperactor/src/reference.rs

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -891,7 +891,7 @@ impl PortId {
891891
pub fn send(&self, cx: &impl context::Actor, serialized: Serialized) {
892892
let mut headers = Attrs::new();
893893
crate::mailbox::headers::set_send_timestamp(&mut headers);
894-
cx.post(self.clone(), headers, serialized);
894+
cx.post(self.clone(), headers, serialized, true);
895895
}
896896

897897
/// Send a serialized message to this port, provided a sending capability,
@@ -904,7 +904,7 @@ impl PortId {
904904
mut headers: Attrs,
905905
) {
906906
crate::mailbox::headers::set_send_timestamp(&mut headers);
907-
cx.post(self.clone(), headers, serialized);
907+
cx.post(self.clone(), headers, serialized, true);
908908
}
909909

910910
/// Split this port, returning a new port that relays messages to the port
@@ -914,8 +914,14 @@ impl PortId {
914914
cx: &impl context::Actor,
915915
reducer_spec: Option<ReducerSpec>,
916916
reducer_opts: Option<ReducerOpts>,
917+
return_undeliverable: bool,
917918
) -> anyhow::Result<PortId> {
918-
cx.split(self.clone(), reducer_spec, reducer_opts)
919+
cx.split(
920+
self.clone(),
921+
reducer_spec,
922+
reducer_opts,
923+
return_undeliverable,
924+
)
919925
}
920926
}
921927

@@ -964,6 +970,7 @@ pub struct PortRef<M> {
964970
)]
965971
reducer_opts: Option<ReducerOpts>,
966972
phantom: PhantomData<M>,
973+
return_undeliverable: bool,
967974
}
968975

969976
impl<M: RemoteMessage> PortRef<M> {
@@ -975,6 +982,7 @@ impl<M: RemoteMessage> PortRef<M> {
975982
reducer_spec: None,
976983
reducer_opts: None,
977984
phantom: PhantomData,
985+
return_undeliverable: true,
978986
}
979987
}
980988

@@ -986,6 +994,7 @@ impl<M: RemoteMessage> PortRef<M> {
986994
reducer_spec,
987995
reducer_opts: None, // TODO: provide attest_reducible_opts
988996
phantom: PhantomData,
997+
return_undeliverable: true,
989998
}
990999
}
9911000

@@ -1052,13 +1061,24 @@ impl<M: RemoteMessage> PortRef<M> {
10521061
) {
10531062
crate::mailbox::headers::set_send_timestamp(&mut headers);
10541063
crate::mailbox::headers::set_rust_message_type::<M>(&mut headers);
1055-
cx.post(self.port_id.clone(), headers, message);
1064+
cx.post(
1065+
self.port_id.clone(),
1066+
headers,
1067+
message,
1068+
self.return_undeliverable,
1069+
);
10561070
}
10571071

10581072
/// Convert this port into a sink that can be used to send messages using the given capability.
10591073
pub fn into_sink<C: context::Actor>(self, cx: C) -> PortSink<C, M> {
10601074
PortSink::new(cx, self)
10611075
}
1076+
1077+
/// Set whether or not messages sent to this port that are undeliverable
1078+
/// should be returned to the sender.
1079+
pub fn return_undeliverable(&mut self, return_undeliverable: bool) {
1080+
self.return_undeliverable = return_undeliverable;
1081+
}
10621082
}
10631083

10641084
impl<M: RemoteMessage> Clone for PortRef<M> {
@@ -1068,6 +1088,7 @@ impl<M: RemoteMessage> Clone for PortRef<M> {
10681088
reducer_spec: self.reducer_spec.clone(),
10691089
reducer_opts: self.reducer_opts.clone(),
10701090
phantom: PhantomData,
1091+
return_undeliverable: self.return_undeliverable,
10711092
}
10721093
}
10731094
}
@@ -1080,7 +1101,12 @@ impl<M: RemoteMessage> fmt::Display for PortRef<M> {
10801101

10811102
/// The parameters extracted from [`PortRef`] to [`Bindings`].
10821103
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Named)]
1083-
pub struct UnboundPort(pub PortId, pub Option<ReducerSpec>, pub Option<ReducerOpts>);
1104+
pub struct UnboundPort(
1105+
pub PortId,
1106+
pub Option<ReducerSpec>,
1107+
pub Option<ReducerOpts>,
1108+
pub bool, // return_undeliverable
1109+
);
10841110

10851111
impl UnboundPort {
10861112
/// Update the port id of this binding.
@@ -1095,6 +1121,7 @@ impl<M: RemoteMessage> From<&PortRef<M>> for UnboundPort {
10951121
port_ref.port_id.clone(),
10961122
port_ref.reducer_spec.clone(),
10971123
port_ref.reducer_opts.clone(),
1124+
port_ref.return_undeliverable,
10981125
)
10991126
}
11001127
}
@@ -1111,6 +1138,7 @@ impl<M: RemoteMessage> Bind for PortRef<M> {
11111138
self.port_id = bound.0;
11121139
self.reducer_spec = bound.1;
11131140
self.reducer_opts = bound.2;
1141+
self.return_undeliverable = bound.3;
11141142
Ok(())
11151143
}
11161144
}
@@ -1163,7 +1191,7 @@ impl<M: RemoteMessage> OncePortRef<M> {
11631191
MailboxSenderErrorKind::Serialize(err.into()),
11641192
)
11651193
})?;
1166-
cx.post(self.port_id.clone(), headers, serialized);
1194+
cx.post(self.port_id.clone(), headers, serialized, true);
11671195
Ok(())
11681196
}
11691197
}

0 commit comments

Comments
 (0)