Skip to content

Commit c5b2c47

Browse files
committed
feat(client): add Layer types for client::conn
1 parent b9dc3d2 commit c5b2c47

File tree

2 files changed

+266
-0
lines changed

2 files changed

+266
-0
lines changed

src/client/conn.rs

Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
//! todo
2+
3+
use std::future::Future;
4+
use std::marker::PhantomData;
5+
use std::pin::Pin;
6+
use std::task::{Context, Poll};
7+
8+
use http::{Request, Response};
9+
use tower_service::Service;
10+
11+
type BoxError = Box<dyn std::error::Error + Send + Sync>;
12+
13+
/// todo
14+
pub struct Http1Layer<B> {
15+
builder: hyper::client::conn::http1::Builder,
16+
_body: PhantomData<fn(B)>,
17+
}
18+
19+
/// todo
20+
pub fn http1<B>() -> Http1Layer<B> {
21+
Http1Layer {
22+
builder: hyper::client::conn::http1::Builder::new(),
23+
_body: PhantomData,
24+
}
25+
}
26+
27+
impl<M, B> tower_layer::Layer<M> for Http1Layer<B> {
28+
type Service = Http1Connect<M, B>;
29+
fn layer(&self, inner: M) -> Self::Service {
30+
Http1Connect {
31+
inner,
32+
builder: self.builder.clone(),
33+
_body: self._body,
34+
}
35+
}
36+
}
37+
38+
impl<B> Clone for Http1Layer<B> {
39+
fn clone(&self) -> Self {
40+
Self {
41+
builder: self.builder.clone(),
42+
_body: self._body.clone(),
43+
}
44+
}
45+
}
46+
47+
impl<B> From<hyper::client::conn::http1::Builder> for Http1Layer<B> {
48+
fn from(builder: hyper::client::conn::http1::Builder) -> Self {
49+
Self {
50+
builder,
51+
_body: PhantomData,
52+
}
53+
}
54+
}
55+
56+
/// todo
57+
pub struct Http1Connect<M, B> {
58+
inner: M,
59+
builder: hyper::client::conn::http1::Builder,
60+
_body: PhantomData<fn(B)>,
61+
}
62+
63+
impl<M, Dst, B> Service<Dst> for Http1Connect<M, B>
64+
where
65+
M: Service<Dst>,
66+
M::Future: Send + 'static,
67+
M::Response: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static,
68+
M::Error: Into<BoxError>,
69+
B: hyper::body::Body + Send + 'static,
70+
B::Data: Send + 'static,
71+
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
72+
{
73+
type Response = Http1ClientService<B>;
74+
type Error = BoxError;
75+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
76+
77+
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
78+
// Minimal, explicit contract: delegate readiness to the response future.
79+
// If you want strict backpressure, use the "permit" pattern (see notes).
80+
Poll::Ready(Ok(()))
81+
}
82+
83+
fn call(&mut self, dst: Dst) -> Self::Future {
84+
let fut = self.inner.call(dst);
85+
let builder = self.builder.clone();
86+
Box::pin(async move {
87+
let io = fut.await.map_err(Into::into)?;
88+
let (tx, conn) = builder.handshake(io).await?;
89+
tokio::spawn(async move {
90+
if let Err(e) = conn.await {
91+
eprintln!("connection error: {:?}", e);
92+
}
93+
});
94+
Ok(Http1ClientService::new(tx))
95+
})
96+
}
97+
}
98+
99+
impl<M: Clone, B> Clone for Http1Connect<M, B> {
100+
fn clone(&self) -> Self {
101+
Self {
102+
inner: self.inner.clone(),
103+
builder: self.builder.clone(),
104+
_body: self._body.clone(),
105+
}
106+
}
107+
}
108+
109+
/// todo
110+
pub struct Http2Layer<B> {
111+
_body: PhantomData<fn(B)>,
112+
}
113+
114+
/// todo
115+
pub fn http2<B>() -> Http2Layer<B> {
116+
Http2Layer {
117+
_body: PhantomData,
118+
}
119+
}
120+
121+
impl<M, B> tower_layer::Layer<M> for Http2Layer<B> {
122+
type Service = Http2Connect<M, B>;
123+
fn layer(&self, inner: M) -> Self::Service {
124+
Http2Connect {
125+
inner,
126+
builder: hyper::client::conn::http2::Builder::new(crate::rt::TokioExecutor::new()),
127+
_body: self._body,
128+
}
129+
}
130+
}
131+
132+
impl<B> Clone for Http2Layer<B> {
133+
fn clone(&self) -> Self {
134+
Self {
135+
_body: self._body.clone(),
136+
}
137+
}
138+
}
139+
140+
/// todo
141+
#[derive(Debug)]
142+
pub struct Http2Connect<M, B> {
143+
inner: M,
144+
builder: hyper::client::conn::http2::Builder<crate::rt::TokioExecutor>,
145+
_body: PhantomData<fn(B)>,
146+
}
147+
148+
impl<M, Dst, B> Service<Dst> for Http2Connect<M, B>
149+
where
150+
M: Service<Dst>,
151+
M::Future: Send + 'static,
152+
M::Response: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static,
153+
M::Error: Into<BoxError>,
154+
B: hyper::body::Body + Unpin + Send + 'static,
155+
B::Data: Send + 'static,
156+
B::Error: Into<BoxError>,
157+
{
158+
type Response = Http2ClientService<B>;
159+
type Error = BoxError;
160+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
161+
162+
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
163+
// Minimal, explicit contract: delegate readiness to the response future.
164+
// If you want strict backpressure, use the "permit" pattern (see notes).
165+
Poll::Ready(Ok(()))
166+
}
167+
168+
fn call(&mut self, dst: Dst) -> Self::Future {
169+
let fut = self.inner.call(dst);
170+
let builder = self.builder.clone();
171+
Box::pin(async move {
172+
let io = fut.await.map_err(Into::into)?;
173+
let (tx, conn) = builder.handshake(io).await?;
174+
tokio::spawn(async move {
175+
if let Err(e) = conn.await {
176+
eprintln!("connection error: {:?}", e);
177+
}
178+
});
179+
Ok(Http2ClientService::new(tx))
180+
})
181+
}
182+
}
183+
184+
impl<M: Clone, B> Clone for Http2Connect<M, B> {
185+
fn clone(&self) -> Self {
186+
Self {
187+
inner: self.inner.clone(),
188+
builder: self.builder.clone(),
189+
_body: self._body.clone(),
190+
}
191+
}
192+
}
193+
194+
/// A thin adapter over hyper HTTP/1 client SendRequest.
195+
#[derive(Debug)]
196+
pub struct Http1ClientService<B> {
197+
tx: hyper::client::conn::http1::SendRequest<B>,
198+
}
199+
200+
impl<B> Http1ClientService<B> {
201+
/// todo
202+
pub fn new(tx: hyper::client::conn::http1::SendRequest<B>) -> Self {
203+
Self { tx }
204+
}
205+
}
206+
207+
impl<B> Service<Request<B>> for Http1ClientService<B>
208+
where
209+
B: hyper::body::Body + Send + 'static,
210+
{
211+
type Response = Response<hyper::body::Incoming>;
212+
type Error = hyper::Error;
213+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
214+
215+
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
216+
// Minimal, explicit contract: delegate readiness to the response future.
217+
// If you want strict backpressure, use the "permit" pattern (see notes).
218+
Poll::Ready(Ok(()))
219+
}
220+
221+
fn call(&mut self, req: Request<B>) -> Self::Future {
222+
let fut = self.tx.send_request(req);
223+
Box::pin(async move { Ok(fut.await?) })
224+
}
225+
}
226+
227+
/// todo
228+
#[derive(Debug)]
229+
pub struct Http2ClientService<B> {
230+
tx: hyper::client::conn::http2::SendRequest<B>,
231+
}
232+
233+
impl<B> Http2ClientService<B> {
234+
/// todo
235+
pub fn new(tx: hyper::client::conn::http2::SendRequest<B>) -> Self {
236+
Self { tx }
237+
}
238+
}
239+
240+
impl<B> Service<Request<B>> for Http2ClientService<B>
241+
where
242+
B: hyper::body::Body + Send + 'static,
243+
{
244+
type Response = Response<hyper::body::Incoming>;
245+
type Error = hyper::Error;
246+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
247+
248+
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
249+
Poll::Ready(Ok(()))
250+
}
251+
252+
fn call(&mut self, req: Request<B>) -> Self::Future {
253+
let fut = self.tx.send_request(req);
254+
Box::pin(async move { Ok(fut.await?) })
255+
}
256+
}
257+
258+
impl<B> Clone for Http2ClientService<B> {
259+
fn clone(&self) -> Self {
260+
Self {
261+
tx: self.tx.clone(),
262+
}
263+
}
264+
}

src/client/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
//! HTTP client utilities
22
3+
pub mod conn;
4+
35
/// Legacy implementations of `connect` module and `Client`
46
#[cfg(feature = "client-legacy")]
57
pub mod legacy;

0 commit comments

Comments
 (0)