Skip to content

Commit b99efe6

Browse files
committed
Added async server impl.
1 parent 80d6e4a commit b99efe6

File tree

10 files changed

+286
-224
lines changed

10 files changed

+286
-224
lines changed

src/client/async.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,3 @@ pub use tokio_tls::TlsStream;
1414
pub type Client<S> = Framed<S, MessageCodec<OwnedMessage>>;
1515

1616
pub type ClientNew<S> = Box<Future<Item = (Client<S>, Headers), Error = WebSocketError>>;
17-

src/client/builder.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ mod async_imports {
4747
#[cfg(feature="async")]
4848
use self::async_imports::*;
4949

50+
// TODO: add extra funcs for future stuff, like auto ping and auto close
51+
5052

5153
/// Build clients with a builder-style API
5254
/// This makes it easy to create and configure a websocket

src/client/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,3 @@ pub mod async;
66

77
#[cfg(feature="sync")]
88
pub mod sync;
9-

src/lib.rs

Lines changed: 40 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -97,50 +97,50 @@ pub mod stream;
9797

9898
#[cfg(feature="sync")]
9999
pub mod sync {
100-
pub use sender;
101-
pub use sender::Writer;
102-
103-
pub use receiver;
104-
pub use receiver::Reader;
105-
106-
pub use stream::sync::Stream;
107-
pub use stream::sync as stream;
108-
109-
pub mod server {
110-
pub use server::sync::*;
111-
pub use server::upgrade::sync::Upgrade;
112-
pub use server::upgrade::sync::IntoWs;
113-
pub use server::upgrade::sync as upgrade;
114-
}
115-
116-
pub mod client {
117-
pub use client::sync::*;
118-
pub use client::builder::ClientBuilder;
119-
}
100+
pub use sender;
101+
pub use sender::Writer;
102+
103+
pub use receiver;
104+
pub use receiver::Reader;
105+
106+
pub use stream::sync::Stream;
107+
pub use stream::sync as stream;
108+
109+
pub mod server {
110+
pub use server::sync::*;
111+
pub use server::upgrade::sync::Upgrade;
112+
pub use server::upgrade::sync::IntoWs;
113+
pub use server::upgrade::sync as upgrade;
114+
}
115+
116+
pub mod client {
117+
pub use client::sync::*;
118+
pub use client::builder::ClientBuilder;
119+
}
120120
}
121121

122122
#[cfg(feature="async")]
123123
pub mod async {
124-
pub use codec;
125-
pub use codec::ws::MessageCodec;
126-
pub use codec::ws::Context as MessageContext;
127-
pub use codec::http::HttpClientCodec;
128-
pub use codec::http::HttpServerCodec;
129-
130-
pub use stream::async::Stream;
131-
pub use stream::async as stream;
132-
133-
pub mod server {
134-
pub use server::async::*;
135-
pub use server::upgrade::async::Upgrade;
136-
pub use server::upgrade::async::IntoWs;
137-
pub use server::upgrade::async as upgrade;
138-
}
139-
140-
pub mod client {
141-
pub use client::async::*;
142-
pub use client::builder::ClientBuilder;
143-
}
124+
pub use codec;
125+
pub use codec::ws::MessageCodec;
126+
pub use codec::ws::Context as MessageContext;
127+
pub use codec::http::HttpClientCodec;
128+
pub use codec::http::HttpServerCodec;
129+
130+
pub use stream::async::Stream;
131+
pub use stream::async as stream;
132+
133+
pub mod server {
134+
pub use server::async::*;
135+
pub use server::upgrade::async::Upgrade;
136+
pub use server::upgrade::async::IntoWs;
137+
pub use server::upgrade::async as upgrade;
138+
}
139+
140+
pub mod client {
141+
pub use client::async::*;
142+
pub use client::builder::ClientBuilder;
143+
}
144144
}
145145

146146
pub use self::message::Message;
@@ -149,4 +149,3 @@ pub use self::message::OwnedMessage;
149149

150150
pub use self::result::WebSocketError;
151151
pub use self::result::WebSocketResult;
152-

src/server/async.rs

Lines changed: 74 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,63 @@
11
use std::io;
22
use std::net::SocketAddr;
33
use server::{WsServer, NoTlsAcceptor};
4-
use tokio_core::net::TcpListener;
4+
use tokio_core::net::{TcpListener, TcpStream};
5+
use futures::{Stream, Future};
6+
use server::upgrade::async::{IntoWs, Upgrade};
7+
use server::InvalidConnection;
8+
use bytes::BytesMut;
59
pub use tokio_core::reactor::Handle;
610

711
#[cfg(any(feature="async-ssl"))]
8-
use native_tls::{TlsStream, TlsAcceptor};
12+
use native_tls::TlsAcceptor;
13+
#[cfg(any(feature="async-ssl"))]
14+
use tokio_tls::{TlsAcceptorExt, TlsStream};
915

1016
pub type Server<S> = WsServer<S, TcpListener>;
1117

18+
pub type Incoming<S> = Box<Stream<Item = Upgrade<S>, Error = InvalidConnection<S, BytesMut>>>;
19+
20+
pub enum AcceptError<E> {
21+
Io(io::Error),
22+
Upgrade(E),
23+
}
24+
1225
impl WsServer<NoTlsAcceptor, TcpListener> {
13-
/// Bind this Server to this socket
1426
pub fn bind(addr: &SocketAddr, handle: &Handle) -> io::Result<Self> {
1527
Ok(Server {
1628
listener: TcpListener::bind(addr, handle)?,
1729
ssl_acceptor: NoTlsAcceptor,
1830
})
1931
}
2032

21-
/// Wait for and accept an incoming WebSocket connection, returning a WebSocketRequest
22-
pub fn incoming(&mut self) {
23-
unimplemented!();
33+
pub fn incoming(self) -> Incoming<TcpStream> {
34+
let future = self.listener
35+
.incoming()
36+
.map_err(|e| {
37+
InvalidConnection {
38+
stream: None,
39+
parsed: None,
40+
buffer: None,
41+
error: e.into(),
42+
}
43+
})
44+
.and_then(|(stream, _)| {
45+
stream.into_ws()
46+
.map_err(|(stream, req, buf, err)| {
47+
InvalidConnection {
48+
stream: Some(stream),
49+
parsed: req,
50+
buffer: Some(buf),
51+
error: err,
52+
}
53+
})
54+
});
55+
Box::new(future)
2456
}
2557
}
2658

2759
#[cfg(any(feature="async-ssl"))]
2860
impl WsServer<TlsAcceptor, TcpListener> {
29-
/// Bind this Server to this socket
3061
pub fn bind_secure(
3162
addr: &SocketAddr,
3263
acceptor: TlsAcceptor,
@@ -38,8 +69,41 @@ impl WsServer<TlsAcceptor, TcpListener> {
3869
})
3970
}
4071

41-
/// Wait for and accept an incoming WebSocket connection, returning a WebSocketRequest
42-
pub fn incoming(&mut self) {
43-
unimplemented!();
72+
pub fn incoming(self) -> Incoming<TlsStream<TcpStream>> {
73+
let acceptor = self.ssl_acceptor;
74+
let future = self.listener
75+
.incoming()
76+
.map_err(|e| {
77+
InvalidConnection {
78+
stream: None,
79+
parsed: None,
80+
buffer: None,
81+
error: e.into(),
82+
}
83+
})
84+
.and_then(move |(stream, _)| {
85+
acceptor.accept_async(stream)
86+
.map_err(|e| {
87+
InvalidConnection {
88+
stream: None,
89+
parsed: None,
90+
buffer: None,
91+
// TODO: better error types
92+
error: io::Error::new(io::ErrorKind::Other, e).into(),
93+
}
94+
})
95+
})
96+
.and_then(|stream| {
97+
stream.into_ws()
98+
.map_err(|(stream, req, buf, err)| {
99+
InvalidConnection {
100+
stream: Some(stream),
101+
parsed: req,
102+
buffer: Some(buf),
103+
error: err,
104+
}
105+
})
106+
});
107+
Box::new(future)
44108
}
45109
}

src/server/mod.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
#[cfg(any(feature="sync-ssl", feature="async-ssl"))]
33
use native_tls::TlsAcceptor;
44

5+
use stream::Stream;
6+
use self::upgrade::{Request, HyperIntoWsError};
7+
58
pub mod upgrade;
69

710
#[cfg(feature="async")]
@@ -22,10 +25,31 @@ impl OptionalTlsAcceptor for NoTlsAcceptor {}
2225
#[cfg(any(feature="sync-ssl", feature="async-ssl"))]
2326
impl OptionalTlsAcceptor for TlsAcceptor {}
2427

28+
/// When a sever tries to accept a connection many things can go wrong.
29+
///
30+
/// This struct is all the information that is recovered from a failed
31+
/// websocket handshake, in case one wants to use the connection for something
32+
/// else (such as HTTP).
33+
pub struct InvalidConnection<S, B>
34+
where S: Stream
35+
{
36+
/// if the stream was successfully setup it will be included here
37+
/// on a failed connection.
38+
pub stream: Option<S>,
39+
/// the parsed request. **This is a normal HTTP request** meaning you can
40+
/// simply run this server and handle both HTTP and Websocket connections.
41+
/// If you already have a server you want to use, checkout the
42+
/// `server::upgrade` module to integrate this crate with your server.
43+
pub parsed: Option<Request>,
44+
/// the buffered data that was already taken from the stream
45+
pub buffer: Option<B>,
46+
/// the cause of the failed websocket connection setup
47+
pub error: HyperIntoWsError,
48+
}
49+
2550
pub struct WsServer<S, L>
2651
where S: OptionalTlsAcceptor
2752
{
2853
listener: L,
2954
ssl_acceptor: S,
3055
}
31-

src/server/sync.rs

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::convert::Into;
55
#[cfg(feature="sync-ssl")]
66
use native_tls::{TlsStream, TlsAcceptor};
77
use stream::Stream;
8-
use server::{WsServer, OptionalTlsAcceptor, NoTlsAcceptor};
8+
use server::{WsServer, OptionalTlsAcceptor, NoTlsAcceptor, InvalidConnection};
99
use server::upgrade::sync::{Upgrade, IntoWs, Buffer};
1010
pub use server::upgrade::{Request, HyperIntoWsError};
1111

@@ -16,32 +16,10 @@ use tokio_core::net::TcpListener as AsyncTcpListener;
1616
#[cfg(feature="async")]
1717
use server::async;
1818

19-
/// When a sever tries to accept a connection many things can go wrong.
20-
///
21-
/// This struct is all the information that is recovered from a failed
22-
/// websocket handshake, in case one wants to use the connection for something
23-
/// else (such as HTTP).
24-
pub struct InvalidConnection<S>
25-
where S: Stream
26-
{
27-
/// if the stream was successfully setup it will be included here
28-
/// on a failed connection.
29-
pub stream: Option<S>,
30-
/// the parsed request. **This is a normal HTTP request** meaning you can
31-
/// simply run this server and handle both HTTP and Websocket connections.
32-
/// If you already have a server you want to use, checkout the
33-
/// `server::upgrade` module to integrate this crate with your server.
34-
pub parsed: Option<Request>,
35-
/// the buffered data that was already taken from the stream
36-
pub buffer: Option<Buffer>,
37-
/// the cause of the failed websocket connection setup
38-
pub error: HyperIntoWsError,
39-
}
40-
4119
/// Either the stream was established and it sent a websocket handshake
4220
/// which represents the `Ok` variant, or there was an error (this is the
4321
/// `Err` variant).
44-
pub type AcceptResult<S> = Result<Upgrade<S>, InvalidConnection<S>>;
22+
pub type AcceptResult<S> = Result<Upgrade<S>, InvalidConnection<S, Buffer>>;
4523

4624
/// Represents a WebSocket server which can work with either normal
4725
/// (non-secure) connections, or secure WebSocket connections.

src/server/upgrade/async.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,7 @@ impl<S> WsUpgrade<S, BytesMut>
6060
self.internal_reject(Some(headers))
6161
}
6262

63-
fn internal_reject(
64-
mut self,
65-
headers: Option<&Headers>,
66-
) -> Send<Framed<S, HttpServerCodec>> {
63+
fn internal_reject(mut self, headers: Option<&Headers>) -> Send<Framed<S, HttpServerCodec>> {
6764
if let Some(custom) = headers {
6865
self.headers.extend(custom.iter());
6966
}

src/server/upgrade/sync.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,11 @@ impl<S> WsUpgrade<S, Option<Buffer>>
9595
impl<S, B> WsUpgrade<S, B>
9696
where S: Stream + AsTcpStream
9797
{
98-
/// Get a handle to the underlying TCP stream, useful to be able to set
99-
/// TCP options, etc.
100-
pub fn tcp_stream(&self) -> &TcpStream {
101-
self.stream.as_tcp()
102-
}
98+
/// Get a handle to the underlying TCP stream, useful to be able to set
99+
/// TCP options, etc.
100+
pub fn tcp_stream(&self) -> &TcpStream {
101+
self.stream.as_tcp()
102+
}
103103
}
104104

105105
/// Trait to take a stream or similar and attempt to recover the start of a

0 commit comments

Comments
 (0)