Skip to content

Commit 56e0012

Browse files
committed
fix(xds): add mutiple async connections to the server
Signed-off-by: hanshal101 <hanshalmehta10@gmail.com>
1 parent 50689a2 commit 56e0012

File tree

5 files changed

+49
-39
lines changed

5 files changed

+49
-39
lines changed

orion-xds/examples/server.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
1111
.with(tracing_subscriber::fmt::layer())
1212
.init();
1313

14-
let (delta_resource_tx, delta_resources_rx) = tokio::sync::mpsc::channel(100);
15-
let (_stream_resource_tx, stream_resources_rx) = tokio::sync::mpsc::channel(100);
14+
let (delta_resource_tx, _delta_resources_rx) = tokio::sync::broadcast::channel::<ServerAction>(100);
15+
let (stream_resource_tx, _stream_resources_rx) = tokio::sync::broadcast::channel::<ServerAction>(100);
1616
let addr = "127.0.0.1:50051".parse()?;
17+
let delta_tx_clone = delta_resource_tx.clone();
18+
let stream_tx_clone = stream_resource_tx.clone();
1719

1820
let grpc_server = tokio::spawn(async move {
1921
info!("Server started");
20-
let res = start_aggregate_server(addr, delta_resources_rx, stream_resources_rx).await;
22+
let res = start_aggregate_server(addr, delta_tx_clone, stream_tx_clone).await;
2123
info!("Server stopped {res:?}");
2224
});
2325
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
@@ -37,7 +39,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
3739
info!("Adding cluster {cluster_id}");
3840
let cluster_resource = resources::create_cluster_resource(&cluster);
3941

40-
if delta_resource_tx.send(ServerAction::Add(cluster_resource.clone())).await.is_err() {
42+
if delta_resource_tx.send(ServerAction::Add(cluster_resource.clone())).is_err() {
4143
break;
4244
};
4345
tokio::time::sleep(Duration::from_secs(5)).await;
@@ -50,13 +52,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
5052
);
5153
let listener_resource = resources::create_listener_resource(&listener);
5254
info!("Adding listener {listener_resource:?}");
53-
if delta_resource_tx.send(ServerAction::Add(listener_resource)).await.is_err() {
55+
if delta_resource_tx.send(ServerAction::Add(listener_resource)).is_err() {
5456
break;
5557
};
5658
tokio::time::sleep(Duration::from_secs(15)).await;
5759

5860
info!("Removing cluster {cluster_id}");
59-
if delta_resource_tx.send(ServerAction::Remove(cluster_resource)).await.is_err() {
61+
if delta_resource_tx.send(ServerAction::Remove(cluster_resource)).is_err() {
6062
break;
6163
};
6264
tokio::time::sleep(Duration::from_secs(5)).await;
@@ -69,7 +71,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
6971
);
7072
let listener_resource = resources::create_listener_resource(&listener);
7173
info!("Removing listener {listener_resource:?}");
72-
if delta_resource_tx.send(ServerAction::Remove(listener_resource)).await.is_err() {
74+
if delta_resource_tx.send(ServerAction::Remove(listener_resource)).is_err() {
7375
break;
7476
};
7577
}

orion-xds/examples/server_routes_and_loads.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
1414
.with(tracing_subscriber::fmt::layer())
1515
.init();
1616

17-
let (delta_resource_tx, delta_resources_rx) = tokio::sync::mpsc::channel(100);
18-
let (_stream_resource_tx, stream_resources_rx) = tokio::sync::mpsc::channel(100);
17+
let (delta_resource_tx, _delta_resources_rx) = tokio::sync::broadcast::channel::<ServerAction>(100);
18+
let (stream_resource_tx, _stream_resources_rx) = tokio::sync::broadcast::channel::<ServerAction>(100);
1919
let addr = "127.0.0.1:50051".parse()?;
20+
let delta_tx_clone = delta_resource_tx.clone();
21+
let stream_tx_clone = stream_resource_tx.clone();
2022

2123
let grpc_server = tokio::spawn(async move {
2224
info!("Server started");
23-
let res = start_aggregate_server(addr, delta_resources_rx, stream_resources_rx).await;
25+
let res = start_aggregate_server(addr, delta_tx_clone, stream_tx_clone).await;
2426
info!("Server stopped {res:?}");
2527
});
2628
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
@@ -38,7 +40,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
3840
info!("Adding Cluster Load Assignment for cluster {cluster_id}");
3941
let load_assigment_resource = resources::create_load_assignment_resource(&cluster_id, &cla);
4042

41-
if delta_resource_tx.send(ServerAction::Add(load_assigment_resource.clone())).await.is_err() {
43+
if delta_resource_tx.send(ServerAction::Add(load_assigment_resource.clone())).is_err() {
4244
return;
4345
};
4446
tokio::time::sleep(Duration::from_secs(5)).await;
@@ -49,20 +51,20 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4951
let route_configuration_resource =
5052
resources::create_route_configuration_resource(&route_id, &route_configuration);
5153

52-
if delta_resource_tx.send(ServerAction::Add(route_configuration_resource.clone())).await.is_err() {
54+
if delta_resource_tx.send(ServerAction::Add(route_configuration_resource.clone())).is_err() {
5355
return;
5456
};
5557

5658
tokio::time::sleep(Duration::from_secs(15)).await;
5759

5860
info!("Removing cluster load assignment {cluster_id}");
59-
if delta_resource_tx.send(ServerAction::Remove(load_assigment_resource)).await.is_err() {
61+
if delta_resource_tx.send(ServerAction::Remove(load_assigment_resource)).is_err() {
6062
return;
6163
};
6264
tokio::time::sleep(Duration::from_secs(5)).await;
6365

6466
info!("Removing route configuration {route_id}");
65-
if delta_resource_tx.send(ServerAction::Remove(route_configuration_resource)).await.is_err() {
67+
if delta_resource_tx.send(ServerAction::Remove(route_configuration_resource)).is_err() {
6668
return;
6769
};
6870
tokio::time::sleep(Duration::from_secs(5)).await;

orion-xds/examples/server_secret_rotation.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
1818
.with(tracing_subscriber::fmt::layer())
1919
.init();
2020

21-
let (delta_resource_tx, delta_resources_rx) = tokio::sync::mpsc::channel(100);
22-
let (_stream_resource_tx, stream_resources_rx) = tokio::sync::mpsc::channel(100);
21+
let (delta_resource_tx, _delta_resources_rx) = tokio::sync::broadcast::channel::<ServerAction>(100);
22+
let (stream_resource_tx, _stream_resources_rx) = tokio::sync::broadcast::channel::<ServerAction>(100);
2323
let addr = "127.0.0.1:50051".parse()?;
24+
let delta_tx_clone = delta_resource_tx.clone();
25+
let stream_tx_clone = stream_resource_tx.clone();
2426

2527
let grpc_server = tokio::spawn(async move {
2628
info!("Server started");
27-
let res = start_aggregate_server(addr, delta_resources_rx, stream_resources_rx).await;
29+
let res = start_aggregate_server(addr, delta_tx_clone, stream_tx_clone).await;
2830
info!("Server stopped {res:?}");
2931
});
3032
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
@@ -55,7 +57,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
5557
info!("Adding downstream secret {secret_id}");
5658
let secret_resource = resources::create_secret_resource(secret_id, &secret);
5759

58-
if delta_resource_tx.send(ServerAction::Add(secret_resource.clone())).await.is_err() {
60+
if delta_resource_tx.send(ServerAction::Add(secret_resource.clone())).is_err() {
5961
return;
6062
};
6163

@@ -77,7 +79,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
7779
info!("Adding upstream secret {secret_id}");
7880
let secret_resource = resources::create_secret_resource(secret_id, &secret);
7981

80-
if delta_resource_tx.send(ServerAction::Add(secret_resource.clone())).await.is_err() {
82+
if delta_resource_tx.send(ServerAction::Add(secret_resource.clone())).is_err() {
8183
return;
8284
};
8385

orion-xds/examples/server_secret_rotation_simple.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ use orion_data_plane_api::envoy_data_plane_api::envoy::{
44
config::core::v3::{data_source::Specifier, DataSource},
55
extensions::transport_sockets::tls::v3::{secret, CertificateValidationContext},
66
};
7-
use orion_xds::xds::{resources, server::start_aggregate_server};
7+
use orion_xds::xds::{
8+
resources,
9+
server::{start_aggregate_server, ServerAction},
10+
};
811
use tracing::info;
912
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
1013

@@ -15,13 +18,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
1518
.with(tracing_subscriber::fmt::layer())
1619
.init();
1720

18-
let (_, delta_resources_rx) = tokio::sync::mpsc::channel(100);
19-
let (_stream_resource_tx, stream_resources_rx) = tokio::sync::mpsc::channel(100);
21+
let (delta_resource_tx, _delta_resources_rx) = tokio::sync::broadcast::channel::<ServerAction>(100);
22+
let (stream_resource_tx, _stream_resources_rx) = tokio::sync::broadcast::channel::<ServerAction>(100);
2023
let addr = "127.0.0.1:50051".parse()?;
24+
let delta_tx_clone = delta_resource_tx.clone();
25+
let stream_tx_clone = stream_resource_tx.clone();
2126

2227
let grpc_server = tokio::spawn(async move {
2328
info!("Server started");
24-
let res = start_aggregate_server(addr, delta_resources_rx, stream_resources_rx).await;
29+
let res = start_aggregate_server(addr, delta_tx_clone, stream_tx_clone).await;
2530
info!("Server stopped {res:?}");
2631
});
2732
tokio::time::sleep(std::time::Duration::from_secs(10)).await;

orion-xds/src/xds/server.rs

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,23 @@
2020

2121
use std::{net::SocketAddr, pin::Pin};
2222

23-
use atomic_take::AtomicTake;
2423
use orion_data_plane_api::envoy_data_plane_api::envoy::service::discovery::v3::{
2524
aggregated_discovery_service_server::{AggregatedDiscoveryService, AggregatedDiscoveryServiceServer},
2625
DeltaDiscoveryRequest, DeltaDiscoveryResponse, DiscoveryRequest, DiscoveryResponse, Resource, ResourceName,
2726
};
2827
use orion_data_plane_api::envoy_data_plane_api::tonic::{
2928
self, transport::Server, IntoStreamingRequest, Response, Status,
3029
};
31-
use tokio::sync::mpsc::{self, Receiver};
30+
use tokio::sync::{
31+
broadcast::Sender,
32+
mpsc::{self},
33+
};
3234
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
3335
use tracing::info;
3436

3537
use crate::xds::{self, model::XdsError};
3638

39+
#[derive(Debug, Clone)]
3740
pub enum ServerAction {
3841
Add(Resource),
3942
Remove(Resource),
@@ -42,16 +45,13 @@ pub enum ServerAction {
4245
pub type ResourceAction = ServerAction;
4346
#[derive(Debug)]
4447
pub struct AggregateServer {
45-
delta_resources_rx: AtomicTake<Receiver<ServerAction>>,
46-
stream_resources_rx: AtomicTake<Receiver<ServerAction>>,
48+
delta_resources_rx: Sender<ServerAction>,
49+
stream_resources_rx: Sender<ServerAction>,
4750
}
4851

4952
impl AggregateServer {
50-
pub fn new(delta_resources_rx: Receiver<ServerAction>, stream_resources_rx: Receiver<ServerAction>) -> Self {
51-
Self {
52-
delta_resources_rx: AtomicTake::new(delta_resources_rx),
53-
stream_resources_rx: AtomicTake::new(stream_resources_rx),
54-
}
53+
pub fn new(delta_resources_rx: Sender<ServerAction>, stream_resources_rx: Sender<ServerAction>) -> Self {
54+
Self { delta_resources_rx, stream_resources_rx }
5555
}
5656
}
5757

@@ -70,10 +70,9 @@ impl AggregatedDiscoveryService for AggregateServer {
7070
info!("\tclient connected from: {:?}", req.remote_addr());
7171

7272
let (tx, rx) = mpsc::channel(128);
73-
let mut resources_rx =
74-
self.stream_resources_rx.take().ok_or(Status::internal("Resource stream is unavailable"))?;
73+
let mut resources_rx = self.stream_resources_rx.subscribe();
7574
tokio::spawn(async move {
76-
while let Some(action) = resources_rx.recv().await {
75+
while let Ok(action) = resources_rx.recv().await {
7776
let item = match action {
7877
xds::server::ServerAction::Add(resource) => {
7978
let Some(resource) = resource.resource else {
@@ -136,9 +135,9 @@ impl AggregatedDiscoveryService for AggregateServer {
136135
// spawn and channel are required if you want handle "disconnect" functionality
137136
// the `out_stream` will not be polled after client disconnect
138137
let (tx, rx) = mpsc::channel(128);
139-
let mut resources_rx = self.delta_resources_rx.take().ok_or(Status::internal("Delta stream is unavailable"))?;
138+
let mut resources_rx = self.delta_resources_rx.subscribe();
140139
tokio::spawn(async move {
141-
while let Some(action) = resources_rx.recv().await {
140+
while let Ok(action) = resources_rx.recv().await {
142141
let item = match action {
143142
xds::server::ServerAction::Add(r) => {
144143
let Some(ref resource) = r.resource else {
@@ -198,8 +197,8 @@ impl AggregatedDiscoveryService for AggregateServer {
198197

199198
pub async fn start_aggregate_server(
200199
addr: SocketAddr,
201-
delta_resources_rx: Receiver<ResourceAction>,
202-
stream_resources_rx: Receiver<ResourceAction>,
200+
delta_resources_rx: Sender<ResourceAction>,
201+
stream_resources_rx: Sender<ResourceAction>,
203202
) -> Result<(), XdsError> {
204203
info!("Server started {addr:?}");
205204
let server = AggregateServer::new(delta_resources_rx, stream_resources_rx);

0 commit comments

Comments
 (0)