@@ -3,21 +3,24 @@ use std::pin::Pin;
33use std:: task:: { Context , Poll } ;
44use std:: time:: Duration ;
55
6+ use async_io:: Timer ;
7+ use async_net:: TcpStream ;
8+ use asyncs:: select;
69use bytes:: buf:: BufMut ;
10+ use futures:: io:: BufReader ;
11+ use futures:: prelude:: * ;
12+ use futures_lite:: AsyncReadExt ;
713use ignore_result:: Ignore ;
8- use tokio:: io:: { AsyncBufReadExt , AsyncRead , AsyncReadExt , AsyncWrite , AsyncWriteExt , BufStream , ReadBuf } ;
9- use tokio:: net:: TcpStream ;
10- use tokio:: { select, time} ;
1114use tracing:: { debug, trace} ;
1215
1316#[ cfg( feature = "tls" ) ]
1417mod tls {
1518 pub use std:: sync:: Arc ;
1619
20+ pub use futures_rustls:: client:: TlsStream ;
21+ pub use futures_rustls:: TlsConnector ;
1722 pub use rustls:: pki_types:: ServerName ;
1823 pub use rustls:: ClientConfig ;
19- pub use tokio_rustls:: client:: TlsStream ;
20- pub use tokio_rustls:: TlsConnector ;
2124}
2225#[ cfg( feature = "tls" ) ]
2326use tls:: * ;
@@ -51,7 +54,7 @@ pub trait AsyncReadToBuf: AsyncReadExt {
5154impl < T > AsyncReadToBuf for T where T : AsyncReadExt { }
5255
5356impl AsyncRead for Connection {
54- fn poll_read ( self : Pin < & mut Self > , cx : & mut Context < ' _ > , buf : & mut ReadBuf < ' _ > ) -> Poll < Result < ( ) > > {
57+ fn poll_read ( self : Pin < & mut Self > , cx : & mut Context < ' _ > , buf : & mut [ u8 ] ) -> Poll < Result < usize > > {
5558 match self . get_mut ( ) {
5659 Self :: Raw ( stream) => Pin :: new ( stream) . poll_read ( cx, buf) ,
5760 #[ cfg( feature = "tls" ) ]
@@ -85,11 +88,11 @@ impl AsyncWrite for Connection {
8588 }
8689 }
8790
88- fn poll_shutdown ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) > > {
91+ fn poll_close ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) > > {
8992 match self . get_mut ( ) {
90- Self :: Raw ( stream) => Pin :: new ( stream) . poll_shutdown ( cx) ,
93+ Self :: Raw ( stream) => Pin :: new ( stream) . poll_close ( cx) ,
9194 #[ cfg( feature = "tls" ) ]
92- Self :: Tls ( stream) => Pin :: new ( stream) . poll_shutdown ( cx) ,
95+ Self :: Tls ( stream) => Pin :: new ( stream) . poll_close ( cx) ,
9396 }
9497 }
9598}
@@ -99,7 +102,7 @@ pub struct ConnReader<'a> {
99102}
100103
101104impl AsyncRead for ConnReader < ' _ > {
102- fn poll_read ( self : Pin < & mut Self > , cx : & mut Context < ' _ > , buf : & mut ReadBuf < ' _ > ) -> Poll < Result < ( ) > > {
105+ fn poll_read ( self : Pin < & mut Self > , cx : & mut Context < ' _ > , buf : & mut [ u8 ] ) -> Poll < Result < usize > > {
103106 Pin :: new ( & mut self . get_mut ( ) . conn ) . poll_read ( cx, buf)
104107 }
105108}
@@ -121,8 +124,8 @@ impl AsyncWrite for ConnWriter<'_> {
121124 Pin :: new ( & mut self . get_mut ( ) . conn ) . poll_flush ( cx)
122125 }
123126
124- fn poll_shutdown ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) > > {
125- Pin :: new ( & mut self . get_mut ( ) . conn ) . poll_shutdown ( cx)
127+ fn poll_close ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) > > {
128+ Pin :: new ( & mut self . get_mut ( ) . conn ) . poll_close ( cx)
126129 }
127130}
128131
@@ -142,13 +145,14 @@ impl Connection {
142145 Self :: Tls ( stream)
143146 }
144147
145- pub async fn command ( self , cmd : & str ) -> Result < String > {
146- let mut stream = BufStream :: new ( self ) ;
147- stream . write_all ( cmd. as_bytes ( ) ) . await ?;
148- stream . flush ( ) . await ?;
148+ pub async fn command ( mut self , cmd : & str ) -> Result < String > {
149+ // let mut stream = BufStream::new(self);
150+ self . write_all ( cmd. as_bytes ( ) ) . await ?;
151+ self . flush ( ) . await ?;
149152 let mut line = String :: new ( ) ;
150- stream. read_line ( & mut line) . await ?;
151- stream. shutdown ( ) . await . ignore ( ) ;
153+ let mut reader = BufReader :: new ( self ) ;
154+ reader. read_line ( & mut line) . await ?;
155+ reader. close ( ) . await . ignore ( ) ;
152156 Ok ( line)
153157 }
154158
@@ -212,7 +216,7 @@ impl Connector {
212216 }
213217 select ! {
214218 _ = unsafe { Pin :: new_unchecked( deadline) } => Err ( Error :: new( ErrorKind :: TimedOut , "deadline exceed" ) ) ,
215- _ = time :: sleep ( self . timeout) => Err ( Error :: new( ErrorKind :: TimedOut , format!( "connection timeout{:?} exceed" , self . timeout) ) ) ,
219+ _ = Timer :: after ( self . timeout) => Err ( Error :: new( ErrorKind :: TimedOut , format!( "connection timeout{:?} exceed" , self . timeout) ) ) ,
216220 r = TcpStream :: connect( ( endpoint. host, endpoint. port) ) => {
217221 match r {
218222 Err ( err) => Err ( err) ,
@@ -255,10 +259,10 @@ impl Connector {
255259 "fails to contact writable server from endpoints {:?}" ,
256260 endpoints. endpoints( )
257261 ) ;
258- time :: sleep ( timeout) . await ;
262+ Timer :: after ( timeout) . await ;
259263 timeout = max_timeout. min ( timeout * 2 ) ;
260264 } else {
261- time :: sleep ( Duration :: from_millis ( 5 ) ) . await ;
265+ Timer :: after ( Duration :: from_millis ( 5 ) ) . await ;
262266 }
263267 }
264268 None
@@ -273,7 +277,7 @@ mod tests {
273277 use crate :: deadline:: Deadline ;
274278 use crate :: endpoint:: EndpointRef ;
275279
276- #[ tokio :: test]
280+ #[ asyncs :: test]
277281 async fn raw ( ) {
278282 let connector = Connector :: new ( ) ;
279283 let endpoint = EndpointRef :: new ( "host1" , 2181 , true ) ;
0 commit comments