@@ -7,35 +7,138 @@ use crate::io::Write;
77
88const DEFAULT_CAPACITY : usize = 8 * 1024 ;
99
10-
11- pub struct BufWriter < W : AsyncWrite > {
12- inner : Option < W > ,
10+ /// Wraps a writer and buffers its output.
11+ ///
12+ /// It can be excessively inefficient to work directly with something that
13+ /// implements [`Write`]. For example, every call to
14+ /// [`write`][`TcpStream::write`] on [`TcpStream`] results in a system call. A
15+ /// `BufWriter` keeps an in-memory buffer of data and writes it to an underlying
16+ /// writer in large, infrequent batches.
17+ ///
18+ /// `BufWriter` can improve the speed of programs that make *small* and
19+ /// *repeated* write calls to the same file or network socket. It does not
20+ /// help when writing very large amounts at once, or writing just one or a few
21+ /// times. It also provides no advantage when writing to a destination that is
22+ /// in memory, like a `Vec<u8>`.
23+ ///
24+ /// When the `BufWriter` is dropped, the contents of its buffer will be written
25+ /// out. However, any errors that happen in the process of flushing the buffer
26+ /// when the writer is dropped will be ignored. Code that wishes to handle such
27+ /// errors must manually call [`flush`] before the writer is dropped.
28+ ///
29+ /// This type is an async version of [`std::io::BufReader`].
30+ ///
31+ /// [`std::io::BufReader`]: https://doc.rust-lang.org/std/io/struct.BufReader.html
32+ ///
33+ /// # Examples
34+ ///
35+ /// Let's write the numbers one through ten to a [`TcpStream`]:
36+ ///
37+ /*/ ```no_run
38+ / use std::io::prelude::*;
39+ / use std::net::TcpStream;
40+ /
41+ / let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap();
42+ /
43+ / for i in 0..10 {
44+ / stream.write(&[i+1]).unwrap();
45+ / }
46+ / ```*/
47+ ///
48+ /// Because we're not buffering, we write each one in turn, incurring the
49+ /// overhead of a system call per byte written. We can fix this with a
50+ /// `BufWriter`:
51+ ///
52+ /*/ ```no_run
53+ / use std::io::prelude::*;
54+ / use std::io::BufWriter;
55+ / use std::net::TcpStream;
56+ /
57+ / let mut stream = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap());
58+ /
59+ / for i in 0..10 {
60+ / stream.write(&[i+1]).unwrap();
61+ / }
62+ / ```*/
63+ ///
64+ /// By wrapping the stream with a `BufWriter`, these ten writes are all grouped
65+ /// together by the buffer, and will all be written out in one system call when
66+ /// the `stream` is dropped.
67+ ///
68+ /// [`Write`]: ../../std/io/trait.Write.html
69+ /// [`TcpStream::write`]: ../../std/net/struct.TcpStream.html#method.write
70+ /// [`TcpStream`]: ../../std/net/struct.TcpStream.html
71+ /// [`flush`]: #method.flush
72+ pub struct BufWriter < W > {
73+ inner : W ,
1374 buf : Vec < u8 > ,
14- panicked : bool ,
75+ written : usize ,
1576}
1677
1778impl < W : AsyncWrite + Unpin > BufWriter < W > {
18- pin_utils:: unsafe_pinned!( inner: Option < W > ) ;
19- pin_utils:: unsafe_unpinned!( panicked : bool ) ;
79+ pin_utils:: unsafe_pinned!( inner: W ) ;
80+ pin_utils:: unsafe_unpinned!( buf : Vec < u8 > ) ;
2081
82+ /// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB,
83+ /// but may change in the future.
84+ ///
85+ /// # Examples
86+ ///
87+ /// ```no_run
88+ /// use async_std::io::BufWriter;
89+ /// use async_std::net::TcpStream;
90+ ///
91+ /// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap());
92+ /// ```
2193 pub fn new ( inner : W ) -> BufWriter < W > {
2294 BufWriter :: with_capacity ( DEFAULT_CAPACITY , inner)
2395 }
2496
97+ /// Creates a new `BufWriter` with the specified buffer capacity.
98+ ///
99+ /// # Examples
100+ ///
101+ /// Creating a buffer with a buffer of a hundred bytes.
102+ ///
103+ /// ```no_run
104+ /// use async_std::io::BufWriter;
105+ /// use async_std::net::TcpStream;
106+ ///
107+ /// let stream = TcpStream::connect("127.0.0.1:34254").unwrap();
108+ /// let mut buffer = BufWriter::with_capacity(100, stream);
109+ /// ```
25110 pub fn with_capacity ( capacity : usize , inner : W ) -> BufWriter < W > {
26111 BufWriter {
27- inner : Some ( inner ) ,
112+ inner,
28113 buf : Vec :: with_capacity ( capacity) ,
29- panicked : false ,
114+ written : 0 ,
30115 }
31116 }
32117
33118 pub fn get_ref ( & self ) -> & W {
34- self . inner . as_ref ( ) . unwrap ( )
119+ & self . inner
35120 }
36121
37122 pub fn get_mut ( & mut self ) -> & mut W {
38- self . inner . as_mut ( ) . unwrap ( )
123+ & mut self . inner
124+ }
125+
126+ pub fn get_pin_mut ( self : Pin < & mut Self > ) -> Pin < & mut W > {
127+ self . inner ( )
128+ }
129+
130+ /// Consumes BufWriter, returning the underlying writer
131+ ///
132+ /// This method will not write leftover data, it will be lost.
133+ /// For method that will attempt to write before returning the writer see [`poll_into_inner`]
134+ ///
135+ /// [`poll_into_inner`]: #method.poll_into_inner
136+ pub fn into_inner ( self ) -> W {
137+ self . inner
138+ }
139+
140+ pub fn poll_into_inner ( mut self : Pin < & mut Self > , cx : Context < ' _ > ) -> Poll < io:: Result < usize > > {
141+ unimplemented ! ( "poll into inner method" )
39142 }
40143
41144 pub fn buffer ( & self ) -> & [ u8 ] {
@@ -46,25 +149,21 @@ impl<W: AsyncWrite + Unpin> BufWriter<W> {
46149 let Self {
47150 inner,
48151 buf,
49- panicked
152+ written
50153 } = Pin :: get_mut ( self ) ;
51- let mut panicked = Pin :: new ( panicked) ;
52- let mut written = 0 ;
154+ let mut inner = Pin :: new ( inner) ;
53155 let len = buf. len ( ) ;
54156 let mut ret = Ok ( ( ) ) ;
55- while written < len {
56- * panicked = true ;
57- let r = Pin :: new ( inner. as_mut ( ) . unwrap ( ) ) ;
58- * panicked = false ;
59- match r. poll_write ( cx, & buf[ written..] ) {
157+ while * written < len {
158+ match inner. as_mut ( ) . poll_write ( cx, & buf[ * written..] ) {
60159 Poll :: Ready ( Ok ( 0 ) ) => {
61160 ret = Err ( io:: Error :: new (
62161 io:: ErrorKind :: WriteZero ,
63162 "Failed to write buffered data" ,
64163 ) ) ;
65164 break ;
66165 }
67- Poll :: Ready ( Ok ( n) ) => written += n,
166+ Poll :: Ready ( Ok ( n) ) => * written += n,
68167 Poll :: Ready ( Err ( ref e) ) if e. kind ( ) == io:: ErrorKind :: Interrupted => { }
69168 Poll :: Ready ( Err ( e) ) => {
70169 ret = Err ( e) ;
@@ -73,22 +172,12 @@ impl<W: AsyncWrite + Unpin> BufWriter<W> {
73172 Poll :: Pending => return Poll :: Pending ,
74173 }
75174 }
76- if written > 0 {
77- buf. drain ( ..written) ;
175+ if * written > 0 {
176+ buf. drain ( ..* written) ;
78177 }
178+ * written = 0 ;
79179 Poll :: Ready ( ret)
80180 }
81-
82- pub fn poll_into_inner (
83- mut self : Pin < & mut Self > ,
84- cx : & mut Context < ' _ > ,
85- //TODO: Fix 'expected function, found struct `IntoInnerError`' compiler error
86- ) -> Poll < io:: Result < W > > {
87- match ready ! ( self . as_mut( ) . poll_flush_buf( cx) ) {
88- Ok ( ( ) ) => Poll :: Ready ( Ok ( self . inner ( ) . take ( ) . unwrap ( ) ) ) ,
89- Err ( e) => Poll :: Ready ( Err ( io:: Error :: new ( io:: ErrorKind :: Other , "" ) ) )
90- }
91- }
92181}
93182
94183impl < W : AsyncWrite + Unpin > AsyncWrite for BufWriter < W > {
@@ -97,33 +186,28 @@ impl<W: AsyncWrite + Unpin> AsyncWrite for BufWriter<W> {
97186 cx : & mut Context ,
98187 buf : & [ u8 ] ,
99188 ) -> Poll < io:: Result < usize > > {
100- let panicked = self . as_mut ( ) . panicked ( ) ;
101- if self . as_ref ( ) . buf . len ( ) + buf. len ( ) > self . as_ref ( ) . buf . capacity ( ) {
102- match ready ! ( self . as_mut( ) . poll_flush_buf( cx) ) {
103- Ok ( ( ) ) => { } ,
104- Err ( e) => return Poll :: Ready ( Err ( e) )
105- }
189+ if self . buf . len ( ) + buf. len ( ) > self . buf . capacity ( ) {
190+ ready ! ( self . as_mut( ) . poll_flush_buf( cx) ) ?;
106191 }
107- if buf. len ( ) >= self . as_ref ( ) . buf . capacity ( ) {
108- * panicked = true ;
109- let r = ready ! ( self . as_mut( ) . poll_write( cx, buf) ) ;
110- * panicked = false ;
111- return Poll :: Ready ( r)
192+ if buf. len ( ) >= self . buf . capacity ( ) {
193+ self . inner ( ) . poll_write ( cx, buf)
112194 } else {
113- return Poll :: Ready ( ready ! ( self . as_ref ( ) . buf . write( buf) . poll( ) ) )
195+ self . buf ( ) . write ( buf) . poll ( )
114196 }
115197 }
116198
117199 fn poll_flush ( mut self : Pin < & mut Self > , cx : & mut Context ) -> Poll < io:: Result < ( ) > > {
118- unimplemented ! ( )
200+ ready ! ( self . as_mut( ) . poll_flush_buf( cx) ) ?;
201+ self . inner ( ) . poll_flush ( cx)
119202 }
120203
121204 fn poll_close ( mut self : Pin < & mut Self > , cx : & mut Context ) -> Poll < io:: Result < ( ) > > {
122- unimplemented ! ( )
205+ ready ! ( self . as_mut( ) . poll_flush_buf( cx) ) ?;
206+ self . inner ( ) . poll_close ( cx)
123207 }
124208}
125209
126- impl < W : AsyncWrite + fmt:: Debug > fmt:: Debug for BufWriter < W > {
210+ impl < W : AsyncWrite + fmt:: Debug + Unpin > fmt:: Debug for BufWriter < W > {
127211 fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
128212 f. debug_struct ( "BufReader" )
129213 . field ( "writer" , & self . inner )
@@ -135,6 +219,138 @@ impl<W: AsyncWrite + fmt::Debug> fmt::Debug for BufWriter<W> {
135219 }
136220}
137221
222+ pub struct LineWriter < W : AsyncWrite + Unpin > {
223+ inner : BufWriter < W > ,
224+ need_flush : bool ,
225+ }
226+
227+ impl < W : AsyncWrite + Unpin > LineWriter < W > {
228+ pin_utils:: unsafe_pinned!( inner: BufWriter <W >) ;
229+ pin_utils:: unsafe_unpinned!( need_flush: bool ) ;
230+ /// Creates a new `LineWriter`.
231+ ///
232+ /// # Examples
233+ ///
234+ /// ```no_run
235+ /// use async_std::fs::File;
236+ /// use async_std::io::LineWriter;
237+ ///
238+ /// fn main() -> std::io::Result<()> {
239+ /// async_std::task::block_on(async {
240+ /// let file = File::create("poem.txt").await?;
241+ /// let file = LineWriter::new(file);
242+ /// Ok(())
243+ /// })
244+ /// }
245+ /// ```
246+ pub fn new ( inner : W ) -> LineWriter < W > {
247+ // Lines typically aren't that long, don't use a giant buffer
248+ LineWriter :: with_capacity ( 1024 , inner)
249+ }
250+
251+ /// Creates a new `LineWriter` with a specified capacity for the internal
252+ /// buffer.
253+ ///
254+ /// # Examples
255+ ///
256+ /// ```no_run
257+ /// use async_std::fs::File;
258+ /// use async_std::io::LineWriter;
259+ ///
260+ /// fn main() -> std::io::Result<()> {
261+ /// async_std::task::block_on(async {
262+ /// let file = File::create("poem.txt").await?;
263+ /// let file = LineWriter::with_capacity(100, file);
264+ /// Ok(())
265+ /// })
266+ /// }
267+ /// ```
268+ pub fn with_capacity ( capacity : usize , inner : W ) -> LineWriter < W > {
269+ LineWriter {
270+ inner : BufWriter :: with_capacity ( capacity, inner) ,
271+ need_flush : false ,
272+ }
273+ }
274+
275+ pub fn get_ref ( & self ) -> & W {
276+ self . inner . get_ref ( )
277+ }
278+
279+ pub fn get_mut ( & mut self ) -> & mut W {
280+ self . inner . get_mut ( )
281+ }
282+
283+ pub fn into_inner ( self ) -> W {
284+ self . inner . into_inner ( )
285+ }
286+ }
287+
288+ impl < W : AsyncWrite + Unpin > AsyncWrite for LineWriter < W > {
289+ fn poll_write ( mut self : Pin < & mut Self > , cx : & mut Context , buf : & [ u8 ] ) -> Poll < io:: Result < usize > > {
290+ if self . need_flush {
291+ self . as_mut ( ) . poll_flush ( cx) ?;
292+ }
293+
294+ let i = match memchr:: memrchr ( b'\n' , buf) {
295+ Some ( i) => i,
296+ None => return self . as_mut ( ) . inner ( ) . as_mut ( ) . poll_write ( cx, buf)
297+ } ;
298+
299+ let n = ready ! ( self . as_mut( ) . inner( ) . as_mut( ) . poll_write( cx, & buf[ ..=i] ) ?) ;
300+ * self . as_mut ( ) . need_flush ( ) = true ;
301+ if ready ! ( self . as_mut( ) . poll_flush( cx) ) . is_err ( ) || n != 1 + 1 {
302+ return Poll :: Ready ( Ok ( n) )
303+ }
304+ match ready ! ( self . inner( ) . poll_write( cx, & buf[ i + 1 ..] ) ) {
305+ Ok ( i) => Poll :: Ready ( Ok ( n + 1 ) ) ,
306+ Err ( _) => Poll :: Ready ( Ok ( n) )
307+ }
308+ }
309+
310+ fn poll_flush ( mut self : Pin < & mut Self > , cx : & mut Context ) -> Poll < io:: Result < ( ) > > {
311+ self . as_mut ( ) . inner ( ) . poll_flush ( cx) ?;
312+ * self . need_flush ( ) = false ;
313+ Poll :: Ready ( Ok ( ( ) ) )
314+ }
315+
316+ fn poll_close ( mut self : Pin < & mut Self > , cx : & mut Context ) -> Poll < io:: Result < ( ) > > {
317+ self . as_mut ( ) . inner ( ) . poll_flush ( cx) ?;
318+ self . inner ( ) . poll_close ( cx)
319+ }
320+ }
321+
322+ impl < W : AsyncWrite + Unpin > fmt:: Debug for LineWriter < W > where W : fmt:: Debug {
323+ fn fmt ( & self , fmt : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
324+ fmt. debug_struct ( "LineWriter" )
325+ . field ( "writer" , & self . inner . inner )
326+ . field ( "buffer" ,
327+ & format_args ! ( "{}/{}" , self . inner. buf. len( ) , self . inner. buf. capacity( ) ) )
328+ . finish ( )
329+ }
330+ }
331+
332+
138333mod tests {
334+ use crate :: prelude:: * ;
335+ use crate :: task;
336+ use super :: LineWriter ;
139337
338+ #[ test]
339+ fn test_line_buffer ( ) {
340+ task:: block_on ( async {
341+ let mut writer = LineWriter :: new ( Vec :: new ( ) ) ;
342+ writer. write ( & [ 0 ] ) . await . unwrap ( ) ;
343+ assert_eq ! ( * writer. get_ref( ) , [ ] ) ;
344+ writer. write ( & [ 1 ] ) . await . unwrap ( ) ;
345+ assert_eq ! ( * writer. get_ref( ) , [ ] ) ;
346+ writer. flush ( ) . await . unwrap ( ) ;
347+ assert_eq ! ( * writer. get_ref( ) , [ 0 , 1 ] ) ;
348+ writer. write ( & [ 0 , b'\n' , 1 , b'\n' , 2 ] ) . await . unwrap ( ) ;
349+ assert_eq ! ( * writer. get_ref( ) , [ 0 , 1 , 0 , b'\n' , 1 , b'\n' ] ) ;
350+ writer. flush ( ) . await . unwrap ( ) ;
351+ assert_eq ! ( * writer. get_ref( ) , [ 0 , 1 , 0 , b'\n' , 1 , b'\n' , 2 ] ) ;
352+ writer. write ( & [ 3 , b'\n' ] ) . await . unwrap ( ) ;
353+ assert_eq ! ( * writer. get_ref( ) , [ 0 , 1 , 0 , b'\n' , 1 , b'\n' , 2 , 3 , b'\n' ] ) ;
354+ } )
355+ }
140356}
0 commit comments