Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hyperactor_mesh/src/v1/host_mesh/mesh_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ impl Handler<resource::GetState<ProcState>> for HostMeshAgent {
cx: &Context<Self>,
get_state: resource::GetState<ProcState>,
) -> anyhow::Result<()> {
let manager = self
let manager: Option<&BootstrapProcManager> = self
.host
.as_mut()
.expect("host")
Expand Down
46 changes: 36 additions & 10 deletions monarch_hyperactor/src/v1/actor_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ use hyperactor::Actor;
use hyperactor::ActorHandle;
use hyperactor::ActorRef;
use hyperactor::RemoteMessage;
use hyperactor::actor::ActorErrorKind;
use hyperactor::actor::ActorStatus;
use hyperactor::actor::Referable;
use hyperactor::actor::RemotableActor;
use hyperactor::clock::Clock;
use hyperactor::clock::RealClock;
use hyperactor::context;
use hyperactor::supervision::ActorSupervisionEvent;
use hyperactor_mesh::bootstrap::ProcStatus;
use hyperactor_mesh::dashmap::DashMap;
use hyperactor_mesh::proc_mesh::mesh_agent::ActorState;
use hyperactor_mesh::resource;
Expand Down Expand Up @@ -466,7 +468,7 @@ fn send_state_change<F>(
/// * is_owned is true if this monitor is running on the owning instance. When true,
/// a message will be sent to "owner" if it is not None. If owner is None,
/// then a panic will be raised instead to crash the client.
/// * time_between_tasks controls how frequently to poll.
/// * time_between_tasks 1trols how frequently to poll.
async fn actor_states_monitor<A, F>(
cx: &impl context::Actor,
mesh: ActorMeshRef<A>,
Expand Down Expand Up @@ -502,7 +504,7 @@ async fn actor_states_monitor<A, F>(
ActorSupervisionEvent::new(
cx.instance().self_id().clone(),
ActorStatus::generic_failure(format!(
"Unable to query for proc states: {:?}",
"unable to query for proc states: {:?}",
e
)),
None,
Expand All @@ -520,18 +522,42 @@ async fn actor_states_monitor<A, F>(
}
if let Some(proc_states) = proc_states.unwrap() {
// Check if the proc mesh is still alive.
if let Some((rank, state)) = proc_states
if let Some((point, state)) = proc_states
.iter()
.find(|(_rank, state)| state.status.is_terminating())
{
// TODO: allow "actor supervision event" to be general, and
// make the proc failure the cause. It is a hack to try to determine
// the correct status based on process exit status.
let actor_status = match state.state.and_then(|s| s.proc_status) {
Some(ProcStatus::Stopped { .. })
// SIGTERM
| Some(ProcStatus::Killed { signal: 15, .. })
// Conservatively treat lack of status as stopped
| None => ActorStatus::Stopped,

Some(status) => ActorStatus::Failed(ActorErrorKind::Generic(format!(
"process failure: {}",
status
))),
};

send_state_change(
rank.rank(),
point.rank(),
ActorSupervisionEvent::new(
state
.state
.map(|s| s.mesh_agent.actor_id().clone())
.unwrap_or(cx.instance().self_id().clone()),
ActorStatus::Stopped,
// Attribute this to the monitored actor, even if the underlying
// cause is a proc_failure. We propagate the cause explicitly.
mesh.get(point.rank()).unwrap().actor_id().clone(),
actor_status,
// ActorStatus::Failed(ActorErrorKind::Generic(format!(
// "process failure: {}",
// state
// .state
// .and_then(|state| state.proc_status)
// .unwrap_or_else(|| ProcStatus::Failed {
// reason: "unknown".to_string()
// })
// ))),
None,
None,
),
Expand All @@ -555,7 +581,7 @@ async fn actor_states_monitor<A, F>(
ActorSupervisionEvent::new(
cx.instance().self_id().clone(),
ActorStatus::generic_failure(format!(
"Unable to query for actor states: {:?}",
"unable to query for actor states: {:?}",
e
)),
None,
Expand Down
4 changes: 3 additions & 1 deletion python/tests/test_actor_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,7 @@ async def test_sigsegv_handling():

# Depending on the timing, any of these messages could come back first.
error_msg = (
"actor mesh is stopped due to proc mesh shutdown|"
'actor mesh is stopped due to proc mesh shutdown.*Failed\\("Killed\\(sig=11.*\\)"\\)|'
"Actor .* exited because of the following reason|"
"Actor .* is unhealthy with reason"
Expand Down Expand Up @@ -692,7 +693,8 @@ async def test_supervision_with_proc_mesh_stopped(mesh) -> None:
# new call should fail with check of health state of actor mesh
with pytest.raises(
SupervisionError,
match="actor mesh is stopped due to proc mesh shutdown|Actor .* exited because of the following reason.*stopped",
match="actor mesh is stopped due to proc mesh shutdown"
+ "|Actor .* exited because of the following reason.*stopped",
):
await actor_mesh.check.call()

Expand Down