11use crate :: frame:: { self , Frame } ;
22
33use bytes:: { Buf , BytesMut } ;
4- use std:: io:: { self , Cursor } ;
4+ use std:: io:: { self , Cursor , ErrorKind :: ConnectionReset } ;
5+ use std:: net:: SocketAddr ;
56use tokio:: io:: { AsyncReadExt , AsyncWriteExt , BufWriter } ;
67use tokio:: net:: TcpStream ;
8+ use tracing:: warn;
79
810/// Send and receive `Frame` values from a remote peer.
911///
@@ -28,6 +30,16 @@ pub struct Connection {
2830 buffer : BytesMut ,
2931}
3032
33+ /// The result of [`Connection::maybe_read_bytes`].
34+ enum ConnectionState {
35+ /// The connection was gracefully closed when reading was attempted.
36+ Closed ,
37+ /// The connection was open when reading was attempted.
38+ Open ,
39+ /// The connection was abruptly reset by the peer when reading was attempted.
40+ Reset ,
41+ }
42+
3143impl Connection {
3244 /// Create a new `Connection`, backed by `socket`. Read and write buffers
3345 /// are initialized.
@@ -42,6 +54,11 @@ impl Connection {
4254 }
4355 }
4456
57+ /// Returns the remote address that this connection is bound to.
58+ pub fn peer_addr ( & self ) -> io:: Result < SocketAddr > {
59+ self . stream . get_ref ( ) . peer_addr ( )
60+ }
61+
4562 /// Read a single `Frame` value from the underlying stream.
4663 ///
4764 /// The function waits until it has retrieved enough data to parse a frame.
@@ -63,23 +80,40 @@ impl Connection {
6380
6481 // There is not enough buffered data to read a frame. Attempt to
6582 // read more data from the socket.
66- //
67- // On success, the number of bytes is returned. `0` indicates "end
68- // of stream".
69- if 0 == self . stream . read_buf ( & mut self . buffer ) . await ? {
70- // The remote closed the connection. For this to be a clean
71- // shutdown, there should be no data in the read buffer. If
72- // there is, this means that the peer closed the socket while
73- // sending a frame.
74- if self . buffer . is_empty ( ) {
83+ match self . maybe_read_bytes ( ) . await ? {
84+ ConnectionState :: Open => continue ,
85+ ConnectionState :: Closed | ConnectionState :: Reset => {
86+ if ! self . buffer . is_empty ( ) {
87+ warn ! {
88+ incomplete =? self . buffer ,
89+ "connection closed with incomplete frame"
90+ } ;
91+ }
7592 return Ok ( None ) ;
76- } else {
77- return Err ( "connection reset by peer" . into ( ) ) ;
7893 }
7994 }
8095 }
8196 }
8297
98+ /// Attempt to read bytes from the connection.
99+ async fn maybe_read_bytes ( & mut self ) -> io:: Result < ConnectionState > {
100+ match self . stream . read_buf ( & mut self . buffer ) . await {
101+ // the connection was closed gracefully
102+ Ok ( 0 ) => Ok ( ConnectionState :: Closed ) ,
103+ // the connection is still open
104+ Ok ( _) => Ok ( ConnectionState :: Open ) ,
105+ // the connection was closed abruptly by the peer
106+ Err ( e) if e. kind ( ) == ConnectionReset => {
107+ warn ! {
108+ "connection closed abruptly by peer"
109+ } ;
110+ Ok ( ConnectionState :: Reset )
111+ }
112+ // reading failed for some other reason
113+ Err ( err) => Err ( err) ,
114+ }
115+ }
116+
83117 /// Tries to parse a frame from the buffer. If the buffer contains enough
84118 /// data, the frame is returned and the data removed from the buffer. If not
85119 /// enough data has been buffered yet, `Ok(None)` is returned. If the
0 commit comments