diff --git a/Cargo.lock b/Cargo.lock index 9064c1d1..427e3740 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1897,6 +1897,9 @@ name = "multimap" version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" +dependencies = [ + "serde", +] [[package]] name = "nohash" @@ -2234,6 +2237,7 @@ dependencies = [ "hyper-util", "ipnet", "lru_time_cache", + "multimap", "once_cell", "opentelemetry", "orion-configuration", diff --git a/orion-lib/Cargo.toml b/orion-lib/Cargo.toml index 9cc5dac8..3319e985 100644 --- a/orion-lib/Cargo.toml +++ b/orion-lib/Cargo.toml @@ -27,6 +27,7 @@ hyper = { version = "1", features = ["full"] } hyper-rustls = { version = "0.27.1", features = ["default", "http2"] } ipnet = "2.9" lru_time_cache = "0.11.11" +multimap = "0.10.0" once_cell = { version = "1.19" } opentelemetry = "0.29.0" hyper-util.workspace = true diff --git a/orion-lib/src/listeners/listeners_manager.rs b/orion-lib/src/listeners/listeners_manager.rs index 645a2ef1..65e53c20 100644 --- a/orion-lib/src/listeners/listeners_manager.rs +++ b/orion-lib/src/listeners/listeners_manager.rs @@ -15,11 +15,8 @@ // // -use std::collections::BTreeMap; - +use multimap::MultiMap; use tokio::sync::{broadcast, mpsc}; -#[cfg(debug_assertions)] -use tracing::debug; use tracing::{info, warn}; use orion_configuration::config::{ @@ -49,17 +46,19 @@ pub enum TlsContextChange { struct ListenerInfo { handle: abort_on_drop::ChildTask<()>, listener_conf: ListenerConfig, + version: u64, } impl ListenerInfo { - fn new(handle: tokio::task::JoinHandle<()>, listener_conf: ListenerConfig) -> Self { - Self { handle: handle.into(), listener_conf } + fn new(handle: tokio::task::JoinHandle<()>, listener_conf: ListenerConfig, version: u64) -> Self { + Self { handle: handle.into(), listener_conf, version } } } pub struct ListenersManager { listener_configuration_channel: mpsc::Receiver, route_configuration_channel: mpsc::Receiver, - listener_handles: BTreeMap<&'static str, ListenerInfo>, + listener_handles: MultiMap, + version_counter: u64, } impl ListenersManager { @@ -70,7 +69,8 @@ impl ListenersManager { ListenersManager { listener_configuration_channel, route_configuration_channel, - listener_handles: BTreeMap::new(), + listener_handles: MultiMap::new(), + version_counter: 0, } } @@ -102,8 +102,8 @@ impl ListenersManager { }, ListenerConfigurationChange::GetConfiguration(config_dump_tx) => { let listeners: Vec = self.listener_handles - .values() - .map(|info| info.listener_conf.clone()) + .iter() + .map(|(_, info)| info.listener_conf.clone()) .collect(); config_dump_tx.send(ConfigDump { listeners: Some(listeners), ..Default::default() }).await?; }, @@ -125,31 +125,39 @@ impl ListenersManager { } pub fn start_listener(&mut self, listener: Listener, listener_conf: ListenerConfig) -> Result<()> { - let listener_name = listener.get_name(); + let listener_name = listener.get_name().to_string(); let (addr, dev) = listener.get_socket(); info!("Listener {} at {addr} (device bind:{})", listener_name, dev.is_some()); - // spawn the task for this listener address, this will spawn additional task per connection + + self.version_counter += 1; + let version = self.version_counter; + + let listener_name_for_async = listener_name.clone(); + let join_handle = tokio::spawn(async move { let error = listener.start().await; - warn!("Listener {listener_name} exited: {error}"); + info!("Listener {} version {} exited: {}", listener_name_for_async, version, error); }); - #[cfg(debug_assertions)] - if self.listener_handles.contains_key(&listener_name) { - debug!("Listener {listener_name} already exists, replacing it"); - } - // note: join handle gets overwritten here if it already exists. - // handles are abort on drop so will be aborted, closing the socket - // but the any tasks spawned within this task, which happens on a per-connection basis, - // will survive past this point and only get dropped when their session ends - self.listener_handles.insert(listener_name, ListenerInfo::new(join_handle, listener_conf)); + + let listener_info = ListenerInfo::new(join_handle, listener_conf, version); + self.listener_handles.insert(listener_name.clone(), listener_info); + + let version_count = self.listener_handles.get_vec(&listener_name).map(|v| v.len()).unwrap_or(0); + info!("Started version {} of listener {} ({} total active version(s))", version, listener_name, version_count); Ok(()) } pub fn stop_listener(&mut self, listener_name: &str) -> Result<()> { - if let Some(abort_handler) = self.listener_handles.remove(listener_name) { - info!("{listener_name} : Stopped"); - abort_handler.handle.abort(); + if let Some(listeners) = self.listener_handles.get_vec_mut(listener_name) { + info!("Stopping all {} version(s) of listener {}", listeners.len(), listener_name); + for listener_info in listeners.drain(..) { + info!("Stopping listener {} version {}", listener_name, listener_info.version); + listener_info.handle.abort(); + } + self.listener_handles.remove(listener_name); + } else { + info!("No listeners found with name {}", listener_name); } Ok(()) @@ -202,9 +210,11 @@ mod tests { assert!(routeb_tx2.send(RouteConfigurationChange::Removed("n/a".into())).is_ok()); tokio::task::yield_now().await; - // This should fail because the old listener exited already dropping the rx - assert!(routeb_tx1.send(RouteConfigurationChange::Removed("n/a".into())).is_err()); - // Yield once more just in case more logs can be seen + // Both listeners should still be active (multiple versions allowed) + assert!(routeb_tx1.send(RouteConfigurationChange::Removed("n/a".into())).is_ok()); + assert!(routeb_tx2.send(RouteConfigurationChange::Removed("n/a".into())).is_ok()); + + assert_eq!(man.listener_handles.get_vec(name).unwrap().len(), 2); tokio::task::yield_now().await; } @@ -239,7 +249,7 @@ mod tests { // See .start_listener() - in the case all channels are dropped the task there // should exit with this warning msg - let expected = format!("Listener {name} exited: channel closed"); + let expected = format!("Listener {name} version 1 exited: channel closed"); logs_assert(|lines: &[&str]| { let logs: Vec<_> = lines.iter().filter(|ln| ln.contains(&expected)).collect(); if logs.len() == 1 { @@ -249,4 +259,78 @@ mod tests { } }); } + + #[traced_test] + #[tokio::test] + async fn start_multiple_listener_versions() { + let chan = 10; + let name = "multi-version-listener"; + + let (_conf_tx, conf_rx) = mpsc::channel(chan); + let (_route_tx, route_rx) = mpsc::channel(chan); + let mut man = ListenersManager::new(conf_rx, route_rx); + + let (routeb_tx1, routeb_rx) = broadcast::channel(chan); + let (_secb_tx1, secb_rx) = broadcast::channel(chan); + let l1 = Listener::test_listener(name, routeb_rx, secb_rx); + let l1_info = ListenerConfig { + name: name.into(), + address: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1234), + filter_chains: HashMap::default(), + bind_device: None, + with_tls_inspector: false, + proxy_protocol_config: None, + with_tlv_listener_filter: false, + tlv_listener_filter_config: None, + }; + man.start_listener(l1, l1_info).unwrap(); + assert!(routeb_tx1.send(RouteConfigurationChange::Removed("n/a".into())).is_ok()); + tokio::task::yield_now().await; + + let (routeb_tx2, routeb_rx) = broadcast::channel(chan); + let (_secb_tx2, secb_rx) = broadcast::channel(chan); + let l2 = Listener::test_listener(name, routeb_rx, secb_rx); + let l2_info = ListenerConfig { + name: name.into(), + address: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1235), // Different port + filter_chains: HashMap::default(), + bind_device: None, + with_tls_inspector: false, + proxy_protocol_config: None, + with_tlv_listener_filter: false, + tlv_listener_filter_config: None, + }; + man.start_listener(l2, l2_info).unwrap(); + assert!(routeb_tx2.send(RouteConfigurationChange::Removed("n/a".into())).is_ok()); + tokio::task::yield_now().await; + + let (routeb_tx3, routeb_rx) = broadcast::channel(chan); + let (_secb_tx3, secb_rx) = broadcast::channel(chan); + let l3 = Listener::test_listener(name, routeb_rx, secb_rx); + let l3_info = ListenerConfig { + name: name.into(), + address: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1236), // Different port + filter_chains: HashMap::default(), + bind_device: None, + with_tls_inspector: false, + proxy_protocol_config: None, + with_tlv_listener_filter: false, + tlv_listener_filter_config: None, + }; + man.start_listener(l3, l3_info).unwrap(); + assert!(routeb_tx3.send(RouteConfigurationChange::Removed("n/a".into())).is_ok()); + tokio::task::yield_now().await; + + assert!(routeb_tx1.send(RouteConfigurationChange::Removed("n/a".into())).is_ok()); + assert!(routeb_tx2.send(RouteConfigurationChange::Removed("n/a".into())).is_ok()); + assert!(routeb_tx3.send(RouteConfigurationChange::Removed("n/a".into())).is_ok()); + + assert_eq!(man.listener_handles.get_vec(name).unwrap().len(), 3); + + man.stop_listener(name).unwrap(); + + assert!(man.listener_handles.get_vec(name).is_none()); + + tokio::task::yield_now().await; + } }