@@ -57,46 +57,9 @@ use tokio::{
5757} ;
5858use tracing:: { debug, info, warn} ;
5959
60- async fn handle_internal_connection_static (
61- listener_name : String ,
62- connection_pair : crate :: transport:: InternalConnectionPair ,
63- filter_chains : Arc < HashMap < FilterChainMatch , FilterchainType > > ,
64- ) -> Result < ( ) > {
65- use crate :: listeners:: filter_state:: DownstreamConnectionMetadata ;
66-
67- debug ! ( "Handling new internal connection for listener '{}'" , listener_name) ;
68-
69- let downstream_metadata = DownstreamConnectionMetadata :: FromInternal {
70- listener_name : listener_name. clone ( ) ,
71- endpoint_id : connection_pair. downstream . metadata ( ) . endpoint_id . clone ( ) ,
72- } ;
73-
74- let filter_chain = match Listener :: select_filterchain ( & filter_chains, & downstream_metadata, None ) ? {
75- Some ( fc) => fc,
76- None => {
77- warn ! ( "No matching filter chain found for internal connection" ) ;
78- return Err ( crate :: Error :: new ( "No matching filter chain" ) ) ;
79- } ,
80- } ;
81-
82- let _downstream_stream = connection_pair. downstream ;
83-
84- match & filter_chain. handler {
85- crate :: listeners:: filterchain:: ConnectionHandler :: Http ( _http_manager) => {
86- info ! ( "Processing internal connection through HTTP filter chain" ) ;
87- tokio:: time:: sleep ( std:: time:: Duration :: from_secs ( 1 ) ) . await ;
88- Ok ( ( ) )
89- } ,
90- crate :: listeners:: filterchain:: ConnectionHandler :: Tcp ( _tcp_proxy) => {
91- info ! ( "Processing internal connection through TCP filter chain" ) ;
92- tokio:: time:: sleep ( std:: time:: Duration :: from_secs ( 1 ) ) . await ;
93- Ok ( ( ) )
94- } ,
95- }
96- }
97-
9860#[ derive( Debug ) ]
9961struct InternalConnectionWorkerPool {
62+ workers : Vec < tokio:: task:: JoinHandle < ( ) > > ,
10063 senders : Vec < mpsc:: UnboundedSender < InternalConnectionTask > > ,
10164 next_worker : std:: sync:: atomic:: AtomicUsize ,
10265}
@@ -111,6 +74,7 @@ struct InternalConnectionTask {
11174impl InternalConnectionWorkerPool {
11275 fn new ( num_workers : usize ) -> Self {
11376 let mut senders: Vec < mpsc:: UnboundedSender < InternalConnectionTask > > = Vec :: with_capacity ( num_workers) ;
77+ let mut workers = Vec :: with_capacity ( num_workers) ;
11478
11579 for _ in 0 ..num_workers {
11680 let ( sender, mut receiver) = mpsc:: unbounded_channel ( ) ;
@@ -126,16 +90,24 @@ impl InternalConnectionWorkerPool {
12690 }
12791 }
12892 } ) ;
129- drop ( worker) ;
93+ workers . push ( worker) ;
13094 }
13195
132- Self { senders, next_worker : std:: sync:: atomic:: AtomicUsize :: new ( 0 ) }
96+ Self { workers , senders, next_worker : std:: sync:: atomic:: AtomicUsize :: new ( 0 ) }
13397 }
13498
13599 fn submit_task ( & self , task : InternalConnectionTask ) -> Result < ( ) > {
136100 let worker_index = self . next_worker . fetch_add ( 1 , std:: sync:: atomic:: Ordering :: Relaxed ) % self . senders . len ( ) ;
137101 self . senders [ worker_index] . send ( task) . map_err ( |_| Error :: new ( "Worker pool is shut down" ) )
138102 }
103+
104+ async fn shutdown ( self ) {
105+ drop ( self . senders ) ;
106+
107+ for worker in self . workers {
108+ let _ = worker. await ;
109+ }
110+ }
139111}
140112
141113static INTERNAL_WORKER_POOL : std:: sync:: OnceLock < InternalConnectionWorkerPool > = std:: sync:: OnceLock :: new ( ) ;
@@ -402,7 +374,7 @@ impl Listener {
402374 maybe_route_update = route_updates_receiver. recv( ) => {
403375 //todo: add context to the error here once orion-error lands
404376 match maybe_route_update {
405- Ok ( route_update) => { Self :: process_route_update( & name, & filter_chains, route_update) } ,
377+ Ok ( route_update) => { Self :: process_route_update( name, & filter_chains, route_update) } ,
406378 Err ( e) => { return e. into( ) ; }
407379 }
408380 } ,
@@ -411,7 +383,7 @@ impl Listener {
411383 Ok ( secret_update) => {
412384 // todo: possibly expensive clone - may need to rethink this structure
413385 let mut filter_chains_clone = filter_chains. as_ref( ) . clone( ) ;
414- Self :: process_secret_update( & name, & mut filter_chains_clone, secret_update) ;
386+ Self :: process_secret_update( name, & mut filter_chains_clone, secret_update) ;
415387 filter_chains = Arc :: new( filter_chains_clone) ;
416388 }
417389 Err ( e) => { return e. into( ) ; }
@@ -436,8 +408,7 @@ impl Listener {
436408 let filter_chains = Arc :: new ( filter_chains) ;
437409 let factory = global_internal_connection_factory ( ) ;
438410
439- let ( _handle, mut connection_receiver, _listener_ref) = match factory. register_listener ( name. to_string ( ) ) . await
440- {
411+ let ( _handle, mut connection_receiver, _listener_ref) = match factory. register_listener ( name. to_owned ( ) ) . await {
441412 Ok ( result) => result,
442413 Err ( e) => {
443414 error ! ( "Failed to register internal listener '{}': {}" , name, e) ;
@@ -450,35 +421,32 @@ impl Listener {
450421 loop {
451422 tokio:: select! {
452423 maybe_connection = connection_receiver. recv( ) => {
453- match maybe_connection {
454- Some ( connection_pair) => {
455- debug!( "Internal listener '{}' received new connection" , name) ;
456-
457- let filter_chains_clone = filter_chains. clone( ) ;
458- let listener_name = name. to_string( ) ;
459-
460- // Use worker pool instead of tokio::spawn for better performance
461- // with large numbers of short connections
462- let task = InternalConnectionTask {
463- listener_name,
464- connection_pair,
465- filter_chains: filter_chains_clone,
466- } ;
467-
468- if let Err ( e) = get_internal_worker_pool( ) . submit_task( task) {
469- warn!( "Failed to submit internal connection task: {}" , e) ;
470- }
471- }
472- None => {
473- warn!( "Internal listener '{}' connection channel closed" , name) ;
474- break ;
424+ if let Some ( connection_pair) = maybe_connection {
425+ debug!( "Internal listener '{}' received new connection" , name) ;
426+
427+ let filter_chains_clone = filter_chains. clone( ) ;
428+ let listener_name = name. to_owned( ) ;
429+
430+ // Use worker pool instead of tokio::spawn for better performance
431+ // with large numbers of short connections
432+ let task = InternalConnectionTask {
433+ listener_name,
434+ connection_pair,
435+ filter_chains: filter_chains_clone,
436+ } ;
437+
438+ if let Err ( e) = get_internal_worker_pool( ) . submit_task( task) {
439+ warn!( "Failed to submit internal connection task: {}" , e) ;
475440 }
441+ } else {
442+ warn!( "Internal listener '{}' connection channel closed" , name) ;
443+ break ;
476444 }
477445 } ,
478446 maybe_route_update = route_updates_receiver. recv( ) => {
479447 match maybe_route_update {
480448 Ok ( route_update) => {
481- Self :: process_route_update( & name, & filter_chains, route_update) ;
449+ Self :: process_route_update( name, & filter_chains, route_update) ;
482450 }
483451 Err ( e) => {
484452 error!( "Route update error for internal listener '{}': {}" , name, e) ;
@@ -490,7 +458,7 @@ impl Listener {
490458 match maybe_secret_update {
491459 Ok ( secret_update) => {
492460 let mut filter_chains_clone = filter_chains. as_ref( ) . clone( ) ;
493- Self :: process_secret_update( & name, & mut filter_chains_clone, secret_update) ;
461+ Self :: process_secret_update( name, & mut filter_chains_clone, secret_update) ;
494462 // TODO: Update the shared filter chains state for active connections
495463 }
496464 Err ( e) => {
@@ -616,8 +584,8 @@ impl Listener {
616584
617585 let ssl = AtomicBool :: new ( false ) ;
618586 defer ! {
619- with_metric!( listeners:: DOWNSTREAM_CX_DESTROY , add, 1 , shard_id, & [ KeyValue :: new( "listener" , listener_name. to_string ( ) ) ] ) ;
620- with_metric!( listeners:: DOWNSTREAM_CX_ACTIVE , sub, 1 , shard_id, & [ KeyValue :: new( "listener" , listener_name. to_string ( ) ) ] ) ;
587+ with_metric!( listeners:: DOWNSTREAM_CX_DESTROY , add, 1 , shard_id, & [ KeyValue :: new( "listener" , listener_name. to_owned ( ) ) ] ) ;
588+ with_metric!( listeners:: DOWNSTREAM_CX_ACTIVE , sub, 1 , shard_id, & [ KeyValue :: new( "listener" , listener_name. to_owned ( ) ) ] ) ;
621589 if ssl. load( Ordering :: Relaxed ) {
622590 with_metric!( http:: DOWNSTREAM_CX_SSL_ACTIVE , add, 1 , shard_id, & [ KeyValue :: new( "listener" , listener_name) ] ) ;
623591 }
@@ -656,7 +624,7 @@ impl Listener {
656624 add,
657625 1 ,
658626 shard_id,
659- & [ KeyValue :: new( "listener" , listener_name. to_string ( ) ) ]
627+ & [ KeyValue :: new( "listener" , listener_name. to_owned ( ) ) ]
660628 ) ;
661629 with_metric ! (
662630 http:: DOWNSTREAM_CX_SSL_ACTIVE ,
@@ -1014,3 +982,40 @@ filter_chains:
1014982 assert_eq ! ( Listener :: select_filterchain( & m, & metadata, Some ( "hello.world" ) ) . unwrap( ) . copied( ) , Some ( 3 ) ) ;
1015983 }
1016984}
985+
986+ async fn handle_internal_connection_static (
987+ listener_name : String ,
988+ connection_pair : crate :: transport:: InternalConnectionPair ,
989+ filter_chains : Arc < HashMap < FilterChainMatch , FilterchainType > > ,
990+ ) -> Result < ( ) > {
991+ use crate :: listeners:: filter_state:: DownstreamConnectionMetadata ;
992+
993+ debug ! ( "Handling new internal connection for listener '{}'" , listener_name) ;
994+
995+ let downstream_metadata = DownstreamConnectionMetadata :: FromInternal {
996+ listener_name : listener_name. clone ( ) ,
997+ endpoint_id : connection_pair. downstream . metadata ( ) . endpoint_id . clone ( ) ,
998+ } ;
999+
1000+ let filter_chain = if let Some ( fc) = Listener :: select_filterchain ( & filter_chains, & downstream_metadata, None ) ? {
1001+ fc
1002+ } else {
1003+ warn ! ( "No matching filter chain found for internal connection" ) ;
1004+ return Err ( crate :: Error :: new ( "No matching filter chain" ) ) ;
1005+ } ;
1006+
1007+ let _downstream_stream = connection_pair. downstream ;
1008+
1009+ match & filter_chain. handler {
1010+ crate :: listeners:: filterchain:: ConnectionHandler :: Http ( _http_manager) => {
1011+ info ! ( "Processing internal connection through HTTP filter chain" ) ;
1012+ tokio:: time:: sleep ( std:: time:: Duration :: from_secs ( 1 ) ) . await ;
1013+ Ok ( ( ) )
1014+ } ,
1015+ crate :: listeners:: filterchain:: ConnectionHandler :: Tcp ( _tcp_proxy) => {
1016+ info ! ( "Processing internal connection through TCP filter chain" ) ;
1017+ tokio:: time:: sleep ( std:: time:: Duration :: from_secs ( 1 ) ) . await ;
1018+ Ok ( ( ) )
1019+ } ,
1020+ }
1021+ }
0 commit comments