1111use futures:: ready;
1212use futures:: stream:: Stream ;
1313use futures:: task:: { Context , Poll } ;
14- use mio:: event:: Evented ;
15- use mio:: unix:: EventedFd ;
16- use mio:: { PollOpt , Ready , Token } ;
17- use tokio:: io:: PollEvented ;
14+ use tokio:: io:: unix:: { AsyncFd , TryIoError } ;
1815
19- use std:: io;
2016use std:: os:: unix:: io:: AsRawFd ;
2117use std:: pin:: Pin ;
2218
2319use super :: event_err;
2420use super :: { LineEvent , LineEventHandle , Result } ;
2521
26- struct PollWrapper {
27- handle : LineEventHandle ,
28- }
29-
30- impl Evented for PollWrapper {
31- fn register (
32- & self ,
33- poll : & mio:: Poll ,
34- token : Token ,
35- interest : Ready ,
36- opts : PollOpt ,
37- ) -> io:: Result < ( ) > {
38- EventedFd ( & self . handle . file . as_raw_fd ( ) ) . register ( poll, token, interest, opts)
39- }
40-
41- fn reregister (
42- & self ,
43- poll : & mio:: Poll ,
44- token : Token ,
45- interest : Ready ,
46- opts : PollOpt ,
47- ) -> io:: Result < ( ) > {
48- EventedFd ( & self . handle . file . as_raw_fd ( ) ) . reregister ( poll, token, interest, opts)
49- }
50-
51- fn deregister ( & self , poll : & mio:: Poll ) -> io:: Result < ( ) > {
52- EventedFd ( & self . handle . file . as_raw_fd ( ) ) . deregister ( poll)
53- }
54- }
55-
5622/// Wrapper around a `LineEventHandle` which implements a `futures::stream::Stream` for interrupts.
5723///
5824/// # Example
@@ -88,7 +54,7 @@ impl Evented for PollWrapper {
8854/// # }
8955/// ```
9056pub struct AsyncLineEventHandle {
91- evented : PollEvented < PollWrapper > ,
57+ asyncfd : AsyncFd < LineEventHandle > ,
9258}
9359
9460impl AsyncLineEventHandle {
@@ -106,36 +72,35 @@ impl AsyncLineEventHandle {
10672 }
10773
10874 Ok ( AsyncLineEventHandle {
109- evented : PollEvented :: new ( PollWrapper { handle } ) ?,
75+ asyncfd : AsyncFd :: new ( handle) ?,
11076 } )
11177 }
11278}
11379
11480impl Stream for AsyncLineEventHandle {
11581 type Item = Result < LineEvent > ;
11682
117- fn poll_next ( self : Pin < & mut Self > , cx : & mut Context ) -> Poll < Option < Self :: Item > > {
118- let ready = Ready :: readable ( ) ;
119- if let Err ( e ) = ready ! ( self . evented . poll_read_ready ( cx, ready ) ) {
120- return Poll :: Ready ( Some ( Err ( e . into ( ) ) ) ) ;
121- }
122-
123- match self . evented . get_ref ( ) . handle . read_event ( ) {
124- Ok ( Some ( event) ) => Poll :: Ready ( Some ( Ok ( event) ) ) ,
125- Ok ( None ) => Poll :: Ready ( Some ( Err ( event_err ( nix :: Error :: Sys (
126- nix :: errno :: Errno :: EIO ,
127- ) ) ) ) ) ,
128- Err ( nix :: Error :: Sys ( nix :: errno :: Errno :: EAGAIN ) ) => {
129- self . evented . clear_read_ready ( cx , ready ) ? ;
130- Poll :: Pending
83+ fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context ) -> Poll < Option < Self :: Item > > {
84+ loop {
85+ let mut guard = ready ! ( self . asyncfd . poll_read_ready_mut ( cx) ) ? ;
86+ match guard . try_io ( |inner| inner . get_mut ( ) . read_event ( ) ) {
87+ Err ( TryIoError { .. } ) => {
88+ // Continue
89+ }
90+ Ok ( Ok ( Some ( event) ) ) => return Poll :: Ready ( Some ( Ok ( event) ) ) ,
91+ Ok ( Ok ( None ) ) => {
92+ return Poll :: Ready ( Some ( Err ( event_err ( nix :: Error :: Sys (
93+ nix :: errno :: Errno :: EIO ,
94+ ) ) ) ) )
95+ }
96+ Ok ( Err ( err ) ) => return Poll :: Ready ( Some ( Err ( err . into ( ) ) ) ) ,
13197 }
132- Err ( e) => Poll :: Ready ( Some ( Err ( event_err ( e) ) ) ) ,
13398 }
13499 }
135100}
136101
137102impl AsRef < LineEventHandle > for AsyncLineEventHandle {
138103 fn as_ref ( & self ) -> & LineEventHandle {
139- & self . evented . get_ref ( ) . handle
104+ & self . asyncfd . get_ref ( )
140105 }
141106}
0 commit comments