11use crate :: engine:: Engine ;
2- use std:: {
3- cmp, fmt, io,
4- io:: { ErrorKind , Result } ,
5- } ;
2+ use std:: io:: ErrorKind ;
3+ use std:: { cmp, fmt, io} ;
64
75pub ( crate ) const BUF_SIZE : usize = 1024 ;
86/// The most bytes whose encoding will fit in `BUF_SIZE`
@@ -53,13 +51,6 @@ const MIN_ENCODE_CHUNK_SIZE: usize = 3;
5351///
5452/// It has some minor performance loss compared to encoding slices (a couple percent).
5553/// It does not do any heap allocation.
56- ///
57- /// # Limitations
58- ///
59- /// Owing to the specification of the `write` and `flush` methods on the `Write` trait and their
60- /// implications for a buffering implementation, these methods may not behave as expected. In
61- /// particular, calling `write_all` on this interface may fail with `io::ErrorKind::WriteZero`.
62- /// See the documentation of the `Write` trait implementation for further details.
6354pub struct EncoderWriter < ' e , E : Engine , W : io:: Write > {
6455 engine : & ' e E ,
6556 /// Where encoded data is written to. It's an Option as it's None immediately before Drop is
@@ -74,21 +65,27 @@ pub struct EncoderWriter<'e, E: Engine, W: io::Write> {
7465 /// Buffer to encode into. May hold leftover encoded bytes from a previous write call that the underlying writer
7566 /// did not write last time.
7667 output : [ u8 ; BUF_SIZE ] ,
77- /// How much of `output` is occupied with encoded data that couldn't be written last time
78- output_occupied_len : usize ,
68+ /// Occupied portion of output.
69+ ///
70+ /// Invariant for the range is that it’s either 0..0 or 0 ≤ start < end ≤
71+ /// BUF_SIZE. This means that if the range is empty, it’s 0..0.
72+ output_range : std:: ops:: Range < usize > ,
7973 /// panic safety: don't write again in destructor if writer panicked while we were writing to it
8074 panicked : bool ,
8175}
8276
8377impl < ' e , E : Engine , W : io:: Write > fmt:: Debug for EncoderWriter < ' e , E , W > {
8478 fn fmt ( & self , f : & mut fmt:: Formatter ) -> fmt:: Result {
79+ let range = self . output_range . clone ( ) ;
80+ let truncated_len = range. len ( ) . min ( 5 ) ;
81+ let truncated_range = range. start ..range. start + truncated_len;
8582 write ! (
8683 f,
87- "extra_input: {:?} extra_input_occupied_len:{:?} output[..5 ]: {:?} output_occupied_len : {:?}" ,
88- self . extra_input,
89- self . extra_input_occupied_len ,
90- & self . output[ 0 .. 5 ] ,
91- self . output_occupied_len
84+ "extra_input: {:?} occupied output[..{} ]: {:?} output_range : {:?}" ,
85+ & self . extra_input[ .. self . extra_input_occupied_len ] ,
86+ truncated_len ,
87+ & self . output[ truncated_range ] ,
88+ range ,
9289 )
9390 }
9491}
@@ -102,7 +99,7 @@ impl<'e, E: Engine, W: io::Write> EncoderWriter<'e, E, W> {
10299 extra_input : [ 0u8 ; MIN_ENCODE_CHUNK_SIZE ] ,
103100 extra_input_occupied_len : 0 ,
104101 output : [ 0u8 ; BUF_SIZE ] ,
105- output_occupied_len : 0 ,
102+ output_range : 0 .. 0 ,
106103 panicked : false ,
107104 }
108105 }
@@ -123,7 +120,7 @@ impl<'e, E: Engine, W: io::Write> EncoderWriter<'e, E, W> {
123120 /// # Errors
124121 ///
125122 /// The first error that is not of `ErrorKind::Interrupted` will be returned.
126- pub fn finish ( & mut self ) -> Result < W > {
123+ pub fn finish ( & mut self ) -> io :: Result < W > {
127124 // If we could consume self in finish(), we wouldn't have to worry about this case, but
128125 // finish() is retryable in the face of I/O errors, so we can't consume here.
129126 if self . delegate . is_none ( ) {
@@ -138,91 +135,96 @@ impl<'e, E: Engine, W: io::Write> EncoderWriter<'e, E, W> {
138135 }
139136
140137 /// Write any remaining buffered data to the delegate writer.
141- fn write_final_leftovers ( & mut self ) -> Result < ( ) > {
138+ fn write_final_leftovers ( & mut self ) -> io :: Result < ( ) > {
142139 if self . delegate . is_none ( ) {
143140 // finish() has already successfully called this, and we are now in drop() with a None
144141 // writer, so just no-op
145142 return Ok ( ( ) ) ;
146143 }
147144
148- self . write_all_encoded_output ( ) ?;
149-
150145 if self . extra_input_occupied_len > 0 {
146+ // Make sure output isn’t full so we can append to it.
147+ if self . output_range . end == self . output . len ( ) {
148+ self . flush_all_output ( ) ?;
149+ }
150+
151151 let encoded_len = self
152152 . engine
153153 . encode_slice (
154154 & self . extra_input [ ..self . extra_input_occupied_len ] ,
155- & mut self . output [ ..] ,
155+ & mut self . output [ self . output_range . end ..] ,
156156 )
157157 . expect ( "buffer is large enough" ) ;
158158
159- self . output_occupied_len = encoded_len;
160-
161- self . write_all_encoded_output ( ) ?;
162-
163- // write succeeded, do not write the encoding of extra again if finish() is retried
159+ self . output_range . end += encoded_len;
164160 self . extra_input_occupied_len = 0 ;
165161 }
166162
167- Ok ( ( ) )
163+ self . flush_all_output ( )
168164 }
169165
170- /// Write as much of the encoded output to the delegate writer as it will accept, and store the
171- /// leftovers to be attempted at the next write() call. Updates `self.output_occupied_len`.
166+ /// Flushes output buffer to the delegate.
172167 ///
173- /// # Errors
168+ /// Loops writing data to the delegate until output buffer is empty or
169+ /// delegate returns an error. An `Ok(0)` return from the delegate is
170+ /// treated as an error.
174171 ///
175- /// Errors from the delegate writer are returned. In the case of an error,
176- /// `self.output_occupied_len` will not be updated, as errors from `write` are specified to mean
177- /// that no write took place.
178- fn write_to_delegate ( & mut self , current_output_len : usize ) -> Result < ( ) > {
179- self . panicked = true ;
180- let res = self
181- . delegate
182- . as_mut ( )
183- . expect ( "Writer must be present" )
184- . write ( & self . output [ ..current_output_len] ) ;
185- self . panicked = false ;
186-
187- res. map ( |consumed| {
188- debug_assert ! ( consumed <= current_output_len) ;
189-
190- if consumed < current_output_len {
191- self . output_occupied_len = current_output_len. checked_sub ( consumed) . unwrap ( ) ;
192- // If we're blocking on I/O, the minor inefficiency of copying bytes to the
193- // start of the buffer is the least of our concerns...
194- // TODO Rotate moves more than we need to; copy_within now stable.
195- self . output . rotate_left ( consumed) ;
196- } else {
197- self . output_occupied_len = 0 ;
172+ /// Updates `output_range` accordingly.
173+ fn flush_output ( & mut self ) -> Option < io:: Result < usize > > {
174+ if self . output_range . end == 0 {
175+ return None ;
176+ }
177+ loop {
178+ match self . write_to_delegate ( self . output_range . clone ( ) ) {
179+ Ok ( 0 ) => break Some ( Ok ( 0 ) ) ,
180+ Ok ( n) if n >= self . output_range . len ( ) => {
181+ self . output_range = 0 ..0 ;
182+ break None ;
183+ }
184+ Ok ( n) => self . output_range . start += n,
185+ Err ( err) => break Some ( Err ( err) ) ,
198186 }
199- } )
187+ }
200188 }
201189
202- /// Write all buffered encoded output. If this returns `Ok`, `self.output_occupied_len` is `0`.
203- ///
204- /// This is basically write_all for the remaining buffered data but without the undesirable
205- /// abort-on-`Ok(0)` behavior.
206- ///
207- /// # Errors
190+ /// Flushes output buffer to the delegate ignoring interruptions.
208191 ///
209- /// Any error emitted by the delegate writer abort the write loop and is returned, unless it's
210- /// `Interrupted`, in which case the error is ignored and writes will continue.
211- fn write_all_encoded_output ( & mut self ) -> Result < ( ) > {
212- while self . output_occupied_len > 0 {
213- let remaining_len = self . output_occupied_len ;
214- match self . write_to_delegate ( remaining_len) {
215- // try again on interrupts ala write_all
216- Err ( ref e) if e. kind ( ) == ErrorKind :: Interrupted => { }
217- // other errors return
218- Err ( e) => return Err ( e) ,
219- // success no-ops because remaining length is already updated
220- Ok ( _) => { }
221- } ;
192+ /// Like [`Self::flush_output`] but ignores [`ErrorKind::Interrupted`]
193+ /// errors and converts `Ok(0)` to [`ErrorKind::WriteZero`].
194+ fn flush_all_output ( & mut self ) -> io:: Result < ( ) > {
195+ if self . output_range . end == 0 {
196+ return Ok ( ( ) ) ;
222197 }
198+ loop {
199+ match self . write_to_delegate ( self . output_range . clone ( ) ) {
200+ Ok ( 0 ) => {
201+ break Err ( io:: Error :: new (
202+ io:: ErrorKind :: WriteZero ,
203+ "failed to write whole buffer" ,
204+ ) )
205+ }
206+ Ok ( n) if n >= self . output_range . len ( ) => {
207+ self . output_range = 0 ..0 ;
208+ break Ok ( ( ) ) ;
209+ }
210+ Ok ( n) => self . output_range . start += n,
211+ Err ( err) if err. kind ( ) == ErrorKind :: Interrupted => ( ) ,
212+ Err ( err) => break Err ( err) ,
213+ }
214+ }
215+ }
223216
224- debug_assert_eq ! ( 0 , self . output_occupied_len) ;
225- Ok ( ( ) )
217+ /// Writes given range of output buffer to the delegate. Performs exactly
218+ /// one write. Sets `panicked` to `true` if delegate panics.
219+ fn write_to_delegate ( & mut self , range : std:: ops:: Range < usize > ) -> io:: Result < usize > {
220+ self . panicked = true ;
221+ let res = self
222+ . delegate
223+ . as_mut ( )
224+ . expect ( "Encoder has already had finish() called" )
225+ . write ( & self . output [ range] ) ;
226+ self . panicked = false ;
227+ res
226228 }
227229
228230 /// Unwraps this `EncoderWriter`, returning the base writer it writes base64 encoded output
@@ -262,38 +264,24 @@ impl<'e, E: Engine, W: io::Write> io::Write for EncoderWriter<'e, E, W> {
262264 /// # Errors
263265 ///
264266 /// Any errors emitted by the delegate writer are returned.
265- fn write ( & mut self , input : & [ u8 ] ) -> Result < usize > {
267+ fn write ( & mut self , input : & [ u8 ] ) -> io :: Result < usize > {
266268 if self . delegate . is_none ( ) {
267269 panic ! ( "Cannot write more after calling finish()" ) ;
268270 }
269271
270- if input . is_empty ( ) {
271- return Ok ( 0 ) ;
272+ if let Some ( res ) = self . flush_output ( ) {
273+ return res ;
272274 }
275+ debug_assert_eq ! ( 0 , self . output_range. len( ) ) ;
273276
274- // The contract of `Write::write` places some constraints on this implementation:
275- // - a call to `write()` represents at most one call to a wrapped `Write`, so we can't
276- // iterate over the input and encode multiple chunks.
277- // - Errors mean that "no bytes were written to this writer", so we need to reset the
278- // internal state to what it was before the error occurred
279-
280- // before reading any input, write any leftover encoded output from last time
281- if self . output_occupied_len > 0 {
282- let current_len = self . output_occupied_len ;
283- return self
284- . write_to_delegate ( current_len)
285- // did not read any input
286- . map ( |_| 0 ) ;
277+ if input. is_empty ( ) {
278+ return Ok ( 0 ) ;
287279 }
288280
289- debug_assert_eq ! ( 0 , self . output_occupied_len) ;
290-
291281 // how many bytes, if any, were read into `extra` to create a triple to encode
292282 let mut extra_input_read_len = 0 ;
293283 let mut input = input;
294284
295- let orig_extra_len = self . extra_input_occupied_len ;
296-
297285 let mut encoded_size = 0 ;
298286 // always a multiple of MIN_ENCODE_CHUNK_SIZE
299287 let mut max_input_len = MAX_INPUT_LEN ;
@@ -322,8 +310,10 @@ impl<'e, E: Engine, W: io::Write> io::Write for EncoderWriter<'e, E, W> {
322310
323311 input = & input[ extra_input_read_len..] ;
324312
325- // consider extra to be used up, since we encoded it
326- self . extra_input_occupied_len = 0 ;
313+ // Note: Not updating self.extra_input_occupied_len yet. It’s
314+ // going to be zeroed at the end of the function if we
315+ // successfully write some data to delegate.
316+
327317 // don't clobber where we just encoded to
328318 encoded_size = 4 ;
329319 // and don't read more than can be encoded
@@ -367,29 +357,30 @@ impl<'e, E: Engine, W: io::Write> io::Write for EncoderWriter<'e, E, W> {
367357 & mut self . output [ encoded_size..] ,
368358 ) ;
369359
370- // not updating `self.output_occupied_len` here because if the below write fails, it should
371- // "never take place" -- the buffer contents we encoded are ignored and perhaps retried
372- // later, if the consumer chooses.
373-
374- self . write_to_delegate ( encoded_size)
375- // no matter whether we wrote the full encoded buffer or not, we consumed the same
376- // input
377- . map ( |_| extra_input_read_len + input_chunks_to_encode_len)
378- . map_err ( |e| {
379- // in case we filled and encoded `extra`, reset extra_len
380- self . extra_input_occupied_len = orig_extra_len;
360+ // Not updating `self.output_range` here because if the write fails, it
361+ // should "never take place" -- the buffer contents we encoded are
362+ // ignored and perhaps retried later, if the consumer chooses.
381363
382- e
383- } )
364+ self . write_to_delegate ( 0 ..encoded_size) . map ( |written| {
365+ if written < encoded_size {
366+ // Update output range to portion which is yet to be written.
367+ self . output_range = written..encoded_size;
368+ } else {
369+ // Everything was written, leave output range empty.
370+ debug_assert_eq ! ( 0 ..0 , self . output_range) ;
371+ }
372+ self . extra_input_occupied_len = 0 ;
373+ extra_input_read_len + input_chunks_to_encode_len
374+ } )
384375 }
385376
386377 /// Because this is usually treated as OK to call multiple times, it will *not* flush any
387378 /// incomplete chunks of input or write padding.
388379 /// # Errors
389380 ///
390381 /// The first error that is not of [`ErrorKind::Interrupted`] will be returned.
391- fn flush ( & mut self ) -> Result < ( ) > {
392- self . write_all_encoded_output ( ) ?;
382+ fn flush ( & mut self ) -> io :: Result < ( ) > {
383+ self . flush_all_output ( ) ?;
393384 self . delegate
394385 . as_mut ( )
395386 . expect ( "Writer must be present" )
0 commit comments