Skip to content

Commit 7b6ce23

Browse files
thomasywangfacebook-github-bot
authored andcommitted
Add spans to points along the message send/recv path (#1833)
Summary: Our observability only shows us when a message is sent out and when it is delivered to its final destination. We want to add spans to some of the important stops along the way Reviewed By: shayne-fletcher Differential Revision: D86802704
1 parent 6d14685 commit 7b6ce23

File tree

8 files changed

+274
-146
lines changed

8 files changed

+274
-146
lines changed

hyperactor/src/channel.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,11 +119,13 @@ pub trait Tx<M: RemoteMessage>: std::fmt::Debug {
119119
/// message is either delivered, or we eventually discover that
120120
/// the channel has failed and it will be sent back on `return_channel`.
121121
#[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `SendError`.
122+
#[hyperactor::instrument_infallible]
122123
fn try_post(&self, message: M, return_channel: oneshot::Sender<SendError<M>>) {
123124
self.do_post(message, Some(return_channel));
124125
}
125126

126127
/// Enqueue a message to be sent on the channel.
128+
#[hyperactor::instrument_infallible]
127129
fn post(&self, message: M) {
128130
self.do_post(message, None);
129131
}
@@ -803,6 +805,7 @@ enum ChannelRxKind<M: RemoteMessage> {
803805

804806
#[async_trait]
805807
impl<M: RemoteMessage> Rx<M> for ChannelRx<M> {
808+
#[hyperactor::instrument]
806809
async fn recv(&mut self) -> Result<M, ChannelError> {
807810
match &mut self.inner {
808811
ChannelRxKind::Local(rx) => rx.recv().await,

hyperactor/src/channel/net/framed.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ impl<R: AsyncRead + Unpin> FrameReader<R> {
6868
/// `max_frame_length`. **This error is fatal:** once returned,
6969
/// the `FrameReader` must be dropped; the underlying connection
7070
/// is no longer valid.
71+
#[tracing::instrument(skip_all)]
7172
pub async fn next(&mut self) -> io::Result<Option<Bytes>> {
7273
loop {
7374
match &mut self.state {
@@ -229,6 +230,7 @@ impl<W: AsyncWrite + Unpin, B: Buf> FrameWrite<W, B> {
229230
/// returned futures at any time. Upon completion, the frame is guaranteed to be
230231
/// written, unless an error was encountered, in which case the underlying stream
231232
/// is in an undefined state.
233+
#[tracing::instrument(skip_all)]
232234
pub async fn send(&mut self) -> io::Result<()> {
233235
loop {
234236
if self.len_buf.has_remaining() {

hyperactor/src/channel/net/server.rs

Lines changed: 263 additions & 146 deletions
Large diffs are not rendered by default.

hyperactor/src/mailbox.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1185,6 +1185,7 @@ impl MailboxClient {
11851185
}
11861186

11871187
impl MailboxSender for MailboxClient {
1188+
#[hyperactor::instrument_infallible]
11881189
fn post_unchecked(
11891190
&self,
11901191
envelope: MessageEnvelope,

hyperactor_mesh/src/actor_mesh.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ declare_attrs! {
7777
/// Common implementation for `ActorMesh`s and `ActorMeshRef`s to cast
7878
/// an `M`-typed message
7979
#[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `CastError`.
80+
#[hyperactor::instrument]
8081
pub(crate) fn actor_mesh_cast<A, M>(
8182
cx: &impl context::Actor,
8283
actor_mesh_id: ActorMeshId,

hyperactor_mesh/src/comm.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,7 @@ impl Handler<CommActorMode> for CommActor {
350350
// TODO(T218630526): reliable casting for mutable topology
351351
#[async_trait]
352352
impl Handler<CastMessage> for CommActor {
353+
#[hyperactor::instrument]
353354
async fn handle(&mut self, cx: &Context<Self>, cast_message: CastMessage) -> Result<()> {
354355
// Always forward the message to the root rank of the slice, casting starts from there.
355356
let slice = cast_message.dest.slice.clone();
@@ -380,6 +381,7 @@ impl Handler<CastMessage> for CommActor {
380381

381382
#[async_trait]
382383
impl Handler<ForwardMessage> for CommActor {
384+
#[hyperactor::instrument]
383385
async fn handle(&mut self, cx: &Context<Self>, fwd_message: ForwardMessage) -> Result<()> {
384386
let ForwardMessage {
385387
sender,

monarch_hyperactor/src/actor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -771,6 +771,7 @@ impl PanicFlag {
771771

772772
#[async_trait]
773773
impl Handler<PythonMessage> for PythonActor {
774+
#[hyperactor::instrument]
774775
async fn handle(
775776
&mut self,
776777
cx: &Context<PythonActor>,

monarch_hyperactor/src/actor_mesh.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ pub(crate) fn to_hy_sel(selection: &str) -> PyResult<Selection> {
137137

138138
#[pymethods]
139139
impl PythonActorMesh {
140+
#[hyperactor::instrument]
140141
fn cast(
141142
&self,
142143
message: &PythonMessage,

0 commit comments

Comments
 (0)