1515//
1616//
1717
18- use std :: collections :: HashMap ;
18+ use multimap :: MultiMap ;
1919use std:: time:: Duration ;
2020
2121use tokio:: sync:: { broadcast, mpsc} ;
@@ -55,8 +55,6 @@ pub struct ListenerManagerConfig {
5555#[ derive( Debug , Clone ) ]
5656pub enum CleanupPolicy {
5757 CountBasedOnly ( usize ) ,
58- TimeBasedOnly ( Duration ) ,
59- Hybrid { timeout : Duration , max_count : usize } ,
6058}
6159
6260impl Default for ListenerManagerConfig {
@@ -83,7 +81,7 @@ impl ListenerInfo {
8381pub struct ListenersManager {
8482 listener_configuration_channel : mpsc:: Receiver < ListenerConfigurationChange > ,
8583 route_configuration_channel : mpsc:: Receiver < RouteConfigurationChange > ,
86- listener_handles : HashMap < String , Vec < ListenerInfo > > ,
84+ listener_handles : MultiMap < String , ListenerInfo > ,
8785 version_counter : u64 ,
8886 config : ListenerManagerConfig ,
8987}
@@ -92,12 +90,19 @@ impl ListenersManager {
9290 pub fn new (
9391 listener_configuration_channel : mpsc:: Receiver < ListenerConfigurationChange > ,
9492 route_configuration_channel : mpsc:: Receiver < RouteConfigurationChange > ,
93+ ) -> Self {
94+ Self :: with_config ( listener_configuration_channel, route_configuration_channel, ListenerManagerConfig :: default ( ) )
95+ }
96+
97+ pub fn with_config (
98+ listener_configuration_channel : mpsc:: Receiver < ListenerConfigurationChange > ,
99+ route_configuration_channel : mpsc:: Receiver < RouteConfigurationChange > ,
95100 config : ListenerManagerConfig ,
96101 ) -> Self {
97102 ListenersManager {
98103 listener_configuration_channel,
99104 route_configuration_channel,
100- listener_handles : HashMap :: new ( ) ,
105+ listener_handles : MultiMap :: new ( ) ,
101106 version_counter : 0 ,
102107 config,
103108 }
@@ -131,9 +136,8 @@ impl ListenersManager {
131136 } ,
132137 ListenerConfigurationChange :: GetConfiguration ( config_dump_tx) => {
133138 let listeners: Vec <ListenerConfig > = self . listener_handles
134- . values( )
135- . flatten( )
136- . map( |info| info. listener_conf. clone( ) )
139+ . iter( )
140+ . map( |( _, info) | info. listener_conf. clone( ) )
137141 . collect( ) ;
138142 config_dump_tx. send( ConfigDump { listeners: Some ( listeners) , ..Default :: default ( ) } ) . await ?;
139143 } ,
@@ -162,20 +166,17 @@ impl ListenersManager {
162166 self . version_counter += 1 ;
163167 let version = self . version_counter ;
164168
165- info ! ( "Starting new version {} of listener {}" , version, listener_name) ;
166-
167- let listener_name_clone = listener_name. clone ( ) ;
168-
169+ let listener_name_for_async = listener_name. clone ( ) ;
169170 let join_handle = tokio:: spawn ( async move {
170171 let error = listener. start ( ) . await ;
171- warn ! ( "Listener {} version {} exited: {}" , listener_name_clone , version, error) ;
172+ info ! ( "Listener {} version {} exited: {}" , listener_name_for_async , version, error) ;
172173 } ) ;
173174
174175 let listener_info = ListenerInfo :: new ( join_handle, listener_conf, version) ;
175176
176- let versions = self . listener_handles . entry ( listener_name. clone ( ) ) . or_default ( ) ;
177- versions . push ( listener_info ) ;
178- info ! ( "Listener {} now has {} active version(s)" , listener_name, versions . len ( ) ) ;
177+ self . listener_handles . insert ( listener_name. clone ( ) , listener_info ) ;
178+ let version_count = self . listener_handles . get_vec ( & listener_name ) . map ( |v| v . len ( ) ) . unwrap_or ( 0 ) ;
179+ info ! ( "Started version {} of listener {} ({} total active version(s)) " , version , listener_name, version_count ) ;
179180
180181 self . cleanup_old_versions ( & listener_name) ;
181182
@@ -197,48 +198,33 @@ impl ListenersManager {
197198 }
198199
199200 fn cleanup_old_versions ( & mut self , listener_name : & str ) {
200- if let Some ( versions) = self . listener_handles . get_mut ( listener_name) {
201+ if let Some ( mut versions) = self . listener_handles . remove ( listener_name) {
201202 let original_count = versions. len ( ) ;
202203
203204 match & self . config . cleanup_policy {
204205 CleanupPolicy :: CountBasedOnly ( max_count) => {
205206 if versions. len ( ) > * max_count {
206207 let to_remove = versions. len ( ) - max_count;
207- for _ in 0 ..to_remove {
208- let old = versions . remove ( 0 ) ;
208+ let removed = versions . drain ( 0 ..to_remove) . collect :: < Vec < _ > > ( ) ;
209+ for old in removed {
209210 info ! ( "Cleaning up old listener {} version {} (count limit)" , listener_name, old. version) ;
210211 }
211212 }
212213 } ,
213- CleanupPolicy :: TimeBasedOnly ( _timeout) => {
214- // TODO: Implement time-based cleanup when we have connection tracking
215- // For now, behave like count-based with default limit
216- if versions. len ( ) > self . config . max_versions_per_listener {
217- let to_remove = versions. len ( ) - self . config . max_versions_per_listener ;
218- for _ in 0 ..to_remove {
219- let old = versions. remove ( 0 ) ;
220- info ! ( "Cleaning up old listener {} version {} (time limit)" , listener_name, old. version) ;
221- }
222- }
223- } ,
224- CleanupPolicy :: Hybrid { max_count, .. } => {
225- if versions. len ( ) > * max_count {
226- let to_remove = versions. len ( ) - max_count;
227- for _ in 0 ..to_remove {
228- let old = versions. remove ( 0 ) ;
229- info ! ( "Cleaning up old listener {} version {} (hybrid limit)" , listener_name, old. version) ;
230- }
231- }
232- } ,
233214 }
234215
235- let cleaned_count = original_count - versions. len ( ) ;
236- if cleaned_count > 0 {
216+ // Re-insert the remaining versions
217+ for version in versions {
218+ self . listener_handles . insert ( listener_name. to_string ( ) , version) ;
219+ }
220+
221+ let remaining_count = self . listener_handles . get_vec ( listener_name) . map ( |v| v. len ( ) ) . unwrap_or ( 0 ) ;
222+ if original_count != remaining_count {
237223 info ! (
238- "Cleaned up {} old versions of listener {}, {} versions remaining" ,
239- cleaned_count ,
224+ "Cleaned up {} old version(s) of listener {}, {} remaining" ,
225+ original_count - remaining_count ,
240226 listener_name,
241- versions . len ( )
227+ remaining_count
242228 ) ;
243229 }
244230 }
@@ -277,8 +263,7 @@ mod tests {
277263
278264 let ( _conf_tx, conf_rx) = mpsc:: channel ( chan) ;
279265 let ( _route_tx, route_rx) = mpsc:: channel ( chan) ;
280- let config = ListenerManagerConfig :: default ( ) ;
281- let mut man = ListenersManager :: new ( conf_rx, route_rx, config) ;
266+ let mut man = ListenersManager :: new ( conf_rx, route_rx) ;
282267
283268 let ( routeb_tx1, routeb_rx) = broadcast:: channel ( chan) ;
284269 let ( _secb_tx1, secb_rx) = broadcast:: channel ( chan) ;
@@ -309,7 +294,7 @@ mod tests {
309294 assert ! ( routeb_tx1. send( RouteConfigurationChange :: Removed ( "n/a" . into( ) ) ) . is_ok( ) ) ;
310295 assert ! ( routeb_tx2. send( RouteConfigurationChange :: Removed ( "n/a" . into( ) ) ) . is_ok( ) ) ;
311296
312- assert_eq ! ( man. listener_handles. get ( name) . unwrap( ) . len( ) , 2 ) ;
297+ assert_eq ! ( man. listener_handles. get_vec ( name) . unwrap( ) . len( ) , 2 ) ;
313298 tokio:: task:: yield_now ( ) . await ;
314299 }
315300
@@ -321,8 +306,7 @@ mod tests {
321306
322307 let ( _conf_tx, conf_rx) = mpsc:: channel ( chan) ;
323308 let ( _route_tx, route_rx) = mpsc:: channel ( chan) ;
324- let config = ListenerManagerConfig :: default ( ) ;
325- let mut man = ListenersManager :: new ( conf_rx, route_rx, config) ;
309+ let mut man = ListenersManager :: new ( conf_rx, route_rx) ;
326310
327311 let ( routeb_tx1, routeb_rx) = broadcast:: channel ( chan) ;
328312 let ( secb_tx1, secb_rx) = broadcast:: channel ( chan) ;
@@ -369,7 +353,7 @@ mod tests {
369353 cleanup_policy : CleanupPolicy :: CountBasedOnly ( 2 ) ,
370354 cleanup_interval : Duration :: from_secs ( 60 ) ,
371355 } ;
372- let mut man = ListenersManager :: new ( conf_rx, route_rx, config) ;
356+ let mut man = ListenersManager :: with_config ( conf_rx, route_rx, config) ;
373357
374358 let ( routeb_tx1, routeb_rx) = broadcast:: channel ( chan) ;
375359 let ( _secb_tx1, secb_rx) = broadcast:: channel ( chan) ;
@@ -402,11 +386,11 @@ mod tests {
402386 assert ! ( routeb_tx3. send( RouteConfigurationChange :: Removed ( "n/a" . into( ) ) ) . is_ok( ) ) ;
403387
404388 // Should only have 2 versions due to cleanup policy (max_count: 2)
405- assert_eq ! ( man. listener_handles. get ( name) . unwrap( ) . len( ) , 2 ) ;
389+ assert_eq ! ( man. listener_handles. get_vec ( name) . unwrap( ) . len( ) , 2 ) ;
406390
407391 man. stop_listener ( name) . unwrap ( ) ;
408392
409- assert ! ( man. listener_handles. get ( name) . is_none( ) ) ;
393+ assert ! ( man. listener_handles. get_vec ( name) . is_none( ) ) ;
410394
411395 tokio:: task:: yield_now ( ) . await ;
412396 }
@@ -424,7 +408,7 @@ mod tests {
424408 cleanup_policy : CleanupPolicy :: CountBasedOnly ( 3 ) ,
425409 cleanup_interval : Duration :: from_secs ( 60 ) ,
426410 } ;
427- let mut man = ListenersManager :: new ( conf_rx, route_rx, config) ;
411+ let mut man = ListenersManager :: with_config ( conf_rx, route_rx, config) ;
428412
429413 // Add 5 listeners, should only keep 3 due to cleanup policy
430414 for i in 1 ..=5 {
@@ -437,10 +421,10 @@ mod tests {
437421 }
438422
439423 // Should only have 3 versions due to cleanup policy
440- assert_eq ! ( man. listener_handles. get ( name) . unwrap( ) . len( ) , 3 ) ;
424+ assert_eq ! ( man. listener_handles. get_vec ( name) . unwrap( ) . len( ) , 3 ) ;
441425
442426 man. stop_listener ( name) . unwrap ( ) ;
443- assert ! ( man. listener_handles. get ( name) . is_none( ) ) ;
427+ assert ! ( man. listener_handles. get_vec ( name) . is_none( ) ) ;
444428
445429 tokio:: task:: yield_now ( ) . await ;
446430 }
0 commit comments