@@ -329,14 +329,16 @@ impl<M: RemoteMessage> NetTx<M> {
329329 next_seq : u64 ,
330330 deque : MessageDeque < M > ,
331331 log_id : & ' a str ,
332+ dest_addr : & ' a ChannelAddr ,
332333 }
333334
334335 impl < ' a , M : RemoteMessage > Outbox < ' a , M > {
335- fn new ( log_id : & ' a str ) -> Self {
336+ fn new ( log_id : & ' a str , dest_addr : & ' a ChannelAddr ) -> Self {
336337 Self {
337338 next_seq : 0 ,
338339 deque : MessageDeque ( VecDeque :: new ( ) ) ,
339340 log_id,
341+ dest_addr,
340342 }
341343 }
342344
@@ -383,7 +385,22 @@ impl<M: RemoteMessage> NetTx<M> {
383385 let frame = Frame :: Message ( self . next_seq , message) ;
384386 let message =
385387 serialize_bincode ( & frame) . map_err ( |e| format ! ( "serialization error: {e}" ) ) ?;
386- metrics:: REMOTE_MESSAGE_SEND_SIZE . record ( message. frame_len ( ) as f64 , & [ ] ) ;
388+ let message_size = message. frame_len ( ) ;
389+ metrics:: REMOTE_MESSAGE_SEND_SIZE . record ( message_size as f64 , & [ ] ) ;
390+
391+ // Track throughput for this channel pair
392+ metrics:: CHANNEL_THROUGHPUT_BYTES . add (
393+ message_size as u64 ,
394+ hyperactor_telemetry:: kv_pairs!(
395+ "dest" => self . dest_addr. to_string( ) ,
396+ ) ,
397+ ) ;
398+ metrics:: CHANNEL_THROUGHPUT_MESSAGES . add (
399+ 1 ,
400+ hyperactor_telemetry:: kv_pairs!(
401+ "dest" => self . dest_addr. to_string( ) ,
402+ ) ,
403+ ) ;
387404
388405 self . deque . push_back ( QueuedMessage {
389406 seq : self . next_seq ,
@@ -498,7 +515,7 @@ impl<M: RemoteMessage> NetTx<M> {
498515 }
499516
500517 /// Remove acked messages from the deque.
501- fn prune ( & mut self , acked : u64 , acked_at : Instant ) {
518+ fn prune ( & mut self , acked : u64 , acked_at : Instant , dest_addr : & ChannelAddr ) {
502519 assert ! (
503520 self . largest_acked. as_ref( ) . map_or( 0 , |i| i. 0 ) <= acked,
504521 "{}: received out-of-order ack; received: {}; stored largest: {}" ,
@@ -513,7 +530,15 @@ impl<M: RemoteMessage> NetTx<M> {
513530 let deque = & mut self . deque ;
514531 while let Some ( msg) = deque. front ( ) {
515532 if msg. seq <= acked {
516- deque. pop_front ( ) ;
533+ let msg: QueuedMessage < M > = deque. pop_front ( ) . unwrap ( ) ;
534+ // Track latency: time from when message was first received to when it was acked
535+ let latency_micros = msg. received_at . elapsed ( ) . as_micros ( ) as i64 ;
536+ metrics:: CHANNEL_LATENCY_MICROS . record (
537+ latency_micros as f64 ,
538+ hyperactor_telemetry:: kv_pairs!(
539+ "dest" => dest_addr. to_string( ) ,
540+ ) ,
541+ ) ;
517542 } else {
518543 // Messages in the deque are orderd by seq in ascending
519544 // order. So we could return early once we encounter
@@ -588,9 +613,9 @@ impl<M: RemoteMessage> NetTx<M> {
588613 }
589614
590615 impl < ' a , M : RemoteMessage > State < ' a , M > {
591- fn init ( log_id : & ' a str ) -> Self {
616+ fn init ( log_id : & ' a str , dest_addr : & ' a ChannelAddr ) -> Self {
592617 Self :: Running ( Deliveries {
593- outbox : Outbox :: new ( log_id) ,
618+ outbox : Outbox :: new ( log_id, dest_addr ) ,
594619 unacked : Unacked :: new ( None , log_id) ,
595620 } )
596621 }
@@ -638,8 +663,9 @@ impl<M: RemoteMessage> NetTx<M> {
638663 }
639664
640665 let session_id = rand:: random ( ) ;
641- let log_id = format ! ( "session {}.{}" , link. dest( ) , session_id) ;
642- let mut state = State :: init ( & log_id) ;
666+ let dest = link. dest ( ) ;
667+ let log_id = format ! ( "session {}.{}" , dest, session_id) ;
668+ let mut state = State :: init ( & log_id, & dest) ;
643669 let mut conn = Conn :: reconnect_with_default ( ) ;
644670
645671 let ( state, conn) = loop {
@@ -760,7 +786,7 @@ impl<M: RemoteMessage> NetTx<M> {
760786 Ok ( response) => {
761787 match response {
762788 NetRxResponse :: Ack ( ack) => {
763- unacked. prune( ack, RealClock . now( ) ) ;
789+ unacked. prune( ack, RealClock . now( ) , & dest ) ;
764790 ( State :: Running ( Deliveries { outbox, unacked } ) , Conn :: Connected { reader, write_state } )
765791 }
766792 NetRxResponse :: Reject => {
@@ -821,6 +847,14 @@ impl<M: RemoteMessage> NetTx<M> {
821847 "{log_id}: outbox send error: {err}; message size: {}" ,
822848 outbox. front_size( ) . expect( "outbox should not be empty" ) ,
823849 ) ;
850+ // Track error for this channel pair
851+ metrics:: CHANNEL_ERRORS . add(
852+ 1 ,
853+ hyperactor_telemetry:: kv_pairs!(
854+ "dest" => dest. to_string( ) ,
855+ "error_type" => "send_error" ,
856+ ) ,
857+ ) ;
824858 ( State :: Running ( Deliveries { outbox, unacked } ) , Conn :: reconnect_with_default( ) )
825859 }
826860 }
@@ -919,6 +953,18 @@ impl<M: RemoteMessage> NetTx<M> {
919953
920954 // Need to resend unacked after reconnecting.
921955 let largest_acked = unacked. largest_acked ;
956+ let num_retries = unacked. deque . len ( ) ;
957+ if num_retries > 0 {
958+ // Track reconnection for this channel pair
959+ metrics:: CHANNEL_RECONNECTIONS . add (
960+ 1 ,
961+ hyperactor_telemetry:: kv_pairs!(
962+ "dest" => dest. to_string( ) ,
963+ "transport" => dest. transport( ) . to_string( ) ,
964+ "reason" => "reconnect_with_unacked" ,
965+ ) ,
966+ ) ;
967+ }
922968 outbox. requeue_unacked ( unacked. deque ) ;
923969 (
924970 State :: Running ( Deliveries {
@@ -950,6 +996,13 @@ impl<M: RemoteMessage> NetTx<M> {
950996 session_id,
951997 err
952998 ) ;
999+ metrics:: CHANNEL_ERRORS . add (
1000+ 1 ,
1001+ hyperactor_telemetry:: kv_pairs!(
1002+ "dest" => dest. to_string( ) ,
1003+ "error_type" => "connection_error" ,
1004+ ) ,
1005+ ) ;
9531006 (
9541007 State :: Running ( Deliveries { outbox, unacked } ) ,
9551008 Conn :: reconnect ( backoff) ,
@@ -1357,18 +1410,29 @@ impl<S: AsyncRead + AsyncWrite + Send + 'static + Unpin> ServerConn<S> {
13571410 } ;
13581411
13591412 // De-frame the multi-part message.
1413+ let bytes_len = bytes. len( ) ;
13601414 let message = match serde_multipart:: Message :: from_framed( bytes) {
13611415 Ok ( message) => message,
1362- Err ( err) => break (
1363- next,
1364- Err :: <( ) , anyhow:: Error >( err. into( ) ) . context(
1365- format!(
1366- "{log_id}: de-frame message with M = {}" ,
1367- type_name:: <M >( ) ,
1368- )
1369- ) ,
1370- false
1371- ) ,
1416+ Err ( err) => {
1417+ metrics:: CHANNEL_ERRORS . add(
1418+ 1 ,
1419+ hyperactor_telemetry:: kv_pairs!(
1420+ "source" => self . source. to_string( ) ,
1421+ "dest" => self . dest. to_string( ) ,
1422+ "error_type" => "deframe_error" ,
1423+ ) ,
1424+ ) ;
1425+ break (
1426+ next,
1427+ Err :: <( ) , anyhow:: Error >( err. into( ) ) . context(
1428+ format!(
1429+ "{log_id}: de-frame message with M = {}" ,
1430+ type_name:: <M >( ) ,
1431+ )
1432+ ) ,
1433+ false
1434+ )
1435+ } ,
13721436 } ;
13731437
13741438 // Finally decode the message. This assembles the M-typed message
@@ -1396,6 +1460,21 @@ impl<S: AsyncRead + AsyncWrite + Send + 'static + Unpin> ServerConn<S> {
13961460 }
13971461 match self . send_with_buffer_metric( & log_id, & tx, message) . await {
13981462 Ok ( ( ) ) => {
1463+ // Track throughput for this channel pair
1464+ metrics:: CHANNEL_THROUGHPUT_BYTES . add(
1465+ bytes_len as u64 ,
1466+ hyperactor_telemetry:: kv_pairs!(
1467+ "source" => self . source. to_string( ) ,
1468+ "dest" => self . dest. to_string( ) ,
1469+ ) ,
1470+ ) ;
1471+ metrics:: CHANNEL_THROUGHPUT_MESSAGES . add(
1472+ 1 ,
1473+ hyperactor_telemetry:: kv_pairs!(
1474+ "source" => self . source. to_string( ) ,
1475+ "dest" => self . dest. to_string( ) ,
1476+ ) ,
1477+ ) ;
13991478 // In channel's contract, "delivered" means the message
14001479 // is sent to the NetRx object. Therefore, we could bump
14011480 // `next_seq` as far as the message is put on the mspc
@@ -1412,16 +1491,26 @@ impl<S: AsyncRead + AsyncWrite + Send + 'static + Unpin> ServerConn<S> {
14121491 }
14131492 }
14141493 } ,
1415- Err ( err) => break (
1416- next,
1417- Err :: <( ) , anyhow:: Error >( err. into( ) ) . context(
1418- format!(
1419- "{log_id}: deserialize message with M = {}" ,
1420- type_name:: <M >( ) ,
1421- )
1422- ) ,
1423- false
1424- ) ,
1494+ Err ( err) => {
1495+ metrics:: CHANNEL_ERRORS . add(
1496+ 1 ,
1497+ hyperactor_telemetry:: kv_pairs!(
1498+ "source" => self . source. to_string( ) ,
1499+ "dest" => self . dest. to_string( ) ,
1500+ "error_type" => "deserialize_error" ,
1501+ ) ,
1502+ ) ;
1503+ break (
1504+ next,
1505+ Err :: <( ) , anyhow:: Error >( err. into( ) ) . context(
1506+ format!(
1507+ "{log_id}: deserialize message with M = {}" ,
1508+ type_name:: <M >( ) ,
1509+ )
1510+ ) ,
1511+ false
1512+ )
1513+ } ,
14251514 }
14261515 } ,
14271516 }
@@ -1743,11 +1832,13 @@ where
17431832 } ;
17441833
17451834 if let Err ( ref err) = res {
1746- metrics:: CHANNEL_CONNECTION_ERRORS . add(
1835+ metrics:: CHANNEL_ERRORS . add(
17471836 1 ,
17481837 hyperactor_telemetry:: kv_pairs!(
17491838 "transport" => dest. transport( ) . to_string( ) ,
17501839 "error" => err. to_string( ) ,
1840+ "error_type" => "connection_error" ,
1841+ "dest" => dest. to_string( ) ,
17511842 ) ,
17521843 ) ;
17531844
@@ -1766,12 +1857,13 @@ where
17661857 } ) ;
17671858 }
17681859 Err ( err) => {
1769- metrics:: CHANNEL_CONNECTION_ERRORS . add(
1860+ metrics:: CHANNEL_ERRORS . add(
17701861 1 ,
17711862 hyperactor_telemetry:: kv_pairs!(
17721863 "transport" => listener_channel_addr. transport( ) . to_string( ) ,
17731864 "operation" => "accept" ,
17741865 "error" => err. to_string( ) ,
1866+ "error_type" => "connection_error" ,
17751867 ) ,
17761868 ) ;
17771869
0 commit comments