Skip to content

Commit 44f59c0

Browse files
committed
Reformat ActorSupervisionEvent
Differential Revision: [D86925582](https://our.internmc.facebook.com/intern/diff/D86925582/) ghstack-source-id: 323132637 Pull Request resolved: #1881
1 parent 5e9f0a2 commit 44f59c0

File tree

14 files changed

+302
-184
lines changed

14 files changed

+302
-184
lines changed

hyperactor/src/actor.rs

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,13 @@ pub trait Actor: Sized + Send + Debug + 'static {
131131
) -> Result<(), anyhow::Error> {
132132
handle_undeliverable_message(cx, envelope)
133133
}
134+
135+
/// If overridden, we will use this name in place of the
136+
/// ActorId for talking about this actor in supervision error
137+
/// messages.
138+
fn display_name(&self) -> Option<String> {
139+
None
140+
}
134141
}
135142

136143
/// Default implementation of [`Actor::handle_undeliverable_message`]. Defined
@@ -321,16 +328,24 @@ pub enum ActorErrorKind {
321328
#[error("{0}")]
322329
Generic(String),
323330

324-
/// An actor supervision event was not handled.
325-
#[error("supervision: {0}")]
331+
/// An error that occurred while trying to handle a supervision event.
332+
#[error("{0} while handling {1}")]
333+
ErrorDuringHandlingSupervision(String, Box<ActorSupervisionEvent>),
334+
/// The actor did not attempt to handle
335+
#[error("{0}")]
326336
UnhandledSupervisionEvent(Box<ActorSupervisionEvent>),
327337
}
328338

329339
impl ActorErrorKind {
330340
/// Error while processing actor, i.e., returned by the actor's
331341
/// processing method.
332342
pub fn processing(err: anyhow::Error) -> Self {
333-
Self::Generic(format!("processing error: {}", err))
343+
// Unbox err from the anyhow err. Check if it is an ActorErrorKind object.
344+
// If it is directly use it as the new ActorError's ActorErrorKind.
345+
// This lets us directly pass the ActorErrorKind::UnhandledSupervisionEvent
346+
// up the handling infrastructure.
347+
err.downcast::<ActorErrorKind>()
348+
.unwrap_or_else(|err| Self::Generic(err.to_string()))
334349
}
335350

336351
/// Unwound stracktrace of a panic.
@@ -410,15 +425,6 @@ impl From<MailboxSenderError> for ActorError {
410425
}
411426
}
412427

413-
impl From<ActorSupervisionEvent> for ActorError {
414-
fn from(inner: ActorSupervisionEvent) -> Self {
415-
Self {
416-
actor_id: Box::new(inner.actor_id.clone()),
417-
kind: Box::new(ActorErrorKind::UnhandledSupervisionEvent(Box::new(inner))),
418-
}
419-
}
420-
}
421-
422428
/// A collection of signals to control the behavior of the actor.
423429
/// Signals are internal runtime control plane messages and should not be
424430
/// sent outside of the runtime.

hyperactor/src/mailbox/undeliverable.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,9 @@ pub fn supervise_undeliverable_messages_with<R, F>(
165165

166166
if let Err(e) = sink.send(ActorSupervisionEvent::new(
167167
actor_id,
168+
None,
168169
ActorStatus::generic_failure(format!("message not delivered: {}", env)),
169170
Some(headers),
170-
None,
171171
)) {
172172
tracing::warn!(
173173
%e,

hyperactor/src/proc.rs

Lines changed: 12 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -1118,7 +1118,6 @@ impl<A: Actor> Instance<A> {
11181118
let result = self
11191119
.run_actor_tree(&mut actor, actor_loop_receivers, &mut work_rx)
11201120
.await;
1121-
11221121
let (actor_status, event) = match result {
11231122
Ok(_) => (ActorStatus::Stopped, None),
11241123
Err(ActorError {
@@ -1131,9 +1130,9 @@ impl<A: Actor> Instance<A> {
11311130
ActorStatus::Failed(error_kind.clone()),
11321131
Some(ActorSupervisionEvent::new(
11331132
self.cell.actor_id().clone(),
1133+
actor.display_name(),
11341134
ActorStatus::Failed(error_kind),
11351135
None,
1136-
None,
11371136
)),
11381137
)
11391138
}
@@ -1286,7 +1285,11 @@ impl<A: Actor> Instance<A> {
12861285
for supervision_event in supervision_event_receiver.drain() {
12871286
self.handle_supervision_event(actor, supervision_event).await?;
12881287
}
1289-
return Err(ActorError::new(self.self_id(), ActorErrorKind::processing(err)));
1288+
let kind = ActorErrorKind::processing(err);
1289+
return Err(ActorError {
1290+
actor_id: Box::new(self.self_id().clone()),
1291+
kind: Box::new(kind),
1292+
});
12901293
}
12911294
}
12921295
signal = signal_receiver.recv() => {
@@ -1346,28 +1349,17 @@ impl<A: Actor> Instance<A> {
13461349
Ok(())
13471350
}
13481351
Ok(false) => {
1349-
// The supervision event wasn't handled by this actor, chain it and bubble it up.
1350-
let supervision_event = ActorSupervisionEvent::new(
1351-
self.self_id().clone(),
1352-
ActorStatus::generic_failure("did not handle supervision event"),
1353-
None,
1354-
Some(Box::new(supervision_event)),
1355-
);
1356-
Err(supervision_event.into())
1352+
let kind = ActorErrorKind::UnhandledSupervisionEvent(Box::new(supervision_event));
1353+
Err(ActorError::new(self.self_id(), kind))
13571354
}
13581355
Err(err) => {
13591356
// The actor failed to handle the supervision event, it should die.
13601357
// Create a new supervision event for this failure and propagate it.
1361-
let supervision_event = ActorSupervisionEvent::new(
1362-
self.self_id().clone(),
1363-
ActorStatus::generic_failure(format!(
1364-
"failed to handle supervision event: {}",
1365-
err
1366-
)),
1367-
None,
1368-
Some(Box::new(supervision_event)),
1358+
let kind = ActorErrorKind::ErrorDuringHandlingSupervision(
1359+
err.to_string(),
1360+
Box::new(supervision_event),
13691361
);
1370-
Err(supervision_event.into())
1362+
Err(ActorError::new(self.self_id(), kind))
13711363
}
13721364
}
13731365
}
@@ -2857,107 +2849,6 @@ mod tests {
28572849
);
28582850
}
28592851

2860-
#[tokio::test]
2861-
async fn test_supervision_event_handler_propagates() {
2862-
#[derive(Debug)]
2863-
struct FailingSupervisionActor;
2864-
2865-
#[async_trait]
2866-
impl Actor for FailingSupervisionActor {
2867-
type Params = ();
2868-
2869-
async fn new(_: ()) -> Result<Self, anyhow::Error> {
2870-
Ok(Self)
2871-
}
2872-
2873-
async fn handle_supervision_event(
2874-
&mut self,
2875-
_this: &Instance<Self>,
2876-
_event: &ActorSupervisionEvent,
2877-
) -> Result<bool, anyhow::Error> {
2878-
anyhow::bail!("failed to handle supervision event!")
2879-
}
2880-
}
2881-
2882-
#[async_trait]
2883-
impl Handler<String> for FailingSupervisionActor {
2884-
async fn handle(
2885-
&mut self,
2886-
_cx: &crate::Context<Self>,
2887-
message: String,
2888-
) -> anyhow::Result<()> {
2889-
Err(anyhow::anyhow!(message))
2890-
}
2891-
}
2892-
2893-
#[derive(Debug)]
2894-
struct ParentActor(tokio::sync::mpsc::UnboundedSender<ActorSupervisionEvent>);
2895-
2896-
#[async_trait]
2897-
impl Actor for ParentActor {
2898-
type Params = tokio::sync::mpsc::UnboundedSender<ActorSupervisionEvent>;
2899-
2900-
async fn new(
2901-
supervision_events: tokio::sync::mpsc::UnboundedSender<ActorSupervisionEvent>,
2902-
) -> Result<Self, anyhow::Error> {
2903-
Ok(Self(supervision_events))
2904-
}
2905-
2906-
async fn handle_supervision_event(
2907-
&mut self,
2908-
_this: &Instance<Self>,
2909-
event: &ActorSupervisionEvent,
2910-
) -> Result<bool, anyhow::Error> {
2911-
self.0.send(event.clone()).unwrap();
2912-
Ok(true)
2913-
}
2914-
}
2915-
2916-
let proc = Proc::local();
2917-
2918-
let (event_tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel();
2919-
2920-
let parent = proc.spawn::<ParentActor>("parent", event_tx).await.unwrap();
2921-
let child = proc
2922-
.spawn_child::<FailingSupervisionActor>(parent.cell().clone(), ())
2923-
.await
2924-
.unwrap();
2925-
let grandchild = proc
2926-
.spawn_child::<FailingSupervisionActor>(child.cell().clone(), ())
2927-
.await
2928-
.unwrap();
2929-
2930-
let child_actor_id = child.actor_id().clone();
2931-
let grandchild_actor_id = grandchild.actor_id().clone();
2932-
2933-
// Grandchild fails, triggering failure up the tree, finally receiving
2934-
// the event at the root.
2935-
grandchild.send("trigger failure".to_string()).unwrap();
2936-
2937-
assert!(grandchild.await.is_failed());
2938-
assert!(child.await.is_failed());
2939-
2940-
assert_eq!(
2941-
event_rx.recv().await.unwrap(),
2942-
// The time field is ignored for Eq and PartialEq.
2943-
ActorSupervisionEvent::new(
2944-
child_actor_id,
2945-
ActorStatus::generic_failure(
2946-
"failed to handle supervision event: failed to handle supervision event!"
2947-
),
2948-
None,
2949-
Some(Box::new(ActorSupervisionEvent::new(
2950-
grandchild_actor_id,
2951-
ActorStatus::generic_failure("processing error: trigger failure"),
2952-
None,
2953-
None,
2954-
))),
2955-
)
2956-
);
2957-
2958-
assert!(event_rx.try_recv().is_err());
2959-
}
2960-
29612852
#[tokio::test]
29622853
async fn test_instance() {
29632854
#[derive(Debug, Default, Actor)]

0 commit comments

Comments
 (0)