@@ -33,6 +33,7 @@ use super::framed::FrameReader;
3333use super :: framed:: FrameWrite ;
3434use super :: framed:: WriteState ;
3535use crate :: RemoteMessage ;
36+ use crate :: channel:: ChannelAddr ;
3637use crate :: channel:: ChannelError ;
3738use crate :: channel:: SendError ;
3839use crate :: channel:: TxStatus ;
@@ -203,14 +204,18 @@ struct Outbox<'a, M: RemoteMessage> {
203204 next_seq : u64 ,
204205 deque : MessageDeque < M > ,
205206 log_id : & ' a str ,
207+ dest_addr : & ' a ChannelAddr ,
208+ session_id : u64 ,
206209}
207210
208211impl < ' a , M : RemoteMessage > Outbox < ' a , M > {
209- fn new ( log_id : & ' a str ) -> Self {
212+ fn new ( log_id : & ' a str , dest_addr : & ' a ChannelAddr , session_id : u64 ) -> Self {
210213 Self {
211214 next_seq : 0 ,
212215 deque : MessageDeque ( VecDeque :: new ( ) ) ,
213216 log_id,
217+ dest_addr,
218+ session_id,
214219 }
215220 }
216221
@@ -255,7 +260,24 @@ impl<'a, M: RemoteMessage> Outbox<'a, M> {
255260
256261 let frame = Frame :: Message ( self . next_seq , message) ;
257262 let message = serialize_bincode ( & frame) . map_err ( |e| format ! ( "serialization error: {e}" ) ) ?;
258- metrics:: REMOTE_MESSAGE_SEND_SIZE . record ( message. frame_len ( ) as f64 , & [ ] ) ;
263+ let message_size = message. frame_len ( ) ;
264+ metrics:: REMOTE_MESSAGE_SEND_SIZE . record ( message_size as f64 , & [ ] ) ;
265+
266+ // Track throughput for this channel pair
267+ metrics:: CHANNEL_THROUGHPUT_BYTES . add (
268+ message_size as u64 ,
269+ hyperactor_telemetry:: kv_pairs!(
270+ "dest" => self . dest_addr. to_string( ) ,
271+ "session_id" => self . session_id. to_string( ) ,
272+ ) ,
273+ ) ;
274+ metrics:: CHANNEL_THROUGHPUT_MESSAGES . add (
275+ 1 ,
276+ hyperactor_telemetry:: kv_pairs!(
277+ "dest" => self . dest_addr. to_string( ) ,
278+ "session_id" => self . session_id. to_string( ) ,
279+ ) ,
280+ ) ;
259281
260282 self . deque . push_back ( QueuedMessage {
261283 seq : self . next_seq ,
@@ -371,7 +393,7 @@ impl<'a, M: RemoteMessage> Unacked<'a, M> {
371393 }
372394
373395 /// Remove acked messages from the deque.
374- fn prune ( & mut self , acked : u64 , acked_at : Instant ) {
396+ fn prune ( & mut self , acked : u64 , acked_at : Instant , dest_addr : & ChannelAddr , session_id : u64 ) {
375397 assert ! (
376398 self . largest_acked. as_ref( ) . map_or( 0 , |i| i. 0 ) <= acked,
377399 "{}: received out-of-order ack; received: {}; stored largest: {}" ,
@@ -386,7 +408,16 @@ impl<'a, M: RemoteMessage> Unacked<'a, M> {
386408 let deque = & mut self . deque ;
387409 while let Some ( msg) = deque. front ( ) {
388410 if msg. seq <= acked {
389- deque. pop_front ( ) ;
411+ let msg: QueuedMessage < M > = deque. pop_front ( ) . unwrap ( ) ;
412+ // Track latency: time from when message was first received to when it was acked
413+ let latency_micros = msg. received_at . elapsed ( ) . as_micros ( ) as i64 ;
414+ metrics:: CHANNEL_LATENCY_MICROS . record (
415+ latency_micros as f64 ,
416+ hyperactor_telemetry:: kv_pairs!(
417+ "dest" => dest_addr. to_string( ) ,
418+ "session_id" => session_id. to_string( ) ,
419+ ) ,
420+ ) ;
390421 } else {
391422 // Messages in the deque are orderd by seq in ascending
392423 // order. So we could return early once we encounter
@@ -461,9 +492,9 @@ enum State<'a, M: RemoteMessage> {
461492}
462493
463494impl < ' a , M : RemoteMessage > State < ' a , M > {
464- fn init ( log_id : & ' a str ) -> Self {
495+ fn init ( log_id : & ' a str , dest_addr : & ' a ChannelAddr , session_id : u64 ) -> Self {
465496 Self :: Running ( Deliveries {
466- outbox : Outbox :: new ( log_id) ,
497+ outbox : Outbox :: new ( log_id, dest_addr , session_id ) ,
467498 unacked : Unacked :: new ( None , log_id) ,
468499 } )
469500 }
@@ -543,7 +574,8 @@ async fn run<M: RemoteMessage>(
543574
544575 let session_id = rand:: random ( ) ;
545576 let log_id = format ! ( "session {}.{}" , link. dest( ) , session_id) ;
546- let mut state = State :: init ( & log_id) ;
577+ let dest = link. dest ( ) ;
578+ let mut state = State :: init ( & log_id, & dest, session_id) ;
547579 let mut conn = Conn :: reconnect_with_default ( ) ;
548580
549581 let ( state, conn) = loop {
@@ -859,7 +891,7 @@ where
859891 Ok ( response) => {
860892 match response {
861893 NetRxResponse :: Ack ( ack) => {
862- unacked. prune( ack, RealClock . now( ) ) ;
894+ unacked. prune( ack, RealClock . now( ) , & link . dest ( ) , session_id ) ;
863895 ( State :: Running ( Deliveries { outbox, unacked } ) , Conn :: Connected { reader, write_state } )
864896 }
865897 NetRxResponse :: Reject => {
@@ -934,6 +966,15 @@ where
934966 "{log_id}: outbox send error: {err}; message size: {}" ,
935967 outbox. front_size( ) . expect( "outbox should not be empty" ) ,
936968 ) ;
969+ // Track error for this channel pair
970+ metrics:: CHANNEL_ERRORS . add(
971+ 1 ,
972+ hyperactor_telemetry:: kv_pairs!(
973+ "dest" => link. dest( ) . to_string( ) ,
974+ "session_id" => session_id. to_string( ) ,
975+ "error_type" => metrics:: ChannelErrorType :: SendError . as_str( ) ,
976+ ) ,
977+ ) ;
937978 ( State :: Running ( Deliveries { outbox, unacked } ) , Conn :: reconnect_with_default( ) )
938979 }
939980 }
@@ -1030,6 +1071,18 @@ where
10301071
10311072 // Need to resend unacked after reconnecting.
10321073 let largest_acked = unacked. largest_acked ;
1074+ let num_retries = unacked. deque . len ( ) ;
1075+ if num_retries > 0 {
1076+ // Track reconnection for this channel pair
1077+ metrics:: CHANNEL_RECONNECTIONS . add (
1078+ 1 ,
1079+ hyperactor_telemetry:: kv_pairs!(
1080+ "dest" => link. dest( ) . to_string( ) ,
1081+ "transport" => link. dest( ) . transport( ) . to_string( ) ,
1082+ "reason" => "reconnect_with_unacked" ,
1083+ ) ,
1084+ ) ;
1085+ }
10331086 outbox. requeue_unacked ( unacked. deque ) ;
10341087 (
10351088 State :: Running ( Deliveries {
@@ -1061,6 +1114,15 @@ where
10611114 session_id,
10621115 err
10631116 ) ;
1117+ // Track connection error for this channel pair
1118+ metrics:: CHANNEL_ERRORS . add (
1119+ 1 ,
1120+ hyperactor_telemetry:: kv_pairs!(
1121+ "dest" => link. dest( ) . to_string( ) ,
1122+ "session_id" => session_id. to_string( ) ,
1123+ "error_type" => metrics:: ChannelErrorType :: ConnectionError . as_str( ) ,
1124+ ) ,
1125+ ) ;
10641126 (
10651127 State :: Running ( Deliveries { outbox, unacked } ) ,
10661128 Conn :: reconnect ( backoff) ,
0 commit comments