Skip to content

Commit 134e3cb

Browse files
committed
keep track of all the used buffers in BufReaders so no data goes missing
this is possible with the help of hyperium/hyper#1107
1 parent df581cf commit 134e3cb

File tree

7 files changed

+90
-47
lines changed

7 files changed

+90
-47
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ keywords = ["websocket", "websockets", "rfc6455"]
1717
license = "MIT"
1818

1919
[dependencies]
20-
hyper = "^0.10"
20+
hyper = { git = "https://github.com/hyperium/hyper.git", branch = "0.10.x" }
2121
unicase = "^1.0"
2222
url = "^1.0"
2323
rustc-serialize = "^0.3"

src/client/builder.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@ use hyper::header::{Headers, Host, Connection, ConnectionOption, Upgrade, Protoc
1010
use unicase::UniCase;
1111
#[cfg(feature="ssl")]
1212
use openssl::ssl::{SslMethod, SslStream, SslConnector, SslConnectorBuilder};
13-
#[cfg(feature="ssl")]
1413
use header::extensions::Extension;
1514
use header::{WebSocketAccept, WebSocketKey, WebSocketVersion, WebSocketProtocol,
1615
WebSocketExtensions, Origin};
1716
use result::{WSUrlErrorKind, WebSocketResult, WebSocketError};
18-
use stream::{Stream, NetworkStream};
17+
#[cfg(feature="ssl")]
18+
use stream::NetworkStream;
19+
use stream::Stream;
1920
use super::Client;
2021

2122
/// Build clients with a builder-style API
@@ -251,8 +252,8 @@ impl<'u> ClientBuilder<'u> {
251252
try!(write!(stream, "{}\r\n", self.headers));
252253

253254
// wait for a response
254-
// TODO: some extra data might get lost with this reader, try to avoid #72
255-
let response = try!(parse_response(&mut BufReader::new(&mut stream)));
255+
let mut reader = BufReader::new(stream);
256+
let response = try!(parse_response(&mut reader));
256257
let status = StatusCode::from_u16(response.subject.0);
257258

258259
// validate
@@ -285,6 +286,6 @@ impl<'u> ClientBuilder<'u> {
285286
return Err(WebSocketError::ResponseError("Connection field must be 'Upgrade'"));
286287
}
287288

288-
Ok(Client::unchecked(stream, response.headers))
289+
Ok(Client::unchecked(reader, response.headers))
289290
}
290291
}

src/client/mod.rs

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ extern crate url;
44
use std::net::TcpStream;
55
use std::net::SocketAddr;
66
use std::io::Result as IoResult;
7+
use std::io::{Read, Write};
78
use hyper::header::Headers;
9+
use hyper::buffer::BufReader;
810

911
use ws;
1012
use ws::sender::Sender as SenderTrait;
@@ -56,7 +58,7 @@ pub use self::builder::{ClientBuilder, Url, ParseError};
5658
pub struct Client<S>
5759
where S: Stream
5860
{
59-
pub stream: S,
61+
stream: BufReader<S>,
6062
headers: Headers,
6163
sender: Sender,
6264
receiver: Receiver,
@@ -66,13 +68,13 @@ impl Client<TcpStream> {
6668
/// Shuts down the sending half of the client connection, will cause all pending
6769
/// and future IO to return immediately with an appropriate value.
6870
pub fn shutdown_sender(&self) -> IoResult<()> {
69-
self.stream.as_tcp().shutdown(Shutdown::Write)
71+
self.stream.get_ref().as_tcp().shutdown(Shutdown::Write)
7072
}
7173

7274
/// Shuts down the receiving half of the client connection, will cause all pending
7375
/// and future IO to return immediately with an appropriate value.
7476
pub fn shutdown_receiver(&self) -> IoResult<()> {
75-
self.stream.as_tcp().shutdown(Shutdown::Read)
77+
self.stream.get_ref().as_tcp().shutdown(Shutdown::Read)
7678
}
7779
}
7880

@@ -82,27 +84,27 @@ impl<S> Client<S>
8284
/// Shuts down the client connection, will cause all pending and future IO to
8385
/// return immediately with an appropriate value.
8486
pub fn shutdown(&self) -> IoResult<()> {
85-
self.stream.as_tcp().shutdown(Shutdown::Both)
87+
self.stream.get_ref().as_tcp().shutdown(Shutdown::Both)
8688
}
8789

8890
/// See `TcpStream.peer_addr()`.
8991
pub fn peer_addr(&self) -> IoResult<SocketAddr> {
90-
self.stream.as_tcp().peer_addr()
92+
self.stream.get_ref().as_tcp().peer_addr()
9193
}
9294

9395
/// See `TcpStream.local_addr()`.
9496
pub fn local_addr(&self) -> IoResult<SocketAddr> {
95-
self.stream.as_tcp().local_addr()
97+
self.stream.get_ref().as_tcp().local_addr()
9698
}
9799

98100
/// See `TcpStream.set_nodelay()`.
99101
pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
100-
self.stream.as_tcp().set_nodelay(nodelay)
102+
self.stream.get_ref().as_tcp().set_nodelay(nodelay)
101103
}
102104

103105
/// Changes whether the stream is in nonblocking mode.
104106
pub fn set_nonblocking(&self, nonblocking: bool) -> IoResult<()> {
105-
self.stream.as_tcp().set_nonblocking(nonblocking)
107+
self.stream.get_ref().as_tcp().set_nonblocking(nonblocking)
106108
}
107109
}
108110

@@ -113,7 +115,7 @@ impl<S> Client<S>
113115
/// **without sending any handshake** this is meant to only be used with
114116
/// a stream that has a websocket connection already set up.
115117
/// If in doubt, don't use this!
116-
pub fn unchecked(stream: S, headers: Headers) -> Self {
118+
pub fn unchecked(stream: BufReader<S>, headers: Headers) -> Self {
117119
Client {
118120
headers: headers,
119121
stream: stream,
@@ -128,15 +130,15 @@ impl<S> Client<S>
128130
pub fn send_dataframe<D>(&mut self, dataframe: &D) -> WebSocketResult<()>
129131
where D: DataFrameable
130132
{
131-
self.sender.send_dataframe(&mut self.stream, dataframe)
133+
self.sender.send_dataframe(self.stream.get_mut(), dataframe)
132134
}
133135

134136
/// Sends a single message to the remote endpoint.
135137
pub fn send_message<'m, M, D>(&mut self, message: &'m M) -> WebSocketResult<()>
136138
where M: ws::Message<'m, D>,
137139
D: DataFrameable
138140
{
139-
self.sender.send_message(&mut self.stream, message)
141+
self.sender.send_message(self.stream.get_mut(), message)
140142
}
141143

142144
/// Reads a single data frame from the remote endpoint.
@@ -145,7 +147,7 @@ impl<S> Client<S>
145147
}
146148

147149
/// Returns an iterator over incoming data frames.
148-
pub fn incoming_dataframes<'a>(&'a mut self) -> DataFrameIterator<'a, Receiver, S> {
150+
pub fn incoming_dataframes<'a>(&'a mut self) -> DataFrameIterator<'a, Receiver, BufReader<S>> {
149151
self.receiver.incoming_dataframes(&mut self.stream)
150152
}
151153

@@ -180,15 +182,20 @@ impl<S> Client<S>
180182
}
181183

182184
pub fn stream_ref(&self) -> &S {
183-
&self.stream
185+
self.stream.get_ref()
184186
}
185187

186-
pub fn stream_ref_mut(&mut self) -> &mut S {
188+
pub fn writer_mut(&mut self) -> &mut Write {
189+
self.stream.get_mut()
190+
}
191+
192+
pub fn reader_mut(&mut self) -> &mut Read {
187193
&mut self.stream
188194
}
189195

190-
pub fn into_stream(self) -> S {
191-
self.stream
196+
pub fn into_stream(self) -> (S, Option<(Vec<u8>, usize, usize)>) {
197+
let (stream, buf, pos, cap) = self.stream.into_parts();
198+
(stream, Some((buf, pos, cap)))
192199
}
193200

194201
/// Returns an iterator over incoming messages.
@@ -229,7 +236,8 @@ impl<S> Client<S>
229236
///}
230237
///# }
231238
///```
232-
pub fn incoming_messages<'a, M, D>(&'a mut self) -> MessageIterator<'a, Receiver, D, M, S>
239+
pub fn incoming_messages<'a, M, D>(&'a mut self,)
240+
-> MessageIterator<'a, Receiver, D, M, BufReader<S>>
233241
where M: ws::Message<'a, D>,
234242
D: DataFrameable
235243
{
@@ -269,9 +277,10 @@ impl<S> Client<S>
269277
pub fn split
270278
(self,)
271279
-> IoResult<(Reader<<S as Splittable>::Reader>, Writer<<S as Splittable>::Writer>)> {
272-
let (read, write) = try!(self.stream.split());
280+
let (stream, buf, pos, cap) = self.stream.into_parts();
281+
let (read, write) = try!(stream.split());
273282
Ok((Reader {
274-
stream: read,
283+
stream: BufReader::from_parts(read, buf, pos, cap),
275284
receiver: self.receiver,
276285
},
277286
Writer {

src/receiver.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
use std::io::Read;
44
use std::io::Result as IoResult;
55

6+
use hyper::buffer::BufReader;
7+
68
use dataframe::{DataFrame, Opcode};
79
use result::{WebSocketResult, WebSocketError};
810
use ws;
@@ -15,7 +17,7 @@ pub use stream::Shutdown;
1517
pub struct Reader<R>
1618
where R: Read
1719
{
18-
pub stream: R,
20+
pub stream: BufReader<R>,
1921
pub receiver: Receiver,
2022
}
2123

@@ -28,7 +30,7 @@ impl<R> Reader<R>
2830
}
2931

3032
/// Returns an iterator over incoming data frames.
31-
pub fn incoming_dataframes<'a>(&'a mut self) -> DataFrameIterator<'a, Receiver, R> {
33+
pub fn incoming_dataframes<'a>(&'a mut self) -> DataFrameIterator<'a, Receiver, BufReader<R>> {
3234
self.receiver.incoming_dataframes(&mut self.stream)
3335
}
3436

@@ -40,7 +42,8 @@ impl<R> Reader<R>
4042
self.receiver.recv_message(&mut self.stream)
4143
}
4244

43-
pub fn incoming_messages<'a, M, D>(&'a mut self) -> MessageIterator<'a, Receiver, D, M, R>
45+
pub fn incoming_messages<'a, M, D>(&'a mut self,)
46+
-> MessageIterator<'a, Receiver, D, M, BufReader<R>>
4447
where M: ws::Message<'a, D>,
4548
D: DataFrameable
4649
{
@@ -54,13 +57,13 @@ impl<S> Reader<S>
5457
/// Closes the receiver side of the connection, will cause all pending and future IO to
5558
/// return immediately with an appropriate value.
5659
pub fn shutdown(&self) -> IoResult<()> {
57-
self.stream.as_tcp().shutdown(Shutdown::Read)
60+
self.stream.get_ref().as_tcp().shutdown(Shutdown::Read)
5861
}
5962

6063
/// Shuts down both Sender and Receiver, will cause all pending and future IO to
6164
/// return immediately with an appropriate value.
6265
pub fn shutdown_all(&self) -> IoResult<()> {
63-
self.stream.as_tcp().shutdown(Shutdown::Both)
66+
self.stream.get_ref().as_tcp().shutdown(Shutdown::Both)
6467
}
6568
}
6669

src/server/mod.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::convert::Into;
55
#[cfg(feature="ssl")]
66
use openssl::ssl::{SslStream, SslAcceptor};
77
use stream::Stream;
8-
use self::upgrade::{WsUpgrade, IntoWs};
8+
use self::upgrade::{WsUpgrade, IntoWs, Buffer};
99
pub use self::upgrade::{Request, HyperIntoWsError};
1010

1111
pub mod upgrade;
@@ -15,6 +15,7 @@ pub struct InvalidConnection<S>
1515
{
1616
pub stream: Option<S>,
1717
pub parsed: Option<Request>,
18+
pub buffer: Option<Buffer>,
1819
pub error: HyperIntoWsError,
1920
}
2021

@@ -150,6 +151,7 @@ impl Server<SslAcceptor> {
150151
return Err(InvalidConnection {
151152
stream: None,
152153
parsed: None,
154+
buffer: None,
153155
error: e.into(),
154156
})
155157
}
@@ -161,17 +163,19 @@ impl Server<SslAcceptor> {
161163
return Err(InvalidConnection {
162164
stream: None,
163165
parsed: None,
166+
buffer: None,
164167
error: io::Error::new(io::ErrorKind::Other, err).into(),
165168
})
166169
}
167170
};
168171

169172
match stream.into_ws() {
170173
Ok(u) => Ok(u),
171-
Err((s, r, e)) => {
174+
Err((s, r, b, e)) => {
172175
Err(InvalidConnection {
173176
stream: Some(s),
174177
parsed: r,
178+
buffer: b,
175179
error: e.into(),
176180
})
177181
}
@@ -213,17 +217,19 @@ impl Server<NoSslAcceptor> {
213217
return Err(InvalidConnection {
214218
stream: None,
215219
parsed: None,
220+
buffer: None,
216221
error: e.into(),
217222
})
218223
}
219224
};
220225

221226
match stream.into_ws() {
222227
Ok(u) => Ok(u),
223-
Err((s, r, e)) => {
228+
Err((s, r, b, e)) => {
224229
Err(InvalidConnection {
225230
stream: Some(s),
226231
parsed: r,
232+
buffer: b,
227233
error: e.into(),
228234
})
229235
}

src/server/upgrade/hyper.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
extern crate hyper;
22

33
use hyper::net::NetworkStream;
4-
use super::{IntoWs, WsUpgrade};
4+
use super::{IntoWs, WsUpgrade, Buffer};
55

66
pub use hyper::http::h1::Incoming;
77
pub use hyper::method::Method;
@@ -28,12 +28,18 @@ impl<'a, 'b> IntoWs for HyperRequest<'a, 'b> {
2828
let (_, method, headers, uri, version, reader) =
2929
self.0.deconstruct();
3030

31-
// TODO: some extra data might get lost with this reader, try to avoid #72
32-
let stream = reader.into_inner().get_mut();
31+
let reader = reader.into_inner();
32+
let (buf, pos, cap) = reader.take_buf();
33+
let stream = reader.get_mut();
3334

3435
Ok(WsUpgrade {
3536
headers: Headers::new(),
3637
stream: stream,
38+
buffer: Some(Buffer {
39+
buf: buf,
40+
pos: pos,
41+
cap: cap,
42+
}),
3743
request: Incoming {
3844
version: version,
3945
headers: headers,

0 commit comments

Comments
 (0)