diff --git a/hyperactor_mesh/src/v1/host_mesh/mesh_agent.rs b/hyperactor_mesh/src/v1/host_mesh/mesh_agent.rs index edd0e4076..9d89d64e0 100644 --- a/hyperactor_mesh/src/v1/host_mesh/mesh_agent.rs +++ b/hyperactor_mesh/src/v1/host_mesh/mesh_agent.rs @@ -355,7 +355,7 @@ impl Handler> for HostMeshAgent { cx: &Context, get_state: resource::GetState, ) -> anyhow::Result<()> { - let manager = self + let manager: Option<&BootstrapProcManager> = self .host .as_mut() .expect("host") diff --git a/monarch_hyperactor/src/v1/actor_mesh.rs b/monarch_hyperactor/src/v1/actor_mesh.rs index 85850ea0c..4fd885ca7 100644 --- a/monarch_hyperactor/src/v1/actor_mesh.rs +++ b/monarch_hyperactor/src/v1/actor_mesh.rs @@ -15,6 +15,7 @@ use hyperactor::Actor; use hyperactor::ActorHandle; use hyperactor::ActorRef; use hyperactor::RemoteMessage; +use hyperactor::actor::ActorErrorKind; use hyperactor::actor::ActorStatus; use hyperactor::actor::Referable; use hyperactor::actor::RemotableActor; @@ -22,6 +23,7 @@ use hyperactor::clock::Clock; use hyperactor::clock::RealClock; use hyperactor::context; use hyperactor::supervision::ActorSupervisionEvent; +use hyperactor_mesh::bootstrap::ProcStatus; use hyperactor_mesh::dashmap::DashMap; use hyperactor_mesh::proc_mesh::mesh_agent::ActorState; use hyperactor_mesh::resource; @@ -466,7 +468,7 @@ fn send_state_change( /// * is_owned is true if this monitor is running on the owning instance. When true, /// a message will be sent to "owner" if it is not None. If owner is None, /// then a panic will be raised instead to crash the client. -/// * time_between_tasks controls how frequently to poll. +/// * time_between_tasks 1trols how frequently to poll. async fn actor_states_monitor( cx: &impl context::Actor, mesh: ActorMeshRef, @@ -502,7 +504,7 @@ async fn actor_states_monitor( ActorSupervisionEvent::new( cx.instance().self_id().clone(), ActorStatus::generic_failure(format!( - "Unable to query for proc states: {:?}", + "unable to query for proc states: {:?}", e )), None, @@ -520,18 +522,42 @@ async fn actor_states_monitor( } if let Some(proc_states) = proc_states.unwrap() { // Check if the proc mesh is still alive. - if let Some((rank, state)) = proc_states + if let Some((point, state)) = proc_states .iter() .find(|(_rank, state)| state.status.is_terminating()) { + // TODO: allow "actor supervision event" to be general, and + // make the proc failure the cause. It is a hack to try to determine + // the correct status based on process exit status. + let actor_status = match state.state.and_then(|s| s.proc_status) { + Some(ProcStatus::Stopped { .. }) + // SIGTERM + | Some(ProcStatus::Killed { signal: 15, .. }) + // Conservatively treat lack of status as stopped + | None => ActorStatus::Stopped, + + Some(status) => ActorStatus::Failed(ActorErrorKind::Generic(format!( + "process failure: {}", + status + ))), + }; + send_state_change( - rank.rank(), + point.rank(), ActorSupervisionEvent::new( - state - .state - .map(|s| s.mesh_agent.actor_id().clone()) - .unwrap_or(cx.instance().self_id().clone()), - ActorStatus::Stopped, + // Attribute this to the monitored actor, even if the underlying + // cause is a proc_failure. We propagate the cause explicitly. + mesh.get(point.rank()).unwrap().actor_id().clone(), + actor_status, + // ActorStatus::Failed(ActorErrorKind::Generic(format!( + // "process failure: {}", + // state + // .state + // .and_then(|state| state.proc_status) + // .unwrap_or_else(|| ProcStatus::Failed { + // reason: "unknown".to_string() + // }) + // ))), None, None, ), @@ -555,7 +581,7 @@ async fn actor_states_monitor( ActorSupervisionEvent::new( cx.instance().self_id().clone(), ActorStatus::generic_failure(format!( - "Unable to query for actor states: {:?}", + "unable to query for actor states: {:?}", e )), None, diff --git a/python/tests/test_actor_error.py b/python/tests/test_actor_error.py index 144d105f5..1702a1985 100644 --- a/python/tests/test_actor_error.py +++ b/python/tests/test_actor_error.py @@ -652,6 +652,7 @@ async def test_sigsegv_handling(): # Depending on the timing, any of these messages could come back first. error_msg = ( + "actor mesh is stopped due to proc mesh shutdown|" 'actor mesh is stopped due to proc mesh shutdown.*Failed\\("Killed\\(sig=11.*\\)"\\)|' "Actor .* exited because of the following reason|" "Actor .* is unhealthy with reason" @@ -692,7 +693,8 @@ async def test_supervision_with_proc_mesh_stopped(mesh) -> None: # new call should fail with check of health state of actor mesh with pytest.raises( SupervisionError, - match="actor mesh is stopped due to proc mesh shutdown|Actor .* exited because of the following reason.*stopped", + match="actor mesh is stopped due to proc mesh shutdown" + + "|Actor .* exited because of the following reason.*stopped", ): await actor_mesh.check.call()