@@ -7,26 +7,32 @@ use std::pin::Pin;
77use std:: marker:: Unpin ;
88use pin_utils:: { unsafe_pinned, unsafe_unpinned} ;
99
10+ struct Block {
11+ offset : usize ,
12+ bytes : Box < dyn AsRef < [ u8 ] > > ,
13+ }
14+
1015/// Sink for the [`into_sink`](super::AsyncWriteExt::into_sink) method.
1116#[ must_use = "sinks do nothing unless polled" ]
17+ #[ derive( Debug ) ]
1218pub struct IntoSink < W > {
1319 writer : W ,
14- /// An outstanding block for us to push into the underlying writer, along with an index of how
15- /// far into the block we have written already.
16- buffer : Option < ( usize , Box < dyn AsRef < [ u8 ] > > ) > ,
20+ /// An outstanding block for us to push into the underlying writer, along with an offset of how
21+ /// far into this block we have written already.
22+ buffer : Option < Block > ,
1723}
1824
1925impl < W : Unpin > Unpin for IntoSink < W > { }
2026
2127impl < W : AsyncWrite > IntoSink < W > {
2228 unsafe_pinned ! ( writer: W ) ;
23- unsafe_unpinned ! ( buffer: Option <( usize , Box <dyn AsRef < [ u8 ] >> ) >) ;
29+ unsafe_unpinned ! ( buffer: Option <Block >) ;
2430
2531 pub ( super ) fn new ( writer : W ) -> Self {
2632 IntoSink { writer, buffer : None }
2733 }
2834
29- fn project < ' a > ( self : Pin < & ' a mut Self > ) -> ( Pin < & ' a mut W > , & ' a mut Option < ( usize , Box < dyn AsRef < [ u8 ] > > ) > ) {
35+ fn project < ' a > ( self : Pin < & ' a mut Self > ) -> ( Pin < & ' a mut W > , & ' a mut Option < Block > ) {
3036 unsafe {
3137 let this = self . get_unchecked_mut ( ) ;
3238 ( Pin :: new_unchecked ( & mut this. writer ) , & mut this. buffer )
@@ -41,17 +47,17 @@ impl<W: AsyncWrite> IntoSink<W> {
4147 ) -> Poll < Result < ( ) , io:: Error > >
4248 {
4349 let ( mut writer, buffer) = self . project ( ) ;
44- if let Some ( ( index , block ) ) = buffer {
50+ if let Some ( buffer ) = buffer {
4551 loop {
46- let bytes = ( * * block ) . as_ref ( ) ;
47- let written = ready ! ( writer. as_mut( ) . poll_write( cx, & bytes[ * index ..] ) ) ?;
48- * index += written;
49- if * index == bytes. len ( ) {
52+ let bytes = ( * buffer . bytes ) . as_ref ( ) ;
53+ let written = ready ! ( writer. as_mut( ) . poll_write( cx, & bytes[ buffer . offset ..] ) ) ?;
54+ buffer . offset += written;
55+ if buffer . offset == bytes. len ( ) {
5056 break ;
5157 }
5258 }
53- * buffer = None ;
5459 }
60+ * buffer = None ;
5561 Poll :: Ready ( Ok ( ( ) ) )
5662 }
5763
@@ -75,7 +81,7 @@ impl<W: AsyncWrite, Item: AsRef<[u8]> + 'static> Sink<Item> for IntoSink<W> {
7581 ) -> Result < ( ) , Self :: SinkError >
7682 {
7783 debug_assert ! ( self . as_mut( ) . buffer( ) . is_none( ) ) ;
78- * self . as_mut ( ) . buffer ( ) = Some ( ( 0 , Box :: new ( item) ) ) ;
84+ * self . as_mut ( ) . buffer ( ) = Some ( Block { offset : 0 , bytes : Box :: new ( item) } ) ;
7985 Ok ( ( ) )
8086 }
8187
@@ -100,16 +106,9 @@ impl<W: AsyncWrite, Item: AsRef<[u8]> + 'static> Sink<Item> for IntoSink<W> {
100106 }
101107}
102108
103- impl < W : fmt:: Debug > fmt:: Debug for IntoSink < W > {
104-
109+ impl fmt:: Debug for Block {
105110 fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
106- let buffer = self . buffer . as_ref ( ) . map ( |( size, block) | {
107- ( size, format ! ( "[... {} bytes ...]" , ( * * block) . as_ref( ) . len( ) ) )
108- } ) ;
109- f. debug_struct ( "IntoSink" )
110- . field ( "writer" , & self . writer )
111- . field ( "buffer" , & buffer)
112- . finish ( ) ?;
111+ write ! ( f, "[... {}/{} bytes ...]" , self . offset, ( * self . bytes) . as_ref( ) . len( ) ) ?;
113112 Ok ( ( ) )
114113 }
115114}
0 commit comments