Skip to content

Commit aacd6ab

Browse files
committed
Fixed multi client bug in async server example.
1 parent 7ffdf7e commit aacd6ab

File tree

1 file changed

+40
-35
lines changed

1 file changed

+40
-35
lines changed

examples/async-server.rs

Lines changed: 40 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,13 @@ extern crate websocket;
22
extern crate futures;
33
extern crate tokio_core;
44

5+
use std::fmt::Debug;
6+
57
use websocket::message::{Message, OwnedMessage};
68
use websocket::server::InvalidConnection;
79
use websocket::async::Server;
8-
use websocket::async::client::Client;
9-
use websocket::async::WebSocketFuture;
1010

11-
use tokio_core::reactor::Core;
12-
use tokio_core::net::TcpStream;
11+
use tokio_core::reactor::{Handle, Core};
1312
use futures::{Future, Sink, Stream};
1413

1514
fn main() {
@@ -24,39 +23,45 @@ fn main() {
2423
// a stream of incoming connections
2524
let f = server.incoming()
2625
// we don't wanna save the stream if it drops
27-
.map_err(|InvalidConnection { error, .. }| error.into())
28-
// negotiate with the client
29-
.and_then(|upgrade| {
26+
.map_err(|InvalidConnection { error, .. }| error)
27+
.for_each(|upgrade| {
3028
// check if it has the protocol we want
31-
let uses_proto = upgrade.protocols().iter().any(|s| s == "rust-websocket");
32-
33-
let f: WebSocketFuture<Option<Client<TcpStream>>> = if uses_proto {
34-
// accept the request to be a ws connection if it does
35-
Box::new(upgrade.use_protocol("rust-websocket").accept().map(|(s, _)| Some(s)))
36-
} else {
29+
if !upgrade.protocols().iter().any(|s| s == "rust-websocket") {
3730
// reject it if it doesn't
38-
Box::new(upgrade.reject().map(|_| None).map_err(|e| e.into()))
39-
};
40-
f
41-
})
42-
// get rid of the bad connections
43-
.filter_map(|i| i)
44-
// send a greeting!
45-
.and_then(|s| s.send(Message::text("Hello World!").into()))
46-
// simple echo server impl
47-
.and_then(|s| {
48-
let (sink, stream) = s.split();
49-
stream.filter_map(|m| {
50-
println!("Message from Client: {:?}", m);
51-
match m {
52-
OwnedMessage::Ping(p) => Some(OwnedMessage::Pong(p)),
53-
OwnedMessage::Pong(_) => None,
54-
_ => Some(m),
55-
}
56-
}).forward(sink)
57-
})
58-
// TODO: ??
59-
.collect();
31+
spawn_future(upgrade.reject(), "Upgrade Rejection", &handle);
32+
return Ok(());
33+
}
34+
35+
// accept the request to be a ws connection if it does
36+
let f = upgrade
37+
.use_protocol("rust-websocket")
38+
.accept()
39+
// send a greeting!
40+
.and_then(|(s, _)| s.send(Message::text("Hello World!").into()))
41+
// simple echo server impl
42+
.and_then(|s| {
43+
let (sink, stream) = s.split();
44+
stream.filter_map(|m| {
45+
println!("Message from Client: {:?}", m);
46+
match m {
47+
OwnedMessage::Ping(p) => Some(OwnedMessage::Pong(p)),
48+
OwnedMessage::Pong(_) => None,
49+
_ => Some(m),
50+
}
51+
}).forward(sink)
52+
});
53+
54+
spawn_future(f, "Client Status", &handle);
55+
Ok(())
56+
});
6057

6158
core.run(f).unwrap();
6259
}
60+
61+
fn spawn_future<F, I, E>(f: F, desc: &'static str, handle: &Handle)
62+
where F: Future<Item = I, Error = E> + 'static,
63+
E: Debug
64+
{
65+
handle.spawn(f.map_err(move |e| println!("{}: '{:?}'", desc, e))
66+
.map(move |_| println!("{}: Finished.", desc)));
67+
}

0 commit comments

Comments
 (0)