Skip to content

Commit 6b7e0e6

Browse files
pzhan9facebook-github-bot
authored andcommitted
Add asserts for actor status transition (#1825)
Summary: Adding asserts so we can be more certain on the the actor status transition. Reviewed By: mariusae Differential Revision: D86772033
1 parent adcdb8e commit 6b7e0e6

File tree

2 files changed

+56
-27
lines changed

2 files changed

+56
-27
lines changed

hyperactor/src/actor.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::sync::Arc;
2020
use std::time::SystemTime;
2121

2222
use async_trait::async_trait;
23+
use enum_as_inner::EnumAsInner;
2324
use futures::FutureExt;
2425
use futures::future::BoxFuture;
2526
use serde::Deserialize;
@@ -472,7 +473,16 @@ impl fmt::Display for Signal {
472473
}
473474

474475
/// The runtime status of an actor.
475-
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Named)]
476+
#[derive(
477+
Debug,
478+
Serialize,
479+
Deserialize,
480+
PartialEq,
481+
Eq,
482+
Clone,
483+
Named,
484+
EnumAsInner
485+
)]
476486
pub enum ActorStatus {
477487
/// The actor status is unknown.
478488
Unknown,
@@ -505,12 +515,7 @@ pub enum ActorStatus {
505515
impl ActorStatus {
506516
/// Tells whether the status is a terminal state.
507517
pub fn is_terminal(&self) -> bool {
508-
matches!(self, Self::Stopped | Self::Failed(_))
509-
}
510-
511-
/// Tells whether the status represents a failure.
512-
pub fn is_failed(&self) -> bool {
513-
matches!(self, Self::Failed(_))
518+
self.is_stopped() || self.is_failed()
514519
}
515520

516521
/// Create a generic failure status with the provided error message.

hyperactor/src/proc.rs

Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,6 +1028,14 @@ impl<A: Actor> Instance<A> {
10281028
self.status_tx.send_replace(new.clone());
10291029
}
10301030

1031+
fn is_terminal(&self) -> bool {
1032+
self.status_tx.borrow().is_terminal()
1033+
}
1034+
1035+
fn is_stopped(&self) -> bool {
1036+
self.status_tx.borrow().is_stopped()
1037+
}
1038+
10311039
/// This instance's actor ID.
10321040
pub fn self_id(&self) -> &ActorId {
10331041
self.mailbox.actor_id()
@@ -1119,23 +1127,34 @@ impl<A: Actor> Instance<A> {
11191127
.run_actor_tree(&mut actor, actor_loop_receivers, &mut work_rx)
11201128
.await;
11211129

1122-
let (actor_status, event) = match result {
1123-
Ok(_) => (ActorStatus::Stopped, None),
1124-
Err(ActorError {
1125-
kind: box ActorErrorKind::UnhandledSupervisionEvent(box event),
1126-
..
1127-
}) => (event.actor_status.clone(), Some(event)),
1130+
let event = match result {
1131+
Ok(_) => {
1132+
// actor should have been stopped by run_actor_tree
1133+
assert!(self.is_stopped());
1134+
None
1135+
}
11281136
Err(err) => {
1129-
let error_kind = ActorErrorKind::Generic(err.kind.to_string());
1130-
(
1131-
ActorStatus::Failed(error_kind.clone()),
1132-
Some(ActorSupervisionEvent::new(
1133-
self.cell.actor_id().clone(),
1134-
ActorStatus::Failed(error_kind),
1135-
None,
1136-
None,
1137-
)),
1138-
)
1137+
match *err.kind {
1138+
ActorErrorKind::UnhandledSupervisionEvent(box event) => {
1139+
// Currently only terminated actors are allowed to raise supervision events.
1140+
// If we want to change that in the future, we need to modify the exit
1141+
// status here too, because we use event's actor_status as this actor's
1142+
// terminal status.
1143+
assert!(event.actor_status.is_terminal());
1144+
self.change_status(event.actor_status.clone());
1145+
Some(event)
1146+
}
1147+
_ => {
1148+
let error_kind = ActorErrorKind::Generic(err.kind.to_string());
1149+
self.change_status(ActorStatus::Failed(error_kind.clone()));
1150+
Some(ActorSupervisionEvent::new(
1151+
self.cell.actor_id().clone(),
1152+
ActorStatus::Failed(error_kind),
1153+
None,
1154+
None,
1155+
))
1156+
}
1157+
}
11391158
}
11401159
};
11411160

@@ -1164,7 +1183,6 @@ impl<A: Actor> Instance<A> {
11641183
self.proc.handle_supervision_event(event);
11651184
}
11661185
}
1167-
self.change_status(actor_status);
11681186
}
11691187

11701188
/// Runs the actor, and manages its supervision tree. When the function returns,
@@ -1207,10 +1225,16 @@ impl<A: Actor> Instance<A> {
12071225
}
12081226
};
12091227

1210-
if let Err(ref err) = result {
1211-
tracing::error!("{}: actor failure: {}", self.self_id(), err);
1228+
match &result {
1229+
Ok(_) => assert!(self.is_stopped()),
1230+
Err(err) => {
1231+
tracing::error!("{}: actor failure: {}", self.self_id(), err);
1232+
assert!(!self.is_terminal());
1233+
// Send Stopping instead of Failed, because we still need to
1234+
// unlink child actors.
1235+
self.change_status(ActorStatus::Stopping);
1236+
}
12121237
}
1213-
self.change_status(ActorStatus::Stopping);
12141238

12151239
// After this point, we know we won't spawn any more children,
12161240
// so we can safely read the current child keys.

0 commit comments

Comments
 (0)