Skip to content

Commit 8b5f851

Browse files
dulinrileymeta-codesync[bot]
authored andcommitted
After Stop and StopAll messages, use GetRankStatus to query for success (#1838)
Summary: Pull Request resolved: #1838 The StopAll message didn't have a reply, so there could be a race between asking all actors to stop and sending a SIGTERM to kill them. Avoid this by adding a new GetAllRankStatus message, a parallel of GetRankStatus without a particular name. In order to cut down on the proliferation of rank replies, removed the reply from Stop. The pre-existing GetRankStatus is used now to fetch whether the actor did successfully stop. In order for a second message to work, we need to *not* destroy the agent actor on the proc, so we add a new argument to `destroy_and_wait`. This way you can send a StopAll, and then actually spawn new actors after that point, making the system a bit more flexible. Reviewed By: mariusae Differential Revision: D86137850 fbshipit-source-id: 08059caba4cdf741b205fe14192afadb298b6867
1 parent e0920bb commit 8b5f851

File tree

8 files changed

+263
-139
lines changed

8 files changed

+263
-139
lines changed

hyperactor/src/proc.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -659,11 +659,30 @@ impl Proc {
659659
/// If `cx` is specified, it means this method was called from inside an actor
660660
/// in which case we shouldn't wait for it to stop and need to delay aborting
661661
/// its task.
662-
#[hyperactor::instrument]
663662
pub async fn destroy_and_wait<A: Actor>(
664663
&mut self,
665664
timeout: Duration,
666665
cx: Option<&Context<'_, A>>,
666+
) -> Result<(Vec<ActorId>, Vec<ActorId>), anyhow::Error> {
667+
self.destroy_and_wait_except_current::<A>(timeout, cx, false)
668+
.await
669+
}
670+
671+
/// Stop the proc. Returns a pair of:
672+
/// - the actors observed to stop;
673+
/// - the actors not observed to stop when timeout.
674+
///
675+
/// If `cx` is specified, it means this method was called from inside an actor
676+
/// in which case we shouldn't wait for it to stop and need to delay aborting
677+
/// its task.
678+
/// If except_current is true, don't stop the actor represented by "cx" at
679+
/// all.
680+
#[hyperactor::instrument]
681+
pub async fn destroy_and_wait_except_current<A: Actor>(
682+
&mut self,
683+
timeout: Duration,
684+
cx: Option<&Context<'_, A>>,
685+
except_current: bool,
667686
) -> Result<(Vec<ActorId>, Vec<ActorId>), anyhow::Error> {
668687
tracing::debug!("{}: proc stopping", self.proc_id());
669688

@@ -736,6 +755,7 @@ impl Proc {
736755

737756
if let Some(this_handle) = this_handle
738757
&& let Some(this_actor_id) = this_actor_id
758+
&& !except_current
739759
{
740760
tracing::debug!("{}: aborting (delayed) {:?}", this_actor_id, this_handle);
741761
this_handle.abort()

hyperactor_mesh/src/bootstrap.rs

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ use crate::logging::OutputTarget;
7272
use crate::logging::StreamFwder;
7373
use crate::proc_mesh::mesh_agent::ProcMeshAgent;
7474
use crate::resource;
75+
use crate::resource::Status;
7576
use crate::v1;
7677
use crate::v1::host_mesh::mesh_agent::HostAgentMode;
7778
use crate::v1::host_mesh::mesh_agent::HostMeshAgent;
@@ -1200,6 +1201,43 @@ impl BootstrapProcHandle {
12001201
.take();
12011202
(out, err)
12021203
}
1204+
1205+
async fn send_stop_all(
1206+
&self,
1207+
cx: &impl context::Actor,
1208+
agent: ActorRef<ProcMeshAgent>,
1209+
timeout: Duration,
1210+
) -> anyhow::Result<()> {
1211+
// For all of the messages and replies in this function:
1212+
// if the proc is already dead, then the message will be undeliverable,
1213+
// which should be ignored.
1214+
// If this message isn't deliverable to the agent, the process may have
1215+
// stopped already. No need to produce any errors, just continue with
1216+
// killing the process.
1217+
let mut agent_port = agent.port();
1218+
agent_port.return_undeliverable(false);
1219+
agent_port.send(cx, resource::StopAll {})?;
1220+
let (reply_port, mut rx) = cx.mailbox().open_port::<Vec<(usize, Status)>>();
1221+
let mut reply_port = reply_port.bind();
1222+
reply_port.return_undeliverable(false);
1223+
// Similar to above, if we cannot query for the stopped actors, just
1224+
// proceed with SIGTERM.
1225+
let mut agent_port = agent.port();
1226+
agent_port.return_undeliverable(false);
1227+
agent_port.send(cx, resource::GetAllRankStatus { reply: reply_port })?;
1228+
// If there's a timeout waiting for a reply, continue with SIGTERM.
1229+
let statuses = RealClock.timeout(timeout, rx.recv()).await??;
1230+
let has_failure = statuses.iter().any(|(_rank, status)| status.is_failure());
1231+
1232+
if has_failure {
1233+
Err(anyhow::anyhow!(
1234+
"StopAll had some actors that failed: {:?}",
1235+
statuses,
1236+
))
1237+
} else {
1238+
Ok(())
1239+
}
1240+
}
12031241
}
12041242

12051243
#[async_trait]
@@ -1323,19 +1361,18 @@ impl hyperactor::host::ProcHandle for BootstrapProcHandle {
13231361
// they are in the Ready state and have an Agent we can message.
13241362
let agent = self.agent_ref();
13251363
if let Some(agent) = agent {
1326-
// TODO: add a reply to StopAll and wait for it.
1327-
let mut port = agent.port();
1328-
// If the proc is already dead, then the StopAll message will be undeliverable,
1329-
// which should be ignored.
1330-
port.return_undeliverable(false);
1331-
if let Err(e) = port.send(cx, resource::StopAll {}) {
1332-
// Cannot send to agent, proceed with SIGTERM.
1364+
if let Err(e) = self.send_stop_all(cx, agent.clone(), timeout).await {
1365+
// Variety of possible errors, proceed with SIGTERM.
13331366
tracing::warn!(
1334-
"ProcMeshAgent {} didn't respond in time to stop proc: {}",
1367+
"ProcMeshAgent {} could not successfully stop all actors: {}",
13351368
agent.actor_id(),
13361369
e,
13371370
);
13381371
}
1372+
// Even if the StopAll message and response is fully effective, we
1373+
// still want to send SIGTERM to actually exit the process and free
1374+
// any leftover resources. No actor should be running at this
1375+
// point.
13391376
}
13401377
// After the stop all actors message may be successful, we still need
13411378
// to actually stop the process.

hyperactor_mesh/src/proc_mesh/mesh_agent.rs

Lines changed: 100 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ pub(crate) fn update_event_actor_id(mut event: ActorSupervisionEvent) -> ActorSu
222222
resource::StopAll { cast = true },
223223
resource::GetState<ActorState> { cast = true },
224224
resource::GetRankStatus { cast = true },
225+
resource::GetAllRankStatus { cast = true },
225226
]
226227
)]
227228
pub struct ProcMeshAgent {
@@ -274,12 +275,14 @@ impl ProcMeshAgent {
274275
proc.spawn::<Self>("agent", agent).await
275276
}
276277

277-
async fn destroy_and_wait<'a>(
278+
async fn destroy_and_wait_except_current<'a>(
278279
&mut self,
279280
cx: &Context<'a, Self>,
280281
timeout: tokio::time::Duration,
281282
) -> Result<(Vec<ActorId>, Vec<ActorId>), anyhow::Error> {
282-
self.proc.destroy_and_wait::<Self>(timeout, Some(cx)).await
283+
self.proc
284+
.destroy_and_wait_except_current::<Self>(timeout, Some(cx), true)
285+
.await
283286
}
284287
}
285288

@@ -451,9 +454,9 @@ impl Handler<ActorSupervisionEvent> for ProcMeshAgent {
451454
let event = update_event_actor_id(event);
452455
if self.record_supervision_events {
453456
tracing::info!(
454-
"Received supervision event on proc {}: {:?}, recording",
455-
self.proc.proc_id(),
456-
event
457+
proc_id = %self.proc.proc_id(),
458+
%event,
459+
"recording supervision event",
457460
);
458461
self.supervision_events
459462
.entry(event.actor_id.clone())
@@ -467,9 +470,9 @@ impl Handler<ActorSupervisionEvent> for ProcMeshAgent {
467470
// the whole process.
468471
tracing::error!(
469472
name = SupervisionEventState::SupervisionEventTransmitFailed.as_ref(),
470-
"proc {}: could not propagate supervision event {:?}: crashing",
471-
cx.self_id().proc_id(),
472-
event
473+
proc_id = %cx.self_id().proc_id(),
474+
%event,
475+
"could not propagate supervision event, crashing",
473476
);
474477

475478
// We should have a custom "crash" function here, so that this works
@@ -559,67 +562,38 @@ impl Handler<resource::CreateOrUpdate<ActorSpec>> for ProcMeshAgent {
559562
#[async_trait]
560563
impl Handler<resource::Stop> for ProcMeshAgent {
561564
async fn handle(&mut self, cx: &Context<Self>, message: resource::Stop) -> anyhow::Result<()> {
562-
use crate::v1::StatusOverlay;
563-
564565
// We don't remove the actor from the state map, instead we just store
565566
// its state as Stopped.
566567
let actor = self.actor_states.get_mut(&message.name);
567-
enum StatusOrActorId {
568-
Status(resource::Status),
569-
ActorId(ActorId),
570-
}
571568
// Have to separate stop_actor from setting "stopped" because it borrows
572569
// as mutable and cannot have self borrowed mutably twice.
573-
let (rank, actor_id) = match actor {
570+
let actor_id = match actor {
574571
Some(actor_state) => {
575-
let rank = actor_state.create_rank;
576572
match &actor_state.spawn {
577573
Ok(actor_id) => {
578574
if actor_state.stopped {
579-
(rank, StatusOrActorId::Status(resource::Status::Stopped))
575+
None
580576
} else {
581577
actor_state.stopped = true;
582-
(rank, StatusOrActorId::ActorId(actor_id.clone()))
578+
Some(actor_id.clone())
583579
}
584580
}
585581
// If the original spawn had failed, the actor is still considered
586582
// successfully stopped.
587-
Err(_) => (rank, StatusOrActorId::Status(resource::Status::Stopped)),
583+
Err(_) => None,
588584
}
589585
}
590586
// TODO: represent unknown rank
591-
None => (
592-
usize::MAX,
593-
StatusOrActorId::Status(resource::Status::NotExist),
594-
),
587+
None => None,
595588
};
596589
let timeout = hyperactor::config::global::get(hyperactor::config::STOP_ACTOR_TIMEOUT);
597-
let status = match actor_id {
598-
StatusOrActorId::Status(s) => s,
599-
StatusOrActorId::ActorId(actor_id) => {
600-
// If this fails, we will still leave the actor_state as stopped,
601-
// because it shouldn't be attempted again.
602-
let stop_result = self
603-
.stop_actor(cx, actor_id, timeout.as_millis() as u64)
604-
.await?;
605-
match stop_result {
606-
// use Stopped as a successful result.
607-
StopActorResult::Success => resource::Status::Stopped,
608-
StopActorResult::Timeout => resource::Status::Timeout(timeout),
609-
StopActorResult::NotFound => resource::Status::NotExist,
610-
}
611-
}
612-
};
613-
614-
// Send a sparse overlay update. If rank is unknown, emit an
615-
// empty overlay.
616-
let overlay = if rank == usize::MAX {
617-
StatusOverlay::new()
618-
} else {
619-
StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status)])
620-
.expect("valid single-run overlay")
621-
};
622-
message.reply.send(cx, overlay)?;
590+
if let Some(actor_id) = actor_id {
591+
// While this function returns a Result, it never returns an Err
592+
// value so we can simply expect without any failure handling.
593+
self.stop_actor(cx, actor_id, timeout.as_millis() as u64)
594+
.await
595+
.expect("stop_actor cannot fail");
596+
}
623597

624598
Ok(())
625599
}
@@ -635,12 +609,21 @@ impl Handler<resource::StopAll> for ProcMeshAgent {
635609
let timeout = hyperactor::config::global::get(hyperactor::config::STOP_ACTOR_TIMEOUT);
636610
// By passing in the self context, destroy_and_wait will stop this agent
637611
// last, after all others are stopped.
638-
let _stop_result = self.destroy_and_wait(cx, timeout).await?;
639-
for (_, actor_state) in self.actor_states.iter_mut() {
640-
// Mark all actors as stopped.
641-
actor_state.stopped = true;
612+
let stop_result = self.destroy_and_wait_except_current(cx, timeout).await;
613+
match stop_result {
614+
Ok(_) => {
615+
for (_, actor_state) in self.actor_states.iter_mut() {
616+
// Mark all actors as stopped.
617+
actor_state.stopped = true;
618+
}
619+
Ok(())
620+
}
621+
Err(e) => Err(anyhow::anyhow!(
622+
"failed to StopAll on {}: {:?}",
623+
cx.self_id(),
624+
e
625+
)),
642626
}
643-
Ok(())
644627
}
645628
}
646629

@@ -713,6 +696,69 @@ impl Handler<resource::GetRankStatus> for ProcMeshAgent {
713696
}
714697
}
715698

699+
#[async_trait]
700+
impl Handler<resource::GetAllRankStatus> for ProcMeshAgent {
701+
async fn handle(
702+
&mut self,
703+
cx: &Context<Self>,
704+
get_rank_status: resource::GetAllRankStatus,
705+
) -> anyhow::Result<()> {
706+
use crate::resource::Status;
707+
708+
let mut ranks = Vec::new();
709+
for (_name, state) in self.actor_states.iter() {
710+
match state {
711+
ActorInstanceState {
712+
spawn: Ok(actor_id),
713+
create_rank,
714+
stopped,
715+
} => {
716+
if *stopped {
717+
ranks.push((*create_rank, resource::Status::Stopped));
718+
} else {
719+
let supervision_events = self
720+
.supervision_events
721+
.get(actor_id)
722+
.map_or_else(Vec::new, |a| a.clone());
723+
ranks.push((
724+
*create_rank,
725+
if supervision_events.is_empty() {
726+
resource::Status::Running
727+
} else {
728+
resource::Status::Failed(format!(
729+
"because of supervision events: {:?}",
730+
supervision_events
731+
))
732+
},
733+
));
734+
}
735+
}
736+
ActorInstanceState {
737+
spawn: Err(e),
738+
create_rank,
739+
..
740+
} => {
741+
ranks.push((*create_rank, Status::Failed(e.to_string())));
742+
}
743+
}
744+
}
745+
746+
let result = get_rank_status.reply.send(cx, ranks);
747+
// Ignore errors, because returning Err from here would cause the ProcMeshAgent
748+
// to be stopped, which would prevent querying and spawning other actors.
749+
// This only means some actor that requested the state of an actor failed to receive it.
750+
if let Err(e) = result {
751+
tracing::warn!(
752+
actor = %cx.self_id(),
753+
"failed to send GetRankStatus reply to {} due to error: {}",
754+
get_rank_status.reply.port_id().actor_id(),
755+
e
756+
);
757+
}
758+
Ok(())
759+
}
760+
}
761+
716762
#[async_trait]
717763
impl Handler<resource::GetState<ActorState>> for ProcMeshAgent {
718764
async fn handle(

0 commit comments

Comments
 (0)