1616//
1717
1818use std:: collections:: HashMap ;
19+ use std:: time:: Duration ;
1920
2021use tokio:: sync:: { broadcast, mpsc} ;
2122use tracing:: { info, warn} ;
@@ -44,6 +45,30 @@ pub enum TlsContextChange {
4445 Updated ( ( String , TransportSecret ) ) ,
4546}
4647
48+ #[ derive( Debug , Clone ) ]
49+ pub struct ListenerManagerConfig {
50+ pub max_versions_per_listener : usize ,
51+ pub cleanup_policy : CleanupPolicy ,
52+ pub cleanup_interval : Duration ,
53+ }
54+
55+ #[ derive( Debug , Clone ) ]
56+ pub enum CleanupPolicy {
57+ CountBasedOnly ( usize ) ,
58+ TimeBasedOnly ( Duration ) ,
59+ Hybrid { timeout : Duration , max_count : usize } ,
60+ }
61+
62+ impl Default for ListenerManagerConfig {
63+ fn default ( ) -> Self {
64+ Self {
65+ max_versions_per_listener : 2 ,
66+ cleanup_policy : CleanupPolicy :: CountBasedOnly ( 2 ) ,
67+ cleanup_interval : Duration :: from_secs ( 60 ) ,
68+ }
69+ }
70+ }
71+
4772struct ListenerInfo {
4873 handle : abort_on_drop:: ChildTask < ( ) > ,
4974 listener_conf : ListenerConfig ,
@@ -60,18 +85,21 @@ pub struct ListenersManager {
6085 route_configuration_channel : mpsc:: Receiver < RouteConfigurationChange > ,
6186 listener_handles : HashMap < String , Vec < ListenerInfo > > ,
6287 version_counter : u64 ,
88+ config : ListenerManagerConfig ,
6389}
6490
6591impl ListenersManager {
6692 pub fn new (
6793 listener_configuration_channel : mpsc:: Receiver < ListenerConfigurationChange > ,
6894 route_configuration_channel : mpsc:: Receiver < RouteConfigurationChange > ,
95+ config : ListenerManagerConfig ,
6996 ) -> Self {
7097 ListenersManager {
7198 listener_configuration_channel,
7299 route_configuration_channel,
73100 listener_handles : HashMap :: new ( ) ,
74101 version_counter : 0 ,
102+ config,
75103 }
76104 }
77105
@@ -149,6 +177,8 @@ impl ListenersManager {
149177 versions. push ( listener_info) ;
150178 info ! ( "Listener {} now has {} active version(s)" , listener_name, versions. len( ) ) ;
151179
180+ self . cleanup_old_versions ( & listener_name) ;
181+
152182 Ok ( ( ) )
153183 }
154184
@@ -165,6 +195,54 @@ impl ListenersManager {
165195
166196 Ok ( ( ) )
167197 }
198+
199+ fn cleanup_old_versions ( & mut self , listener_name : & str ) {
200+ if let Some ( versions) = self . listener_handles . get_mut ( listener_name) {
201+ let original_count = versions. len ( ) ;
202+
203+ match & self . config . cleanup_policy {
204+ CleanupPolicy :: CountBasedOnly ( max_count) => {
205+ if versions. len ( ) > * max_count {
206+ let to_remove = versions. len ( ) - max_count;
207+ for _ in 0 ..to_remove {
208+ let old = versions. remove ( 0 ) ;
209+ info ! ( "Cleaning up old listener {} version {} (count limit)" , listener_name, old. version) ;
210+ }
211+ }
212+ } ,
213+ CleanupPolicy :: TimeBasedOnly ( _timeout) => {
214+ // TODO: Implement time-based cleanup when we have connection tracking
215+ // For now, behave like count-based with default limit
216+ if versions. len ( ) > self . config . max_versions_per_listener {
217+ let to_remove = versions. len ( ) - self . config . max_versions_per_listener ;
218+ for _ in 0 ..to_remove {
219+ let old = versions. remove ( 0 ) ;
220+ info ! ( "Cleaning up old listener {} version {} (time limit)" , listener_name, old. version) ;
221+ }
222+ }
223+ } ,
224+ CleanupPolicy :: Hybrid { max_count, .. } => {
225+ if versions. len ( ) > * max_count {
226+ let to_remove = versions. len ( ) - max_count;
227+ for _ in 0 ..to_remove {
228+ let old = versions. remove ( 0 ) ;
229+ info ! ( "Cleaning up old listener {} version {} (hybrid limit)" , listener_name, old. version) ;
230+ }
231+ }
232+ } ,
233+ }
234+
235+ let cleaned_count = original_count - versions. len ( ) ;
236+ if cleaned_count > 0 {
237+ info ! (
238+ "Cleaned up {} old versions of listener {}, {} versions remaining" ,
239+ cleaned_count,
240+ listener_name,
241+ versions. len( )
242+ ) ;
243+ }
244+ }
245+ }
168246}
169247
170248#[ cfg( test) ]
@@ -199,7 +277,8 @@ mod tests {
199277
200278 let ( _conf_tx, conf_rx) = mpsc:: channel ( chan) ;
201279 let ( _route_tx, route_rx) = mpsc:: channel ( chan) ;
202- let mut man = ListenersManager :: new ( conf_rx, route_rx) ;
280+ let config = ListenerManagerConfig :: default ( ) ;
281+ let mut man = ListenersManager :: new ( conf_rx, route_rx, config) ;
203282
204283 let ( routeb_tx1, routeb_rx) = broadcast:: channel ( chan) ;
205284 let ( _secb_tx1, secb_rx) = broadcast:: channel ( chan) ;
@@ -242,7 +321,8 @@ mod tests {
242321
243322 let ( _conf_tx, conf_rx) = mpsc:: channel ( chan) ;
244323 let ( _route_tx, route_rx) = mpsc:: channel ( chan) ;
245- let mut man = ListenersManager :: new ( conf_rx, route_rx) ;
324+ let config = ListenerManagerConfig :: default ( ) ;
325+ let mut man = ListenersManager :: new ( conf_rx, route_rx, config) ;
246326
247327 let ( routeb_tx1, routeb_rx) = broadcast:: channel ( chan) ;
248328 let ( secb_tx1, secb_rx) = broadcast:: channel ( chan) ;
@@ -284,7 +364,12 @@ mod tests {
284364
285365 let ( _conf_tx, conf_rx) = mpsc:: channel ( chan) ;
286366 let ( _route_tx, route_rx) = mpsc:: channel ( chan) ;
287- let mut man = ListenersManager :: new ( conf_rx, route_rx) ;
367+ let config = ListenerManagerConfig {
368+ max_versions_per_listener : 2 ,
369+ cleanup_policy : CleanupPolicy :: CountBasedOnly ( 2 ) ,
370+ cleanup_interval : Duration :: from_secs ( 60 ) ,
371+ } ;
372+ let mut man = ListenersManager :: new ( conf_rx, route_rx, config) ;
288373
289374 let ( routeb_tx1, routeb_rx) = broadcast:: channel ( chan) ;
290375 let ( _secb_tx1, secb_rx) = broadcast:: channel ( chan) ;
@@ -310,16 +395,53 @@ mod tests {
310395 assert ! ( routeb_tx3. send( RouteConfigurationChange :: Removed ( "n/a" . into( ) ) ) . is_ok( ) ) ;
311396 tokio:: task:: yield_now ( ) . await ;
312397
313- assert ! ( routeb_tx1. send( RouteConfigurationChange :: Removed ( "n/a" . into( ) ) ) . is_ok( ) ) ;
398+ // After adding 3rd listener, first should be cleaned up due to max_versions_per_listener = 2
399+ // So routeb_tx1 should be closed (is_err), but routeb_tx2 and routeb_tx3 should work
400+ assert ! ( routeb_tx1. send( RouteConfigurationChange :: Removed ( "n/a" . into( ) ) ) . is_err( ) ) ;
314401 assert ! ( routeb_tx2. send( RouteConfigurationChange :: Removed ( "n/a" . into( ) ) ) . is_ok( ) ) ;
315402 assert ! ( routeb_tx3. send( RouteConfigurationChange :: Removed ( "n/a" . into( ) ) ) . is_ok( ) ) ;
316403
317- assert_eq ! ( man. listener_handles. get( name) . unwrap( ) . len( ) , 3 ) ;
404+ // Should only have 2 versions due to cleanup policy (max_count: 2)
405+ assert_eq ! ( man. listener_handles. get( name) . unwrap( ) . len( ) , 2 ) ;
318406
319407 man. stop_listener ( name) . unwrap ( ) ;
320408
321409 assert ! ( man. listener_handles. get( name) . is_none( ) ) ;
322410
323411 tokio:: task:: yield_now ( ) . await ;
324412 }
413+
414+ #[ traced_test]
415+ #[ tokio:: test]
416+ async fn test_cleanup_policy_enforcement ( ) {
417+ let chan = 10 ;
418+ let name = "cleanup-test-listener" ;
419+
420+ let ( _conf_tx, conf_rx) = mpsc:: channel ( chan) ;
421+ let ( _route_tx, route_rx) = mpsc:: channel ( chan) ;
422+ let config = ListenerManagerConfig {
423+ max_versions_per_listener : 3 ,
424+ cleanup_policy : CleanupPolicy :: CountBasedOnly ( 3 ) ,
425+ cleanup_interval : Duration :: from_secs ( 60 ) ,
426+ } ;
427+ let mut man = ListenersManager :: new ( conf_rx, route_rx, config) ;
428+
429+ // Add 5 listeners, should only keep 3 due to cleanup policy
430+ for i in 1 ..=5 {
431+ let ( _routeb_tx, routeb_rx) = broadcast:: channel ( chan) ;
432+ let ( _secb_tx, secb_rx) = broadcast:: channel ( chan) ;
433+ let listener = Listener :: test_listener ( name, routeb_rx, secb_rx) ;
434+ let listener_info = create_test_listener_config ( name, 1230 + i) ;
435+ man. start_listener ( listener, listener_info) . unwrap ( ) ;
436+ tokio:: task:: yield_now ( ) . await ;
437+ }
438+
439+ // Should only have 3 versions due to cleanup policy
440+ assert_eq ! ( man. listener_handles. get( name) . unwrap( ) . len( ) , 3 ) ;
441+
442+ man. stop_listener ( name) . unwrap ( ) ;
443+ assert ! ( man. listener_handles. get( name) . is_none( ) ) ;
444+
445+ tokio:: task:: yield_now ( ) . await ;
446+ }
325447}
0 commit comments