Skip to content

Commit 40eca52

Browse files
committed
[monarch_hyperactor] propagate proc status in supervision events for proc failures
Pull Request resolved: #1877 Currently, we report the agent to be Stopped. This is accurate, but confusing, and could be better attributed. Here, we synthesize an actor failure by: 1) attributing the fault to the corresponding actor in the monitored actor mesh; 2) elevating the proc_status (which contains mode of failure, exit code, etc) into the actor failure, making it clear it is a process failure In the future: 1) We will have a more general "Failure" struct, explicitly capturing host, proc, actor, etc., failures. 2) We will attribute the actor failure (it is the most proximate), but move the proc failure to the "cause" (i.e., proc failure caused actor to fail), which is the most correct and clear. ghstack-source-id: 323362899 @exported-using-ghexport Differential Revision: [D86993889](https://our.internmc.facebook.com/intern/diff/D86993889/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D86993889/)!
1 parent 7194160 commit 40eca52

File tree

3 files changed

+25
-12
lines changed

3 files changed

+25
-12
lines changed

hyperactor_mesh/src/v1/host_mesh/mesh_agent.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ impl Handler<resource::GetState<ProcState>> for HostMeshAgent {
355355
cx: &Context<Self>,
356356
get_state: resource::GetState<ProcState>,
357357
) -> anyhow::Result<()> {
358-
let manager = self
358+
let manager: Option<&BootstrapProcManager> = self
359359
.host
360360
.as_mut()
361361
.expect("host")

monarch_hyperactor/src/v1/actor_mesh.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@ use hyperactor::Actor;
1515
use hyperactor::ActorHandle;
1616
use hyperactor::ActorRef;
1717
use hyperactor::RemoteMessage;
18+
use hyperactor::actor::ActorErrorKind;
1819
use hyperactor::actor::ActorStatus;
1920
use hyperactor::actor::Referable;
2021
use hyperactor::actor::RemotableActor;
2122
use hyperactor::clock::Clock;
2223
use hyperactor::clock::RealClock;
2324
use hyperactor::context;
2425
use hyperactor::supervision::ActorSupervisionEvent;
26+
use hyperactor_mesh::bootstrap::ProcStatus;
2527
use hyperactor_mesh::dashmap::DashMap;
2628
use hyperactor_mesh::proc_mesh::mesh_agent::ActorState;
2729
use hyperactor_mesh::resource;
@@ -466,7 +468,7 @@ fn send_state_change<F>(
466468
/// * is_owned is true if this monitor is running on the owning instance. When true,
467469
/// a message will be sent to "owner" if it is not None. If owner is None,
468470
/// then a panic will be raised instead to crash the client.
469-
/// * time_between_tasks controls how frequently to poll.
471+
/// * time_between_tasks 1trols how frequently to poll.
470472
async fn actor_states_monitor<A, F>(
471473
cx: &impl context::Actor,
472474
mesh: ActorMeshRef<A>,
@@ -502,7 +504,7 @@ async fn actor_states_monitor<A, F>(
502504
ActorSupervisionEvent::new(
503505
cx.instance().self_id().clone(),
504506
ActorStatus::generic_failure(format!(
505-
"Unable to query for proc states: {:?}",
507+
"unable to query for proc states: {:?}",
506508
e
507509
)),
508510
None,
@@ -520,18 +522,27 @@ async fn actor_states_monitor<A, F>(
520522
}
521523
if let Some(proc_states) = proc_states.unwrap() {
522524
// Check if the proc mesh is still alive.
523-
if let Some((rank, state)) = proc_states
525+
if let Some((point, state)) = proc_states
524526
.iter()
525527
.find(|(_rank, state)| state.status.is_terminating())
526528
{
527529
send_state_change(
528-
rank.rank(),
530+
point.rank(),
529531
ActorSupervisionEvent::new(
530-
state
531-
.state
532-
.map(|s| s.mesh_agent.actor_id().clone())
533-
.unwrap_or(cx.instance().self_id().clone()),
534-
ActorStatus::Stopped,
532+
// Attribute this to the monitored actor, even if the underlying
533+
// cause is a proc_failure. We propagate the cause explicitly.
534+
mesh.get(point.rank()).unwrap().actor_id().clone(),
535+
// TODO: allow "actor supervision event" to be general, and
536+
// make the proc failure the cause.
537+
ActorStatus::Failed(ActorErrorKind::Generic(format!(
538+
"process failure: {}",
539+
state
540+
.state
541+
.and_then(|state| state.proc_status)
542+
.unwrap_or_else(|| ProcStatus::Failed {
543+
reason: "unknown".to_string()
544+
})
545+
))),
535546
None,
536547
None,
537548
),
@@ -555,7 +566,7 @@ async fn actor_states_monitor<A, F>(
555566
ActorSupervisionEvent::new(
556567
cx.instance().self_id().clone(),
557568
ActorStatus::generic_failure(format!(
558-
"Unable to query for actor states: {:?}",
569+
"unable to query for actor states: {:?}",
559570
e
560571
)),
561572
None,

python/tests/test_actor_error.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -652,6 +652,7 @@ async def test_sigsegv_handling():
652652

653653
# Depending on the timing, any of these messages could come back first.
654654
error_msg = (
655+
"actor mesh is stopped due to proc mesh shutdown|"
655656
'actor mesh is stopped due to proc mesh shutdown.*Failed\\("Killed\\(sig=11.*\\)"\\)|'
656657
"Actor .* exited because of the following reason|"
657658
"Actor .* is unhealthy with reason"
@@ -692,7 +693,8 @@ async def test_supervision_with_proc_mesh_stopped(mesh) -> None:
692693
# new call should fail with check of health state of actor mesh
693694
with pytest.raises(
694695
SupervisionError,
695-
match="actor mesh is stopped due to proc mesh shutdown|Actor .* exited because of the following reason.*stopped",
696+
match="actor mesh is stopped due to proc mesh shutdown"
697+
+ "|Actor .* exited because of the following reason.*process failure",
696698
):
697699
await actor_mesh.check.call()
698700

0 commit comments

Comments
 (0)