@@ -4,7 +4,7 @@ use async_std::path::PathBuf;
44use async_std:: sync:: Arc ;
55use async_std:: task:: { Context , Poll } ;
66use std:: pin:: Pin ;
7- use std:: sync:: Mutex ;
7+ use std:: sync:: { atomic :: AtomicBool , Mutex } ;
88
99#[ derive( Debug , Copy , Clone ) ]
1010#[ allow( dead_code) ]
@@ -19,6 +19,12 @@ pub struct TestCase {
1919 source_fixture : Arc < File > ,
2020 expected_fixture : Arc < Mutex < File > > ,
2121 result : Arc < Mutex < File > > ,
22+ throttle : Arc < Throttle > ,
23+ }
24+
25+ enum Throttle {
26+ NoThrottle ,
27+ YieldPending ( AtomicBool , AtomicBool ) ,
2228}
2329
2430impl TestCase {
@@ -68,9 +74,15 @@ impl TestCase {
6874 source_fixture : Arc :: new ( source_fixture) ,
6975 expected_fixture : Arc :: new ( Mutex :: new ( expected_fixture) ) ,
7076 result,
77+ throttle : Arc :: new ( Throttle :: NoThrottle ) ,
7178 }
7279 }
7380
81+ #[ allow( dead_code) ]
82+ pub fn throttle ( & mut self ) {
83+ self . throttle = Arc :: new ( Throttle :: YieldPending ( AtomicBool :: new ( false ) , AtomicBool :: new ( false ) ) ) ;
84+ }
85+
7486 pub async fn read_result ( & self ) -> String {
7587 use async_std:: prelude:: * ;
7688 let mut result = String :: new ( ) ;
@@ -128,13 +140,48 @@ impl Read for TestCase {
128140 cx : & mut Context ,
129141 buf : & mut [ u8 ] ,
130142 ) -> Poll < io:: Result < usize > > {
131- Pin :: new ( & mut & * self . source_fixture ) . poll_read ( cx, buf)
143+ match & * self . throttle {
144+ Throttle :: NoThrottle => {
145+ Pin :: new ( & mut & * self . source_fixture ) . poll_read ( cx, buf)
146+ } ,
147+ Throttle :: YieldPending ( read_flag, _) => {
148+ if read_flag. fetch_xor ( true , std:: sync:: atomic:: Ordering :: SeqCst ) {
149+ println ! ( "read yield" ) ;
150+ cx. waker ( ) . wake_by_ref ( ) ;
151+ Poll :: Pending
152+ } else {
153+ // read partial
154+ let throttle_len = std:: cmp:: min ( buf. len ( ) , 10 ) ;
155+ let buf = & mut buf[ ..throttle_len] ;
156+ let ret = Pin :: new ( & mut & * self . source_fixture ) . poll_read ( cx, buf) ;
157+ println ! ( "read partial 10 {:?} {:?}" , ret, buf) ;
158+ ret
159+ }
160+ } ,
161+ }
132162 }
133163}
134164
135165impl Write for TestCase {
136166 fn poll_write ( self : Pin < & mut Self > , cx : & mut Context , buf : & [ u8 ] ) -> Poll < io:: Result < usize > > {
137- Pin :: new ( & mut & * self . result . lock ( ) . unwrap ( ) ) . poll_write ( cx, buf)
167+ match & * self . throttle {
168+ Throttle :: NoThrottle => {
169+ Pin :: new ( & mut & * self . result . lock ( ) . unwrap ( ) ) . poll_write ( cx, buf)
170+ } ,
171+ Throttle :: YieldPending ( _, write_flag) => {
172+ if write_flag. fetch_xor ( true , std:: sync:: atomic:: Ordering :: SeqCst ) {
173+ println ! ( "write yield" ) ;
174+ cx. waker ( ) . wake_by_ref ( ) ;
175+ Poll :: Pending
176+ } else {
177+ // write partial
178+ let throttle_len = std:: cmp:: min ( buf. len ( ) , 10 ) ;
179+ let buf = & buf[ ..throttle_len] ;
180+ println ! ( "write partial 10 {:?}" , buf) ;
181+ Pin :: new ( & mut & * self . result . lock ( ) . unwrap ( ) ) . poll_write ( cx, buf)
182+ }
183+ } ,
184+ }
138185 }
139186
140187 fn poll_flush ( self : Pin < & mut Self > , cx : & mut Context ) -> Poll < io:: Result < ( ) > > {
0 commit comments