11#![ cfg( all( feature = "tokio" , feature = "zstd" ) ) ]
2- #![ allow( clippy:: unusual_byte_groupings) ]
32
43use std:: {
54 io,
65 pin:: Pin ,
7- task:: { Context , Poll } ,
6+ task:: { ready , Context , Poll } ,
87} ;
98
109use async_compression:: tokio:: write:: ZstdEncoder ;
1110use tokio:: io:: { AsyncWrite , AsyncWriteExt as _} ;
11+ use tracing_subscriber:: fmt:: format:: FmtSpan ;
1212
1313/// <https://github.com/Nullus157/async-compression/issues/246>
1414#[ tokio:: test]
1515async fn issue_246 ( ) {
16+ tracing_subscriber:: fmt ( )
17+ . without_time ( )
18+ . with_ansi ( false )
19+ . with_level ( false )
20+ . with_test_writer ( )
21+ . with_target ( false )
22+ . with_span_events ( FmtSpan :: NEW )
23+ . init ( ) ;
1624 let mut zstd_encoder =
1725 Transparent :: new ( Trace :: new ( ZstdEncoder :: new ( DelayedShutdown :: default ( ) ) ) ) ;
1826 zstd_encoder. shutdown ( ) . await . unwrap ( ) ;
@@ -32,39 +40,30 @@ impl<T> Transparent<T> {
3240}
3341
3442impl < T : AsyncWrite > AsyncWrite for Transparent < T > {
43+ #[ tracing:: instrument( name = "Transparent::poll_write" , skip_all, ret) ]
3544 fn poll_write (
3645 self : Pin < & mut Self > ,
3746 cx : & mut Context < ' _ > ,
3847 buf : & [ u8 ] ,
3948 ) -> Poll < Result < usize , io:: Error > > {
40- eprintln ! ( "Transparent::poll_write = ..." ) ;
41- let ret = self . project ( ) . inner . poll_write ( cx, buf) ;
42- eprintln ! ( "Transparent::poll_write = {:?}" , ret) ;
43- ret
49+ self . project ( ) . inner . poll_write ( cx, buf)
4450 }
4551
52+ #[ tracing:: instrument( name = "Transparent::poll_flush" , skip_all, ret) ]
4653 fn poll_flush ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , io:: Error > > {
47- eprintln ! ( "Transparent::poll_flush = ..." ) ;
48- let ret = self . project ( ) . inner . poll_flush ( cx) ;
49- eprintln ! ( "Transparent::poll_flush = {:?}" , ret) ;
50- ret
54+ self . project ( ) . inner . poll_flush ( cx)
5155 }
5256
5357 /// To quote the [`AsyncWrite`] docs:
5458 /// > Invocation of a shutdown implies an invocation of flush.
5559 /// > Once this method returns Ready it implies that a flush successfully happened before the shutdown happened.
5660 /// > That is, callers don't need to call flush before calling shutdown.
5761 /// > They can rely that by calling shutdown any pending buffered data will be written out.
62+ #[ tracing:: instrument( name = "Transparent::poll_shutdown" , skip_all, ret) ]
5863 fn poll_shutdown ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , io:: Error > > {
59- eprintln ! ( "Transparent::poll_shutdown = ..." ) ;
6064 let mut this = self . project ( ) ;
61- let ret = match this. inner . as_mut ( ) . poll_flush ( cx) {
62- Poll :: Ready ( Ok ( ( ) ) ) => this. inner . poll_shutdown ( cx) ,
63- Poll :: Ready ( Err ( e) ) => Poll :: Ready ( Err ( e) ) ,
64- Poll :: Pending => Poll :: Pending ,
65- } ;
66- eprintln ! ( "Transparent::poll_shutdown = {:?}" , ret) ;
67- ret
65+ ready ! ( this. inner. as_mut( ) . poll_flush( cx) ) ?;
66+ this. inner . poll_shutdown ( cx)
6867 }
6968}
7069
@@ -78,39 +77,33 @@ pin_project_lite::pin_project! {
7877}
7978
8079impl AsyncWrite for DelayedShutdown {
80+ #[ tracing:: instrument( name = "DelayedShutdown::poll_write" , skip_all, ret) ]
8181 fn poll_write (
8282 self : Pin < & mut Self > ,
8383 cx : & mut Context < ' _ > ,
8484 buf : & [ u8 ] ,
8585 ) -> Poll < Result < usize , io:: Error > > {
86- eprintln ! ( "DelayedShutdown::poll_write = ..." ) ;
8786 let _ = cx;
8887 self . project ( ) . contents . extend_from_slice ( buf) ;
89- let ret = Poll :: Ready ( Ok ( buf. len ( ) ) ) ;
90- eprintln ! ( "DelayedShutdown::poll_write = {:?}" , ret) ;
91- ret
88+ Poll :: Ready ( Ok ( buf. len ( ) ) )
9289 }
9390
91+ #[ tracing:: instrument( name = "DelayedShutdown::poll_flush" , skip_all, ret) ]
9492 fn poll_flush ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , io:: Error > > {
95- eprintln ! ( "DelayedShutdown::poll_flush = ..." ) ;
9693 let _ = cx;
97- let ret = Poll :: Ready ( Ok ( ( ) ) ) ;
98- eprintln ! ( "DelayedShutdown::poll_flush = {:?}" , ret) ;
99- ret
94+ Poll :: Ready ( Ok ( ( ) ) )
10095 }
10196
97+ #[ tracing:: instrument( name = "DelayedShutdown::poll_shutdown" , skip_all, ret) ]
10298 fn poll_shutdown ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , io:: Error > > {
103- eprintln ! ( "DelayedShutdown::poll_shutdown = ..." ) ;
104- let ret = match self . project ( ) . num_times_shutdown_called {
99+ match self . project ( ) . num_times_shutdown_called {
105100 it @ 0 => {
106101 * it += 1 ;
107102 cx. waker ( ) . wake_by_ref ( ) ;
108103 Poll :: Pending
109104 }
110105 _ => Poll :: Ready ( Ok ( ( ) ) ) ,
111- } ;
112- eprintln ! ( "DelayedShutdown::poll_shutdown = {:?}" , ret) ;
113- ret
106+ }
114107 }
115108}
116109
@@ -128,28 +121,21 @@ impl<T> Trace<T> {
128121}
129122
130123impl < T : AsyncWrite > AsyncWrite for Trace < T > {
124+ #[ tracing:: instrument( name = "Trace::poll_write" , skip_all, ret) ]
131125 fn poll_write (
132126 self : Pin < & mut Self > ,
133127 cx : & mut Context < ' _ > ,
134128 buf : & [ u8 ] ,
135129 ) -> Poll < Result < usize , io:: Error > > {
136- eprintln ! ( "Trace::poll_write = ..." ) ;
137- let ret = self . project ( ) . inner . poll_write ( cx, buf) ;
138- eprintln ! ( "Trace::poll_write = {:?}" , ret) ;
139- ret
130+ self . project ( ) . inner . poll_write ( cx, buf)
140131 }
141-
132+ # [ tracing :: instrument ( name = "Trace::poll_flush" , skip_all , ret ) ]
142133 fn poll_flush ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , io:: Error > > {
143- eprintln ! ( "Trace::poll_flush = ..." ) ;
144- let ret = self . project ( ) . inner . poll_flush ( cx) ;
145- eprintln ! ( "Trace::poll_flush = {:?}" , ret) ;
146- ret
134+ self . project ( ) . inner . poll_flush ( cx)
147135 }
148136
137+ #[ tracing:: instrument( name = "Trace::poll_shutdown" , skip_all, ret) ]
149138 fn poll_shutdown ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , io:: Error > > {
150- eprintln ! ( "Trace::poll_shutdown = ..." ) ;
151- let ret = self . project ( ) . inner . poll_shutdown ( cx) ;
152- eprintln ! ( "Trace::poll_shutdown = {:?}" , ret) ;
153- ret
139+ self . project ( ) . inner . poll_shutdown ( cx)
154140 }
155141}
0 commit comments