Skip to content

Commit 59adc36

Browse files
authored
Merge pull request #68 from hzxuzhonghu/cleanup-launch
runtime renaming
2 parents 1618cbe + 597914a commit 59adc36

File tree

12 files changed

+296
-122
lines changed

12 files changed

+296
-122
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@
44
*~
55
*.html
66
*.log.*
7+
.vscode

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

orion-lib/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ typed-builder = "0.18.2"
7171
url.workspace = true
7272
uuid = { version = "1.17.0", features = ["v4"] }
7373
x509-parser = { version = "0.17", features = ["default"] }
74+
tokio-util = "0.7.16"
7475

7576

7677
[dev-dependencies]

orion-lib/src/lib.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,13 +131,16 @@ pub fn new_configuration_channel(capacity: usize) -> (ConfigurationSenders, Conf
131131

132132
/// Start the listeners manager directly without spawning a background task.
133133
/// Caller must be inside a Tokio runtime and await this async function.
134-
pub async fn start_listener_manager(configuration_receivers: ConfigurationReceivers) -> Result<()> {
134+
pub async fn start_listener_manager(
135+
configuration_receivers: ConfigurationReceivers,
136+
ct: tokio_util::sync::CancellationToken,
137+
) -> Result<()> {
135138
let ConfigurationReceivers { listener_configuration_receiver, route_configuration_receiver } =
136139
configuration_receivers;
137140

138141
tracing::debug!("listeners manager starting");
139142
let mgr = ListenersManager::new(listener_configuration_receiver, route_configuration_receiver);
140-
mgr.start().await.map_err(|err| {
143+
mgr.start(ct).await.map_err(|err| {
141144
tracing::warn!(error = %err, "listeners manager exited with error");
142145
err
143146
})?;

orion-lib/src/listeners/listeners_manager.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,26 +57,30 @@ impl ListenerInfo {
5757
}
5858

5959
pub struct ListenersManager {
60-
configuration_channel: mpsc::Receiver<ListenerConfigurationChange>,
60+
listener_configuration_channel: mpsc::Receiver<ListenerConfigurationChange>,
6161
route_configuration_channel: mpsc::Receiver<RouteConfigurationChange>,
6262
listener_handles: BTreeMap<&'static str, ListenerInfo>,
6363
}
6464

6565
impl ListenersManager {
6666
pub fn new(
67-
configuration_channel: mpsc::Receiver<ListenerConfigurationChange>,
67+
listener_configuration_channel: mpsc::Receiver<ListenerConfigurationChange>,
6868
route_configuration_channel: mpsc::Receiver<RouteConfigurationChange>,
6969
) -> Self {
70-
ListenersManager { configuration_channel, route_configuration_channel, listener_handles: BTreeMap::new() }
70+
ListenersManager {
71+
listener_configuration_channel,
72+
route_configuration_channel,
73+
listener_handles: BTreeMap::new(),
74+
}
7175
}
7276

73-
pub async fn start(mut self) -> Result<()> {
77+
pub async fn start(mut self, ct: tokio_util::sync::CancellationToken) -> Result<()> {
7478
let (tx_secret_updates, _) = broadcast::channel(16);
7579
let (tx_route_updates, _) = broadcast::channel(16);
76-
80+
// TODO: create child token for each listener?
7781
loop {
7882
tokio::select! {
79-
Some(listener_configuration_change) = self.configuration_channel.recv() => {
83+
Some(listener_configuration_change) = self.listener_configuration_channel.recv() => {
8084
match listener_configuration_change {
8185
ListenerConfigurationChange::Added(boxed) => {
8286
let (factory, listener_conf) = *boxed;
@@ -112,9 +116,9 @@ impl ListenersManager {
112116
warn!("Internal problem when updating a route: {e}");
113117
}
114118
},
115-
else => {
116-
warn!("All listener manager channels are closed...exiting");
117-
return Err("All listener manager channels are closed...exiting".into());
119+
_ = ct.cancelled() => {
120+
warn!("Listener manager exiting");
121+
return Ok(());
118122
}
119123
}
120124
}

orion-proxy/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ parking_lot = "0.12.3"
3434
tokio.workspace = true
3535
tower.workspace = true
3636
tracing.workspace = true
37+
pingora-timeout = "0.3.0"
38+
3739

3840
axum = "0.8.1"
3941
compact_str.workspace = true
@@ -52,6 +54,7 @@ tracing-subscriber = { workspace = true, features = [
5254
"registry",
5355
"std",
5456
] }
57+
tokio-util = "0.7.16"
5558

5659
[target.'cfg(not(target_env = "msvc"))'.dependencies]
5760
tikv-jemallocator = { version = "0.6", optional = true }
@@ -65,5 +68,8 @@ axum-test = "17.2.0"
6568
orion-data-plane-api.workspace = true
6669
tracing-test.workspace = true
6770

71+
[target.'cfg(unix)'.dev-dependencies]
72+
libc = "0.2"
73+
6874
[lints]
6975
workspace = true

orion-proxy/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ mod admin;
2323
mod core_affinity;
2424
mod proxy;
2525
mod runtime;
26+
mod signal;
2627
mod xds_configurator;
2728

2829
pub fn run() -> Result<()> {

orion-proxy/src/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ static GLOBAL: Jemalloc = Jemalloc;
2626
#[global_allocator]
2727
static ALLOC: dhat::Alloc = dhat::Alloc;
2828

29-
fn main() -> orion_error::Result<()> {
29+
#[tokio::main]
30+
async fn main() -> orion_error::Result<()> {
3031
#[cfg(all(feature = "dhat-heap", not(feature = "jemalloc")))]
3132
let _profiler = dhat::Profiler::new_heap();
3233
orion_proxy::run()

0 commit comments

Comments
 (0)