@@ -77,12 +77,20 @@ where
7777 pub ( crate ) async fn handle ( & mut self ) {
7878 if let Err ( e) = self . try_handle ( ) . await {
7979 trace ! ( "RRQ request failed (peer: {}, error: {})" , & self . peer, & e) ;
80- let mut buffer = BytesMut :: with_capacity ( DEFAULT_BLOCK_SIZE ) ;
81- Packet :: Error ( e. into ( ) ) . encode ( & mut buffer) ;
82- let buf = buffer. split ( ) . freeze ( ) ;
83- // Errors are never retransmitted.
84- // We do not care if `send_to` resulted to an IO error.
85- let _ = self . socket . send_to ( & buf[ ..] , self . peer ) . await ;
80+
81+ if let Error :: Packet (
82+ crate :: packet:: Error :: OptionNegotiationFailed ,
83+ ) = e
84+ {
85+ // client aborted the connection, nothing to do
86+ } else {
87+ let mut buffer = BytesMut :: with_capacity ( DEFAULT_BLOCK_SIZE ) ;
88+ Packet :: Error ( e. into ( ) ) . encode ( & mut buffer) ;
89+ let buf = buffer. split ( ) . freeze ( ) ;
90+ // Errors are never retransmitted.
91+ // We do not care if `send_to` resulted to an IO error.
92+ let _ = self . socket . send_to ( & buf[ ..] , self . peer ) . await ;
93+ }
8694 }
8795 }
8896
@@ -92,17 +100,20 @@ where
92100 let mut block_id: u16 ;
93101 let mut window_base: u16 = 1 ;
94102 let mut buf: Bytes ;
95- let mut is_last_block: bool ;
103+ let mut is_last_block: bool = false ;
96104
97- ( buf, is_last_block) = self . fill_data_block ( window_base) . await ?;
98- window. push_back ( buf) ;
99-
100- // Send OACK after we manage to read the first block from reader.
101- //
102- // We do this because we want to give the developers the option to
103- // produce an error after they construct a reader.
104- if let Some ( opts) = self . oack_opts . as_ref ( ) {
105+ if let Some ( opts) = self . oack_opts . take ( ) {
105106 trace ! ( "RRQ OACK (peer: {}, opts: {:?}" , & self . peer, & opts) ;
107+ // Send OACK after we manage to read the first block from the reader for
108+ // non-transfer size probe requests (oack.transfer_size value is set).
109+ // During transfer size probes a client aborts the connection after receiving
110+ // oack from the server. For normal requests we do this because we want to give
111+ // the developers the option to produce an error after they construct a reader.
112+ if opts. transfer_size . is_none ( ) {
113+ ( buf, is_last_block) =
114+ self . fill_data_block ( window_base) . await ?;
115+ window. push_back ( buf) ;
116+ }
106117 let mut buff = BytesMut :: with_capacity ( PACKET_DATA_HEADER_LEN + 64 ) ;
107118 Packet :: OAck ( opts. to_owned ( ) ) . encode ( & mut buff) ;
108119 // OACK is not really part of the window, so we send it separately
@@ -186,15 +197,17 @@ where
186197 ) ;
187198 return Ok ( blocks_acked) ;
188199 }
189- Err ( ref e) if e. kind ( ) == io:: ErrorKind :: TimedOut => {
200+ Err ( Error :: Io ( ref e) )
201+ if e. kind ( ) == io:: ErrorKind :: TimedOut =>
202+ {
190203 trace ! (
191204 "RRQ (peer: {}, block_id: {}) - Timeout" ,
192205 & self . peer,
193206 window_base
194207 ) ;
195208 continue ;
196209 }
197- Err ( e) => return Err ( e. into ( ) ) ,
210+ Err ( e) => return Err ( e) ,
198211 }
199212 }
200213
@@ -206,7 +219,7 @@ where
206219 & mut self ,
207220 window_base : u16 ,
208221 window_len : u16 ,
209- ) -> io :: Result < u16 > {
222+ ) -> Result < u16 > {
210223 // We can not use `self` within `async_std::io::timeout` because not all
211224 // struct members implement `Sync`. So we borrow only what we need.
212225 let socket = & mut self . socket ;
@@ -224,30 +237,37 @@ where
224237 }
225238
226239 // parse only valid Ack packets, the rest are ignored
227- if let Ok ( Packet :: Ack ( recved_block_id) ) =
228- Packet :: decode ( & buf[ ..len] )
240+ // if let Ok(Packet::Ack(recved_block_id)) =
241+ match Packet :: decode ( & buf[ ..len] )
229242 {
230- let window_end = window_base. wrapping_add ( window_len) ;
231-
232- if window_end > window_base {
233- // window_end did not wrap
234- if recved_block_id >= window_base && recved_block_id < window_end {
235- // number of blocks acked
236- return Ok ( recved_block_id-window_base+1u16 ) ;
237- }
238- else {
239- trace ! ( "Unexpected ack packet {recved_block_id}, window_base: {window_base}, window_len: {window_len}" ) ;
240- }
241- } else {
242- // window_end wrapped
243- if recved_block_id >= window_base {
244- return Ok ( 1u16 + ( recved_block_id - window_base) ) ;
245- } else if recved_block_id < window_end {
246- return Ok ( 1u16 + recved_block_id + ( window_len - window_end) ) ;
243+ Ok ( Packet :: Ack ( recved_block_id) ) => {
244+ let window_end = window_base. wrapping_add ( window_len) ;
245+
246+ if window_end > window_base {
247+ // window_end did not wrap
248+ if recved_block_id >= window_base && recved_block_id < window_end {
249+ // number of blocks acked
250+ return Ok ( recved_block_id - window_base + 1u16 ) ;
251+ } else {
252+ trace ! ( "Unexpected ack packet {recved_block_id}, window_base: {window_base}, window_len: {window_len}" ) ;
253+ }
247254 } else {
248- trace ! ( "Unexpected ack packet {recved_block_id}, window_base: {window_base}, window_len: {window_len}" ) ;
255+ // window_end wrapped
256+ if recved_block_id >= window_base {
257+ return Ok ( 1u16 + ( recved_block_id - window_base) ) ;
258+ } else if recved_block_id < window_end {
259+ return Ok ( 1u16 + recved_block_id + ( window_len - window_end) ) ;
260+ } else {
261+ trace ! ( "Unexpected ack packet {recved_block_id}, window_base: {window_base}, window_len: {window_len}" ) ;
262+ }
249263 }
264+ } ,
265+ Ok ( Packet :: Error ( error) ) if error. is_client_error ( ) => {
266+ // pass errors coming from the client
267+ return Err ( Error :: Packet ( error) )
250268 }
269+ // ignore all other errors
270+ _ => { }
251271 }
252272 }
253273 } )
0 commit comments