Skip to content

Commit 4d6bdd7

Browse files
committed
pools wip
1 parent 7acab98 commit 4d6bdd7

File tree

8 files changed

+382
-28
lines changed

8 files changed

+382
-28
lines changed

Cargo.toml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ futures-util = { version = "0.3.16", default-features = false, features = ["allo
4343
http-body-util = "0.1.0"
4444
tokio = { version = "1", features = ["macros", "test-util", "signal"] }
4545
tokio-test = "0.4"
46+
tower = { version = "0.5", features = ["util"] }
4647
tower-test = "0.4"
4748
pretty_env_logger = "0.5"
4849

@@ -56,7 +57,7 @@ system-configuration = { version = "0.6.1", optional = true }
5657
windows-registry = { version = "0.5", optional = true }
5758

5859
[features]
59-
default = []
60+
default = ["client", "client-pool"]
6061

6162
# Shorthand to enable everything
6263
full = [
@@ -80,6 +81,7 @@ client-legacy = ["client", "dep:socket2", "tokio/sync", "dep:libc", "dep:futures
8081
client-pool = ["dep:futures-util", "dep:tower-layer"]
8182
client-proxy = ["client", "dep:base64", "dep:ipnet", "dep:percent-encoding"]
8283
client-proxy-system = ["dep:system-configuration", "dep:windows-registry"]
84+
client-pool = ["tokio/sync", "dep:futures-util", "dep:tower-layer"]
8385

8486
server = ["hyper/server"]
8587
server-auto = ["server", "http1", "http2"]
@@ -99,6 +101,10 @@ __internal_happy_eyeballs_tests = []
99101

100102
[[example]]
101103
name = "client"
104+
required-features = ["client-legacy", "client-pool", "http1", "tokio"]
105+
106+
[[example]]
107+
name = "client_legacy"
102108
required-features = ["client-legacy", "http1", "tokio"]
103109

104110
[[example]]

examples/client.rs

Lines changed: 132 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,143 @@
1-
use std::env;
1+
use tower_service::Service;
2+
use tower::ServiceExt;
23

3-
use http_body_util::Empty;
4-
use hyper::Request;
5-
use hyper_util::client::legacy::{connect::HttpConnector, Client};
4+
use hyper_util::client::pool;
65

76
#[tokio::main(flavor = "current_thread")]
8-
async fn main() -> Result<(), Box<dyn std::error::Error>> {
9-
let url = match env::args().nth(1) {
10-
Some(url) => url,
11-
None => {
12-
eprintln!("Usage: client <url>");
13-
return Ok(());
14-
}
15-
};
16-
17-
// HTTPS requires picking a TLS implementation, so give a better
18-
// warning if the user tries to request an 'https' URL.
19-
let url = url.parse::<hyper::Uri>()?;
20-
if url.scheme_str() != Some("http") {
21-
eprintln!("This example only works with 'http' URLs.");
22-
return Ok(());
7+
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
8+
send_nego().await
9+
}
10+
11+
async fn send_h1() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
12+
let tcp = hyper_util::client::legacy::connect::HttpConnector::new();
13+
14+
let http1 = tcp.and_then(|conn| Box::pin(async move {
15+
let (mut tx, c) = hyper::client::conn::http1::handshake::<_, http_body_util::Empty<hyper::body::Bytes>>(conn).await?;
16+
tokio::spawn(async move {
17+
if let Err(e) = c.await {
18+
eprintln!("connection error: {:?}", e);
19+
}
20+
});
21+
let svc = tower::service_fn(move |req| { tx.send_request(req) });
22+
Ok::<_, Box<dyn std::error::Error + Send + Sync>>(svc)
23+
}));
24+
25+
let mut p = pool::Cache::new(http1).build();
26+
27+
let mut c = p.call(http::Uri::from_static("http://hyper.rs")).await?;
28+
eprintln!("{:?}", c);
29+
30+
let req = http::Request::builder()
31+
.header("host", "hyper.rs")
32+
.body(http_body_util::Empty::new())
33+
.unwrap();
34+
35+
c.ready().await?;
36+
let resp = c.call(req).await?;
37+
eprintln!("{:?}", resp);
38+
39+
Ok(())
40+
}
41+
42+
async fn send_h2() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
43+
let tcp = hyper_util::client::legacy::connect::HttpConnector::new();
44+
45+
let http2 = tcp.and_then(|conn| Box::pin(async move {
46+
let (mut tx, c) = hyper::client::conn::http2::handshake::<_, _, http_body_util::Empty<hyper::body::Bytes>>(
47+
hyper_util::rt::TokioExecutor::new(),
48+
conn,
49+
).await?;
50+
println!("connected");
51+
tokio::spawn(async move {
52+
if let Err(e) = c.await {
53+
eprintln!("connection error: {:?}", e);
54+
}
55+
});
56+
let svc = tower::service_fn(move |req| { tx.send_request(req) });
57+
Ok::<_, Box<dyn std::error::Error + Send + Sync>>(svc)
58+
}));
59+
60+
let mut p = pool::Singleton::new(http2);
61+
62+
for _ in 0..5 {
63+
let mut c = p.call(http::Uri::from_static("http://localhost:3000")).await?;
64+
eprintln!("{:?}", c);
65+
66+
let req = http::Request::builder()
67+
.header("host", "hyper.rs")
68+
.body(http_body_util::Empty::new())
69+
.unwrap();
70+
71+
c.ready().await?;
72+
let resp = c.call(req).await?;
73+
eprintln!("{:?}", resp);
2374
}
2475

25-
let client = Client::builder(hyper_util::rt::TokioExecutor::new()).build(HttpConnector::new());
76+
Ok(())
77+
}
78+
79+
async fn send_nego() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
80+
let tcp = hyper_util::client::legacy::connect::HttpConnector::new();
81+
82+
let http1 = tower::layer::layer_fn(|tcp| {
83+
tower::service_fn(move |dst| {
84+
let inner = tcp.call(dst);
85+
async move {
86+
let conn = inner.await?;
87+
let (mut tx, c) = hyper::client::conn::http1::handshake::<_, http_body_util::Empty<hyper::body::Bytes>>(conn).await?;
88+
tokio::spawn(async move {
89+
if let Err(e) = c.await {
90+
eprintln!("connection error: {:?}", e);
91+
}
92+
});
93+
let svc = tower::service_fn(move |req| { tx.send_request(req) });
94+
Ok::<_, Box<dyn std::error::Error + Send + Sync>>(svc)
95+
}
96+
})
97+
});
98+
2699

27-
let req = Request::builder()
28-
.uri(url)
29-
.body(Empty::<bytes::Bytes>::new())?;
100+
let http2 = tower::layer::layer_fn(|tcp| {
101+
tower::service_fn(move |dst| {
102+
let inner = tcp.call(dst);
103+
async move {
104+
let conn = inner.await?;
105+
let (mut tx, c) = hyper::client::conn::http2::handshake::<_, _, http_body_util::Empty<hyper::body::Bytes>>(
106+
hyper_util::rt::TokioExecutor::new(),
107+
conn,
108+
).await?;
109+
println!("connected");
110+
tokio::spawn(async move {
111+
if let Err(e) = c.await {
112+
eprintln!("connection error: {:?}", e);
113+
}
114+
});
115+
let svc = tower::service_fn(move |req| { tx.send_request(req) });
116+
Ok::<_, Box<dyn std::error::Error + Send + Sync>>(svc)
117+
}
118+
})
119+
});
30120

31-
let resp = client.request(req).await?;
121+
let mut svc = pool::negotiate(
122+
tcp,
123+
|_| false,
124+
http1,
125+
http2,
126+
);
32127

33-
eprintln!("{:?} {:?}", resp.version(), resp.status());
34-
eprintln!("{:#?}", resp.headers());
128+
for _ in 0..5 {
129+
let mut c = svc.call(http::Uri::from_static("http://localhost:3000")).await?;
130+
eprintln!("{:?}", c);
131+
132+
let req = http::Request::builder()
133+
.header("host", "hyper.rs")
134+
.body(http_body_util::Empty::new())
135+
.unwrap();
136+
137+
c.ready().await?;
138+
let resp = c.call(req).await?;
139+
eprintln!("{:?}", resp);
140+
}
35141

36142
Ok(())
37143
}

examples/client_legacy.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
use std::env;
2+
3+
use http_body_util::Empty;
4+
use hyper::Request;
5+
use hyper_util::client::legacy::{connect::HttpConnector, Client};
6+
7+
#[tokio::main(flavor = "current_thread")]
8+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
9+
let url = match env::args().nth(1) {
10+
Some(url) => url,
11+
None => {
12+
eprintln!("Usage: client <url>");
13+
return Ok(());
14+
}
15+
};
16+
17+
// HTTPS requires picking a TLS implementation, so give a better
18+
// warning if the user tries to request an 'https' URL.
19+
let url = url.parse::<hyper::Uri>()?;
20+
if url.scheme_str() != Some("http") {
21+
eprintln!("This example only works with 'http' URLs.");
22+
return Ok(());
23+
}
24+
25+
let client = Client::builder(hyper_util::rt::TokioExecutor::new()).build(HttpConnector::new());
26+
27+
let req = Request::builder()
28+
.uri(url)
29+
.body(Empty::<bytes::Bytes>::new())?;
30+
31+
let resp = client.request(req).await?;
32+
33+
eprintln!("{:?} {:?}", resp.version(), resp.status());
34+
eprintln!("{:#?}", resp.headers());
35+
36+
Ok(())
37+
}

src/client/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,5 @@ pub mod pool;
99

1010
#[cfg(feature = "client-proxy")]
1111
pub mod proxy;
12+
13+
pub mod pool;

src/client/pool/expire.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
pub struct Expire {
2+
3+
}

0 commit comments

Comments
 (0)