Skip to content

Commit 3782f0d

Browse files
committed
Fixed bug in async SSL connections.
1 parent 86aeb2a commit 3782f0d

File tree

2 files changed

+89
-22
lines changed

2 files changed

+89
-22
lines changed

examples/ssl-client.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
extern crate websocket;
2+
extern crate futures;
3+
extern crate tokio_core;
4+
5+
use std::thread;
6+
use std::io::stdin;
7+
use tokio_core::reactor::Core;
8+
use futures::future::Future;
9+
use futures::sink::Sink;
10+
use futures::stream::Stream;
11+
use futures::sync::mpsc;
12+
use websocket::result::WebSocketError;
13+
use websocket::{ClientBuilder, OwnedMessage};
14+
15+
const CONNECTION: &'static str = "wss://echo.websocket.org";
16+
17+
fn main() {
18+
println!("Connecting to {}", CONNECTION);
19+
let mut core = Core::new().unwrap();
20+
21+
// standard in isn't supported in mio yet, so we use a thread
22+
// see https://github.com/carllerche/mio/issues/321
23+
let (usr_msg, stdin_ch) = mpsc::channel(0);
24+
thread::spawn(move || {
25+
let mut input = String::new();
26+
let mut stdin_sink = usr_msg.wait();
27+
loop {
28+
input.clear();
29+
stdin().read_line(&mut input).unwrap();
30+
let trimmed = input.trim();
31+
32+
let (close, msg) = match trimmed {
33+
"/close" => (true, OwnedMessage::Close(None)),
34+
"/ping" => (false, OwnedMessage::Ping(b"PING".to_vec())),
35+
_ => (false, OwnedMessage::Text(trimmed.to_string())),
36+
};
37+
38+
stdin_sink.send(msg)
39+
.expect("Sending message across stdin channel.");
40+
41+
if close {
42+
break;
43+
}
44+
}
45+
});
46+
47+
let runner = ClientBuilder::new(CONNECTION)
48+
.unwrap()
49+
.async_connect_secure(None, &core.handle())
50+
.and_then(|(duplex, _)| {
51+
let (sink, stream) = duplex.split();
52+
stream.filter_map(|message| {
53+
println!("Received Message: {:?}", message);
54+
match message {
55+
OwnedMessage::Close(e) => Some(OwnedMessage::Close(e)),
56+
OwnedMessage::Ping(d) => Some(OwnedMessage::Pong(d)),
57+
_ => None,
58+
}
59+
})
60+
.select(stdin_ch.map_err(|_| WebSocketError::NoDataAvailable))
61+
.forward(sink)
62+
});
63+
core.run(runner).unwrap();
64+
}

src/client/builder.rs

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ impl<'u> ClientBuilder<'u> {
490490
handle: &Handle,
491491
) -> async::ClientNew<Box<stream::async::Stream + Send>> {
492492
// connect to the tcp stream
493-
let tcp_stream = match self.async_tcpstream(handle) {
493+
let tcp_stream = match self.async_tcpstream(None, handle) {
494494
Ok(t) => t,
495495
Err(e) => return future::err(e).boxed(),
496496
};
@@ -543,7 +543,7 @@ impl<'u> ClientBuilder<'u> {
543543
handle: &Handle,
544544
) -> async::ClientNew<async::TlsStream<async::TcpStream>> {
545545
// connect to the tcp stream
546-
let tcp_stream = match self.async_tcpstream(handle) {
546+
let tcp_stream = match self.async_tcpstream(Some(true), handle) {
547547
Ok(t) => t,
548548
Err(e) => return future::err(e).boxed(),
549549
};
@@ -565,22 +565,22 @@ impl<'u> ClientBuilder<'u> {
565565
};
566566

567567
// put it all together
568-
let future =
569-
tcp_stream.map_err(|e| e.into())
570-
.and_then(move |s| {
571-
connector.connect_async(&host, s).map_err(|e| e.into())
572-
})
573-
.and_then(move |stream| {
574-
builder.async_connect_on(stream)
575-
});
568+
let future = tcp_stream.map_err(|e| e.into())
569+
.and_then(move |s| {
570+
connector.connect_async(&host, s)
571+
.map_err(|e| e.into())
572+
})
573+
.and_then(move |stream| {
574+
builder.async_connect_on(stream)
575+
});
576576
Box::new(future)
577577
}
578578

579579
// TODO: add timeout option for connecting
580580
// TODO: add conveniences like .response_to_pings, .send_close, etc.
581581
#[cfg(feature="async")]
582582
pub fn async_connect_insecure(self, handle: &Handle) -> async::ClientNew<async::TcpStream> {
583-
let tcp_stream = match self.async_tcpstream(handle) {
583+
let tcp_stream = match self.async_tcpstream(Some(false), handle) {
584584
Ok(t) => t,
585585
Err(e) => return future::err(e).boxed(),
586586
};
@@ -645,20 +645,23 @@ impl<'u> ClientBuilder<'u> {
645645
}
646646

647647
#[cfg(feature="async")]
648-
fn async_tcpstream(&self, handle: &Handle) -> WebSocketResult<TcpStreamNew> {
648+
fn async_tcpstream(
649+
&self,
650+
secure: Option<bool>,
651+
handle: &Handle,
652+
) -> WebSocketResult<TcpStreamNew> {
649653
// get the address to connect to, return an error future if ther's a problem
650-
let address =
651-
match self.extract_host_port(Some(false)).and_then(|p| Ok(p.to_socket_addrs()?)) {
652-
Ok(mut s) => {
653-
match s.next() {
654-
Some(a) => a,
655-
None => {
656-
return Err(WebSocketError::WebSocketUrlError(WSUrlErrorKind::NoHostName));
657-
}
654+
let address = match self.extract_host_port(secure).and_then(|p| Ok(p.to_socket_addrs()?)) {
655+
Ok(mut s) => {
656+
match s.next() {
657+
Some(a) => a,
658+
None => {
659+
return Err(WebSocketError::WebSocketUrlError(WSUrlErrorKind::NoHostName));
658660
}
659661
}
660-
Err(e) => return Err(e.into()),
661-
};
662+
}
663+
Err(e) => return Err(e.into()),
664+
};
662665

663666
// connect a tcp stream
664667
Ok(async::TcpStream::connect(&address, handle))

0 commit comments

Comments
 (0)