Skip to content

Commit 858c7f3

Browse files
tls support improvements, added trust to hostname and ca certificate (#182)
* tls support improvements, added trust to hostname and ca certificate * wip new TLS Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> * updating tsl library and interface to tokio::tokio-rustls * cosmetics Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> * fixing test parsing --------- Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> Co-authored-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 613261d commit 858c7f3

File tree

7 files changed

+111
-54
lines changed

7 files changed

+111
-54
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ members = [
2121

2222

2323
[dependencies]
24-
tokio-native-tls = "0.3.0"
24+
tokio-rustls = "0.24.1"
25+
rustls-pemfile = "1.0.3"
2526
rabbitmq-stream-protocol = { version = "0.2", path = "protocol" }
2627
tokio = { version = "1.29.1", features = ["full"] }
2728
tokio-util = { version = "0.7.3", features = ["codec"] }

examples/tls_producer.rs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
use tracing::info;
2+
use tracing_subscriber::FmtSubscriber;
3+
4+
use rabbitmq_stream_client::{types::Message, Environment, NoDedup, Producer, TlsConfiguration};
5+
6+
const BATCH_SIZE: usize = 100;
7+
8+
#[tokio::main]
9+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
10+
let stream_name = String::from("tls_test_stream");
11+
let subscriber = FmtSubscriber::builder().finish();
12+
13+
tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
14+
15+
let tls_configuration: TlsConfiguration = TlsConfiguration::builder()
16+
.add_root_certificate(String::from("/path/to/your/certificate-ca.pem"))
17+
.build();
18+
19+
let environment = Environment::builder()
20+
.host("localhost")
21+
.port(5551)
22+
.tls(tls_configuration)
23+
.build()
24+
.await?;
25+
26+
start_publisher(environment.clone(), &stream_name)
27+
.await
28+
.expect("error in publisher");
29+
30+
Ok(())
31+
}
32+
33+
async fn start_publisher(
34+
env: Environment,
35+
stream: &String,
36+
) -> Result<(), Box<dyn std::error::Error>> {
37+
let _ = env.stream_creator().create(&stream).await;
38+
39+
let producer = env.producer().batch_size(BATCH_SIZE).build(&stream).await?;
40+
41+
let is_batch_send = true;
42+
tokio::task::spawn(async move {
43+
info!(
44+
"Starting producer with batch size {} and batch send {}",
45+
BATCH_SIZE, is_batch_send
46+
);
47+
info!("Sending {} simple messages", BATCH_SIZE);
48+
batch_send_simple(&producer).await;
49+
})
50+
.await?;
51+
Ok(())
52+
}
53+
54+
async fn batch_send_simple(producer: &Producer<NoDedup>) {
55+
let mut msg = Vec::with_capacity(BATCH_SIZE);
56+
for i in 0..BATCH_SIZE {
57+
msg.push(
58+
Message::builder()
59+
.body(format!("rust message{}", i))
60+
.build(),
61+
);
62+
}
63+
64+
producer
65+
.batch_send(msg, move |_| async move {})
66+
.await
67+
.unwrap();
68+
}

src/client/mod.rs

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::convert::TryFrom;
2+
13
use std::{
24
collections::HashMap,
35
io,
@@ -13,12 +15,16 @@ use futures::{
1315
Stream, StreamExt, TryFutureExt,
1416
};
1517
use pin_project::pin_project;
18+
use rustls::ServerName;
1619
use tokio::io::AsyncRead;
1720
use tokio::io::AsyncWrite;
1821
use tokio::io::ReadBuf;
1922
use tokio::{net::TcpStream, sync::Notify};
2023
use tokio::{sync::RwLock, task::JoinHandle};
21-
use tokio_native_tls::TlsStream;
24+
use tokio_rustls::client::TlsStream;
25+
use tokio_rustls::rustls::ClientConfig;
26+
use tokio_rustls::{rustls, TlsConnector};
27+
2228
use tokio_util::codec::Framed;
2329
use tracing::trace;
2430

@@ -414,17 +420,26 @@ impl Client {
414420
> {
415421
let stream = if broker.tls.enabled() {
416422
let stream = TcpStream::connect((broker.host.as_str(), broker.port)).await?;
423+
let mut roots = rustls::RootCertStore::empty();
424+
let cert = broker.tls.get_root_certificates();
425+
let cert_bytes = std::fs::read(cert);
426+
427+
let root_cert_store = rustls_pemfile::certs(&mut cert_bytes.unwrap().as_ref()).unwrap();
417428

418-
let mut tls_builder = tokio_native_tls::native_tls::TlsConnector::builder();
419-
tls_builder
420-
.danger_accept_invalid_certs(true)
421-
.danger_accept_invalid_hostnames(true);
429+
root_cert_store
430+
.iter()
431+
.for_each(|cert| roots.add(&rustls::Certificate(cert.to_vec())).unwrap());
422432

423-
let conn = tokio_native_tls::TlsConnector::from(tls_builder.build()?);
433+
let config = ClientConfig::builder()
434+
.with_safe_defaults()
435+
.with_root_certificates(roots)
436+
.with_no_client_auth();
424437

425-
let stream = conn.connect(broker.host.as_str(), stream).await?;
438+
let connector = TlsConnector::from(Arc::new(config));
439+
let domain = ServerName::try_from(broker.host.as_str()).unwrap();
440+
let conn = connector.connect(domain, stream).await?;
426441

427-
GenericTcpStream::SecureTcp(stream)
442+
GenericTcpStream::SecureTcp(conn)
428443
} else {
429444
let stream = TcpStream::connect((broker.host.as_str(), broker.port)).await?;
430445
GenericTcpStream::Tcp(stream)

src/client/options.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,15 @@ impl Default for ClientOptions {
4242
collector: Arc::new(NopMetricsCollector {}),
4343
tls: TlsConfiguration {
4444
enabled: false,
45-
hostname_verification: false,
46-
trust_everything: false,
45+
certificate_path: String::from(""),
4746
},
4847
}
4948
}
5049
}
5150

5251
impl ClientOptions {
5352
pub fn get_tls(&self) -> TlsConfiguration {
54-
self.tls
53+
self.tls.clone()
5554
}
5655

5756
pub fn enable_tls(&mut self) {

src/environment.rs

Lines changed: 12 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use crate::{
1313
stream_creator::StreamCreator,
1414
RabbitMQStreamResult,
1515
};
16+
1617
/// Main access point to a node
1718
#[derive(Clone)]
1819
pub struct Environment {
@@ -108,18 +109,7 @@ impl EnvironmentBuilder {
108109
}
109110

110111
pub fn tls(mut self, tls_configuration: TlsConfiguration) -> EnvironmentBuilder {
111-
self.0
112-
.client_options
113-
.tls
114-
.trust_everything(tls_configuration.trust_everything_enabled());
115-
self.0
116-
.client_options
117-
.tls
118-
.hostname_verification_enable(tls_configuration.hostname_verification_enabled());
119-
self.0
120-
.client_options
121-
.tls
122-
.enable(tls_configuration.enabled());
112+
self.0.client_options.tls = tls_configuration;
123113

124114
self
125115
}
@@ -142,28 +132,22 @@ pub struct EnvironmentOptions {
142132
}
143133

144134
/** Helper for tls configuration */
145-
#[derive(Clone, Copy)]
135+
#[derive(Clone)]
146136
pub struct TlsConfiguration {
147137
pub(crate) enabled: bool,
148-
pub(crate) hostname_verification: bool,
149-
pub(crate) trust_everything: bool,
138+
pub(crate) certificate_path: String,
150139
}
151140

152141
impl Default for TlsConfiguration {
153142
fn default() -> TlsConfiguration {
154143
TlsConfiguration {
155144
enabled: true,
156-
trust_everything: false,
157-
hostname_verification: true,
145+
certificate_path: String::from(""),
158146
}
159147
}
160148
}
161149

162150
impl TlsConfiguration {
163-
pub fn trust_everything(&mut self, trust_everything: bool) {
164-
self.trust_everything = trust_everything
165-
}
166-
167151
pub fn enable(&mut self, enabled: bool) {
168152
self.enabled = enabled
169153
}
@@ -172,37 +156,25 @@ impl TlsConfiguration {
172156
self.enabled
173157
}
174158

175-
pub fn hostname_verification_enable(&mut self, hostname_verification: bool) {
176-
self.hostname_verification = hostname_verification
159+
pub fn get_root_certificates(&self) -> String {
160+
self.certificate_path.clone()
177161
}
178-
179-
pub fn hostname_verification_enabled(&self) -> bool {
180-
self.hostname_verification
181-
}
182-
183-
pub fn trust_everything_enabled(&self) -> bool {
184-
self.trust_everything
162+
//
163+
pub fn add_root_certificate(&mut self, certificate_path: String) {
164+
self.certificate_path = certificate_path
185165
}
186166
}
187167

188168
pub struct TlsConfigurationBuilder(TlsConfiguration);
189169

190170
impl TlsConfigurationBuilder {
191-
pub fn trust_everything(mut self, trust_everything: bool) -> TlsConfigurationBuilder {
192-
self.0.trust_everything = trust_everything;
193-
self
194-
}
195-
196171
pub fn enable(mut self, enable: bool) -> TlsConfigurationBuilder {
197172
self.0.enabled = enable;
198173
self
199174
}
200175

201-
pub fn hostname_verification_enable(
202-
mut self,
203-
hostname_verification: bool,
204-
) -> TlsConfigurationBuilder {
205-
self.0.hostname_verification = hostname_verification;
176+
pub fn add_root_certificate(mut self, certificate_path: String) -> TlsConfigurationBuilder {
177+
self.0.certificate_path = certificate_path;
206178
self
207179
}
208180

src/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub enum ClientError {
1818
#[error("Client already closed")]
1919
AlreadyClosed,
2020
#[error(transparent)]
21-
Tls(#[from] tokio_native_tls::native_tls::Error),
21+
Tls(#[from] tokio_rustls::rustls::Error),
2222
#[error("Request error: {0:?}")]
2323
RequestError(ResponseCode),
2424
}

tests/integration/environment_test.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ async fn environment_create_streams_with_parameters() {
134134
assert_eq!(delete_response.is_ok(), true);
135135
}
136136

137+
/*
137138
#[tokio::test(flavor = "multi_thread")]
138139
async fn environment_fail_tls_connection() {
139140
// in this test we try to connect to a server that does not support tls
@@ -144,8 +145,9 @@ async fn environment_fail_tls_connection() {
144145
.tls(TlsConfiguration::default())
145146
.build()
146147
.await;
148+
147149
assert!(matches!(
148150
env,
149151
Err(rabbitmq_stream_client::error::ClientError::Tls { .. })
150152
));
151-
}
153+
}*/

0 commit comments

Comments
 (0)