diff --git a/src/client/conn.rs b/src/client/conn.rs new file mode 100644 index 0000000..3f3728e --- /dev/null +++ b/src/client/conn.rs @@ -0,0 +1,287 @@ +//! todo + +use std::future::Future; +use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use http::{Request, Response}; +use tower_service::Service; + +use crate::common::future::poll_fn; + +type BoxError = Box; + +/// todo +#[cfg(feature = "http1")] +pub struct Http1Layer { + builder: hyper::client::conn::http1::Builder, + _body: PhantomData, +} + +/// todo +#[cfg(feature = "http1")] +pub fn http1() -> Http1Layer { + Http1Layer { + builder: hyper::client::conn::http1::Builder::new(), + _body: PhantomData, + } +} + +#[cfg(feature = "http1")] +impl tower_layer::Layer for Http1Layer { + type Service = Http1Connect; + fn layer(&self, inner: M) -> Self::Service { + Http1Connect { + inner, + builder: self.builder.clone(), + _body: self._body, + } + } +} + +#[cfg(feature = "http1")] +impl Clone for Http1Layer { + fn clone(&self) -> Self { + Self { + builder: self.builder.clone(), + _body: self._body.clone(), + } + } +} + +#[cfg(feature = "http1")] +impl From for Http1Layer { + fn from(builder: hyper::client::conn::http1::Builder) -> Self { + Self { + builder, + _body: PhantomData, + } + } +} + +/// todo +#[cfg(feature = "http1")] +pub struct Http1Connect { + inner: M, + builder: hyper::client::conn::http1::Builder, + _body: PhantomData, +} + +#[cfg(feature = "http1")] +impl Service for Http1Connect +where + M: Service, + M::Future: Send + 'static, + M::Response: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static, + M::Error: Into, + B: hyper::body::Body + Send + 'static, + B::Data: Send + 'static, + B::Error: Into>, +{ + type Response = Http1ClientService; + type Error = BoxError; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, dst: Dst) -> Self::Future { + let fut = self.inner.call(dst); + let builder = self.builder.clone(); + Box::pin(async move { + let io = fut.await.map_err(Into::into)?; + let (mut tx, conn) = builder.handshake(io).await?; + //todo: pass in Executor + tokio::spawn(async move { + if let Err(e) = conn.await { + eprintln!("connection error: {:?}", e); + } + }); + // todo: wait for ready? or factor out to other middleware? + poll_fn(|cx| tx.poll_ready(cx)).await?; + + Ok(Http1ClientService::new(tx)) + }) + } +} + +#[cfg(feature = "http1")] +impl Clone for Http1Connect { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + builder: self.builder.clone(), + _body: self._body.clone(), + } + } +} + +/// todo +#[cfg(feature = "http2")] +pub struct Http2Layer { + _body: PhantomData, +} + +/// todo +#[cfg(feature = "http2")] +pub fn http2() -> Http2Layer { + Http2Layer { _body: PhantomData } +} + +#[cfg(feature = "http2")] +impl tower_layer::Layer for Http2Layer { + type Service = Http2Connect; + fn layer(&self, inner: M) -> Self::Service { + Http2Connect { + inner, + builder: hyper::client::conn::http2::Builder::new(crate::rt::TokioExecutor::new()), + _body: self._body, + } + } +} + +#[cfg(feature = "http2")] +impl Clone for Http2Layer { + fn clone(&self) -> Self { + Self { + _body: self._body.clone(), + } + } +} + +/// todo +#[cfg(feature = "http2")] +#[derive(Debug)] +pub struct Http2Connect { + inner: M, + builder: hyper::client::conn::http2::Builder, + _body: PhantomData, +} + +#[cfg(feature = "http2")] +impl Service for Http2Connect +where + M: Service, + M::Future: Send + 'static, + M::Response: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static, + M::Error: Into, + B: hyper::body::Body + Unpin + Send + 'static, + B::Data: Send + 'static, + B::Error: Into, +{ + type Response = Http2ClientService; + type Error = BoxError; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, dst: Dst) -> Self::Future { + let fut = self.inner.call(dst); + let builder = self.builder.clone(); + Box::pin(async move { + let io = fut.await.map_err(Into::into)?; + let (mut tx, conn) = builder.handshake(io).await?; + tokio::spawn(async move { + if let Err(e) = conn.await { + eprintln!("connection error: {:?}", e); + } + }); + + // todo: wait for ready? or factor out to other middleware? + poll_fn(|cx| tx.poll_ready(cx)).await?; + Ok(Http2ClientService::new(tx)) + }) + } +} + +#[cfg(feature = "http2")] +impl Clone for Http2Connect { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + builder: self.builder.clone(), + _body: self._body.clone(), + } + } +} + +/// A thin adapter over hyper HTTP/1 client SendRequest. +#[cfg(feature = "http1")] +#[derive(Debug)] +pub struct Http1ClientService { + tx: hyper::client::conn::http1::SendRequest, +} + +#[cfg(feature = "http1")] +impl Http1ClientService { + /// todo + pub fn new(tx: hyper::client::conn::http1::SendRequest) -> Self { + Self { tx } + } +} + +#[cfg(feature = "http1")] +impl Service> for Http1ClientService +where + B: hyper::body::Body + Send + 'static, +{ + type Response = Response; + type Error = hyper::Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.tx.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + let fut = self.tx.send_request(req); + Box::pin(fut) + } +} + +/// todo +#[cfg(feature = "http2")] +#[derive(Debug)] +pub struct Http2ClientService { + tx: hyper::client::conn::http2::SendRequest, +} + +#[cfg(feature = "http2")] +impl Http2ClientService { + /// todo + pub fn new(tx: hyper::client::conn::http2::SendRequest) -> Self { + Self { tx } + } +} + +#[cfg(feature = "http2")] +impl Service> for Http2ClientService +where + B: hyper::body::Body + Send + 'static, +{ + type Response = Response; + type Error = hyper::Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.tx.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + let fut = self.tx.send_request(req); + Box::pin(fut) + } +} + +#[cfg(feature = "http2")] +impl Clone for Http2ClientService { + fn clone(&self) -> Self { + Self { + tx: self.tx.clone(), + } + } +} diff --git a/src/client/mod.rs b/src/client/mod.rs index 0d89603..294db01 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,5 +1,8 @@ //! HTTP client utilities +#[cfg(any(feature = "http1", feature = "http2"))] +pub mod conn; + /// Legacy implementations of `connect` module and `Client` #[cfg(feature = "client-legacy")] pub mod legacy;