Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions hyperactor/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,13 @@ pub trait Tx<M: RemoteMessage>: std::fmt::Debug {
/// message is either delivered, or we eventually discover that
/// the channel has failed and it will be sent back on `return_channel`.
#[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `SendError`.
#[hyperactor::instrument_infallible]
fn try_post(&self, message: M, return_channel: oneshot::Sender<SendError<M>>) {
self.do_post(message, Some(return_channel));
}

/// Enqueue a message to be sent on the channel.
#[hyperactor::instrument_infallible]
fn post(&self, message: M) {
self.do_post(message, None);
}
Expand Down Expand Up @@ -803,6 +805,7 @@ enum ChannelRxKind<M: RemoteMessage> {

#[async_trait]
impl<M: RemoteMessage> Rx<M> for ChannelRx<M> {
#[hyperactor::instrument]
async fn recv(&mut self) -> Result<M, ChannelError> {
match &mut self.inner {
ChannelRxKind::Local(rx) => rx.recv().await,
Expand Down
2 changes: 2 additions & 0 deletions hyperactor/src/channel/net/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ impl<R: AsyncRead + Unpin> FrameReader<R> {
/// `max_frame_length`. **This error is fatal:** once returned,
/// the `FrameReader` must be dropped; the underlying connection
/// is no longer valid.
#[tracing::instrument(skip_all)]
pub async fn next(&mut self) -> io::Result<Option<Bytes>> {
loop {
match &mut self.state {
Expand Down Expand Up @@ -229,6 +230,7 @@ impl<W: AsyncWrite + Unpin, B: Buf> FrameWrite<W, B> {
/// returned futures at any time. Upon completion, the frame is guaranteed to be
/// written, unless an error was encountered, in which case the underlying stream
/// is in an undefined state.
#[tracing::instrument(skip_all)]
pub async fn send(&mut self) -> io::Result<()> {
loop {
if self.len_buf.has_remaining() {
Expand Down
409 changes: 263 additions & 146 deletions hyperactor/src/channel/net/server.rs

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions hyperactor/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,7 @@ impl MailboxClient {
}

impl MailboxSender for MailboxClient {
#[hyperactor::instrument_infallible]
fn post_unchecked(
&self,
envelope: MessageEnvelope,
Expand Down
1 change: 1 addition & 0 deletions hyperactor_mesh/src/actor_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ declare_attrs! {
/// Common implementation for `ActorMesh`s and `ActorMeshRef`s to cast
/// an `M`-typed message
#[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `CastError`.
#[hyperactor::instrument]
pub(crate) fn actor_mesh_cast<A, M>(
cx: &impl context::Actor,
actor_mesh_id: ActorMeshId,
Expand Down
2 changes: 2 additions & 0 deletions hyperactor_mesh/src/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ impl Handler<CommActorMode> for CommActor {
// TODO(T218630526): reliable casting for mutable topology
#[async_trait]
impl Handler<CastMessage> for CommActor {
#[hyperactor::instrument]
async fn handle(&mut self, cx: &Context<Self>, cast_message: CastMessage) -> Result<()> {
// Always forward the message to the root rank of the slice, casting starts from there.
let slice = cast_message.dest.slice.clone();
Expand Down Expand Up @@ -380,6 +381,7 @@ impl Handler<CastMessage> for CommActor {

#[async_trait]
impl Handler<ForwardMessage> for CommActor {
#[hyperactor::instrument]
async fn handle(&mut self, cx: &Context<Self>, fwd_message: ForwardMessage) -> Result<()> {
let ForwardMessage {
sender,
Expand Down
1 change: 1 addition & 0 deletions monarch_hyperactor/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,7 @@ impl PanicFlag {

#[async_trait]
impl Handler<PythonMessage> for PythonActor {
#[hyperactor::instrument]
async fn handle(
&mut self,
cx: &Context<PythonActor>,
Expand Down
1 change: 1 addition & 0 deletions monarch_hyperactor/src/actor_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ pub(crate) fn to_hy_sel(selection: &str) -> PyResult<Selection> {

#[pymethods]
impl PythonActorMesh {
#[hyperactor::instrument]
fn cast(
&self,
message: &PythonMessage,
Expand Down