1515//
1616//
1717
18- use std :: collections :: HashMap ;
18+ use multimap :: MultiMap ;
1919use std:: time:: Duration ;
2020
2121use tokio:: sync:: { broadcast, mpsc} ;
@@ -55,8 +55,6 @@ pub struct ListenerManagerConfig {
5555#[ derive( Debug , Clone ) ]
5656pub enum CleanupPolicy {
5757 CountBasedOnly ( usize ) ,
58- TimeBasedOnly ( Duration ) ,
59- Hybrid { timeout : Duration , max_count : usize } ,
6058}
6159
6260impl Default for ListenerManagerConfig {
@@ -92,6 +90,13 @@ impl ListenersManager {
9290 pub fn new (
9391 listener_configuration_channel : mpsc:: Receiver < ListenerConfigurationChange > ,
9492 route_configuration_channel : mpsc:: Receiver < RouteConfigurationChange > ,
93+ ) -> Self {
94+ Self :: with_config ( listener_configuration_channel, route_configuration_channel, ListenerManagerConfig :: default ( ) )
95+ }
96+
97+ pub fn with_config (
98+ listener_configuration_channel : mpsc:: Receiver < ListenerConfigurationChange > ,
99+ route_configuration_channel : mpsc:: Receiver < RouteConfigurationChange > ,
95100 config : ListenerManagerConfig ,
96101 ) -> Self {
97102 ListenersManager {
@@ -170,7 +175,6 @@ impl ListenersManager {
170175
171176 let listener_info = ListenerInfo :: new ( join_handle, listener_conf, version) ;
172177 self . listener_handles . insert ( listener_name. clone ( ) , listener_info) ;
173-
174178 let version_count = self . listener_handles . get_vec ( & listener_name) . map ( |v| v. len ( ) ) . unwrap_or ( 0 ) ;
175179 info ! ( "Started version {} of listener {} ({} total active version(s))" , version, listener_name, version_count) ;
176180
@@ -194,48 +198,33 @@ impl ListenersManager {
194198 }
195199
196200 fn cleanup_old_versions ( & mut self , listener_name : & str ) {
197- if let Some ( versions) = self . listener_handles . get_mut ( listener_name) {
201+ if let Some ( mut versions) = self . listener_handles . remove ( listener_name) {
198202 let original_count = versions. len ( ) ;
199203
200204 match & self . config . cleanup_policy {
201205 CleanupPolicy :: CountBasedOnly ( max_count) => {
202206 if versions. len ( ) > * max_count {
203207 let to_remove = versions. len ( ) - max_count;
204- for _ in 0 ..to_remove {
205- let old = versions . remove ( 0 ) ;
208+ let removed = versions . drain ( 0 ..to_remove) . collect :: < Vec < _ > > ( ) ;
209+ for old in removed {
206210 info ! ( "Cleaning up old listener {} version {} (count limit)" , listener_name, old. version) ;
207211 }
208212 }
209213 } ,
210- CleanupPolicy :: TimeBasedOnly ( _timeout) => {
211- // TODO: Implement time-based cleanup when we have connection tracking
212- // For now, behave like count-based with default limit
213- if versions. len ( ) > self . config . max_versions_per_listener {
214- let to_remove = versions. len ( ) - self . config . max_versions_per_listener ;
215- for _ in 0 ..to_remove {
216- let old = versions. remove ( 0 ) ;
217- info ! ( "Cleaning up old listener {} version {} (time limit)" , listener_name, old. version) ;
218- }
219- }
220- } ,
221- CleanupPolicy :: Hybrid { max_count, .. } => {
222- if versions. len ( ) > * max_count {
223- let to_remove = versions. len ( ) - max_count;
224- for _ in 0 ..to_remove {
225- let old = versions. remove ( 0 ) ;
226- info ! ( "Cleaning up old listener {} version {} (hybrid limit)" , listener_name, old. version) ;
227- }
228- }
229- } ,
230214 }
231215
232- let cleaned_count = original_count - versions. len ( ) ;
233- if cleaned_count > 0 {
216+ // Re-insert the remaining versions
217+ for version in versions {
218+ self . listener_handles . insert ( listener_name. to_string ( ) , version) ;
219+ }
220+
221+ let remaining_count = self . listener_handles . get_vec ( listener_name) . map ( |v| v. len ( ) ) . unwrap_or ( 0 ) ;
222+ if original_count != remaining_count {
234223 info ! (
235- "Cleaned up {} old versions of listener {}, {} versions remaining" ,
236- cleaned_count ,
224+ "Cleaned up {} old version(s) of listener {}, {} remaining" ,
225+ original_count - remaining_count ,
237226 listener_name,
238- versions . len ( )
227+ remaining_count
239228 ) ;
240229 }
241230 }
@@ -274,8 +263,7 @@ mod tests {
274263
275264 let ( _conf_tx, conf_rx) = mpsc:: channel ( chan) ;
276265 let ( _route_tx, route_rx) = mpsc:: channel ( chan) ;
277- let config = ListenerManagerConfig :: default ( ) ;
278- let mut man = ListenersManager :: new ( conf_rx, route_rx, config) ;
266+ let mut man = ListenersManager :: new ( conf_rx, route_rx) ;
279267
280268 let ( routeb_tx1, routeb_rx) = broadcast:: channel ( chan) ;
281269 let ( _secb_tx1, secb_rx) = broadcast:: channel ( chan) ;
@@ -318,8 +306,7 @@ mod tests {
318306
319307 let ( _conf_tx, conf_rx) = mpsc:: channel ( chan) ;
320308 let ( _route_tx, route_rx) = mpsc:: channel ( chan) ;
321- let config = ListenerManagerConfig :: default ( ) ;
322- let mut man = ListenersManager :: new ( conf_rx, route_rx, config) ;
309+ let mut man = ListenersManager :: new ( conf_rx, route_rx) ;
323310
324311 let ( routeb_tx1, routeb_rx) = broadcast:: channel ( chan) ;
325312 let ( secb_tx1, secb_rx) = broadcast:: channel ( chan) ;
@@ -366,7 +353,7 @@ mod tests {
366353 cleanup_policy : CleanupPolicy :: CountBasedOnly ( 2 ) ,
367354 cleanup_interval : Duration :: from_secs ( 60 ) ,
368355 } ;
369- let mut man = ListenersManager :: new ( conf_rx, route_rx, config) ;
356+ let mut man = ListenersManager :: with_config ( conf_rx, route_rx, config) ;
370357
371358 let ( routeb_tx1, routeb_rx) = broadcast:: channel ( chan) ;
372359 let ( _secb_tx1, secb_rx) = broadcast:: channel ( chan) ;
@@ -399,7 +386,7 @@ mod tests {
399386 assert ! ( routeb_tx3. send( RouteConfigurationChange :: Removed ( "n/a" . into( ) ) ) . is_ok( ) ) ;
400387
401388 // Should only have 2 versions due to cleanup policy (max_count: 2)
402- assert_eq ! ( man. listener_handles. get ( name) . unwrap( ) . len( ) , 2 ) ;
389+ assert_eq ! ( man. listener_handles. get_vec ( name) . unwrap( ) . len( ) , 2 ) ;
403390
404391 man. stop_listener ( name) . unwrap ( ) ;
405392
@@ -421,7 +408,7 @@ mod tests {
421408 cleanup_policy : CleanupPolicy :: CountBasedOnly ( 3 ) ,
422409 cleanup_interval : Duration :: from_secs ( 60 ) ,
423410 } ;
424- let mut man = ListenersManager :: new ( conf_rx, route_rx, config) ;
411+ let mut man = ListenersManager :: with_config ( conf_rx, route_rx, config) ;
425412
426413 // Add 5 listeners, should only keep 3 due to cleanup policy
427414 for i in 1 ..=5 {
@@ -434,10 +421,10 @@ mod tests {
434421 }
435422
436423 // Should only have 3 versions due to cleanup policy
437- assert_eq ! ( man. listener_handles. get ( name) . unwrap( ) . len( ) , 3 ) ;
424+ assert_eq ! ( man. listener_handles. get_vec ( name) . unwrap( ) . len( ) , 3 ) ;
438425
439426 man. stop_listener ( name) . unwrap ( ) ;
440- assert ! ( man. listener_handles. get ( name) . is_none( ) ) ;
427+ assert ! ( man. listener_handles. get_vec ( name) . is_none( ) ) ;
441428
442429 tokio:: task:: yield_now ( ) . await ;
443430 }
0 commit comments