@@ -95,8 +95,7 @@ fn upload_lots() {
9595 }
9696
9797 let mut m = Multi :: new ( ) ;
98- let poll = t ! ( mio:: Poll :: new( ) ) ;
99- let ( tx, rx) = mio_extras:: channel:: channel ( ) ;
98+ let ( tx, mut rx) = tokio:: sync:: mpsc:: unbounded_channel ( ) ;
10099 let tx2 = tx. clone ( ) ;
101100 t ! ( m. socket_function( move |socket, events, token| {
102101 t!( tx2. send( Message :: Wait ( socket, events, token) ) ) ;
@@ -136,86 +135,59 @@ fn upload_lots() {
136135 t ! ( h. upload( true ) ) ;
137136 t ! ( h. http_headers( list) ) ;
138137
139- t ! ( poll. register( & rx, mio:: Token ( 0 ) , mio:: Ready :: all( ) , mio:: PollOpt :: level( ) ) ) ;
140-
141138 let e = t ! ( m. add( h) ) ;
142139
143140 let mut next_token = 1 ;
144141 let mut token_map = HashMap :: new ( ) ;
145142 let mut cur_timeout = None ;
146- let mut events = mio:: Events :: with_capacity ( 128 ) ;
147- let mut running = true ;
148-
149- while running {
150- let n = t ! ( poll. poll( & mut events, cur_timeout) ) ;
151143
152- if n == 0 && t ! ( m. timeout( ) ) == 0 {
153- running = false ;
154- }
155-
156- for event in events. iter ( ) {
157- while event. token ( ) == mio:: Token ( 0 ) {
158- match rx. try_recv ( ) {
159- Ok ( Message :: Timeout ( dur) ) => cur_timeout = dur,
160- Ok ( Message :: Wait ( socket, events, token) ) => {
161- let evented = mio:: unix:: EventedFd ( & socket) ;
162- if events. remove ( ) {
163- token_map. remove ( & token) . unwrap ( ) ;
164- } else {
165- let mut e = mio:: Ready :: empty ( ) ;
166- if events. input ( ) {
167- e |= mio:: Ready :: readable ( ) ;
168- }
169- if events. output ( ) {
170- e |= mio:: Ready :: writable ( ) ;
171- }
172- if token == 0 {
173- let token = next_token;
174- next_token += 1 ;
175- t ! ( m. assign( socket, token) ) ;
176- token_map. insert ( token, socket) ;
177- t ! ( poll. register(
178- & evented,
179- mio:: Token ( token) ,
180- e,
181- mio:: PollOpt :: level( )
182- ) ) ;
183- } else {
184- t ! ( poll. reregister(
185- & evented,
186- mio:: Token ( token) ,
187- e,
188- mio:: PollOpt :: level( )
189- ) ) ;
190- }
144+ let rt = tokio:: runtime:: Builder :: new_current_thread ( )
145+ . enable_time ( )
146+ . build ( )
147+ . unwrap ( ) ;
148+
149+ rt. block_on ( async {
150+ loop {
151+ let timeout = cur_timeout. unwrap_or ( Duration :: new ( 0 , 1000000 ) ) ;
152+ let message = tokio:: time:: timeout ( timeout, rx. recv ( ) ) . await ;
153+
154+ match message {
155+ Ok ( Some ( Message :: Timeout ( dur) ) ) => cur_timeout = dur,
156+ Ok ( Some ( Message :: Wait ( socket, events, token) ) ) => {
157+ if events. remove ( ) {
158+ token_map. remove ( & token) ;
159+ } else {
160+ if token == 0 {
161+ let token = next_token;
162+ next_token += 1 ;
163+ t ! ( m. assign( socket, token) ) ;
164+ token_map. insert ( token, socket) ;
191165 }
192166 }
193- Err ( _) => break ,
194- }
195- }
196167
197- if event. token ( ) == mio:: Token ( 0 ) {
198- continue ;
199- }
168+ let mut e = Events :: new ( ) ;
169+ if events. input ( ) {
170+ e. input ( true ) ;
171+ }
172+ if events. output ( ) {
173+ e. output ( true ) ;
174+ }
175+ let remaining = t ! ( m. action( socket, & e) ) ;
200176
201- let token = event. token ( ) ;
202- let socket = token_map[ & token. into ( ) ] ;
203- let mut e = Events :: new ( ) ;
204- if event. readiness ( ) . is_readable ( ) {
205- e. input ( true ) ;
206- }
207- if event. readiness ( ) . is_writable ( ) {
208- e. output ( true ) ;
209- }
210- if mio:: unix:: UnixReady :: from ( event. readiness ( ) ) . is_error ( ) {
211- e. error ( true ) ;
212- }
213- let remaining = t ! ( m. action( socket, & e) ) ;
214- if remaining == 0 {
215- running = false ;
177+ if remaining == 0 {
178+ break ;
179+ }
180+ }
181+ Err ( _) => {
182+ t ! ( m. timeout( ) ) ;
183+ for socket in token_map. values ( ) . copied ( ) {
184+ t ! ( m. action( socket, & Events :: new( ) ) ) ;
185+ }
186+ }
187+ Ok ( None ) => break ,
216188 }
217189 }
218- }
190+ } ) ;
219191
220192 let mut done = 0 ;
221193 m. messages ( |m| {
0 commit comments