Skip to content

Commit 0f16e12

Browse files
committed
feat: allow multiple versions of the same listener
- Change storage from BTreeMap to HashMap<String, Vec<ListenerInfo>> - Add version tracking to prevent breaking existing connections - Update start_listener to create new versions instead of stopping existing ones - Modify stop_listener to handle multiple versions - Update tests to verify multiple version functionality Signed-off-by: Eeshu-Yadav <eeshuyadav123@gmail.com>
1 parent 59adc36 commit 0f16e12

File tree

1 file changed

+114
-26
lines changed

1 file changed

+114
-26
lines changed

orion-lib/src/listeners/listeners_manager.rs

Lines changed: 114 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,9 @@
1515
//
1616
//
1717

18-
use std::collections::BTreeMap;
18+
use std::collections::HashMap;
1919

2020
use tokio::sync::{broadcast, mpsc};
21-
#[cfg(debug_assertions)]
22-
use tracing::debug;
2321
use tracing::{info, warn};
2422

2523
use orion_configuration::config::{
@@ -49,17 +47,19 @@ pub enum TlsContextChange {
4947
struct ListenerInfo {
5048
handle: abort_on_drop::ChildTask<()>,
5149
listener_conf: ListenerConfig,
50+
version: u64,
5251
}
5352
impl ListenerInfo {
54-
fn new(handle: tokio::task::JoinHandle<()>, listener_conf: ListenerConfig) -> Self {
55-
Self { handle: handle.into(), listener_conf }
53+
fn new(handle: tokio::task::JoinHandle<()>, listener_conf: ListenerConfig, version: u64) -> Self {
54+
Self { handle: handle.into(), listener_conf, version }
5655
}
5756
}
5857

5958
pub struct ListenersManager {
6059
listener_configuration_channel: mpsc::Receiver<ListenerConfigurationChange>,
6160
route_configuration_channel: mpsc::Receiver<RouteConfigurationChange>,
62-
listener_handles: BTreeMap<&'static str, ListenerInfo>,
61+
listener_handles: HashMap<String, Vec<ListenerInfo>>,
62+
version_counter: u64,
6363
}
6464

6565
impl ListenersManager {
@@ -70,7 +70,8 @@ impl ListenersManager {
7070
ListenersManager {
7171
listener_configuration_channel,
7272
route_configuration_channel,
73-
listener_handles: BTreeMap::new(),
73+
listener_handles: HashMap::new(),
74+
version_counter: 0,
7475
}
7576
}
7677

@@ -103,6 +104,7 @@ impl ListenersManager {
103104
ListenerConfigurationChange::GetConfiguration(config_dump_tx) => {
104105
let listeners: Vec<ListenerConfig> = self.listener_handles
105106
.values()
107+
.flatten()
106108
.map(|info| info.listener_conf.clone())
107109
.collect();
108110
config_dump_tx.send(ConfigDump { listeners: Some(listeners), ..Default::default() }).await?;
@@ -125,31 +127,41 @@ impl ListenersManager {
125127
}
126128

127129
pub fn start_listener(&mut self, listener: Listener, listener_conf: ListenerConfig) -> Result<()> {
128-
let listener_name = listener.get_name();
130+
let listener_name = listener.get_name().to_string();
129131
let (addr, dev) = listener.get_socket();
130132
info!("Listener {} at {addr} (device bind:{})", listener_name, dev.is_some());
131-
// spawn the task for this listener address, this will spawn additional task per connection
133+
134+
self.version_counter += 1;
135+
let version = self.version_counter;
136+
137+
info!("Starting new version {} of listener {}", version, listener_name);
138+
139+
let listener_name_clone = listener_name.clone();
140+
132141
let join_handle = tokio::spawn(async move {
133142
let error = listener.start().await;
134-
warn!("Listener {listener_name} exited: {error}");
143+
warn!("Listener {} version {} exited: {}", listener_name_clone, version, error);
135144
});
136-
#[cfg(debug_assertions)]
137-
if self.listener_handles.contains_key(&listener_name) {
138-
debug!("Listener {listener_name} already exists, replacing it");
139-
}
140-
// note: join handle gets overwritten here if it already exists.
141-
// handles are abort on drop so will be aborted, closing the socket
142-
// but the any tasks spawned within this task, which happens on a per-connection basis,
143-
// will survive past this point and only get dropped when their session ends
144-
self.listener_handles.insert(listener_name, ListenerInfo::new(join_handle, listener_conf));
145+
146+
let listener_info = ListenerInfo::new(join_handle, listener_conf, version);
147+
148+
self.listener_handles.entry(listener_name.clone()).or_insert_with(Vec::new).push(listener_info);
149+
150+
info!("Listener {} now has {} active version(s)", listener_name, self.listener_handles.get(&listener_name).unwrap().len());
145151

146152
Ok(())
147153
}
148154

149155
pub fn stop_listener(&mut self, listener_name: &str) -> Result<()> {
150-
if let Some(abort_handler) = self.listener_handles.remove(listener_name) {
151-
info!("{listener_name} : Stopped");
152-
abort_handler.handle.abort();
156+
if let Some(listeners) = self.listener_handles.get_mut(listener_name) {
157+
info!("Stopping all {} version(s) of listener {}", listeners.len(), listener_name);
158+
for listener_info in listeners.drain(..) {
159+
info!("Stopping listener {} version {}", listener_name, listener_info.version);
160+
listener_info.handle.abort();
161+
}
162+
self.listener_handles.remove(listener_name);
163+
} else {
164+
info!("No listeners found with name {}", listener_name);
153165
}
154166

155167
Ok(())
@@ -202,9 +214,11 @@ mod tests {
202214
assert!(routeb_tx2.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
203215
tokio::task::yield_now().await;
204216

205-
// This should fail because the old listener exited already dropping the rx
206-
assert!(routeb_tx1.send(RouteConfigurationChange::Removed("n/a".into())).is_err());
207-
// Yield once more just in case more logs can be seen
217+
// Both listeners should still be active (multiple versions allowed)
218+
assert!(routeb_tx1.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
219+
assert!(routeb_tx2.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
220+
221+
assert_eq!(man.listener_handles.get(name).unwrap().len(), 2);
208222
tokio::task::yield_now().await;
209223
}
210224

@@ -239,7 +253,7 @@ mod tests {
239253

240254
// See .start_listener() - in the case all channels are dropped the task there
241255
// should exit with this warning msg
242-
let expected = format!("Listener {name} exited: channel closed");
256+
let expected = format!("Listener {name} version 1 exited: channel closed");
243257
logs_assert(|lines: &[&str]| {
244258
let logs: Vec<_> = lines.iter().filter(|ln| ln.contains(&expected)).collect();
245259
if logs.len() == 1 {
@@ -249,4 +263,78 @@ mod tests {
249263
}
250264
});
251265
}
266+
267+
#[traced_test]
268+
#[tokio::test]
269+
async fn start_multiple_listener_versions() {
270+
let chan = 10;
271+
let name = "multi-version-listener";
272+
273+
let (_conf_tx, conf_rx) = mpsc::channel(chan);
274+
let (_route_tx, route_rx) = mpsc::channel(chan);
275+
let mut man = ListenersManager::new(conf_rx, route_rx);
276+
277+
let (routeb_tx1, routeb_rx) = broadcast::channel(chan);
278+
let (_secb_tx1, secb_rx) = broadcast::channel(chan);
279+
let l1 = Listener::test_listener(name, routeb_rx, secb_rx);
280+
let l1_info = ListenerConfig {
281+
name: name.into(),
282+
address: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1234),
283+
filter_chains: HashMap::default(),
284+
bind_device: None,
285+
with_tls_inspector: false,
286+
proxy_protocol_config: None,
287+
with_tlv_listener_filter: false,
288+
tlv_listener_filter_config: None,
289+
};
290+
man.start_listener(l1, l1_info).unwrap();
291+
assert!(routeb_tx1.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
292+
tokio::task::yield_now().await;
293+
294+
let (routeb_tx2, routeb_rx) = broadcast::channel(chan);
295+
let (_secb_tx2, secb_rx) = broadcast::channel(chan);
296+
let l2 = Listener::test_listener(name, routeb_rx, secb_rx);
297+
let l2_info = ListenerConfig {
298+
name: name.into(),
299+
address: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1235), // Different port
300+
filter_chains: HashMap::default(),
301+
bind_device: None,
302+
with_tls_inspector: false,
303+
proxy_protocol_config: None,
304+
with_tlv_listener_filter: false,
305+
tlv_listener_filter_config: None,
306+
};
307+
man.start_listener(l2, l2_info).unwrap();
308+
assert!(routeb_tx2.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
309+
tokio::task::yield_now().await;
310+
311+
let (routeb_tx3, routeb_rx) = broadcast::channel(chan);
312+
let (_secb_tx3, secb_rx) = broadcast::channel(chan);
313+
let l3 = Listener::test_listener(name, routeb_rx, secb_rx);
314+
let l3_info = ListenerConfig {
315+
name: name.into(),
316+
address: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1236), // Different port
317+
filter_chains: HashMap::default(),
318+
bind_device: None,
319+
with_tls_inspector: false,
320+
proxy_protocol_config: None,
321+
with_tlv_listener_filter: false,
322+
tlv_listener_filter_config: None,
323+
};
324+
man.start_listener(l3, l3_info).unwrap();
325+
assert!(routeb_tx3.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
326+
tokio::task::yield_now().await;
327+
328+
assert!(routeb_tx1.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
329+
assert!(routeb_tx2.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
330+
assert!(routeb_tx3.send(RouteConfigurationChange::Removed("n/a".into())).is_ok());
331+
332+
assert_eq!(man.listener_handles.get(name).unwrap().len(), 3);
333+
334+
man.stop_listener(name).unwrap();
335+
336+
assert!(man.listener_handles.get(name).is_none());
337+
338+
tokio::task::yield_now().await;
339+
}
252340
}

0 commit comments

Comments
 (0)