Skip to content

Commit 117c3f3

Browse files
committed
Split server into async and sync, implemented some server async.
1 parent 3998118 commit 117c3f3

File tree

4 files changed

+381
-315
lines changed

4 files changed

+381
-315
lines changed

src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ extern crate bitflags;
6363
extern crate test;
6464

6565
pub use self::client::{Client, ClientBuilder};
66-
pub use self::server::Server;
6766
pub use self::dataframe::DataFrame;
6867
pub use self::message::{Message, OwnedMessage};
6968
pub use self::stream::Stream;

src/server/async.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
use std::io;
2+
use std::net::SocketAddr;
3+
use server::{WsServer, NoTlsAcceptor};
4+
use tokio_core::net::TcpListener;
5+
use native_tls::{TlsStream, TlsAcceptor};
6+
pub use tokio_core::reactor::Handle;
7+
8+
pub type Server<S> = WsServer<S, TcpListener>;
9+
10+
impl Server<NoTlsAcceptor> {
11+
/// Bind this Server to this socket
12+
pub fn bind(addr: &SocketAddr, handle: &Handle) -> io::Result<Self> {
13+
Ok(Server {
14+
listener: TcpListener::bind(addr, handle)?,
15+
ssl_acceptor: NoTlsAcceptor,
16+
})
17+
}
18+
19+
/// Wait for and accept an incoming WebSocket connection, returning a WebSocketRequest
20+
pub fn incoming(&mut self) {
21+
unimplemented!();
22+
}
23+
}
24+
25+
impl Server<TlsAcceptor> {
26+
/// Bind this Server to this socket
27+
pub fn bind_secure(addr: &SocketAddr, acceptor: TlsAcceptor, handle: &Handle) -> io::Result<Self> {
28+
Ok(Server {
29+
listener: TcpListener::bind(addr, handle)?,
30+
ssl_acceptor: acceptor,
31+
})
32+
}
33+
34+
/// Wait for and accept an incoming WebSocket connection, returning a WebSocketRequest
35+
pub fn incoming(&mut self) {
36+
unimplemented!();
37+
}
38+
}

src/server/mod.rs

Lines changed: 6 additions & 314 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,14 @@
11
//! Provides an implementation of a WebSocket server
2-
use std::net::{SocketAddr, ToSocketAddrs, TcpListener, TcpStream};
3-
use std::io;
4-
use std::convert::Into;
52
#[cfg(feature="ssl")]
6-
use native_tls::{TlsStream, TlsAcceptor};
7-
use stream::Stream;
8-
use self::upgrade::{SyncWsUpgrade, IntoWs, Buffer};
3+
use native_tls::TlsAcceptor;
94
pub use self::upgrade::{Request, HyperIntoWsError};
105

116
pub mod upgrade;
127

13-
/// When a sever tries to accept a connection many things can go wrong.
14-
///
15-
/// This struct is all the information that is recovered from a failed
16-
/// websocket handshake, in case one wants to use the connection for something
17-
/// else (such as HTTP).
18-
pub struct InvalidConnection<S>
19-
where S: Stream
20-
{
21-
/// if the stream was successfully setup it will be included here
22-
/// on a failed connection.
23-
pub stream: Option<S>,
24-
/// the parsed request. **This is a normal HTTP request** meaning you can
25-
/// simply run this server and handle both HTTP and Websocket connections.
26-
/// If you already have a server you want to use, checkout the
27-
/// `server::upgrade` module to integrate this crate with your server.
28-
pub parsed: Option<Request>,
29-
/// the buffered data that was already taken from the stream
30-
pub buffer: Option<Buffer>,
31-
/// the cause of the failed websocket connection setup
32-
pub error: HyperIntoWsError,
33-
}
8+
#[cfg(feature="async")]
9+
pub mod async;
3410

35-
/// Either the stream was established and it sent a websocket handshake
36-
/// which represents the `Ok` variant, or there was an error (this is the
37-
/// `Err` variant).
38-
pub type AcceptResult<S> = Result<SyncWsUpgrade<S>, InvalidConnection<S>>;
11+
pub mod sync;
3912

4013
/// Marker struct for a struct not being secure
4114
#[derive(Clone)]
@@ -49,291 +22,10 @@ impl OptionalTlsAcceptor for NoTlsAcceptor {}
4922
#[cfg(feature="ssl")]
5023
impl OptionalTlsAcceptor for TlsAcceptor {}
5124

52-
/// Represents a WebSocket server which can work with either normal
53-
/// (non-secure) connections, or secure WebSocket connections.
54-
///
55-
/// This is a convenient way to implement WebSocket servers, however
56-
/// it is possible to use any sendable Reader and Writer to obtain
57-
/// a WebSocketClient, so if needed, an alternative server implementation can be used.
58-
///# Non-secure Servers
59-
///
60-
/// ```no_run
61-
///extern crate websocket;
62-
///# fn main() {
63-
///use std::thread;
64-
///use websocket::{Server, Message};
65-
///
66-
///let server = Server::bind("127.0.0.1:1234").unwrap();
67-
///
68-
///for connection in server.filter_map(Result::ok) {
69-
/// // Spawn a new thread for each connection.
70-
/// thread::spawn(move || {
71-
/// let mut client = connection.accept().unwrap();
72-
///
73-
/// let message = Message::text("Hello, client!");
74-
/// let _ = client.send_message(&message);
75-
///
76-
/// // ...
77-
/// });
78-
///}
79-
/// # }
80-
/// ```
81-
///
82-
///# Secure Servers
83-
/// ```no_run
84-
///extern crate websocket;
85-
///extern crate openssl;
86-
///# fn main() {
87-
///use std::thread;
88-
///use std::io::Read;
89-
///use std::fs::File;
90-
///use websocket::{Server, Message};
91-
///use openssl::pkcs12::Pkcs12;
92-
///use openssl::ssl::{SslMethod, SslAcceptorBuilder, SslStream};
93-
///
94-
///// In this example we retrieve our keypair and certificate chain from a PKCS #12 archive,
95-
///// but but they can also be retrieved from, for example, individual PEM- or DER-formatted
96-
///// files. See the documentation for the `PKey` and `X509` types for more details.
97-
///let mut file = File::open("identity.pfx").unwrap();
98-
///let mut pkcs12 = vec![];
99-
///file.read_to_end(&mut pkcs12).unwrap();
100-
///let pkcs12 = Pkcs12::from_der(&pkcs12).unwrap();
101-
///let identity = pkcs12.parse("password123").unwrap();
102-
///
103-
///let acceptor = SslAcceptorBuilder::mozilla_intermediate(SslMethod::tls(),
104-
/// &identity.pkey,
105-
/// &identity.cert,
106-
/// &identity.chain)
107-
/// .unwrap()
108-
/// .build();
109-
///
110-
///let server = Server::bind_secure("127.0.0.1:1234", acceptor).unwrap();
111-
///
112-
///for connection in server.filter_map(Result::ok) {
113-
/// // Spawn a new thread for each connection.
114-
/// thread::spawn(move || {
115-
/// let mut client = connection.accept().unwrap();
116-
///
117-
/// let message = Message::text("Hello, client!");
118-
/// let _ = client.send_message(&message);
119-
///
120-
/// // ...
121-
/// });
122-
///}
123-
/// # }
124-
/// ```
125-
///
126-
/// # A Hyper Server
127-
/// This crates comes with hyper integration out of the box, you can create a hyper
128-
/// server and serve websocket and HTTP **on the same port!**
129-
/// check out the docs over at `websocket::server::upgrade::from_hyper` for an example.
130-
///
131-
/// # A Custom Server
132-
/// So you don't want to use any of our server implementations? That's O.K.
133-
/// All it takes is implementing the `IntoWs` trait for your server's streams,
134-
/// then calling `.into_ws()` on them.
135-
/// check out the docs over at `websocket::server::upgrade` for more.
136-
pub struct Server<S>
25+
pub struct WsServer<S, L>
13726
where S: OptionalTlsAcceptor
13827
{
139-
listener: TcpListener,
28+
listener: L,
14029
ssl_acceptor: S,
14130
}
14231

143-
impl<S> Server<S>
144-
where S: OptionalTlsAcceptor
145-
{
146-
/// Get the socket address of this server
147-
pub fn local_addr(&self) -> io::Result<SocketAddr> {
148-
self.listener.local_addr()
149-
}
150-
151-
/// Changes whether the Server is in nonblocking mode.
152-
///
153-
/// If it is in nonblocking mode, accept() will return an error instead of blocking when there
154-
/// are no incoming connections.
155-
///
156-
///# Examples
157-
///```no_run
158-
/// # extern crate websocket;
159-
/// # use websocket::Server;
160-
/// # fn main() {
161-
/// // Suppose we have to work in a single thread, but want to
162-
/// // accomplish two unrelated things:
163-
/// // (1) Once in a while we want to check if anybody tried to connect to
164-
/// // our websocket server, and if so, handle the TcpStream.
165-
/// // (2) In between we need to something else, possibly unrelated to networking.
166-
///
167-
/// let mut server = Server::bind("127.0.0.1:0").unwrap();
168-
///
169-
/// // Set the server to non-blocking.
170-
/// server.set_nonblocking(true);
171-
///
172-
/// for i in 1..3 {
173-
/// let result = match server.accept() {
174-
/// Ok(wsupgrade) => {
175-
/// // Do something with the established TcpStream.
176-
/// }
177-
/// _ => {
178-
/// // Nobody tried to connect, move on.
179-
/// }
180-
/// };
181-
/// // Perform another task. Because we have a non-blocking server,
182-
/// // this will execute independent of whether someone tried to
183-
/// // establish a connection.
184-
/// let two = 1+1;
185-
/// }
186-
/// # }
187-
///```
188-
pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
189-
self.listener.set_nonblocking(nonblocking)
190-
}
191-
}
192-
193-
#[cfg(feature="ssl")]
194-
impl Server<TlsAcceptor> {
195-
/// Bind this Server to this socket, utilising the given SslContext
196-
pub fn bind_secure<A>(addr: A, acceptor: TlsAcceptor) -> io::Result<Self>
197-
where A: ToSocketAddrs
198-
{
199-
Ok(Server {
200-
listener: try!(TcpListener::bind(&addr)),
201-
ssl_acceptor: acceptor,
202-
})
203-
}
204-
205-
/// Wait for and accept an incoming WebSocket connection, returning a WebSocketRequest
206-
pub fn accept(&mut self) -> AcceptResult<TlsStream<TcpStream>> {
207-
let stream = match self.listener.accept() {
208-
Ok(s) => s.0,
209-
Err(e) => {
210-
return Err(InvalidConnection {
211-
stream: None,
212-
parsed: None,
213-
buffer: None,
214-
error: e.into(),
215-
})
216-
}
217-
};
218-
219-
let stream = match self.ssl_acceptor.accept(stream) {
220-
Ok(s) => s,
221-
Err(err) => {
222-
return Err(InvalidConnection {
223-
stream: None,
224-
parsed: None,
225-
buffer: None,
226-
error: io::Error::new(io::ErrorKind::Other, err).into(),
227-
})
228-
}
229-
};
230-
231-
match stream.into_ws() {
232-
Ok(u) => Ok(u),
233-
Err((s, r, b, e)) => {
234-
Err(InvalidConnection {
235-
stream: Some(s),
236-
parsed: r,
237-
buffer: b,
238-
error: e.into(),
239-
})
240-
}
241-
}
242-
}
243-
}
244-
245-
#[cfg(feature="ssl")]
246-
impl Iterator for Server<TlsAcceptor> {
247-
type Item = AcceptResult<TlsStream<TcpStream>>;
248-
249-
fn next(&mut self) -> Option<<Self as Iterator>::Item> {
250-
Some(self.accept())
251-
}
252-
}
253-
254-
impl Server<NoTlsAcceptor> {
255-
/// Bind this Server to this socket
256-
pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
257-
Ok(Server {
258-
listener: try!(TcpListener::bind(&addr)),
259-
ssl_acceptor: NoTlsAcceptor,
260-
})
261-
}
262-
263-
/// Wait for and accept an incoming WebSocket connection, returning a WebSocketRequest
264-
pub fn accept(&mut self) -> AcceptResult<TcpStream> {
265-
let stream = match self.listener.accept() {
266-
Ok(s) => s.0,
267-
Err(e) => {
268-
return Err(InvalidConnection {
269-
stream: None,
270-
parsed: None,
271-
buffer: None,
272-
error: e.into(),
273-
})
274-
}
275-
};
276-
277-
match stream.into_ws() {
278-
Ok(u) => Ok(u),
279-
Err((s, r, b, e)) => {
280-
Err(InvalidConnection {
281-
stream: Some(s),
282-
parsed: r,
283-
buffer: b,
284-
error: e.into(),
285-
})
286-
}
287-
}
288-
}
289-
290-
/// Create a new independently owned handle to the underlying socket.
291-
pub fn try_clone(&self) -> io::Result<Self> {
292-
let inner = try!(self.listener.try_clone());
293-
Ok(Server {
294-
listener: inner,
295-
ssl_acceptor: self.ssl_acceptor.clone(),
296-
})
297-
}
298-
}
299-
300-
impl Iterator for Server<NoTlsAcceptor> {
301-
type Item = AcceptResult<TcpStream>;
302-
303-
fn next(&mut self) -> Option<<Self as Iterator>::Item> {
304-
Some(self.accept())
305-
}
306-
}
307-
308-
mod tests {
309-
#[test]
310-
// test the set_nonblocking() method for Server<NoSslAcceptor>.
311-
// Some of this is copied from
312-
// https://doc.rust-lang.org/src/std/net/tcp.rs.html#1413
313-
fn set_nonblocking() {
314-
315-
use super::*;
316-
317-
// Test unsecure server
318-
319-
let mut server = Server::bind("127.0.0.1:0").unwrap();
320-
321-
// Note that if set_nonblocking() doesn't work, but the following
322-
// fails to panic for some reason, then the .accept() method below
323-
// will block indefinitely.
324-
server.set_nonblocking(true).unwrap();
325-
326-
let result = server.accept();
327-
match result {
328-
// nobody tried to establish a connection, so we expect an error
329-
Ok(_) => panic!("expected error"),
330-
Err(e) => {
331-
match e.error {
332-
HyperIntoWsError::Io(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
333-
_ => panic!("unexpected error {}"),
334-
}
335-
}
336-
}
337-
338-
}
339-
}

0 commit comments

Comments
 (0)