Skip to content

Commit 3998118

Browse files
committed
Implemented async IntoWs and WsUpgrade, only server is left.
1 parent db9ce91 commit 3998118

File tree

6 files changed

+151
-79
lines changed

6 files changed

+151
-79
lines changed

src/codec/http.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,12 @@ impl Decoder for HttpClientCodec {
6666
pub struct HttpServerCodec;
6767

6868
impl Encoder for HttpServerCodec {
69-
type Item = Incoming<RawStatus>;
69+
type Item = Incoming<StatusCode>;
7070
type Error = io::Error;
7171

7272
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
7373
// TODO: optomize this!
74-
let status = StatusCode::from_u16(item.subject.0);
75-
let response = format!("{} {}\r\n{}\r\n", item.version, status, item.headers);
74+
let response = format!("{} {}\r\n{}\r\n", item.version, item.subject, item.headers);
7675
let byte_len = response.as_bytes().len();
7776
if byte_len > dst.remaining_mut() {
7877
dst.reserve(byte_len);

src/codec/ws.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ use ws::message::Message as MessageTrait;
1515
use ws::util::header::read_header;
1616
use result::WebSocketError;
1717

18+
// TODO: IMPORTANT: check if frame_size is correct,
19+
// do not call .reserve with the entire size
20+
1821
/**************
1922
* Dataframes *
2023
**************/

src/server/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::convert::Into;
55
#[cfg(feature="ssl")]
66
use native_tls::{TlsStream, TlsAcceptor};
77
use stream::Stream;
8-
use self::upgrade::{WsUpgrade, IntoWs, Buffer};
8+
use self::upgrade::{SyncWsUpgrade, IntoWs, Buffer};
99
pub use self::upgrade::{Request, HyperIntoWsError};
1010

1111
pub mod upgrade;
@@ -35,7 +35,7 @@ pub struct InvalidConnection<S>
3535
/// Either the stream was established and it sent a websocket handshake
3636
/// which represents the `Ok` variant, or there was an error (this is the
3737
/// `Err` variant).
38-
pub type AcceptResult<S> = Result<WsUpgrade<S>, InvalidConnection<S>>;
38+
pub type AcceptResult<S> = Result<SyncWsUpgrade<S>, InvalidConnection<S>>;
3939

4040
/// Marker struct for a struct not being secure
4141
#[derive(Clone)]

src/server/upgrade/async.rs

Lines changed: 71 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,81 @@
1-
use super::{Buffer, HyperIntoWsError, WsUpgrade, Request, validate};
1+
use super::{HyperIntoWsError, WsUpgrade, Request, validate};
22
use std::io::{self, ErrorKind};
3-
use tokio_io::codec::FramedParts;
3+
use tokio_io::codec::{Framed, FramedParts};
44
use hyper::header::Headers;
5+
use hyper::http::h1::Incoming;
6+
use hyper::status::StatusCode;
57
use stream::AsyncStream;
6-
use futures::{Stream, Future};
8+
use futures::{Stream, Sink, Future};
9+
use futures::sink::Send;
710
use codec::http::HttpServerCodec;
11+
use codec::ws::{MessageCodec, Context};
812
use bytes::BytesMut;
913
use client::async::ClientNew;
1014

11-
impl<S> WsUpgrade<S>
12-
where S: AsyncStream
15+
pub type AsyncWsUpgrade<S> = WsUpgrade<S, BytesMut>;
16+
17+
impl<S> AsyncWsUpgrade<S>
18+
where S: AsyncStream + 'static
1319
{
14-
pub fn async_accept(self) -> Result<ClientNew<S>, (S, io::Error)> {
15-
unimplemented!();
20+
pub fn async_accept(self) -> ClientNew<S> {
21+
self.internal_async_accept(None)
22+
}
23+
24+
pub fn async_accept_with(self, custom_headers: &Headers) -> ClientNew<S> {
25+
self.internal_async_accept(Some(custom_headers))
26+
}
27+
28+
fn internal_async_accept(mut self, custom_headers: Option<&Headers>) -> ClientNew<S> {
29+
let status = self.prepare_headers(custom_headers);
30+
let WsUpgrade { headers, stream, request, buffer } = self;
31+
32+
let duplex = Framed::from_parts(FramedParts {
33+
inner: stream,
34+
readbuf: buffer,
35+
writebuf: BytesMut::with_capacity(0),
36+
},
37+
HttpServerCodec);
38+
39+
let future = duplex.send(Incoming {
40+
version: request.version,
41+
subject: status,
42+
headers: headers.clone(),
43+
})
44+
.map(move |s| {
45+
let codec = MessageCodec::default(Context::Client);
46+
let client = Framed::from_parts(s.into_parts(), codec);
47+
(client, headers)
48+
})
49+
.map_err(|e| e.into());
50+
Box::new(future)
1651
}
1752

18-
pub fn async_accept_with(
19-
self,
20-
custom_headers: &Headers,
21-
) -> Result<ClientNew<S>, (S, io::Error)> {
22-
unimplemented!();
53+
pub fn async_reject(self) -> Send<Framed<S, HttpServerCodec>> {
54+
self.internal_async_reject(None)
2355
}
2456

25-
pub fn async_reject(self) -> Result<S, (S, io::Error)> {
26-
unimplemented!();
57+
pub fn async_reject_with(self, headers: &Headers) -> Send<Framed<S, HttpServerCodec>> {
58+
self.internal_async_reject(Some(headers))
2759
}
2860

29-
pub fn async_reject_with(self, headers: &Headers) -> Result<S, (S, io::Error)> {
30-
unimplemented!();
61+
fn internal_async_reject(
62+
mut self,
63+
headers: Option<&Headers>,
64+
) -> Send<Framed<S, HttpServerCodec>> {
65+
if let Some(custom) = headers {
66+
self.headers.extend(custom.iter());
67+
}
68+
let duplex = Framed::from_parts(FramedParts {
69+
inner: self.stream,
70+
readbuf: self.buffer,
71+
writebuf: BytesMut::with_capacity(0),
72+
},
73+
HttpServerCodec);
74+
duplex.send(Incoming {
75+
version: self.request.version,
76+
subject: StatusCode::BadRequest,
77+
headers: self.headers,
78+
})
3179
}
3280
}
3381

@@ -44,46 +92,42 @@ pub trait AsyncIntoWs {
4492
///
4593
/// Note: this is the asynchronous version, meaning it will not block when
4694
/// trying to read a request.
47-
fn into_ws(self) -> Box<Future<Item = WsUpgrade<Self::Stream>, Error = Self::Error>>;
95+
fn into_ws(self) -> Box<Future<Item = AsyncWsUpgrade<Self::Stream>, Error = Self::Error>>;
4896
}
4997

5098
impl<S> AsyncIntoWs for S
5199
where S: AsyncStream + 'static
52100
{
53101
type Stream = S;
54-
type Error = (S, Option<Request>, Option<BytesMut>, HyperIntoWsError);
102+
type Error = (S, Option<Request>, BytesMut, HyperIntoWsError);
55103

56-
fn into_ws(self) -> Box<Future<Item = WsUpgrade<Self::Stream>, Error = Self::Error>> {
104+
fn into_ws(self) -> Box<Future<Item = AsyncWsUpgrade<Self::Stream>, Error = Self::Error>> {
57105
let future = self.framed(HttpServerCodec)
58106
.into_future()
59107
.map_err(|(e, s)| {
60108
let FramedParts { inner, readbuf, .. } = s.into_parts();
61-
(inner, None, Some(readbuf), e.into())
109+
(inner, None, readbuf, e.into())
62110
})
63111
.and_then(|(m, s)| {
64112
let FramedParts { inner, readbuf, .. } = s.into_parts();
65113
if let Some(msg) = m {
66114
match validate(&msg.subject.0, &msg.version, &msg.headers) {
67115
Ok(()) => Ok((msg, inner, readbuf)),
68-
Err(e) => Err((inner, None, Some(readbuf), e)),
116+
Err(e) => Err((inner, None, readbuf, e)),
69117
}
70118
} else {
71119
let err = HyperIntoWsError::Io(io::Error::new(
72120
ErrorKind::ConnectionReset,
73121
"Connection dropped before handshake could be read"));
74-
Err((inner, None, Some(readbuf), err))
122+
Err((inner, None, readbuf, err))
75123
}
76124
})
77125
.map(|(m, stream, buffer)| {
78126
WsUpgrade {
79127
headers: Headers::new(),
80128
stream: stream,
81129
request: m,
82-
buffer: Some(Buffer {
83-
buf: unimplemented!(),
84-
pos: 0,
85-
cap: buffer.capacity(),
86-
}),
130+
buffer: buffer,
87131
}
88132
});
89133
Box::new(future)

src/server/upgrade/from_hyper.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
//! ```
3737
3838
use hyper::net::NetworkStream;
39-
use super::{IntoWs, WsUpgrade, Buffer};
39+
use super::{IntoWs, SyncWsUpgrade, Buffer};
4040

4141
pub use hyper::http::h1::Incoming;
4242
pub use hyper::method::Method;
@@ -58,7 +58,7 @@ impl<'a, 'b> IntoWs for HyperRequest<'a, 'b> {
5858
type Stream = &'a mut &'b mut NetworkStream;
5959
type Error = (Request<'a, 'b>, HyperIntoWsError);
6060

61-
fn into_ws(self) -> Result<WsUpgrade<Self::Stream>, Self::Error> {
61+
fn into_ws(self) -> Result<SyncWsUpgrade<Self::Stream>, Self::Error> {
6262
if let Err(e) = validate(&self.0.method, &self.0.version, &self.0.headers) {
6363
return Err((self.0, e));
6464
}
@@ -70,7 +70,7 @@ impl<'a, 'b> IntoWs for HyperRequest<'a, 'b> {
7070
let (buf, pos, cap) = reader.take_buf();
7171
let stream = reader.get_mut();
7272

73-
Ok(WsUpgrade {
73+
Ok(SyncWsUpgrade {
7474
headers: Headers::new(),
7575
stream: stream,
7676
buffer: Some(Buffer {

0 commit comments

Comments
 (0)