Skip to content

Commit 5e79dea

Browse files
committed
Created client/server http codecs, added async intows trait.
1 parent 1b58dde commit 5e79dea

File tree

6 files changed

+193
-16
lines changed

6 files changed

+193
-16
lines changed

examples/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ fn main() {
7474
let _ = tx_1.send(OwnedMessage::Close(None));
7575
return;
7676
}
77-
OwnedMessage::Ping(data) => {
77+
OwnedMessage::Ping(data) => {
7878
match tx_1.send(OwnedMessage::Pong(data)) {
7979
// Send a pong in response
8080
Ok(()) => (),

src/client/builder.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -551,7 +551,7 @@ impl<'u> ClientBuilder<'u> {
551551
key_set: self.key_set,
552552
};
553553
let resource = builder.build_request();
554-
let framed = stream.framed(::codec::http::HttpCodec);
554+
let framed = stream.framed(::codec::http::HttpClientCodec);
555555
let request = Incoming {
556556
version: builder.version,
557557
headers: builder.headers.clone(),
@@ -563,7 +563,7 @@ impl<'u> ClientBuilder<'u> {
563563
.send(request).map_err(::std::convert::Into::into)
564564

565565
// wait for a response
566-
.and_then(|stream| stream.into_future().map_err(|e| e.0))
566+
.and_then(|stream| stream.into_future().map_err(|e| e.0.into()))
567567

568568
// validate
569569
.and_then(move |(message, stream)| {

src/codec/http.rs

Lines changed: 94 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,24 @@
1+
use std::io::{self, Write};
2+
use std::error::Error;
3+
use std::fmt::{self, Formatter, Display};
14
use hyper;
25
use hyper::http::h1::Incoming;
36
use hyper::http::h1::parse_response;
7+
use hyper::http::h1::parse_request;
48
use hyper::http::RawStatus;
9+
use hyper::status::StatusCode;
510
use hyper::method::Method;
611
use hyper::uri::RequestUri;
712
use hyper::buffer::BufReader;
8-
use std::io::{self, Write};
913
use tokio_io::codec::{Decoder, Encoder};
1014
use bytes::BytesMut;
1115
use bytes::BufMut;
1216
use result::WebSocketError;
1317

1418
#[derive(Copy, Clone, Debug)]
15-
pub struct HttpCodec;
19+
pub struct HttpClientCodec;
1620

17-
impl Encoder for HttpCodec {
21+
impl Encoder for HttpClientCodec {
1822
type Item = Incoming<(Method, RequestUri)>;
1923
type Error = io::Error;
2024

@@ -33,9 +37,9 @@ impl Encoder for HttpCodec {
3337
}
3438
}
3539

36-
impl Decoder for HttpCodec {
40+
impl Decoder for HttpClientCodec {
3741
type Item = Incoming<RawStatus>;
38-
type Error = WebSocketError;
42+
type Error = HttpCodecError;
3943

4044
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
4145
// check if we get a request from hyper
@@ -58,3 +62,88 @@ impl Decoder for HttpCodec {
5862
Ok(Some(response))
5963
}
6064
}
65+
66+
#[derive(Copy, Clone, Debug)]
67+
pub struct HttpServerCodec;
68+
69+
impl Encoder for HttpServerCodec {
70+
type Item = Incoming<RawStatus>;
71+
type Error = io::Error;
72+
73+
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
74+
// TODO: optomize this!
75+
let status = StatusCode::from_u16(item.subject.0);
76+
let response = format!("{} {}\r\n{}\r\n", item.version, status, item.headers);
77+
let byte_len = response.as_bytes().len();
78+
if byte_len > dst.remaining_mut() {
79+
dst.reserve(byte_len);
80+
}
81+
dst.writer().write(response.as_bytes()).map(|_| ())
82+
}
83+
}
84+
85+
impl Decoder for HttpServerCodec {
86+
type Item = Incoming<(Method, RequestUri)>;
87+
type Error = HttpCodecError;
88+
89+
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
90+
// check if we get a request from hyper
91+
// TODO: this is ineffecient, but hyper does not give us a better way to parse
92+
let (response, bytes_read) = {
93+
let mut reader = BufReader::new(&*src as &[u8]);
94+
let res = match parse_request(&mut reader) {
95+
Err(hyper::Error::Io(ref e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
96+
return Ok(None)
97+
}
98+
Err(hyper::Error::TooLarge) => return Ok(None),
99+
Err(e) => return Err(e.into()),
100+
Ok(r) => r,
101+
};
102+
let (_, _, pos, _) = reader.into_parts();
103+
(res, pos)
104+
};
105+
106+
src.split_to(bytes_read);
107+
Ok(Some(response))
108+
}
109+
}
110+
111+
#[derive(Debug)]
112+
pub enum HttpCodecError {
113+
Io(io::Error),
114+
Http(hyper::Error),
115+
}
116+
117+
impl Display for HttpCodecError {
118+
fn fmt(&self, fmt: &mut Formatter) -> Result<(), fmt::Error> {
119+
fmt.write_str(self.description())
120+
}
121+
}
122+
123+
impl Error for HttpCodecError {
124+
fn description(&self) -> &str {
125+
match *self {
126+
HttpCodecError::Io(ref e) => e.description(),
127+
HttpCodecError::Http(ref e) => e.description(),
128+
}
129+
}
130+
131+
fn cause(&self) -> Option<&Error> {
132+
match *self {
133+
HttpCodecError::Io(ref error) => Some(error),
134+
HttpCodecError::Http(ref error) => Some(error),
135+
}
136+
}
137+
}
138+
139+
impl From<io::Error> for HttpCodecError {
140+
fn from(err: io::Error) -> HttpCodecError {
141+
HttpCodecError::Io(err)
142+
}
143+
}
144+
145+
impl From<hyper::Error> for HttpCodecError {
146+
fn from(err: hyper::Error) -> HttpCodecError {
147+
HttpCodecError::Http(err)
148+
}
149+
}

src/result.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,16 @@ impl From<Utf8Error> for WebSocketError {
138138
}
139139
}
140140

141+
#[cfg(feature="async")]
142+
impl From<::codec::http::HttpCodecError> for WebSocketError {
143+
fn from(src: ::codec::http::HttpCodecError) -> Self {
144+
match src {
145+
::codec::http::HttpCodecError::Io(e) => WebSocketError::IoError(e),
146+
::codec::http::HttpCodecError::Http(e) => WebSocketError::HttpError(e),
147+
}
148+
}
149+
}
150+
141151
impl From<WSUrlErrorKind> for WebSocketError {
142152
fn from(err: WSUrlErrorKind) -> WebSocketError {
143153
WebSocketError::WebSocketUrlError(err)

src/server/upgrade/async.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
use super::{Buffer, HyperIntoWsError, WsUpgrade, Request, validate};
2+
use std::io::{self, ErrorKind};
3+
use hyper::header::Headers;
4+
use stream::AsyncStream;
5+
use futures::{Stream, Future};
6+
use codec::http::HttpServerCodec;
7+
use bytes::BytesMut;
8+
9+
pub trait AsyncIntoWs {
10+
/// The type of stream this upgrade process is working with (TcpStream, etc.)
11+
type Stream: AsyncStream;
12+
/// An error value in case the stream is not asking for a websocket connection
13+
/// or something went wrong. It is common to also include the stream here.
14+
type Error;
15+
/// Attempt to read and parse the start of a Websocket handshake, later
16+
/// with the returned `WsUpgrade` struct, call `accept to start a
17+
/// websocket client, and `reject` to send a handshake rejection response.
18+
///
19+
/// Note: this is the asynchronous version, meaning it will not block when
20+
/// trying to read a request.
21+
fn into_ws(self) -> Box<Future<Item = WsUpgrade<Self::Stream>, Error = Self::Error>>;
22+
}
23+
24+
impl<S> AsyncIntoWs for S
25+
where S: AsyncStream + 'static
26+
{
27+
type Stream = S;
28+
type Error = (S, Option<Request>, Option<BytesMut>, HyperIntoWsError);
29+
30+
fn into_ws(self) -> Box<Future<Item = WsUpgrade<Self::Stream>, Error = Self::Error>> {
31+
let future = self.framed(HttpServerCodec)
32+
.into_future()
33+
.map_err(|(e, s)| {
34+
let (stream, buffer) = s.into_parts();
35+
(stream, None, Some(buffer), e.into())
36+
})
37+
.and_then(|(m, s)| {
38+
let (stream, buffer) = s.into_parts();
39+
if let Some(msg) = m {
40+
match validate(&msg.subject.0, &msg.version, &msg.headers) {
41+
Ok(()) => Ok((msg, stream, buffer)),
42+
Err(e) => Err((stream, None, Some(buffer), e)),
43+
}
44+
} else {
45+
let err = HyperIntoWsError::Io(io::Error::new(
46+
ErrorKind::ConnectionReset,
47+
"Connection dropped before handshake could be read"));
48+
Err((stream, None, Some(buffer), err))
49+
}
50+
})
51+
.map(|(m, stream, buffer)| {
52+
WsUpgrade {
53+
headers: Headers::new(),
54+
stream: stream,
55+
request: m,
56+
buffer: Some(Buffer {
57+
buf: unimplemented!(),
58+
pos: 0,
59+
cap: buffer.capacity(),
60+
}),
61+
}
62+
});
63+
Box::new(future)
64+
}
65+
}

src/server/upgrade/mod.rs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ use hyper::header::{Headers, Upgrade, Protocol, ProtocolName, Connection, Connec
2424

2525
pub mod from_hyper;
2626

27+
#[cfg(feature="async")]
28+
pub mod async;
29+
2730
/// This crate uses buffered readers to read in the handshake quickly, in order to
2831
/// interface with other use cases that don't use buffered readers the buffered readers
2932
/// is deconstructed when it is returned to the user and given as the underlying
@@ -43,6 +46,14 @@ pub struct Buffer {
4346
pub cap: usize,
4447
}
4548

49+
/// A typical request from hyper
50+
pub type Request = Incoming<(Method, RequestUri)>;
51+
52+
/// If you have your requests separate from your stream you can use this struct
53+
/// to upgrade the connection based on the request given
54+
/// (the request should be a handshake).
55+
pub struct RequestStreamPair<S: Stream>(pub S, pub Request);
56+
4657
/// Intermediate representation of a half created websocket session.
4758
/// Should be used to examine the client's handshake
4859
/// accept the protocols requested, route the path, etc.
@@ -246,14 +257,6 @@ pub trait IntoWs {
246257
fn into_ws(self) -> Result<WsUpgrade<Self::Stream>, Self::Error>;
247258
}
248259

249-
250-
/// A typical request from hyper
251-
pub type Request = Incoming<(Method, RequestUri)>;
252-
/// If you have your requests separate from your stream you can use this struct
253-
/// to upgrade the connection based on the request given
254-
/// (the request should be a handshake).
255-
pub struct RequestStreamPair<S: Stream>(pub S, pub Request);
256-
257260
impl<S> IntoWs for S
258261
where S: Stream
259262
{
@@ -381,6 +384,16 @@ impl From<::hyper::error::Error> for HyperIntoWsError {
381384
}
382385
}
383386

387+
#[cfg(feature="async")]
388+
impl From<::codec::http::HttpCodecError> for HyperIntoWsError {
389+
fn from(src: ::codec::http::HttpCodecError) -> Self {
390+
match src {
391+
::codec::http::HttpCodecError::Io(e) => HyperIntoWsError::Io(e),
392+
::codec::http::HttpCodecError::Http(e) => HyperIntoWsError::Parsing(e),
393+
}
394+
}
395+
}
396+
384397
fn validate(
385398
method: &Method,
386399
version: &HttpVersion,

0 commit comments

Comments
 (0)