Skip to content

Commit 1c22e21

Browse files
committed
feat(client): add Layer types for client::conn
1 parent b9dc3d2 commit 1c22e21

File tree

2 files changed

+290
-0
lines changed

2 files changed

+290
-0
lines changed

src/client/conn.rs

Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
//! todo
2+
3+
use std::future::Future;
4+
use std::marker::PhantomData;
5+
use std::pin::Pin;
6+
use std::task::{Context, Poll};
7+
8+
use http::{Request, Response};
9+
use tower_service::Service;
10+
11+
use crate::common::future::poll_fn;
12+
13+
type BoxError = Box<dyn std::error::Error + Send + Sync>;
14+
15+
/// todo
16+
#[cfg(feature = "http1")]
17+
pub struct Http1Layer<B> {
18+
builder: hyper::client::conn::http1::Builder,
19+
_body: PhantomData<fn(B)>,
20+
}
21+
22+
/// todo
23+
#[cfg(feature = "http1")]
24+
pub fn http1<B>() -> Http1Layer<B> {
25+
Http1Layer {
26+
builder: hyper::client::conn::http1::Builder::new(),
27+
_body: PhantomData,
28+
}
29+
}
30+
31+
#[cfg(feature = "http1")]
32+
impl<M, B> tower_layer::Layer<M> for Http1Layer<B> {
33+
type Service = Http1Connect<M, B>;
34+
fn layer(&self, inner: M) -> Self::Service {
35+
Http1Connect {
36+
inner,
37+
builder: self.builder.clone(),
38+
_body: self._body,
39+
}
40+
}
41+
}
42+
43+
#[cfg(feature = "http1")]
44+
impl<B> Clone for Http1Layer<B> {
45+
fn clone(&self) -> Self {
46+
Self {
47+
builder: self.builder.clone(),
48+
_body: self._body.clone(),
49+
}
50+
}
51+
}
52+
53+
#[cfg(feature = "http1")]
54+
impl<B> From<hyper::client::conn::http1::Builder> for Http1Layer<B> {
55+
fn from(builder: hyper::client::conn::http1::Builder) -> Self {
56+
Self {
57+
builder,
58+
_body: PhantomData,
59+
}
60+
}
61+
}
62+
63+
/// todo
64+
#[cfg(feature = "http1")]
65+
pub struct Http1Connect<M, B> {
66+
inner: M,
67+
builder: hyper::client::conn::http1::Builder,
68+
_body: PhantomData<fn(B)>,
69+
}
70+
71+
#[cfg(feature = "http1")]
72+
impl<M, Dst, B> Service<Dst> for Http1Connect<M, B>
73+
where
74+
M: Service<Dst>,
75+
M::Future: Send + 'static,
76+
M::Response: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static,
77+
M::Error: Into<BoxError>,
78+
B: hyper::body::Body + Send + 'static,
79+
B::Data: Send + 'static,
80+
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
81+
{
82+
type Response = Http1ClientService<B>;
83+
type Error = BoxError;
84+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
85+
86+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
87+
self.inner.poll_ready(cx).map_err(Into::into)
88+
}
89+
90+
fn call(&mut self, dst: Dst) -> Self::Future {
91+
let fut = self.inner.call(dst);
92+
let builder = self.builder.clone();
93+
Box::pin(async move {
94+
let io = fut.await.map_err(Into::into)?;
95+
let (mut tx, conn) = builder.handshake(io).await?;
96+
//todo: pass in Executor
97+
tokio::spawn(async move {
98+
if let Err(e) = conn.await {
99+
eprintln!("connection error: {:?}", e);
100+
}
101+
});
102+
// todo: wait for ready? or factor out to other middleware?
103+
poll_fn(|cx| tx.poll_ready(cx)).await?;
104+
105+
Ok(Http1ClientService::new(tx))
106+
})
107+
}
108+
}
109+
110+
#[cfg(feature = "http1")]
111+
impl<M: Clone, B> Clone for Http1Connect<M, B> {
112+
fn clone(&self) -> Self {
113+
Self {
114+
inner: self.inner.clone(),
115+
builder: self.builder.clone(),
116+
_body: self._body.clone(),
117+
}
118+
}
119+
}
120+
121+
/// todo
122+
#[cfg(feature = "http2")]
123+
pub struct Http2Layer<B> {
124+
_body: PhantomData<fn(B)>,
125+
}
126+
127+
/// todo
128+
#[cfg(feature = "http2")]
129+
pub fn http2<B>() -> Http2Layer<B> {
130+
Http2Layer { _body: PhantomData }
131+
}
132+
133+
#[cfg(feature = "http2")]
134+
impl<M, B> tower_layer::Layer<M> for Http2Layer<B> {
135+
type Service = Http2Connect<M, B>;
136+
fn layer(&self, inner: M) -> Self::Service {
137+
Http2Connect {
138+
inner,
139+
builder: hyper::client::conn::http2::Builder::new(crate::rt::TokioExecutor::new()),
140+
_body: self._body,
141+
}
142+
}
143+
}
144+
145+
#[cfg(feature = "http2")]
146+
impl<B> Clone for Http2Layer<B> {
147+
fn clone(&self) -> Self {
148+
Self {
149+
_body: self._body.clone(),
150+
}
151+
}
152+
}
153+
154+
/// todo
155+
#[cfg(feature = "http2")]
156+
#[derive(Debug)]
157+
pub struct Http2Connect<M, B> {
158+
inner: M,
159+
builder: hyper::client::conn::http2::Builder<crate::rt::TokioExecutor>,
160+
_body: PhantomData<fn(B)>,
161+
}
162+
163+
#[cfg(feature = "http2")]
164+
impl<M, Dst, B> Service<Dst> for Http2Connect<M, B>
165+
where
166+
M: Service<Dst>,
167+
M::Future: Send + 'static,
168+
M::Response: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static,
169+
M::Error: Into<BoxError>,
170+
B: hyper::body::Body + Unpin + Send + 'static,
171+
B::Data: Send + 'static,
172+
B::Error: Into<BoxError>,
173+
{
174+
type Response = Http2ClientService<B>;
175+
type Error = BoxError;
176+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
177+
178+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
179+
self.inner.poll_ready(cx).map_err(Into::into)
180+
}
181+
182+
fn call(&mut self, dst: Dst) -> Self::Future {
183+
let fut = self.inner.call(dst);
184+
let builder = self.builder.clone();
185+
Box::pin(async move {
186+
let io = fut.await.map_err(Into::into)?;
187+
let (mut tx, conn) = builder.handshake(io).await?;
188+
tokio::spawn(async move {
189+
if let Err(e) = conn.await {
190+
eprintln!("connection error: {:?}", e);
191+
}
192+
});
193+
194+
// todo: wait for ready? or factor out to other middleware?
195+
poll_fn(|cx| tx.poll_ready(cx)).await?;
196+
Ok(Http2ClientService::new(tx))
197+
})
198+
}
199+
}
200+
201+
#[cfg(feature = "http2")]
202+
impl<M: Clone, B> Clone for Http2Connect<M, B> {
203+
fn clone(&self) -> Self {
204+
Self {
205+
inner: self.inner.clone(),
206+
builder: self.builder.clone(),
207+
_body: self._body.clone(),
208+
}
209+
}
210+
}
211+
212+
/// A thin adapter over hyper HTTP/1 client SendRequest.
213+
#[cfg(feature = "http1")]
214+
#[derive(Debug)]
215+
pub struct Http1ClientService<B> {
216+
tx: hyper::client::conn::http1::SendRequest<B>,
217+
}
218+
219+
#[cfg(feature = "http1")]
220+
impl<B> Http1ClientService<B> {
221+
/// todo
222+
pub fn new(tx: hyper::client::conn::http1::SendRequest<B>) -> Self {
223+
Self { tx }
224+
}
225+
}
226+
227+
#[cfg(feature = "http1")]
228+
impl<B> Service<Request<B>> for Http1ClientService<B>
229+
where
230+
B: hyper::body::Body + Send + 'static,
231+
{
232+
type Response = Response<hyper::body::Incoming>;
233+
type Error = hyper::Error;
234+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
235+
236+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
237+
self.tx.poll_ready(cx)
238+
}
239+
240+
fn call(&mut self, req: Request<B>) -> Self::Future {
241+
let fut = self.tx.send_request(req);
242+
Box::pin(fut)
243+
}
244+
}
245+
246+
/// todo
247+
#[cfg(feature = "http2")]
248+
#[derive(Debug)]
249+
pub struct Http2ClientService<B> {
250+
tx: hyper::client::conn::http2::SendRequest<B>,
251+
}
252+
253+
#[cfg(feature = "http2")]
254+
impl<B> Http2ClientService<B> {
255+
/// todo
256+
pub fn new(tx: hyper::client::conn::http2::SendRequest<B>) -> Self {
257+
Self { tx }
258+
}
259+
}
260+
261+
#[cfg(feature = "http2")]
262+
impl<B> Service<Request<B>> for Http2ClientService<B>
263+
where
264+
B: hyper::body::Body + Send + 'static,
265+
{
266+
type Response = Response<hyper::body::Incoming>;
267+
type Error = hyper::Error;
268+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
269+
270+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
271+
self.tx.poll_ready(cx)
272+
}
273+
274+
fn call(&mut self, req: Request<B>) -> Self::Future {
275+
let fut = self.tx.send_request(req);
276+
Box::pin(fut)
277+
}
278+
}
279+
280+
#[cfg(feature = "http2")]
281+
impl<B> Clone for Http2ClientService<B> {
282+
fn clone(&self) -> Self {
283+
Self {
284+
tx: self.tx.clone(),
285+
}
286+
}
287+
}

src/client/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
//! HTTP client utilities
22
3+
#[cfg(any(feature = "http1", feature = "http2"))]
4+
pub mod conn;
5+
36
/// Legacy implementations of `connect` module and `Client`
47
#[cfg(feature = "client-legacy")]
58
pub mod legacy;

0 commit comments

Comments
 (0)