44use std:: {
55 io,
66 pin:: Pin ,
7- task:: { ready , Context , Poll } ,
7+ task:: { Context , Poll } ,
88} ;
99
1010use async_compression:: tokio:: write:: ZstdEncoder ;
@@ -13,7 +13,8 @@ use tokio::io::{AsyncWrite, AsyncWriteExt as _};
1313/// <https://github.com/Nullus157/async-compression/issues/246>
1414#[ tokio:: test]
1515async fn issue_246 ( ) {
16- let mut zstd_encoder = Transparent :: new ( ZstdEncoder :: new ( DelayedShutdown :: default ( ) ) ) ;
16+ let mut zstd_encoder =
17+ Transparent :: new ( Trace :: new ( ZstdEncoder :: new ( DelayedShutdown :: default ( ) ) ) ) ;
1718 zstd_encoder. shutdown ( ) . await . unwrap ( ) ;
1819}
1920
@@ -36,11 +37,17 @@ impl<T: AsyncWrite> AsyncWrite for Transparent<T> {
3637 cx : & mut Context < ' _ > ,
3738 buf : & [ u8 ] ,
3839 ) -> Poll < Result < usize , io:: Error > > {
39- self . project ( ) . inner . poll_write ( cx, buf)
40+ eprintln ! ( "Transparent::poll_write = ..." ) ;
41+ let ret = self . project ( ) . inner . poll_write ( cx, buf) ;
42+ eprintln ! ( "Transparent::poll_write = {:?}" , ret) ;
43+ ret
4044 }
4145
4246 fn poll_flush ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , io:: Error > > {
43- self . project ( ) . inner . poll_flush ( cx)
47+ eprintln ! ( "Transparent::poll_flush = ..." ) ;
48+ let ret = self . project ( ) . inner . poll_flush ( cx) ;
49+ eprintln ! ( "Transparent::poll_flush = {:?}" , ret) ;
50+ ret
4451 }
4552
4653 /// To quote the [`AsyncWrite`] docs:
@@ -49,9 +56,15 @@ impl<T: AsyncWrite> AsyncWrite for Transparent<T> {
4956 /// > That is, callers don't need to call flush before calling shutdown.
5057 /// > They can rely that by calling shutdown any pending buffered data will be written out.
5158 fn poll_shutdown ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , io:: Error > > {
59+ eprintln ! ( "Transparent::poll_shutdown = ..." ) ;
5260 let mut this = self . project ( ) ;
53- ready ! ( this. inner. as_mut( ) . poll_flush( cx) ) ?;
54- this. inner . poll_shutdown ( cx)
61+ let ret = match this. inner . as_mut ( ) . poll_flush ( cx) {
62+ Poll :: Ready ( Ok ( ( ) ) ) => this. inner . poll_shutdown ( cx) ,
63+ Poll :: Ready ( Err ( e) ) => Poll :: Ready ( Err ( e) ) ,
64+ Poll :: Pending => Poll :: Pending ,
65+ } ;
66+ eprintln ! ( "Transparent::poll_shutdown = {:?}" , ret) ;
67+ ret
5568 }
5669}
5770
@@ -70,24 +83,73 @@ impl AsyncWrite for DelayedShutdown {
7083 cx : & mut Context < ' _ > ,
7184 buf : & [ u8 ] ,
7285 ) -> Poll < Result < usize , io:: Error > > {
86+ eprintln ! ( "DelayedShutdown::poll_write = ..." ) ;
7387 let _ = cx;
7488 self . project ( ) . contents . extend_from_slice ( buf) ;
75- Poll :: Ready ( Ok ( buf. len ( ) ) )
89+ let ret = Poll :: Ready ( Ok ( buf. len ( ) ) ) ;
90+ eprintln ! ( "DelayedShutdown::poll_write = {:?}" , ret) ;
91+ ret
7692 }
7793
7894 fn poll_flush ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , io:: Error > > {
95+ eprintln ! ( "DelayedShutdown::poll_flush = ..." ) ;
7996 let _ = cx;
80- Poll :: Ready ( Ok ( ( ) ) )
97+ let ret = Poll :: Ready ( Ok ( ( ) ) ) ;
98+ eprintln ! ( "DelayedShutdown::poll_flush = {:?}" , ret) ;
99+ ret
81100 }
82101
83102 fn poll_shutdown ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , io:: Error > > {
84- match self . project ( ) . num_times_shutdown_called {
103+ eprintln ! ( "DelayedShutdown::poll_shutdown = ..." ) ;
104+ let ret = match self . project ( ) . num_times_shutdown_called {
85105 it @ 0 => {
86106 * it += 1 ;
87107 cx. waker ( ) . wake_by_ref ( ) ;
88108 Poll :: Pending
89109 }
90110 _ => Poll :: Ready ( Ok ( ( ) ) ) ,
91- }
111+ } ;
112+ eprintln ! ( "DelayedShutdown::poll_shutdown = {:?}" , ret) ;
113+ ret
114+ }
115+ }
116+
117+ pin_project_lite:: pin_project! {
118+ /// A wrapper which traces all calls
119+ struct Trace <T > {
120+ #[ pin] inner: T
121+ }
122+ }
123+
124+ impl < T > Trace < T > {
125+ fn new ( inner : T ) -> Self {
126+ Self { inner }
127+ }
128+ }
129+
130+ impl < T : AsyncWrite > AsyncWrite for Trace < T > {
131+ fn poll_write (
132+ self : Pin < & mut Self > ,
133+ cx : & mut Context < ' _ > ,
134+ buf : & [ u8 ] ,
135+ ) -> Poll < Result < usize , io:: Error > > {
136+ eprintln ! ( "Trace::poll_write = ..." ) ;
137+ let ret = self . project ( ) . inner . poll_write ( cx, buf) ;
138+ eprintln ! ( "Trace::poll_write = {:?}" , ret) ;
139+ ret
140+ }
141+
142+ fn poll_flush ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , io:: Error > > {
143+ eprintln ! ( "Trace::poll_flush = ..." ) ;
144+ let ret = self . project ( ) . inner . poll_flush ( cx) ;
145+ eprintln ! ( "Trace::poll_flush = {:?}" , ret) ;
146+ ret
147+ }
148+
149+ fn poll_shutdown ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , io:: Error > > {
150+ eprintln ! ( "Trace::poll_shutdown = ..." ) ;
151+ let ret = self . project ( ) . inner . poll_shutdown ( cx) ;
152+ eprintln ! ( "Trace::poll_shutdown = {:?}" , ret) ;
153+ ret
92154 }
93155}
0 commit comments