Skip to content

Commit 8480a57

Browse files
committed
Reformat ActorSupervisionEvent
Pull Request resolved: #1881 This changes the ActorSupervisionEvent structure so that we preserve enough information to give a good error message when an actor fails. The major changes are * removing jargon `processing error: superivision: `. * Adding user-understandable actor names. * identifying the actual actor that failed, and summarizing the default chain handling so that there are almost no wrappers around the error. Here are some examples of what it looks like now: When an actor directly errors: ``` I AM ABOUT TO ERROR!!!! Unhandled monarch error on the top-level client: The actor <root>.<tests.test_supervision_hierarchy.Lambda actor> and all its descendants have failed. This occurred because the actor itself failed. The error was: Traceback (most recent call last): File "/data/users/zdevito/fbsource/fbcode/monarch/python/monarch/_src/actor/actor_mesh.py", line 1068, in handle response_port.exception(ActorError(e)) File "/data/users/zdevito/fbsource/fbcode/monarch/python/monarch/_src/actor/actor_mesh.py", line 828, in exception raise obj from None File "/data/users/zdevito/fbsource/fbcode/monarch/python/monarch/_src/actor/actor_mesh.py", line 1062, in handle result = the_method(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/data/users/zdevito/fbsource/fbcode/monarch/python/tests/test_supervision_hierarchy.py", line 19, in run return l() ^^^ File "/data/users/zdevito/fbsource/fbcode/monarch/python/tests/test_supervision_hierarchy.py", line 46, in error raise ValueError("Error.") ValueError: Error. ``` When a nested actor errors: ``` python/tests/test_supervision_hierarchy.py::test_nested_mesh_kills_actor_actor_error Monarch internal logs are being written to /tmp/zdevito/monarch_log.log ERRORED THE ACTOR I AM ABOUT TO ERROR!!!! Nest still alive 0 Nest still alive 1 Nest still alive 2 Unhandled monarch error on the top-level client: The actor <root>.<tests.test_supervision_hierarchy.Nest actor> and all its descendants have failed. This occurred because the actor <root>.<tests.test_supervision_hierarchy.Nest actor>.<tests.test_supervision_hierarchy.Lambda nested> failed. The error was: Traceback (most recent call last): File "/data/users/zdevito/fbsource/fbcode/monarch/python/monarch/_src/actor/actor_mesh.py", line 1068, in handle response_port.exception(ActorError(e)) File "/data/users/zdevito/fbsource/fbcode/monarch/python/monarch/_src/actor/actor_mesh.py", line 828, in exception raise obj from None File "/data/users/zdevito/fbsource/fbcode/monarch/python/monarch/_src/actor/actor_mesh.py", line 1062, in handle result = the_method(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/data/users/zdevito/fbsource/fbcode/monarch/python/tests/test_supervision_hierarchy.py", line 19, in run return l() ^^^ File "/data/users/zdevito/fbsource/fbcode/monarch/python/tests/test_supervision_hierarchy.py", line 46, in error raise ValueError("Error.") ValueError: Error. ``` When a proc errors: ``` Unhandled monarch error on the top-level client: The actor <root>.<tests.test_supervision_hierarchy.Nest actor> and all its descendants have failed. This occurred because the actor unix:@eRu5gzLrP1kdciNpAErvY1Q9,anon_0_16inPfUmdpwZ,agent[0] failed. The error was: The process unix:@eRu5gzLrP1kdciNpAErvY1Q9 owned by this actor became unresponsive and is assumed dead, check the log on the host for details ``` The proc error includes the changes added in D86984496 to make agent failures more clean. We should eventually further improve this by making sure we generate a supervision event specific to process failure as noticed by the host agent. That should include a friendly name for the process (the processes name given during spawn, and its owning actor). . ghstack-source-id: 323426577 Differential Revision: [D86925582](https://our.internmc.facebook.com/intern/diff/D86925582/)
1 parent 785b9a4 commit 8480a57

File tree

18 files changed

+316
-191
lines changed

18 files changed

+316
-191
lines changed

hyperactor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ hostname = "0.3"
5757
humantime = "2.1"
5858
hyperactor_macros = { version = "0.0.0", path = "../hyperactor_macros" }
5959
hyperactor_telemetry = { version = "0.0.0", path = "../hyperactor_telemetry" }
60+
indenter = "0.3.3"
6061
inventory = "0.3.8"
6162
lazy_static = "1.5"
6263
local-ip-address = "0.5.3"

hyperactor/src/actor.rs

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,13 @@ pub trait Actor: Sized + Send + Debug + 'static {
151151
) -> Result<(), anyhow::Error> {
152152
handle_undeliverable_message(cx, envelope)
153153
}
154+
155+
/// If overridden, we will use this name in place of the
156+
/// ActorId for talking about this actor in supervision error
157+
/// messages.
158+
fn display_name(&self) -> Option<String> {
159+
None
160+
}
154161
}
155162

156163
/// Default implementation of [`Actor::handle_undeliverable_message`]. Defined
@@ -341,16 +348,24 @@ pub enum ActorErrorKind {
341348
#[error("{0}")]
342349
Generic(String),
343350

344-
/// An actor supervision event was not handled.
345-
#[error("supervision: {0}")]
351+
/// An error that occurred while trying to handle a supervision event.
352+
#[error("{0} while handling {1}")]
353+
ErrorDuringHandlingSupervision(String, Box<ActorSupervisionEvent>),
354+
/// The actor did not attempt to handle
355+
#[error("{0}")]
346356
UnhandledSupervisionEvent(Box<ActorSupervisionEvent>),
347357
}
348358

349359
impl ActorErrorKind {
350360
/// Error while processing actor, i.e., returned by the actor's
351361
/// processing method.
352362
pub fn processing(err: anyhow::Error) -> Self {
353-
Self::Generic(format!("processing error: {}", err))
363+
// Unbox err from the anyhow err. Check if it is an ActorErrorKind object.
364+
// If it is directly use it as the new ActorError's ActorErrorKind.
365+
// This lets us directly pass the ActorErrorKind::UnhandledSupervisionEvent
366+
// up the handling infrastructure.
367+
err.downcast::<ActorErrorKind>()
368+
.unwrap_or_else(|err| Self::Generic(err.to_string()))
354369
}
355370

356371
/// Unwound stracktrace of a panic.
@@ -435,15 +450,6 @@ impl From<MailboxSenderError> for ActorError {
435450
}
436451
}
437452

438-
impl From<ActorSupervisionEvent> for ActorError {
439-
fn from(inner: ActorSupervisionEvent) -> Self {
440-
Self {
441-
actor_id: Box::new(inner.actor_id.clone()),
442-
kind: Box::new(ActorErrorKind::UnhandledSupervisionEvent(Box::new(inner))),
443-
}
444-
}
445-
}
446-
447453
/// A collection of signals to control the behavior of the actor.
448454
/// Signals are internal runtime control plane messages and should not be
449455
/// sent outside of the runtime.

hyperactor/src/mailbox.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3143,7 +3143,7 @@ mod tests {
31433143
unreachable!()
31443144
};
31453145
assert!(msg.to_string().contains(
3146-
"processing error: a message from \
3146+
"a message from \
31473147
quux[0].foo[0] to corge[0].bar[0][9999] was undeliverable and returned"
31483148
));
31493149

hyperactor/src/mailbox/undeliverable.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,9 @@ pub fn supervise_undeliverable_messages_with<R, F>(
165165

166166
if let Err(e) = sink.send(ActorSupervisionEvent::new(
167167
actor_id,
168+
None,
168169
ActorStatus::generic_failure(format!("message not delivered: {}", env)),
169170
Some(headers),
170-
None,
171171
)) {
172172
tracing::warn!(
173173
%e,

hyperactor/src/proc.rs

Lines changed: 15 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -1169,7 +1169,6 @@ impl<A: Actor> Instance<A> {
11691169
let result = self
11701170
.run_actor_tree(&mut actor, actor_loop_receivers, &mut work_rx)
11711171
.await;
1172-
11731172
let event = match result {
11741173
Ok(_) => {
11751174
// actor should have been stopped by run_actor_tree
@@ -1189,11 +1188,12 @@ impl<A: Actor> Instance<A> {
11891188
}
11901189
_ => {
11911190
let error_kind = ActorErrorKind::Generic(err.kind.to_string());
1192-
self.change_status(ActorStatus::Failed(error_kind.clone()));
1191+
let status = ActorStatus::Failed(error_kind);
1192+
self.change_status(status.clone());
11931193
Some(ActorSupervisionEvent::new(
11941194
self.cell.actor_id().clone(),
1195-
ActorStatus::Failed(error_kind),
1196-
None,
1195+
actor.display_name(),
1196+
status,
11971197
None,
11981198
))
11991199
}
@@ -1387,7 +1387,11 @@ impl<A: Actor> Instance<A> {
13871387
for supervision_event in supervision_event_receiver.drain() {
13881388
self.handle_supervision_event(actor, supervision_event).await?;
13891389
}
1390-
return Err(ActorError::new(self.self_id(), ActorErrorKind::processing(err)));
1390+
let kind = ActorErrorKind::processing(err);
1391+
return Err(ActorError {
1392+
actor_id: Box::new(self.self_id().clone()),
1393+
kind: Box::new(kind),
1394+
});
13911395
}
13921396
}
13931397
signal = signal_receiver.recv() => {
@@ -1447,28 +1451,17 @@ impl<A: Actor> Instance<A> {
14471451
Ok(())
14481452
}
14491453
Ok(false) => {
1450-
// The supervision event wasn't handled by this actor, chain it and bubble it up.
1451-
let supervision_event = ActorSupervisionEvent::new(
1452-
self.self_id().clone(),
1453-
ActorStatus::generic_failure("did not handle supervision event"),
1454-
None,
1455-
Some(Box::new(supervision_event)),
1456-
);
1457-
Err(supervision_event.into())
1454+
let kind = ActorErrorKind::UnhandledSupervisionEvent(Box::new(supervision_event));
1455+
Err(ActorError::new(self.self_id(), kind))
14581456
}
14591457
Err(err) => {
14601458
// The actor failed to handle the supervision event, it should die.
14611459
// Create a new supervision event for this failure and propagate it.
1462-
let supervision_event = ActorSupervisionEvent::new(
1463-
self.self_id().clone(),
1464-
ActorStatus::generic_failure(format!(
1465-
"failed to handle supervision event: {}",
1466-
err
1467-
)),
1468-
None,
1469-
Some(Box::new(supervision_event)),
1460+
let kind = ActorErrorKind::ErrorDuringHandlingSupervision(
1461+
err.to_string(),
1462+
Box::new(supervision_event),
14701463
);
1471-
Err(supervision_event.into())
1464+
Err(ActorError::new(self.self_id(), kind))
14721465
}
14731466
}
14741467
}
@@ -2962,107 +2955,6 @@ mod tests {
29622955
);
29632956
}
29642957

2965-
#[tokio::test]
2966-
async fn test_supervision_event_handler_propagates() {
2967-
#[derive(Debug)]
2968-
struct FailingSupervisionActor;
2969-
2970-
#[async_trait]
2971-
impl Actor for FailingSupervisionActor {
2972-
type Params = ();
2973-
2974-
async fn new(_: ()) -> Result<Self, anyhow::Error> {
2975-
Ok(Self)
2976-
}
2977-
2978-
async fn handle_supervision_event(
2979-
&mut self,
2980-
_this: &Instance<Self>,
2981-
_event: &ActorSupervisionEvent,
2982-
) -> Result<bool, anyhow::Error> {
2983-
anyhow::bail!("failed to handle supervision event!")
2984-
}
2985-
}
2986-
2987-
#[async_trait]
2988-
impl Handler<String> for FailingSupervisionActor {
2989-
async fn handle(
2990-
&mut self,
2991-
_cx: &crate::Context<Self>,
2992-
message: String,
2993-
) -> anyhow::Result<()> {
2994-
Err(anyhow::anyhow!(message))
2995-
}
2996-
}
2997-
2998-
#[derive(Debug)]
2999-
struct ParentActor(tokio::sync::mpsc::UnboundedSender<ActorSupervisionEvent>);
3000-
3001-
#[async_trait]
3002-
impl Actor for ParentActor {
3003-
type Params = tokio::sync::mpsc::UnboundedSender<ActorSupervisionEvent>;
3004-
3005-
async fn new(
3006-
supervision_events: tokio::sync::mpsc::UnboundedSender<ActorSupervisionEvent>,
3007-
) -> Result<Self, anyhow::Error> {
3008-
Ok(Self(supervision_events))
3009-
}
3010-
3011-
async fn handle_supervision_event(
3012-
&mut self,
3013-
_this: &Instance<Self>,
3014-
event: &ActorSupervisionEvent,
3015-
) -> Result<bool, anyhow::Error> {
3016-
self.0.send(event.clone()).unwrap();
3017-
Ok(true)
3018-
}
3019-
}
3020-
3021-
let proc = Proc::local();
3022-
3023-
let (event_tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel();
3024-
3025-
let parent = proc.spawn::<ParentActor>("parent", event_tx).await.unwrap();
3026-
let child = proc
3027-
.spawn_child::<FailingSupervisionActor>(parent.cell().clone(), ())
3028-
.await
3029-
.unwrap();
3030-
let grandchild = proc
3031-
.spawn_child::<FailingSupervisionActor>(child.cell().clone(), ())
3032-
.await
3033-
.unwrap();
3034-
3035-
let child_actor_id = child.actor_id().clone();
3036-
let grandchild_actor_id = grandchild.actor_id().clone();
3037-
3038-
// Grandchild fails, triggering failure up the tree, finally receiving
3039-
// the event at the root.
3040-
grandchild.send("trigger failure".to_string()).unwrap();
3041-
3042-
assert!(grandchild.await.is_failed());
3043-
assert!(child.await.is_failed());
3044-
3045-
assert_eq!(
3046-
event_rx.recv().await.unwrap(),
3047-
// The time field is ignored for Eq and PartialEq.
3048-
ActorSupervisionEvent::new(
3049-
child_actor_id,
3050-
ActorStatus::generic_failure(
3051-
"failed to handle supervision event: failed to handle supervision event!"
3052-
),
3053-
None,
3054-
Some(Box::new(ActorSupervisionEvent::new(
3055-
grandchild_actor_id,
3056-
ActorStatus::generic_failure("processing error: trigger failure"),
3057-
None,
3058-
None,
3059-
))),
3060-
)
3061-
);
3062-
3063-
assert!(event_rx.try_recv().is_err());
3064-
}
3065-
30662958
#[tokio::test]
30672959
async fn test_instance() {
30682960
#[derive(Debug, Default, Actor)]

0 commit comments

Comments
 (0)