Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
287 changes: 287 additions & 0 deletions src/client/conn.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
//! todo

use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};

use http::{Request, Response};
use tower_service::Service;

use crate::common::future::poll_fn;

type BoxError = Box<dyn std::error::Error + Send + Sync>;

/// todo
#[cfg(feature = "http1")]
pub struct Http1Layer<B> {
builder: hyper::client::conn::http1::Builder,
_body: PhantomData<fn(B)>,
}

/// todo
#[cfg(feature = "http1")]
pub fn http1<B>() -> Http1Layer<B> {
Http1Layer {
builder: hyper::client::conn::http1::Builder::new(),
_body: PhantomData,
}
}

#[cfg(feature = "http1")]
impl<M, B> tower_layer::Layer<M> for Http1Layer<B> {
type Service = Http1Connect<M, B>;
fn layer(&self, inner: M) -> Self::Service {
Http1Connect {
inner,
builder: self.builder.clone(),
_body: self._body,
}
}
}

#[cfg(feature = "http1")]
impl<B> Clone for Http1Layer<B> {
fn clone(&self) -> Self {
Self {
builder: self.builder.clone(),
_body: self._body.clone(),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need to clone phantom data

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couple of other cases of this elsewhere

}
}
}

#[cfg(feature = "http1")]
impl<B> From<hyper::client::conn::http1::Builder> for Http1Layer<B> {
fn from(builder: hyper::client::conn::http1::Builder) -> Self {
Self {
builder,
_body: PhantomData,
}
}
}

/// todo
#[cfg(feature = "http1")]
pub struct Http1Connect<M, B> {
inner: M,
builder: hyper::client::conn::http1::Builder,
_body: PhantomData<fn(B)>,
}

#[cfg(feature = "http1")]
impl<M, Dst, B> Service<Dst> for Http1Connect<M, B>
where
M: Service<Dst>,
M::Future: Send + 'static,
M::Response: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static,
M::Error: Into<BoxError>,
B: hyper::body::Body + Send + 'static,
B::Data: Send + 'static,
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
type Response = Http1ClientService<B>;
type Error = BoxError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(Into::into)
}

fn call(&mut self, dst: Dst) -> Self::Future {
let fut = self.inner.call(dst);
let builder = self.builder.clone();
Box::pin(async move {
let io = fut.await.map_err(Into::into)?;
let (mut tx, conn) = builder.handshake(io).await?;
//todo: pass in Executor
tokio::spawn(async move {
if let Err(e) = conn.await {
eprintln!("connection error: {:?}", e);
}
});
// todo: wait for ready? or factor out to other middleware?
poll_fn(|cx| tx.poll_ready(cx)).await?;

Ok(Http1ClientService::new(tx))
})
}
}

#[cfg(feature = "http1")]
impl<M: Clone, B> Clone for Http1Connect<M, B> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
builder: self.builder.clone(),
_body: self._body.clone(),
}
}
}

/// todo
#[cfg(feature = "http2")]
pub struct Http2Layer<B> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we should be generic over the executor here:

diff --git a/src/client/conn.rs b/src/client/conn.rs
index d510ec3..0ec767b 100644
--- a/src/client/conn.rs
+++ b/src/client/conn.rs
@@ -39,7 +39,7 @@ impl<B> Clone for Http1Layer<B> {
     fn clone(&self) -> Self {
         Self {
             builder: self.builder.clone(),
-            _body: self._body.clone(),
+            _body: self._body,
         }
     }
 }
@@ -101,49 +101,69 @@ impl<M: Clone, B> Clone for Http1Connect<M, B> {
         Self {
             inner: self.inner.clone(),
             builder: self.builder.clone(),
-            _body: self._body.clone(),
+            _body: self._body,
         }
     }
 }

 /// todo
-pub struct Http2Layer<B> {
+pub struct Http2Layer<B, E> {
+    builder: hyper::client::conn::http2::Builder<E>,
     _body: PhantomData<fn(B)>,
 }

 /// todo
-pub fn http2<B>() -> Http2Layer<B> {
-    Http2Layer { _body: PhantomData }
+pub fn http2<B, E>(executor: E) -> Http2Layer<B, E>
+where
+    E: Clone,
+{
+    Http2Layer {
+        builder: hyper::client::conn::http2::Builder::new(executor),
+        _body: PhantomData,
+    }
 }

-impl<M, B> tower_layer::Layer<M> for Http2Layer<B> {
-    type Service = Http2Connect<M, B>;
+impl<M, B, E> tower_layer::Layer<M> for Http2Layer<B, E>
+where
+    E: Clone,
+{
+    type Service = Http2Connect<M, B, E>;
     fn layer(&self, inner: M) -> Self::Service {
         Http2Connect {
             inner,
-            builder: hyper::client::conn::http2::Builder::new(crate::rt::TokioExecutor::new()),
+            builder: self.builder.clone(),
             _body: self._body,
         }
     }
 }

-impl<B> Clone for Http2Layer<B> {
+impl<B, E: Clone> Clone for Http2Layer<B, E> {
     fn clone(&self) -> Self {
         Self {
-            _body: self._body.clone(),
+            builder: self.builder.clone(),
+            _body: self._body,
+        }
+    }
+}
+
+impl<B, E> From<hyper::client::conn::http2::Builder<E>> for Http2Layer<B, E> {
+    fn from(builder: hyper::client::conn::http2::Builder<E>) -> Self {
+        Self {
+            builder,
+            _body: PhantomData,
         }
     }
 }

 /// todo
 #[derive(Debug)]
-pub struct Http2Connect<M, B> {
+pub struct Http2Connect<M, B, E> {
     inner: M,
-    builder: hyper::client::conn::http2::Builder<crate::rt::TokioExecutor>,
+    builder: hyper::client::conn::http2::Builder<E>,
     _body: PhantomData<fn(B)>,
 }

-impl<M, Dst, B> Service<Dst> for Http2Connect<M, B>
+impl<M, Dst, B, E> Service<Dst> for Http2Connect<M, B, E>
 where
     M: Service<Dst>,
     M::Future: Send + 'static,
@@ -152,6 +172,7 @@ where
     B: hyper::body::Body + Unpin + Send + 'static,
     B::Data: Send + 'static,
     B::Error: Into<BoxError>,
+    E: hyper::rt::bounds::Http2ClientConnExec<B, M::Response> + Unpin + Clone + Send + 'static,
 {
     type Response = Http2ClientService<B>;
     type Error = BoxError;
@@ -179,12 +200,12 @@ where
     }
 }

-impl<M: Clone, B> Clone for Http2Connect<M, B> {
+impl<M: Clone, B, E: Clone> Clone for Http2Connect<M, B, E> {
     fn clone(&self) -> Self {
         Self {
             inner: self.inner.clone(),
             builder: self.builder.clone(),
-            _body: self._body.clone(),
+            _body: self._body,
         }
     }
 }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yea definitely, it was something I skipped initially but knew it'd be needed before merging.

_body: PhantomData<fn(B)>,
}

/// todo
#[cfg(feature = "http2")]
pub fn http2<B>() -> Http2Layer<B> {
Http2Layer { _body: PhantomData }
}

#[cfg(feature = "http2")]
impl<M, B> tower_layer::Layer<M> for Http2Layer<B> {
type Service = Http2Connect<M, B>;
fn layer(&self, inner: M) -> Self::Service {
Http2Connect {
inner,
builder: hyper::client::conn::http2::Builder::new(crate::rt::TokioExecutor::new()),
_body: self._body,
}
}
}

#[cfg(feature = "http2")]
impl<B> Clone for Http2Layer<B> {
fn clone(&self) -> Self {
Self {
_body: self._body.clone(),
}
}
}

/// todo
#[cfg(feature = "http2")]
#[derive(Debug)]
pub struct Http2Connect<M, B> {
inner: M,
builder: hyper::client::conn::http2::Builder<crate::rt::TokioExecutor>,
_body: PhantomData<fn(B)>,
}

#[cfg(feature = "http2")]
impl<M, Dst, B> Service<Dst> for Http2Connect<M, B>
where
M: Service<Dst>,
M::Future: Send + 'static,
M::Response: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static,
M::Error: Into<BoxError>,
B: hyper::body::Body + Unpin + Send + 'static,
B::Data: Send + 'static,
B::Error: Into<BoxError>,
{
type Response = Http2ClientService<B>;
type Error = BoxError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(Into::into)
}

fn call(&mut self, dst: Dst) -> Self::Future {
let fut = self.inner.call(dst);
let builder = self.builder.clone();
Box::pin(async move {
let io = fut.await.map_err(Into::into)?;
let (mut tx, conn) = builder.handshake(io).await?;
tokio::spawn(async move {
if let Err(e) = conn.await {
eprintln!("connection error: {:?}", e);
}
});

// todo: wait for ready? or factor out to other middleware?
poll_fn(|cx| tx.poll_ready(cx)).await?;
Ok(Http2ClientService::new(tx))
})
}
}

#[cfg(feature = "http2")]
impl<M: Clone, B> Clone for Http2Connect<M, B> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
builder: self.builder.clone(),
_body: self._body.clone(),
}
}
}

/// A thin adapter over hyper HTTP/1 client SendRequest.
#[cfg(feature = "http1")]
#[derive(Debug)]
pub struct Http1ClientService<B> {
tx: hyper::client::conn::http1::SendRequest<B>,
}

#[cfg(feature = "http1")]
impl<B> Http1ClientService<B> {
/// todo
pub fn new(tx: hyper::client::conn::http1::SendRequest<B>) -> Self {
Self { tx }
}
}

#[cfg(feature = "http1")]
impl<B> Service<Request<B>> for Http1ClientService<B>
where
B: hyper::body::Body + Send + 'static,
{
type Response = Response<hyper::body::Incoming>;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.tx.poll_ready(cx)
}

fn call(&mut self, req: Request<B>) -> Self::Future {
let fut = self.tx.send_request(req);
Box::pin(fut)
}
}

/// todo
#[cfg(feature = "http2")]
#[derive(Debug)]
pub struct Http2ClientService<B> {
tx: hyper::client::conn::http2::SendRequest<B>,
}

#[cfg(feature = "http2")]
impl<B> Http2ClientService<B> {
/// todo
pub fn new(tx: hyper::client::conn::http2::SendRequest<B>) -> Self {
Self { tx }
}
}

#[cfg(feature = "http2")]
impl<B> Service<Request<B>> for Http2ClientService<B>
where
B: hyper::body::Body + Send + 'static,
{
type Response = Response<hyper::body::Incoming>;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.tx.poll_ready(cx)
}

fn call(&mut self, req: Request<B>) -> Self::Future {
let fut = self.tx.send_request(req);
Box::pin(fut)
}
}

#[cfg(feature = "http2")]
impl<B> Clone for Http2ClientService<B> {
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
}
}
}
3 changes: 3 additions & 0 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
//! HTTP client utilities

#[cfg(any(feature = "http1", feature = "http2"))]
pub mod conn;

/// Legacy implementations of `connect` module and `Client`
#[cfg(feature = "client-legacy")]
pub mod legacy;
Expand Down
Loading