Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 37 additions & 11 deletions hyperactor/src/supervision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,23 +72,49 @@ impl ActorSupervisionEvent {
status => status,
}
}

/// Returns true if this is a user actor event, false if it's a system actor event.
/// System actors (like "agent") are internal to Monarch and should not be exposed to users.
pub fn is_user_actor_event(&self) -> bool {
self.actor_id.name() != "agent"
}
}

impl std::error::Error for ActorSupervisionEvent {}

impl fmt::Display for ActorSupervisionEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}: {} at {}",
self.actor_id,
self.actor_status,
DateTime::<Local>::from(self.occurred_at),
)?;
if let Some(message_headers) = &self.message_headers {
let headers = serde_json::to_string(&message_headers)
.expect("could not serialize message headers");
write!(f, " (headers: {})", headers)?;
if self.is_user_actor_event() {
write!(
f,
"{}: {} at {}",
self.actor_id,
self.actor_status,
DateTime::<Local>::from(self.occurred_at),
)?;
if let Some(message_headers) = &self.message_headers {
let headers = serde_json::to_string(&message_headers)
.expect("could not serialize message headers");
write!(f, " (headers: {})", headers)?;
}
} else {
// System actor event - show simplified message
match self.actor_id.proc_id() {
crate::reference::ProcId::Direct(addr, _) => {
write!(
f,
"{} is not reacheable, check the log on the host for details",
addr
)?;
}
crate::reference::ProcId::Ranked(_, _) => {
write!(
f,
"{} is not reacheable, check the log on the host for details",
self.actor_id.proc_id()
)?;
}
}
}
if let Some(caused_by) = &self.caused_by {
write!(f, ": (caused by: {})", caused_by)?;
Expand Down
23 changes: 15 additions & 8 deletions monarch_hyperactor/src/v1/actor_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,15 +442,22 @@ fn send_state_change<F>(
} else {
Unhealthy::Crashed(event.clone())
};
let event_actor_id = event.actor_id.clone();
let py_event = PyActorSupervisionEvent::from(event.clone());
let pyerr = PyErr::new::<SupervisionError, _>(format!(
"Actor {} exited because of the following reason: {}",
event_actor_id,
py_event
.__repr__()
.expect("repr failed on PyActorSupervisionEvent")
));
let pyerr = if event.is_user_actor_event() {
PyErr::new::<SupervisionError, _>(format!(
"Actor {} exited because of the following reason: {}",
event.actor_id,
py_event
.__repr__()
.expect("repr failed on PyActorSupervisionEvent")
))
} else {
PyErr::new::<SupervisionError, _>(
py_event
.__repr__()
.expect("repr failed on PyActorSupervisionEvent"),
)
};
sender.send(Some(pyerr)).expect("receiver closed");
}

Expand Down
2 changes: 1 addition & 1 deletion python/tests/test_actor_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ async def test_supervision_with_proc_mesh_stopped(mesh) -> None:
# new call should fail with check of health state of actor mesh
with pytest.raises(
SupervisionError,
match="actor mesh is stopped due to proc mesh shutdown|Actor .* exited because of the following reason.*stopped",
match="actor mesh is stopped due to proc mesh shutdown|.* is not reacheable, check the log on the host for details",
):
await actor_mesh.check.call()

Expand Down
Loading