From 7574a84bdb9c60afeef071adbb74e41540ed9e99 Mon Sep 17 00:00:00 2001 From: Eeshu-Yadav Date: Fri, 26 Sep 2025 20:05:40 +0530 Subject: [PATCH 1/5] feat: implement internal connection factory for in-process communication - InternalConnectionFactory with thread-safe listener registry - InternalListenerHandle for connection management and lifecycle - InternalClusterConnector for cluster-side connections - InternalChannelConnector with cluster context - Global singleton factory accessible across the codebase - Comprehensive connection metadata and statistics Signed-off-by: Eeshu-Yadav --- orion-lib/src/lib.rs | 5 + orion-lib/src/listeners/filter_state.rs | 6 + orion-lib/src/listeners/listener.rs | 105 ++++- orion-lib/src/listeners/listeners_manager.rs | 5 +- .../transport/internal_cluster_connector.rs | 207 ++++++++++ .../src/transport/internal_connection.rs | 378 ++++++++++++++++++ orion-lib/src/transport/mod.rs | 7 + .../tests/internal_connection_integration.rs | 176 ++++++++ 8 files changed, 881 insertions(+), 8 deletions(-) create mode 100644 orion-lib/src/transport/internal_cluster_connector.rs create mode 100644 orion-lib/src/transport/internal_connection.rs create mode 100644 orion-lib/tests/internal_connection_integration.rs diff --git a/orion-lib/src/lib.rs b/orion-lib/src/lib.rs index 21707b4a..37b9f5a2 100644 --- a/orion-lib/src/lib.rs +++ b/orion-lib/src/lib.rs @@ -54,7 +54,12 @@ use orion_configuration::config::{ Bootstrap, Cluster, Listener as ListenerConfig, }; pub use secrets::SecretManager; +pub use transport::internal_cluster_connector::cluster_helpers; pub(crate) use transport::AsyncStream; +pub use transport::{ + global_internal_connection_factory, InternalChannelConnector, InternalConnectionFactory, InternalConnectionPair, + InternalConnectionStats, InternalListenerHandle, +}; pub type Error = orion_error::Error; pub type Result = ::core::result::Result; diff --git a/orion-lib/src/listeners/filter_state.rs b/orion-lib/src/listeners/filter_state.rs index 7fe2d442..28652dd4 100644 --- a/orion-lib/src/listeners/filter_state.rs +++ b/orion-lib/src/listeners/filter_state.rs @@ -39,6 +39,10 @@ pub enum DownstreamConnectionMetadata { proxy_peer_address: SocketAddr, proxy_local_address: SocketAddr, }, + FromInternal { + listener_name: String, + endpoint_id: Option, + }, } impl DownstreamConnectionMetadata { @@ -47,6 +51,7 @@ impl DownstreamConnectionMetadata { Self::FromSocket { peer_address, .. } => *peer_address, Self::FromProxyProtocol { original_peer_address, .. } => *original_peer_address, Self::FromTlv { proxy_peer_address, .. } => *proxy_peer_address, + Self::FromInternal { .. } => SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), } } pub fn local_address(&self) -> SocketAddr { @@ -54,6 +59,7 @@ impl DownstreamConnectionMetadata { Self::FromSocket { local_address, .. } => *local_address, Self::FromProxyProtocol { original_destination_address, .. } => *original_destination_address, Self::FromTlv { original_destination_address, .. } => *original_destination_address, + Self::FromInternal { .. } => SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), } } } diff --git a/orion-lib/src/listeners/listener.rs b/orion-lib/src/listeners/listener.rs index de1e37fa..e16b3244 100644 --- a/orion-lib/src/listeners/listener.rs +++ b/orion-lib/src/listeners/listener.rs @@ -73,6 +73,7 @@ enum ListenerAddress { #[derive(Debug, Clone)] struct InternalListenerConfig { + #[allow(dead_code)] buffer_size_kb: Option, } #[derive(Debug, Clone)] @@ -339,16 +340,58 @@ impl Listener { mut route_updates_receiver: broadcast::Receiver, mut secret_updates_receiver: broadcast::Receiver, ) -> Error { + use crate::transport::global_internal_connection_factory; + use tracing::{debug, error, info, warn}; + let filter_chains = Arc::new(filter_chains); + let factory = global_internal_connection_factory(); + + let (_handle, mut connection_receiver, _listener_ref) = match factory.register_listener(name.to_string()).await + { + Ok(result) => result, + Err(e) => { + error!("Failed to register internal listener '{}': {}", name, e); + return e; + }, + }; + + info!("Internal listener '{}' registered with connection factory", name); - // For now, internal listeners just wait for updates - // The actual connection handling will be implemented when we add the internal connection factory loop { tokio::select! { + maybe_connection = connection_receiver.recv() => { + match maybe_connection { + Some(connection_pair) => { + debug!("Internal listener '{}' received new connection", name); + + let filter_chains_clone = filter_chains.clone(); + let listener_name = name.to_string(); + + tokio::spawn(async move { + if let Err(e) = Self::handle_internal_connection( + listener_name, + connection_pair, + filter_chains_clone, + ).await { + warn!("Error handling internal connection: {}", e); + } + }); + } + None => { + warn!("Internal listener '{}' connection channel closed", name); + break; + } + } + }, maybe_route_update = route_updates_receiver.recv() => { match maybe_route_update { - Ok(route_update) => {Self::process_route_update(&name, &filter_chains, route_update);} - Err(e) => {return e.into();} + Ok(route_update) => { + Self::process_route_update(&name, &filter_chains, route_update); + } + Err(e) => { + error!("Route update error for internal listener '{}': {}", name, e); + return e.into(); + } } }, maybe_secret_update = secret_updates_receiver.recv() => { @@ -356,14 +399,62 @@ impl Listener { Ok(secret_update) => { let mut filter_chains_clone = filter_chains.as_ref().clone(); Self::process_secret_update(&name, &mut filter_chains_clone, secret_update); - // Note: For internal listeners, we'd need to update the shared state - // This will be implemented when we add the internal connection factory + // TODO: Update the shared filter chains state for active connections + } + Err(e) => { + error!("Secret update error for internal listener '{}': {}", name, e); + return e.into(); } - Err(e) => {return e.into();} } } } } + + if let Err(e) = factory.unregister_listener(name).await { + warn!("Failed to unregister internal listener '{}': {}", name, e); + } + + info!("Internal listener '{}' shutting down", name); + Error::new("Internal listener shutdown") + } + + async fn handle_internal_connection( + listener_name: String, + connection_pair: crate::transport::InternalConnectionPair, + filter_chains: Arc>, + ) -> Result<()> { + use crate::listeners::filter_state::DownstreamConnectionMetadata; + use tracing::{debug, info, warn}; + + debug!("Handling new internal connection for listener '{}'", listener_name); + + let downstream_metadata = DownstreamConnectionMetadata::FromInternal { + listener_name: listener_name.clone(), + endpoint_id: connection_pair.downstream.metadata().endpoint_id.clone(), + }; + + let filter_chain = match Self::select_filterchain(&filter_chains, &downstream_metadata, None)? { + Some(fc) => fc, + None => { + warn!("No matching filter chain found for internal connection"); + return Err(crate::Error::new("No matching filter chain")); + }, + }; + + let _downstream_stream = connection_pair.downstream; + + match &filter_chain.handler { + crate::listeners::filterchain::ConnectionHandler::Http(_http_manager) => { + info!("Processing internal connection through HTTP filter chain"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + Ok(()) + }, + crate::listeners::filterchain::ConnectionHandler::Tcp(_tcp_proxy) => { + info!("Processing internal connection through TCP filter chain"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + Ok(()) + }, + } } fn select_filterchain<'a, T>( diff --git a/orion-lib/src/listeners/listeners_manager.rs b/orion-lib/src/listeners/listeners_manager.rs index 7817f57b..b6683148 100644 --- a/orion-lib/src/listeners/listeners_manager.rs +++ b/orion-lib/src/listeners/listeners_manager.rs @@ -20,9 +20,12 @@ use tokio::sync::{broadcast, mpsc}; use tracing::{info, warn}; use orion_configuration::config::{ - listener::ListenerAddress, network_filters::http_connection_manager::RouteConfiguration, Listener as ListenerConfig, + network_filters::http_connection_manager::RouteConfiguration, Listener as ListenerConfig, }; +#[cfg(test)] +use orion_configuration::config::listener::ListenerAddress; + use super::listener::{Listener, ListenerFactory}; use crate::{secrets::TransportSecret, ConfigDump, Result}; #[derive(Debug, Clone)] diff --git a/orion-lib/src/transport/internal_cluster_connector.rs b/orion-lib/src/transport/internal_cluster_connector.rs new file mode 100644 index 00000000..befb4d2e --- /dev/null +++ b/orion-lib/src/transport/internal_cluster_connector.rs @@ -0,0 +1,207 @@ +// Copyright 2025 The kmesh Authors +// +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +use super::{global_internal_connection_factory, AsyncStream}; +use crate::{Error, Result}; + +#[derive(Debug, Clone)] +pub struct InternalClusterConnector { + listener_name: String, + endpoint_id: Option, +} + +impl InternalClusterConnector { + pub fn new(listener_name: String, endpoint_id: Option) -> Self { + Self { listener_name, endpoint_id } + } + + pub fn listener_name(&self) -> &str { + &self.listener_name + } + + pub fn endpoint_id(&self) -> Option<&str> { + self.endpoint_id.as_deref() + } + + pub async fn connect(&self) -> Result { + let factory = global_internal_connection_factory(); + + if !factory.is_listener_active(&self.listener_name).await { + return Err(Error::new(format!( + "Internal listener '{}' is not active or not registered", + self.listener_name + ))); + } + + factory.connect_to_listener(&self.listener_name, self.endpoint_id.clone()).await + } + + pub async fn is_available(&self) -> bool { + let factory = global_internal_connection_factory(); + factory.is_listener_active(&self.listener_name).await + } +} + +#[derive(Debug, Clone)] +pub struct InternalChannelConnector { + connector: InternalClusterConnector, + cluster_name: &'static str, +} + +impl InternalChannelConnector { + pub fn new(listener_name: String, cluster_name: &'static str, endpoint_id: Option) -> Self { + let connector = InternalClusterConnector::new(listener_name, endpoint_id); + + Self { connector, cluster_name } + } + + pub fn cluster_name(&self) -> &'static str { + self.cluster_name + } + + pub fn listener_name(&self) -> &str { + self.connector.listener_name() + } + + pub async fn connect(&self) -> Result { + let stream = self.connector.connect().await?; + + Ok(InternalChannel { + stream, + cluster_name: self.cluster_name, + listener_name: self.connector.listener_name().to_string(), + endpoint_id: self.connector.endpoint_id().map(|s| s.to_string()), + }) + } + + pub async fn is_available(&self) -> bool { + self.connector.is_available().await + } +} + +pub struct InternalChannel { + pub stream: AsyncStream, + pub cluster_name: &'static str, + pub listener_name: String, + pub endpoint_id: Option, +} + +impl InternalChannel { + pub fn cluster_name(&self) -> &'static str { + self.cluster_name + } + + pub fn listener_name(&self) -> &str { + &self.listener_name + } + + pub fn endpoint_id(&self) -> Option<&str> { + self.endpoint_id.as_deref() + } +} + +pub mod cluster_helpers { + use super::*; + use orion_configuration::config::cluster::InternalEndpointAddress; + + pub fn create_internal_connector( + internal_addr: &InternalEndpointAddress, + cluster_name: &'static str, + ) -> InternalChannelConnector { + InternalChannelConnector::new( + internal_addr.server_listener_name.to_string(), + cluster_name, + internal_addr.endpoint_id.as_ref().map(|s| s.to_string()), + ) + } + + pub async fn is_internal_listener_available(listener_name: &str) -> bool { + let factory = global_internal_connection_factory(); + factory.is_listener_active(listener_name).await + } + + pub async fn get_internal_connection_stats() -> crate::transport::InternalConnectionStats { + let factory = global_internal_connection_factory(); + factory.get_stats().await + } + + pub async fn list_internal_listeners() -> Vec { + let factory = global_internal_connection_factory(); + factory.list_listeners().await + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_internal_connector_creation() { + let connector = InternalClusterConnector::new(String::from("test_listener"), Some(String::from("endpoint1"))); + assert_eq!(connector.listener_name(), "test_listener"); + assert_eq!(connector.endpoint_id(), Some("endpoint1")); + } + + #[tokio::test] + async fn test_connection_to_non_existent_listener() { + let connector = InternalClusterConnector::new(String::from("non_existent_listener"), None); + let result = connector.connect().await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_availability_check() { + let connector = InternalClusterConnector::new(String::from("non_existent_listener"), None); + assert!(!connector.is_available().await); + } + + #[tokio::test] + async fn test_internal_channel_connector() { + let channel_connector = InternalChannelConnector::new( + String::from("test_listener"), + "test_cluster", + Some(String::from("endpoint1")), + ); + + assert_eq!(channel_connector.cluster_name(), "test_cluster"); + assert_eq!(channel_connector.listener_name(), "test_listener"); + assert!(!channel_connector.is_available().await); + } + + #[tokio::test] + async fn test_cluster_helpers() { + use cluster_helpers::*; + use orion_configuration::config::cluster::InternalEndpointAddress; + + let internal_addr = InternalEndpointAddress { + server_listener_name: String::from("test_listener").into(), + endpoint_id: Some(String::from("endpoint1").into()), + }; + + let connector = create_internal_connector(&internal_addr, "test_cluster"); + assert_eq!(connector.cluster_name(), "test_cluster"); + assert_eq!(connector.listener_name(), "test_listener"); + + assert!(!is_internal_listener_available("non_existent").await); + + let stats = get_internal_connection_stats().await; + assert_eq!(stats.active_listeners, 0); + + let listeners = list_internal_listeners().await; + assert!(listeners.is_empty()); + } +} diff --git a/orion-lib/src/transport/internal_connection.rs b/orion-lib/src/transport/internal_connection.rs new file mode 100644 index 00000000..4c5a5378 --- /dev/null +++ b/orion-lib/src/transport/internal_connection.rs @@ -0,0 +1,378 @@ +// Copyright 2025 The kmesh Authors +// +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +use std::{ + collections::HashMap, + ops::Deref, + sync::{Arc, Weak}, +}; + +use tokio::{ + io::{AsyncRead, AsyncWrite}, + sync::{mpsc, RwLock}, + time::Instant, +}; + +use super::AsyncStream; +use crate::{Error, Result}; + +#[derive(Debug, Clone)] +pub struct InternalConnectionMetadata { + pub listener_name: String, + pub buffer_size_kb: Option, + pub created_at: Instant, + pub endpoint_id: Option, +} + +#[derive(Debug, Clone)] +pub struct InternalListenerHandle { + pub name: String, + pub connection_sender: mpsc::UnboundedSender, + listener_ref: Weak<()>, +} + +impl InternalListenerHandle { + pub fn new( + name: String, + connection_sender: mpsc::UnboundedSender, + listener_ref: Weak<()>, + ) -> Self { + Self { name, connection_sender, listener_ref } + } + + pub fn is_alive(&self) -> bool { + self.listener_ref.strong_count() > 0 + } + + pub fn create_connection(&self, endpoint_id: Option) -> Result { + if !self.is_alive() { + return Err(Error::new(format!("Internal listener '{}' is no longer active", self.name))); + } + + let metadata = InternalConnectionMetadata { + listener_name: self.name.clone(), + buffer_size_kb: None, + created_at: Instant::now(), + endpoint_id, + }; + + let (upstream, downstream) = create_internal_connection_pair(metadata); + + let connection_pair = InternalConnectionPair { upstream: upstream.clone(), downstream: downstream.clone() }; + + if self.connection_sender.send(connection_pair.clone()).is_err() { + return Err(Error::new(format!("Failed to send connection to internal listener '{}'", self.name))); + } + + Ok(connection_pair) + } +} + +#[derive(Debug, Clone)] +pub struct InternalConnectionPair { + pub upstream: Arc, + pub downstream: Arc, +} + +#[derive(Debug)] +pub struct InternalStream { + metadata: InternalConnectionMetadata, + stream: tokio::io::DuplexStream, + is_closed: Arc>, +} + +impl InternalStream { + fn new(metadata: InternalConnectionMetadata, stream: tokio::io::DuplexStream) -> Self { + Self { metadata, stream, is_closed: Arc::new(RwLock::new(false)) } + } +} + +impl InternalStream { + pub fn metadata(&self) -> &InternalConnectionMetadata { + &self.metadata + } + + pub fn is_active(&self) -> bool { + if let Ok(is_closed) = self.is_closed.try_read() { + !*is_closed + } else { + false + } + } +} + +impl AsyncRead for InternalStream { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + tokio::io::AsyncRead::poll_read(std::pin::Pin::new(&mut self.get_mut().stream), cx, buf) + } +} + +impl AsyncWrite for InternalStream { + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + tokio::io::AsyncWrite::poll_write(std::pin::Pin::new(&mut self.get_mut().stream), cx, buf) + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + tokio::io::AsyncWrite::poll_flush(std::pin::Pin::new(&mut self.get_mut().stream), cx) + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + tokio::io::AsyncWrite::poll_shutdown(std::pin::Pin::new(&mut self.get_mut().stream), cx) + } +} + +#[derive(Debug)] +pub struct InternalConnectionFactory { + listeners: Arc>>, +} + +impl InternalConnectionFactory { + pub fn new() -> Self { + Self { listeners: Arc::new(RwLock::new(HashMap::new())) } + } + + pub async fn register_listener( + &self, + name: String, + ) -> Result<(InternalListenerHandle, mpsc::UnboundedReceiver, Arc<()>)> { + let (connection_tx, connection_rx) = mpsc::unbounded_channel(); + let listener_ref = Arc::new(()); + let weak_ref = Arc::downgrade(&listener_ref); + + let handle = InternalListenerHandle::new(name.clone(), connection_tx, weak_ref); + + let mut listeners = self.listeners.write().await; + + if listeners.contains_key(&name) { + return Err(Error::new(format!("Internal listener '{}' is already registered", name))); + } + + listeners.insert(name, handle.clone()); + Ok((handle, connection_rx, listener_ref)) + } + + pub async fn unregister_listener(&self, name: &str) -> Result<()> { + let mut listeners = self.listeners.write().await; + + if listeners.remove(name).is_none() { + return Err(Error::new(format!("Internal listener '{}' was not registered", name))); + } + + Ok(()) + } + + pub async fn connect_to_listener(&self, name: &str, endpoint_id: Option) -> Result { + let listeners = self.listeners.read().await; + let handle = + listeners.get(name).ok_or_else(|| Error::new(format!("Internal listener '{}' not found", name)))?; + + let connection_pair = handle.create_connection(endpoint_id)?; + Ok(Box::new(InternalStreamWrapper(connection_pair.upstream))) + } + + pub async fn list_listeners(&self) -> Vec { + let listeners = self.listeners.read().await; + listeners.keys().map(String::clone).collect() + } + + pub async fn is_listener_active(&self, name: &str) -> bool { + let listeners = self.listeners.read().await; + listeners.get(name).map_or(false, |handle| handle.is_alive()) + } + + pub async fn get_stats(&self) -> InternalConnectionStats { + let listeners = self.listeners.read().await; + let active_listeners = listeners.len(); + + InternalConnectionStats { active_listeners, total_pooled_connections: 0, max_pooled_connections: 0 } + } +} + +impl Default for InternalConnectionFactory { + fn default() -> Self { + Self::new() + } +} + +#[derive(Debug, Clone)] +pub struct InternalConnectionStats { + pub active_listeners: usize, + pub total_pooled_connections: usize, + pub max_pooled_connections: usize, +} + +pub struct InternalStreamWrapper(Arc); + +impl Deref for InternalStreamWrapper { + type Target = InternalStream; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl AsyncRead for InternalStreamWrapper { + fn poll_read( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + _buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + // InternalStreamWrapper is read-only - actual I/O happens in InternalStream + std::task::Poll::Pending + } +} + +impl AsyncWrite for InternalStreamWrapper { + fn poll_write( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + _buf: &[u8], + ) -> std::task::Poll> { + std::task::Poll::Pending + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } +} + +fn create_internal_connection_pair(metadata: InternalConnectionMetadata) -> (Arc, Arc) { + let (upstream_io, downstream_io) = tokio::io::duplex(1024); + + let upstream = Arc::new(InternalStream::new(metadata.clone(), upstream_io)); + let downstream = Arc::new(InternalStream::new(metadata, downstream_io)); + + (upstream, downstream) +} + +static GLOBAL_FACTORY: std::sync::OnceLock = std::sync::OnceLock::new(); + +pub fn global_internal_connection_factory() -> &'static InternalConnectionFactory { + GLOBAL_FACTORY.get_or_init(InternalConnectionFactory::new) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_factory_creation() { + let factory = InternalConnectionFactory::new(); + let stats = factory.get_stats().await; + assert_eq!(stats.active_listeners, 0); + assert_eq!(stats.total_pooled_connections, 0); + } + + #[tokio::test] + async fn test_listener_registration() { + let factory = InternalConnectionFactory::new(); + + let result = factory.register_listener("test_listener".to_string()).await; + assert!(result.is_ok()); + let (_handle, _rx, _listener_ref) = result.unwrap(); + + let stats = factory.get_stats().await; + assert_eq!(stats.active_listeners, 1); + + let result = factory.register_listener("test_listener".to_string()).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_listener_unregistration() { + let factory = InternalConnectionFactory::new(); + + let (_handle, _rx, _listener_ref) = factory.register_listener("test_listener".to_string()).await.unwrap(); + let result = factory.unregister_listener("test_listener").await; + assert!(result.is_ok()); + + let stats = factory.get_stats().await; + assert_eq!(stats.active_listeners, 0); + + let result = factory.unregister_listener("non_existent").await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_connection_to_non_existent_listener() { + let factory = InternalConnectionFactory::new(); + + let result = factory.connect_to_listener("non_existent", None).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_listener_lifecycle() { + let factory = InternalConnectionFactory::new(); + + let (handle, _rx, _listener_ref) = factory.register_listener("test_listener".to_string()).await.unwrap(); + + assert!(factory.is_listener_active("test_listener").await); + assert!(handle.is_alive()); + + factory.unregister_listener("test_listener").await.unwrap(); + + assert!(!factory.is_listener_active("test_listener").await); + } + + #[tokio::test] + async fn test_list_listeners() { + let factory = InternalConnectionFactory::new(); + + let listeners = factory.list_listeners().await; + assert!(listeners.is_empty()); + + let (_handle1, _rx1, _listener_ref1) = factory.register_listener("listener1".to_string()).await.unwrap(); + let (_handle2, _rx2, _listener_ref2) = factory.register_listener("listener2".to_string()).await.unwrap(); + + let listeners = factory.list_listeners().await; + assert_eq!(listeners.len(), 2); + assert!(listeners.contains(&String::from("listener1"))); + assert!(listeners.contains(&String::from("listener2"))); + } + + #[tokio::test] + async fn test_global_factory() { + let factory = global_internal_connection_factory(); + let stats = factory.get_stats().await; + assert_eq!(stats.max_pooled_connections, 0); + } +} diff --git a/orion-lib/src/transport/mod.rs b/orion-lib/src/transport/mod.rs index d8bfcd3a..c008fb54 100644 --- a/orion-lib/src/transport/mod.rs +++ b/orion-lib/src/transport/mod.rs @@ -21,6 +21,8 @@ pub mod bind_device; pub mod connector; mod grpc_channel; mod http_channel; +pub mod internal_cluster_connector; +pub mod internal_connection; mod resolver; pub mod tcp_channel; pub use resolver::resolve; @@ -33,6 +35,11 @@ pub mod transport_socket; pub use self::{ grpc_channel::{GrpcService, SimpleRoundRobinGrpcServiceLB}, http_channel::{HttpChannel, HttpChannelBuilder}, + internal_cluster_connector::InternalChannelConnector, + internal_connection::{ + global_internal_connection_factory, InternalConnectionFactory, InternalConnectionPair, InternalConnectionStats, + InternalListenerHandle, + }, proxy_protocol::ProxyProtocolReader, tcp_channel::TcpChannelConnector, tlv_listener_filter::TlvListenerFilter, diff --git a/orion-lib/tests/internal_connection_integration.rs b/orion-lib/tests/internal_connection_integration.rs new file mode 100644 index 00000000..5efc61d8 --- /dev/null +++ b/orion-lib/tests/internal_connection_integration.rs @@ -0,0 +1,176 @@ +// Copyright 2025 The kmesh Authors +// +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +use orion_lib::{global_internal_connection_factory, InternalChannelConnector}; + +#[tokio::test] +async fn test_complete_internal_connection_flow() { + let factory = global_internal_connection_factory(); + + let listener_name = "test_integration_listener_global"; + let (_handle, mut connection_receiver, _listener_ref) = + factory.register_listener(listener_name.to_string()).await.expect("Failed to register listener"); + + assert!(factory.is_listener_active(listener_name).await); + let listeners = factory.list_listeners().await; + assert!(listeners.contains(&listener_name.to_string())); + + let cluster_connector = + InternalChannelConnector::new(listener_name.to_string(), "test_cluster", Some("endpoint1".to_string())); + + assert!(cluster_connector.is_available().await); + + let connection_future = cluster_connector.connect(); + let listener_future = async { connection_receiver.recv().await }; + + let (cluster_connection, listener_connection) = tokio::join!(connection_future, listener_future); + + assert!(cluster_connection.is_ok(), "Cluster connection failed: {:?}", cluster_connection.err()); + assert!(listener_connection.is_some(), "Listener didn't receive connection"); + + let cluster_channel = cluster_connection.unwrap(); + let listener_pair = listener_connection.unwrap(); + + assert_eq!(cluster_channel.cluster_name(), "test_cluster"); + assert_eq!(cluster_channel.listener_name(), listener_name); + assert_eq!(cluster_channel.endpoint_id(), Some("endpoint1")); + + let metadata = listener_pair.downstream.metadata(); + assert_eq!(metadata.listener_name, listener_name); + assert_eq!(metadata.buffer_size_kb, None); + assert_eq!(metadata.endpoint_id, Some("endpoint1".to_string())); + + assert!(listener_pair.upstream.is_active()); + assert!(listener_pair.downstream.is_active()); + + factory.unregister_listener(listener_name).await.expect("Failed to unregister listener"); + assert!(!factory.is_listener_active(listener_name).await); +} + +#[tokio::test] +async fn test_connection_pooling() { + let factory = global_internal_connection_factory(); + + let listener_name = "test_pooling_listener_global"; + let (_handle, mut connection_receiver, _listener_ref) = + factory.register_listener(listener_name.to_string()).await.expect("Failed to register listener"); + + let connector = InternalChannelConnector::new(listener_name.to_string(), "test_cluster", None); + + let connection_future = connector.connect(); + let listener_future = connection_receiver.recv(); + let (_cluster_conn, listener_pair) = tokio::join!(connection_future, listener_future); + + assert!(listener_pair.is_some()); + + let stats = factory.get_stats().await; + assert!(stats.active_listeners >= 1); + + factory.unregister_listener(listener_name).await.expect("Failed to unregister listener"); +} + +#[tokio::test] +async fn test_error_scenarios() { + let factory = global_internal_connection_factory(); + + let connector = InternalChannelConnector::new("non_existent_listener".to_string(), "test_cluster", None); + + assert!(!connector.is_available().await); + let result = connector.connect().await; + assert!(result.is_err()); + + let listener_name = "test_error_listener_global"; + let result1 = factory.register_listener(listener_name.to_string()).await; + assert!(result1.is_ok()); + let (_handle1, _rx1, _listener_ref1) = result1.unwrap(); + + let result2 = factory.register_listener(listener_name.to_string()).await; + assert!(result2.is_err()); + + let result = factory.unregister_listener("non_existent").await; + assert!(result.is_err()); + + factory.unregister_listener(listener_name).await.expect("Failed to unregister listener"); +} + +#[tokio::test] +async fn test_connection_to_unregistered_listener() { + let _factory = global_internal_connection_factory(); + + let connector = InternalChannelConnector::new("non_existent_timeout_listener".to_string(), "test_cluster", None); + + let result = connector.connect().await; + assert!(result.is_err()); + + assert!(!connector.is_available().await); +} + +#[tokio::test] +async fn test_global_factory() { + let factory = global_internal_connection_factory(); + + let listener_name = "test_global_listener"; + let (_handle, _rx, _listener_ref) = factory.register_listener(listener_name.to_string()).await.unwrap(); + + assert!(factory.is_listener_active(listener_name).await); + + let stats = factory.get_stats().await; + assert!(stats.active_listeners > 0); + + factory.unregister_listener(listener_name).await.expect("Failed to unregister listener"); +} + +#[tokio::test] +async fn test_statistics_and_monitoring() { + let factory = global_internal_connection_factory(); + + let listener1 = "test_stats_listener1_global"; + + let (_handle1, _rx1, _listener_ref1) = + factory.register_listener(listener1.to_string()).await.expect("Failed to register listener1"); + + let stats = factory.get_stats().await; + assert!(stats.active_listeners >= 1); + + let listeners = factory.list_listeners().await; + assert!(listeners.contains(&listener1.to_string())); + + factory.unregister_listener(listener1).await.expect("Failed to unregister listener1"); +} + +#[tokio::test] +async fn test_cluster_helpers() { + use orion_configuration::config::cluster::InternalEndpointAddress; + use orion_lib::cluster_helpers::*; + + let internal_addr = InternalEndpointAddress { + server_listener_name: "test_listener".to_string().into(), + endpoint_id: Some("endpoint1".to_string().into()), + }; + + let connector = create_internal_connector(&internal_addr, "test_cluster"); + assert_eq!(connector.cluster_name(), "test_cluster"); + assert_eq!(connector.listener_name(), "test_listener"); + + assert!(!is_internal_listener_available("non_existent").await); + + let stats = get_internal_connection_stats().await; + assert_eq!(stats.max_pooled_connections, 0); + + let listeners = list_internal_listeners().await; + assert!(listeners.is_empty()); +} From e94254566e44266aced32613e91f82e01f8330df Mon Sep 17 00:00:00 2001 From: Eeshu-Yadav Date: Fri, 3 Oct 2025 09:15:31 +0530 Subject: [PATCH 2/5] feat: implement internal connection factory for in-process communication - InternalConnectionWorkerPool with efficient task distribution - Special address range for internal connections (127.255.255.x) - Enhanced connection metadata with listener_name and endpoint_id - Performance optimization replacing tokio::spawn overhead Signed-off-by: Eeshu-Yadav --- ...nternal-listener-and-upstream-transport.md | 370 ++++++++++++++++++ orion-lib/src/listeners/filter_state.rs | 21 +- orion-lib/src/listeners/listener.rs | 153 +++++--- 3 files changed, 492 insertions(+), 52 deletions(-) create mode 100644 docs/proposal/internal-listener-and-upstream-transport.md diff --git a/docs/proposal/internal-listener-and-upstream-transport.md b/docs/proposal/internal-listener-and-upstream-transport.md new file mode 100644 index 00000000..677cda4e --- /dev/null +++ b/docs/proposal/internal-listener-and-upstream-transport.md @@ -0,0 +1,370 @@ +--- +title: Internal Listener and Upstream Transport Support +authors: +- "@Eeshu-Yadav" +reviewers: +- "@YaoZengzeng" +- "@dawid-nowak" +approvers: +- "@YaoZengzeng" +- "@dawid-nowak" + +creation-date: 2025-10-03 + +--- + +## Internal Listener and Upstream Transport Support + +### Summary + +This proposal implements internal listener and upstream transport functionality in Orion to enable waypoint proxy capabilities. Internal listeners allow in-process communication without network APIs, while internal upstream transport enables metadata passthrough between proxy hops. This enables ambient mesh deployments with TCP proxy chaining and multi-hop routing. + +### Motivation + +To support ambient service mesh deployments, Orion needs: + +1. **Internal connections**: Accept connections from within the same process via in-memory channels +2. **Name-based routing**: Route to internal listeners by name instead of network addresses +3. **Metadata propagation**: Preserve request context across proxy hops for routing and observability +4. **Performance optimization**: Eliminate network stack overhead for co-located proxy communication + +#### Goals + +- Implement Envoy-compatible internal listener and upstream transport support +- Enable clusters to connect via `server_listener_name` with metadata passthrough +- Provide thread-safe connection handling with proper lifecycle management +- Maintain full compatibility with Envoy xDS configurations +- Ensure zero performance regression for network listeners + + +### Proposal + +The proposal introduces three main components to enable internal listener and upstream transport functionality: + +1. **Internal Connection Factory**: A global, thread-safe registry that manages internal listener registration and connection establishment between clusters and listeners within the same proxy instance. + +2. **Enhanced Internal Listener Runtime**: Extension of the existing listener infrastructure to handle internal connections, process them through filter chains, and manage lifecycle events. + +3. **Internal Upstream Transport**: Implementation of cluster-side functionality to establish connections to internal endpoints and pass metadata through the transport socket layer. + +The implementation follows Envoy's internal listener design while leveraging Rust's type system and async runtime for safety and performance. + + +#### Notes + +**Design Decisions:** +- In-process only communication using Tokio duplex streams +- Global factory pattern with `std::sync::OnceLock` for thread-safe initialization +- Weak references for automatic lifecycle management +- Initial support for host, cluster, and route metadata (request metadata in future) +- Full Envoy configuration compatibility with listener name validation + + +### Design Details + +#### Architecture Overview + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ Orion Proxy Process │ +│ │ +│ ┌──────────────┐ ┌──────────────┐ │ +│ │ External │ │ Internal │ │ +│ │ Listener │ │ Listener │ │ +│ │ (Network) │ │ (In-Memory) │ │ +│ └──────┬───────┘ └──────┬───────┘ │ +│ │ │ │ +│ │ TCP Connection │ Register │ +│ ▼ ▼ │ +│ ┌────────────────────────────────────────────────────┐ │ +│ │ Internal Connection Factory │ │ +│ │ ┌──────────────────────────────────────────┐ │ │ +│ │ │ Listener Registry │ │ │ +│ │ │ HashMap │ │ │ +│ │ └──────────────────────────────────────────┘ │ │ +│ └────────────────────────────┬───────────────────────┘ │ +│ │ │ +│ │ Connect │ +│ ▼ │ +│ ┌──────────────┐ ┌─────────────────┐ │ +│ │ Cluster │─────▶│ TCP Proxy │ │ +│ │ (Internal │ │ Filter │ │ +│ │ Endpoint) │ └─────────────────┘ │ +│ └──────────────┘ │ +│ │ │ +│ │ Internal Connection (Duplex Stream) │ +│ ▼ │ +│ ┌──────────────────────────────────────┐ │ +│ │ Internal Upstream Transport │ │ +│ │ - Metadata Passthrough │ │ +│ │ - Host/Cluster/Route Metadata │ │ +│ └──────────────────────────────────────┘ │ +└─────────────────────────────────────────────────────────────────┘ +``` + +#### Component Details + +##### 1. Internal Connection Factory + +**Location**: `orion-lib/src/transport/internal_connection.rs` + +```rust +pub struct InternalConnectionFactory { + listeners: Arc>>, +} + +pub struct InternalListenerHandle { + pub name: String, + pub connection_sender: mpsc::UnboundedSender, + listener_ref: Weak<()>, +} + +pub struct InternalConnectionPair { + pub upstream: Arc, + pub downstream: Arc, +} + +pub struct InternalStream { + metadata: InternalConnectionMetadata, + stream: tokio::io::DuplexStream, + is_closed: Arc>, +} +``` + +**Key Operations**: `register_listener`, `unregister_listener`, `connect_to_listener`, `is_listener_active`, `list_listeners`, `get_stats` + +##### 2. Enhanced Internal Listener Runtime + +**Location**: `orion-lib/src/listeners/listener.rs` + +```rust +async fn run_internal_listener( + name: &'static str, + filter_chains: HashMap, + mut route_updates_receiver: broadcast::Receiver, + mut secret_updates_receiver: broadcast::Receiver, +) -> Error { + let factory = global_internal_connection_factory(); + let (_handle, mut connection_receiver, _listener_ref) = + factory.register_listener(name.to_string()).await?; + + loop { + tokio::select! { + Some(connection_pair) = connection_receiver.recv() => { + tokio::spawn(handle_internal_connection(connection_pair, filter_chains_clone)); + } + Ok(route_update) = route_updates_receiver.recv() => { + process_route_update(&name, &filter_chains, route_update); + } + Ok(secret_update) = secret_updates_receiver.recv() => { + process_secret_update(&name, &mut filter_chains_clone, secret_update); + } + } + } +} +``` + +##### 3. Internal Cluster Connector + +**Location**: `orion-lib/src/transport/internal_cluster_connector.rs` + +```rust +pub struct InternalClusterConnector { + listener_name: String, + endpoint_id: Option, +} + +impl InternalClusterConnector { + pub async fn connect(&self) -> Result { + let factory = global_internal_connection_factory(); + factory.connect_to_listener(&self.listener_name, self.endpoint_id.clone()).await + } +} + +pub struct InternalChannelConnector { + connector: InternalClusterConnector, + cluster_name: &'static str, +} +``` + +##### 4. Configuration Data Structures + +**Listener Configuration** (`orion-configuration/src/config/listener.rs`): + +```rust +pub enum ListenerAddress { + Socket(SocketAddr), + Internal(InternalListener), +} + +pub struct InternalListener { + pub buffer_size_kb: Option, +} +``` + +**Cluster Configuration** (`orion-configuration/src/config/cluster.rs`): + +```rust +pub enum EndpointAddress { + Socket(SocketAddr), + Internal(InternalEndpointAddress), +} + +pub struct InternalEndpointAddress { + pub server_listener_name: CompactString, + pub endpoint_id: Option, +} + +pub enum TransportSocket { + InternalUpstream(InternalUpstreamTransport), + RawBuffer, +} + +pub struct InternalUpstreamTransport { + pub passthrough_metadata: Vec, + pub transport_socket: Box, +} + +pub struct MetadataValueSource { + pub kind: MetadataKind, + pub name: CompactString, +} + +pub enum MetadataKind { + Host, + Route, + Cluster, +} +``` + +##### 5. Example Configuration + +**Bootstrap Configuration**: + +```yaml +static_resources: + listeners: + # External listener accepting network connections + - name: "listener_0" + address: + socket_address: + address: "0.0.0.0" + port_value: 15001 + filter_chains: + - filters: + - name: "tcp_proxy" + typed_config: + "@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy" + stat_prefix: "ingress_tcp" + cluster: "internal_cluster" + + # Internal listener accepting in-process connections + - name: "waypoint_internal" + address: + envoy_internal_address: + server_listener_name: "waypoint_internal" + filter_chains: + - filters: + - name: "http_connection_manager" + typed_config: + "@type": "type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager" + stat_prefix: "waypoint_http" + route_config: + name: "local_route" + virtual_hosts: + - name: "backend" + domains: ["*"] + routes: + - match: { prefix: "/" } + route: { cluster: "backend_service" } + + clusters: + # Cluster routing to internal listener + - name: "internal_cluster" + type: "STATIC" + load_assignment: + cluster_name: "internal_cluster" + endpoints: + - lb_endpoints: + - endpoint: + address: + envoy_internal_address: + server_listener_name: "waypoint_internal" + transport_socket: + name: "internal_upstream" + typed_config: + "@type": "type.googleapis.com/envoy.extensions.transport_sockets.internal_upstream.v3.InternalUpstreamTransport" + passthrough_metadata: + - kind: { host: {} } + name: "envoy.filters.listener.original_dst" + - kind: { cluster: {} } + name: "istio.workload" + transport_socket: + name: "raw_buffer" + typed_config: + "@type": "type.googleapis.com/envoy.extensions.transport_sockets.raw_buffer.v3.RawBuffer" + + # Backend service cluster + - name: "backend_service" + type: "STATIC" + load_assignment: + cluster_name: "backend_service" + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: "10.0.0.5" + port_value: 8080 +``` + +#### Implementation Phases + +The implementation is divided into four GitHub issues for manageable development and review: + +**Phase 1: Internal Connection Factory** - Connection factory with thread-safe registry and lifecycle management + +**Phase 2: Enhanced Internal Listener Runtime** - Connection acceptance and filter chain integration + +**Phase 3: Cluster Internal Connection Support** - Cluster connectors with load balancing for internal endpoints + +**Phase 4: Internal Upstream Transport & Metadata Passthrough** - Metadata extraction and passthrough implementation + +#### Test Plan + +**Unit Tests**: +- Listener registration/unregistration in connection factory +- Connection establishment between listeners and clusters +- Thread safety and concurrent access +- Error handling for non-existent/inactive listeners + +**Integration Tests**: +- End-to-end flow: External listener → Internal listener → Backend +- Configuration parsing and validation +- Metadata propagation across proxy hops + +--- + +## References + +1. [Envoy Internal Listener Documentation](https://www.envoyproxy.io/docs/envoy/latest/configuration/other_features/internal_listener) +2. [Envoy Internal Upstream Transport Proto](https://www.envoyproxy.io/docs/envoy/latest/api-v3/extensions/transport_sockets/internal_upstream/v3/internal_upstream.proto) +3. [Envoy Metadata Types](https://www.envoyproxy.io/docs/envoy/latest/api-v3/type/metadata/v3/metadata.proto) + + +--- + +## Appendix + +### Glossary + +- **Internal Listener**: A listener that accepts connections from within the proxy process rather than from the network +- **Waypoint Proxy**: A shared proxy in ambient service mesh that handles L7 processing for multiple workloads +- **Internal Upstream Transport**: Transport socket that enables metadata passthrough for internal connections +- **Server Listener Name**: Unique identifier for an internal listener used by clusters to establish connections +- **Metadata Passthrough**: Mechanism to propagate context (host/cluster/route metadata) across proxy hops +- **Duplex Stream**: Bidirectional async I/O stream provided by Tokio for in-memory communication + +### Acknowledgments + +This feature was proposed by @YaoZengzeng in this [issue](https://github.com/kmesh-net/orion/issues/59) and has been reviewed by @dawid-nowak @YaoZengzeng. The design follows Envoy's internal listener specification while adapting to Orion's Rust-based architecture and async runtime. diff --git a/orion-lib/src/listeners/filter_state.rs b/orion-lib/src/listeners/filter_state.rs index 28652dd4..3f5e0df6 100644 --- a/orion-lib/src/listeners/filter_state.rs +++ b/orion-lib/src/listeners/filter_state.rs @@ -19,6 +19,11 @@ use compact_str::CompactString; use orion_configuration::config::common::TlvType; use std::{collections::HashMap, net::SocketAddr}; +const INTERNAL_PEER_ADDR: std::net::SocketAddr = + std::net::SocketAddr::V4(std::net::SocketAddrV4::new(std::net::Ipv4Addr::new(127, 255, 255, 254), 65534)); +const INTERNAL_LOCAL_ADDR: std::net::SocketAddr = + std::net::SocketAddr::V4(std::net::SocketAddrV4::new(std::net::Ipv4Addr::new(127, 255, 255, 255), 65535)); + #[derive(Debug, Clone)] pub enum DownstreamConnectionMetadata { FromSocket { @@ -39,6 +44,9 @@ pub enum DownstreamConnectionMetadata { proxy_peer_address: SocketAddr, proxy_local_address: SocketAddr, }, + /// Internal connections from in-process communication (e.g., waypoint proxy) + /// - listener_name: identifies which internal listener accepted this connection + /// - endpoint_id: optional identifier for the specific endpoint (used for load balancing) FromInternal { listener_name: String, endpoint_id: Option, @@ -51,17 +59,26 @@ impl DownstreamConnectionMetadata { Self::FromSocket { peer_address, .. } => *peer_address, Self::FromProxyProtocol { original_peer_address, .. } => *original_peer_address, Self::FromTlv { proxy_peer_address, .. } => *proxy_peer_address, - Self::FromInternal { .. } => SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), + Self::FromInternal { .. } => INTERNAL_PEER_ADDR, } } + pub fn local_address(&self) -> SocketAddr { match self { Self::FromSocket { local_address, .. } => *local_address, Self::FromProxyProtocol { original_destination_address, .. } => *original_destination_address, Self::FromTlv { original_destination_address, .. } => *original_destination_address, - Self::FromInternal { .. } => SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0), + Self::FromInternal { .. } => INTERNAL_LOCAL_ADDR, } } + + pub fn is_internal(&self) -> bool { + matches!(self, Self::FromInternal { .. }) + } + + pub fn is_internal_address(addr: SocketAddr) -> bool { + addr == INTERNAL_PEER_ADDR || addr == INTERNAL_LOCAL_ADDR + } } #[derive(Debug, Clone)] diff --git a/orion-lib/src/listeners/listener.rs b/orion-lib/src/listeners/listener.rs index e16b3244..3dc85e10 100644 --- a/orion-lib/src/listeners/listener.rs +++ b/orion-lib/src/listeners/listener.rs @@ -50,10 +50,100 @@ use std::{ }; use tokio::{ net::{TcpListener, TcpSocket}, - sync::broadcast::{self}, + sync::{ + broadcast::{self}, + mpsc, + }, }; use tracing::{debug, info, warn}; +async fn handle_internal_connection_static( + listener_name: String, + connection_pair: crate::transport::InternalConnectionPair, + filter_chains: Arc>, +) -> Result<()> { + use crate::listeners::filter_state::DownstreamConnectionMetadata; + + debug!("Handling new internal connection for listener '{}'", listener_name); + + let downstream_metadata = DownstreamConnectionMetadata::FromInternal { + listener_name: listener_name.clone(), + endpoint_id: connection_pair.downstream.metadata().endpoint_id.clone(), + }; + + let filter_chain = match Listener::select_filterchain(&filter_chains, &downstream_metadata, None)? { + Some(fc) => fc, + None => { + warn!("No matching filter chain found for internal connection"); + return Err(crate::Error::new("No matching filter chain")); + }, + }; + + let _downstream_stream = connection_pair.downstream; + + match &filter_chain.handler { + crate::listeners::filterchain::ConnectionHandler::Http(_http_manager) => { + info!("Processing internal connection through HTTP filter chain"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + Ok(()) + }, + crate::listeners::filterchain::ConnectionHandler::Tcp(_tcp_proxy) => { + info!("Processing internal connection through TCP filter chain"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + Ok(()) + }, + } +} + +#[derive(Debug)] +struct InternalConnectionWorkerPool { + senders: Vec>, + next_worker: std::sync::atomic::AtomicUsize, +} + +#[derive(Debug)] +struct InternalConnectionTask { + listener_name: String, + connection_pair: crate::transport::InternalConnectionPair, + filter_chains: Arc>, +} + +impl InternalConnectionWorkerPool { + fn new(num_workers: usize) -> Self { + let mut senders: Vec> = Vec::with_capacity(num_workers); + + for _ in 0..num_workers { + let (sender, mut receiver) = mpsc::unbounded_channel(); + senders.push(sender); + + let worker = tokio::spawn(async move { + while let Some(task) = receiver.recv().await { + if let Err(e) = + handle_internal_connection_static(task.listener_name, task.connection_pair, task.filter_chains) + .await + { + warn!("Error handling internal connection task: {}", e); + } + } + }); + drop(worker); + } + + Self { senders, next_worker: std::sync::atomic::AtomicUsize::new(0) } + } + + fn submit_task(&self, task: InternalConnectionTask) -> Result<()> { + let worker_index = self.next_worker.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % self.senders.len(); + self.senders[worker_index].send(task).map_err(|_| Error::new("Worker pool is shut down")) + } +} + +static INTERNAL_WORKER_POOL: std::sync::OnceLock = std::sync::OnceLock::new(); + +fn get_internal_worker_pool() -> &'static InternalConnectionWorkerPool { + INTERNAL_WORKER_POOL.get_or_init(|| InternalConnectionWorkerPool::new(4)) // 4 workers by default +} + #[derive(Debug, Clone)] struct PartialListener { name: &'static str, @@ -355,7 +445,7 @@ impl Listener { }, }; - info!("Internal listener '{}' registered with connection factory", name); + info!("Internal listener '{}' registered to connection factory", name); loop { tokio::select! { @@ -367,15 +457,17 @@ impl Listener { let filter_chains_clone = filter_chains.clone(); let listener_name = name.to_string(); - tokio::spawn(async move { - if let Err(e) = Self::handle_internal_connection( - listener_name, - connection_pair, - filter_chains_clone, - ).await { - warn!("Error handling internal connection: {}", e); - } - }); + // Use worker pool instead of tokio::spawn for better performance + // with large numbers of short connections + let task = InternalConnectionTask { + listener_name, + connection_pair, + filter_chains: filter_chains_clone, + }; + + if let Err(e) = get_internal_worker_pool().submit_task(task) { + warn!("Failed to submit internal connection task: {}", e); + } } None => { warn!("Internal listener '{}' connection channel closed", name); @@ -418,45 +510,6 @@ impl Listener { Error::new("Internal listener shutdown") } - async fn handle_internal_connection( - listener_name: String, - connection_pair: crate::transport::InternalConnectionPair, - filter_chains: Arc>, - ) -> Result<()> { - use crate::listeners::filter_state::DownstreamConnectionMetadata; - use tracing::{debug, info, warn}; - - debug!("Handling new internal connection for listener '{}'", listener_name); - - let downstream_metadata = DownstreamConnectionMetadata::FromInternal { - listener_name: listener_name.clone(), - endpoint_id: connection_pair.downstream.metadata().endpoint_id.clone(), - }; - - let filter_chain = match Self::select_filterchain(&filter_chains, &downstream_metadata, None)? { - Some(fc) => fc, - None => { - warn!("No matching filter chain found for internal connection"); - return Err(crate::Error::new("No matching filter chain")); - }, - }; - - let _downstream_stream = connection_pair.downstream; - - match &filter_chain.handler { - crate::listeners::filterchain::ConnectionHandler::Http(_http_manager) => { - info!("Processing internal connection through HTTP filter chain"); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - Ok(()) - }, - crate::listeners::filterchain::ConnectionHandler::Tcp(_tcp_proxy) => { - info!("Processing internal connection through TCP filter chain"); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - Ok(()) - }, - } - } - fn select_filterchain<'a, T>( filter_chains: &'a HashMap, downstream_metadata: &DownstreamConnectionMetadata, From d32ae909961fca9cf789cddb204fd5a688a94a63 Mon Sep 17 00:00:00 2001 From: Eeshu-Yadav Date: Fri, 3 Oct 2025 09:35:22 +0530 Subject: [PATCH 3/5] fix: resolve clippy warnings and improve code quality and done some fixes Signed-off-by: Eeshu-Yadav --- orion-configuration/src/config/cluster.rs | 6 +- orion-configuration/src/config/core.rs | 2 +- orion-configuration/src/config/runtime.rs | 10 +- orion-lib/src/clusters/load_assignment.rs | 2 +- orion-lib/src/listeners/filter_state.rs | 12 +- .../src/listeners/http_connection_manager.rs | 4 +- .../http_connection_manager/route.rs | 4 +- orion-lib/src/listeners/listener.rs | 147 +++++++++--------- orion-lib/src/listeners/listeners_manager.rs | 6 +- .../transport/internal_cluster_connector.rs | 8 +- .../src/transport/internal_connection.rs | 112 +++++++++---- orion-proxy/src/proxy.rs | 4 +- 12 files changed, 194 insertions(+), 123 deletions(-) diff --git a/orion-configuration/src/config/cluster.rs b/orion-configuration/src/config/cluster.rs index 56f883d4..6341a3c7 100644 --- a/orion-configuration/src/config/cluster.rs +++ b/orion-configuration/src/config/cluster.rs @@ -155,7 +155,7 @@ impl From for Vec { Address::Internal(internal_addr) => Some(LbEndpoint { address: EndpointAddress::Internal(InternalEndpointAddress { server_listener_name: internal_addr.server_listener_name.into(), - endpoint_id: internal_addr.endpoint_id.map(|id| id.into()), + endpoint_id: internal_addr.endpoint_id.map(std::convert::Into::into), }), health_status: HealthStatus::default(), load_balancing_weight: NonZeroU32::MIN, @@ -193,7 +193,7 @@ impl EndpointAddress { pub fn into_addr(self) -> Result { match self { EndpointAddress::Socket(addr) => Ok(addr), - EndpointAddress::Internal(_) => Err("Cannot convert internal address to socket address".to_string()), + EndpointAddress::Internal(_) => Err("Cannot convert internal address to socket address".to_owned()), } } } @@ -785,7 +785,7 @@ mod envoy_conversions { Address::Socket(socket_addr) => Ok(EndpointAddress::Socket(socket_addr)), Address::Internal(internal_addr) => Ok(EndpointAddress::Internal(InternalEndpointAddress { server_listener_name: internal_addr.server_listener_name.into(), - endpoint_id: internal_addr.endpoint_id.map(|id| id.into()), + endpoint_id: internal_addr.endpoint_id.map(std::convert::Into::into), })), Address::Pipe(_, _) => { Err(GenericError::unsupported_variant("Pipe addresses are not supported for endpoints")) diff --git a/orion-configuration/src/config/core.rs b/orion-configuration/src/config/core.rs index 4d1c89a7..559a5981 100644 --- a/orion-configuration/src/config/core.rs +++ b/orion-configuration/src/config/core.rs @@ -305,7 +305,7 @@ pub mod envoy_conversions { impl std::fmt::Display for Address { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::Socket(addr) => write!(f, "{}", addr), + Self::Socket(addr) => write!(f, "{addr}"), Self::Internal(internal) => write!(f, "internal:{}", internal.server_listener_name), Self::Pipe(path, _) => f.write_str(path), } diff --git a/orion-configuration/src/config/runtime.rs b/orion-configuration/src/config/runtime.rs index 0442d07a..b9cbda7d 100644 --- a/orion-configuration/src/config/runtime.rs +++ b/orion-configuration/src/config/runtime.rs @@ -144,7 +144,7 @@ fn get_cgroup_v2_cpu_limit() -> crate::Result { } fn parse_cgroup_v2_cpu_max(content: &str) -> crate::Result { - let parts: Vec<&str> = content.trim().split_whitespace().collect(); + let parts: Vec<&str> = content.split_whitespace().collect(); if parts.len() == 2 && parts[0] != "max" { let quota: i64 = parts[0].parse()?; let period: i64 = parts[1].parse()?; @@ -160,8 +160,8 @@ fn parse_cgroup_v2_cpu_max(content: &str) -> crate::Result { fn get_cgroup_v1_cpu_limit() -> crate::Result { let cgroup_path = get_cgroup_v1_cpu_path()?; - let quota_path = format!("{}/cpu.cfs_quota_us", cgroup_path); - let period_path = format!("{}/cpu.cfs_period_us", cgroup_path); + let quota_path = format!("{cgroup_path}/cpu.cfs_quota_us"); + let period_path = format!("{cgroup_path}/cpu.cfs_period_us"); let quota_content = std::fs::read_to_string("a_path)?; let period_content = std::fs::read_to_string(&period_path)?; @@ -193,7 +193,7 @@ fn parse_cgroup_v1_cpu_path(cgroup_content: &str) -> crate::Result { let mut parts = line.split(':'); if let (Some(_), Some(controllers), Some(path)) = (parts.next(), parts.next(), parts.next()) { if controllers.split(',').any(|c| c == "cpu") { - return Some(format!("/sys/fs/cgroup/cpu{}", path)); + return Some(format!("/sys/fs/cgroup/cpu{path}")); } } None @@ -202,7 +202,7 @@ fn parse_cgroup_v1_cpu_path(cgroup_content: &str) -> crate::Result { } if std::path::Path::new("/sys/fs/cgroup/cpu").exists() { - Ok("/sys/fs/cgroup/cpu".to_string()) + Ok("/sys/fs/cgroup/cpu".to_owned()) } else { Err("CPU cgroup path not found".into()) } diff --git a/orion-lib/src/clusters/load_assignment.rs b/orion-lib/src/clusters/load_assignment.rs index 6d1faf37..2c3c2c82 100644 --- a/orion-lib/src/clusters/load_assignment.rs +++ b/orion-lib/src/clusters/load_assignment.rs @@ -80,7 +80,7 @@ impl EndpointAddressType { if let Ok(socket_addr) = addr_str.parse::() { EndpointAddress::Socket(socket_addr) } else { - panic!("Cannot convert authority back to socket address: {}", addr_str); + panic!("Cannot convert authority back to socket address: {addr_str}"); } }, EndpointAddressType::Internal(internal_addr, _) => EndpointAddress::Internal(internal_addr.clone()), diff --git a/orion-lib/src/listeners/filter_state.rs b/orion-lib/src/listeners/filter_state.rs index 3f5e0df6..15bf6d1f 100644 --- a/orion-lib/src/listeners/filter_state.rs +++ b/orion-lib/src/listeners/filter_state.rs @@ -19,11 +19,17 @@ use compact_str::CompactString; use orion_configuration::config::common::TlvType; use std::{collections::HashMap, net::SocketAddr}; +/// Reserved internal address used as the peer address for in-process (internal) connections. +/// Chosen from the loopback range (127.255.255.254:65534) to avoid conflicts with real network traffic. +/// This address clearly identifies internal connections in logs and debugging. const INTERNAL_PEER_ADDR: std::net::SocketAddr = std::net::SocketAddr::V4(std::net::SocketAddrV4::new(std::net::Ipv4Addr::new(127, 255, 255, 254), 65534)); + +/// Reserved internal address used as the local address for in-process (internal) connections. +/// Chosen from the loopback range (127.255.255.255:65535) to avoid conflicts with real network traffic. +/// This address clearly identifies internal connections in logs and debugging. const INTERNAL_LOCAL_ADDR: std::net::SocketAddr = std::net::SocketAddr::V4(std::net::SocketAddrV4::new(std::net::Ipv4Addr::new(127, 255, 255, 255), 65535)); - #[derive(Debug, Clone)] pub enum DownstreamConnectionMetadata { FromSocket { @@ -45,8 +51,8 @@ pub enum DownstreamConnectionMetadata { proxy_local_address: SocketAddr, }, /// Internal connections from in-process communication (e.g., waypoint proxy) - /// - listener_name: identifies which internal listener accepted this connection - /// - endpoint_id: optional identifier for the specific endpoint (used for load balancing) + /// - `listener_name`: identifies which internal listener accepted this connection + /// - `endpoint_id`: optional identifier for the specific endpoint (used for load balancing) FromInternal { listener_name: String, endpoint_id: Option, diff --git a/orion-lib/src/listeners/http_connection_manager.rs b/orion-lib/src/listeners/http_connection_manager.rs index 21f3a2c7..09457f8b 100644 --- a/orion-lib/src/listeners/http_connection_manager.rs +++ b/orion-lib/src/listeners/http_connection_manager.rs @@ -1024,7 +1024,7 @@ impl Service> for HttpRequestHandler { add, 1, trans_handler.thread_id(), - &[KeyValue::new("listener", listener_name_for_route.to_string())] + &[KeyValue::new("listener", listener_name_for_route.to_owned())] ); if let Some(state) = trans_handler.span_state.as_ref() { @@ -1049,7 +1049,7 @@ impl Service> for HttpRequestHandler { add, nbytes + resp_head_size as u64, trans_handler.thread_id(), - &[KeyValue::new("listener", listener_name_for_response.to_string())] + &[KeyValue::new("listener", listener_name_for_response.to_owned())] ); let is_transaction_complete = if let Some(ctx) = trans_handler.access_log_ctx.as_ref() { diff --git a/orion-lib/src/listeners/http_connection_manager/route.rs b/orion-lib/src/listeners/http_connection_manager/route.rs index 05e7b413..c5103f30 100644 --- a/orion-lib/src/listeners/http_connection_manager/route.rs +++ b/orion-lib/src/listeners/http_connection_manager/route.rs @@ -194,7 +194,7 @@ impl<'a> RequestHandler<(MatchedRequest<'a>, &HttpConnectionManager)> for &Route let err = err.into_inner(); let event_error = EventError::try_infer_from(&err); let flags = event_error.clone().map(ResponseFlags::from).unwrap_or_default(); - let event_kind = event_error.map_or(EventKind::ViaUpstream, |e| EventKind::Error(e)); + let event_kind = event_error.map_or(EventKind::ViaUpstream, EventKind::Error); debug!( "HttpConnectionManager Error processing response {:?}: {}({})", err, @@ -211,7 +211,7 @@ impl<'a> RequestHandler<(MatchedRequest<'a>, &HttpConnectionManager)> for &Route let err = err.into_inner(); let event_error = EventError::try_infer_from(&err); let flags = event_error.clone().map(ResponseFlags::from).unwrap_or_default(); - let event_kind = event_error.map_or(EventKind::ViaUpstream, |e| EventKind::Error(e)); + let event_kind = event_error.map_or(EventKind::ViaUpstream, EventKind::Error); debug!( "Failed to get an HTTP connection: {:?}: {}({})", err, diff --git a/orion-lib/src/listeners/listener.rs b/orion-lib/src/listeners/listener.rs index 3dc85e10..9abef41c 100644 --- a/orion-lib/src/listeners/listener.rs +++ b/orion-lib/src/listeners/listener.rs @@ -57,46 +57,9 @@ use tokio::{ }; use tracing::{debug, info, warn}; -async fn handle_internal_connection_static( - listener_name: String, - connection_pair: crate::transport::InternalConnectionPair, - filter_chains: Arc>, -) -> Result<()> { - use crate::listeners::filter_state::DownstreamConnectionMetadata; - - debug!("Handling new internal connection for listener '{}'", listener_name); - - let downstream_metadata = DownstreamConnectionMetadata::FromInternal { - listener_name: listener_name.clone(), - endpoint_id: connection_pair.downstream.metadata().endpoint_id.clone(), - }; - - let filter_chain = match Listener::select_filterchain(&filter_chains, &downstream_metadata, None)? { - Some(fc) => fc, - None => { - warn!("No matching filter chain found for internal connection"); - return Err(crate::Error::new("No matching filter chain")); - }, - }; - - let _downstream_stream = connection_pair.downstream; - - match &filter_chain.handler { - crate::listeners::filterchain::ConnectionHandler::Http(_http_manager) => { - info!("Processing internal connection through HTTP filter chain"); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - Ok(()) - }, - crate::listeners::filterchain::ConnectionHandler::Tcp(_tcp_proxy) => { - info!("Processing internal connection through TCP filter chain"); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - Ok(()) - }, - } -} - #[derive(Debug)] struct InternalConnectionWorkerPool { + workers: Vec>, senders: Vec>, next_worker: std::sync::atomic::AtomicUsize, } @@ -111,6 +74,7 @@ struct InternalConnectionTask { impl InternalConnectionWorkerPool { fn new(num_workers: usize) -> Self { let mut senders: Vec> = Vec::with_capacity(num_workers); + let mut workers = Vec::with_capacity(num_workers); for _ in 0..num_workers { let (sender, mut receiver) = mpsc::unbounded_channel(); @@ -126,16 +90,24 @@ impl InternalConnectionWorkerPool { } } }); - drop(worker); + workers.push(worker); } - Self { senders, next_worker: std::sync::atomic::AtomicUsize::new(0) } + Self { workers, senders, next_worker: std::sync::atomic::AtomicUsize::new(0) } } fn submit_task(&self, task: InternalConnectionTask) -> Result<()> { let worker_index = self.next_worker.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % self.senders.len(); self.senders[worker_index].send(task).map_err(|_| Error::new("Worker pool is shut down")) } + + async fn shutdown(self) { + drop(self.senders); + + for worker in self.workers { + let _ = worker.await; + } + } } static INTERNAL_WORKER_POOL: std::sync::OnceLock = std::sync::OnceLock::new(); @@ -402,7 +374,7 @@ impl Listener { maybe_route_update = route_updates_receiver.recv() => { //todo: add context to the error here once orion-error lands match maybe_route_update { - Ok(route_update) => {Self::process_route_update(&name, &filter_chains, route_update)}, + Ok(route_update) => {Self::process_route_update(name, &filter_chains, route_update)}, Err(e) => {return e.into();} } }, @@ -411,7 +383,7 @@ impl Listener { Ok(secret_update) => { // todo: possibly expensive clone - may need to rethink this structure let mut filter_chains_clone = filter_chains.as_ref().clone(); - Self::process_secret_update(&name, &mut filter_chains_clone, secret_update); + Self::process_secret_update(name, &mut filter_chains_clone, secret_update); filter_chains = Arc::new(filter_chains_clone); } Err(e) => {return e.into();} @@ -436,8 +408,7 @@ impl Listener { let filter_chains = Arc::new(filter_chains); let factory = global_internal_connection_factory(); - let (_handle, mut connection_receiver, _listener_ref) = match factory.register_listener(name.to_string()).await - { + let (_handle, mut connection_receiver, _listener_ref) = match factory.register_listener(name.to_owned()).await { Ok(result) => result, Err(e) => { error!("Failed to register internal listener '{}': {}", name, e); @@ -450,35 +421,32 @@ impl Listener { loop { tokio::select! { maybe_connection = connection_receiver.recv() => { - match maybe_connection { - Some(connection_pair) => { - debug!("Internal listener '{}' received new connection", name); - - let filter_chains_clone = filter_chains.clone(); - let listener_name = name.to_string(); - - // Use worker pool instead of tokio::spawn for better performance - // with large numbers of short connections - let task = InternalConnectionTask { - listener_name, - connection_pair, - filter_chains: filter_chains_clone, - }; - - if let Err(e) = get_internal_worker_pool().submit_task(task) { - warn!("Failed to submit internal connection task: {}", e); - } - } - None => { - warn!("Internal listener '{}' connection channel closed", name); - break; + if let Some(connection_pair) = maybe_connection { + debug!("Internal listener '{}' received new connection", name); + + let filter_chains_clone = filter_chains.clone(); + let listener_name = name.to_owned(); + + // Use worker pool instead of tokio::spawn for better performance + // with large numbers of short connections + let task = InternalConnectionTask { + listener_name, + connection_pair, + filter_chains: filter_chains_clone, + }; + + if let Err(e) = get_internal_worker_pool().submit_task(task) { + warn!("Failed to submit internal connection task: {}", e); } + } else { + warn!("Internal listener '{}' connection channel closed", name); + break; } }, maybe_route_update = route_updates_receiver.recv() => { match maybe_route_update { Ok(route_update) => { - Self::process_route_update(&name, &filter_chains, route_update); + Self::process_route_update(name, &filter_chains, route_update); } Err(e) => { error!("Route update error for internal listener '{}': {}", name, e); @@ -490,7 +458,7 @@ impl Listener { match maybe_secret_update { Ok(secret_update) => { let mut filter_chains_clone = filter_chains.as_ref().clone(); - Self::process_secret_update(&name, &mut filter_chains_clone, secret_update); + Self::process_secret_update(name, &mut filter_chains_clone, secret_update); // TODO: Update the shared filter chains state for active connections } Err(e) => { @@ -616,8 +584,8 @@ impl Listener { let ssl = AtomicBool::new(false); defer! { - with_metric!(listeners::DOWNSTREAM_CX_DESTROY, add, 1, shard_id, &[KeyValue::new("listener", listener_name.to_string())]); - with_metric!(listeners::DOWNSTREAM_CX_ACTIVE, sub, 1, shard_id, &[KeyValue::new("listener", listener_name.to_string())]); + with_metric!(listeners::DOWNSTREAM_CX_DESTROY, add, 1, shard_id, &[KeyValue::new("listener", listener_name.to_owned())]); + with_metric!(listeners::DOWNSTREAM_CX_ACTIVE, sub, 1, shard_id, &[KeyValue::new("listener", listener_name.to_owned())]); if ssl.load(Ordering::Relaxed) { with_metric!(http::DOWNSTREAM_CX_SSL_ACTIVE, add, 1, shard_id, &[KeyValue::new("listener", listener_name)]); } @@ -656,7 +624,7 @@ impl Listener { add, 1, shard_id, - &[KeyValue::new("listener", listener_name.to_string())] + &[KeyValue::new("listener", listener_name.to_owned())] ); with_metric!( http::DOWNSTREAM_CX_SSL_ACTIVE, @@ -1014,3 +982,40 @@ filter_chains: assert_eq!(Listener::select_filterchain(&m, &metadata, Some("hello.world")).unwrap().copied(), Some(3)); } } + +async fn handle_internal_connection_static( + listener_name: String, + connection_pair: crate::transport::InternalConnectionPair, + filter_chains: Arc>, +) -> Result<()> { + use crate::listeners::filter_state::DownstreamConnectionMetadata; + + debug!("Handling new internal connection for listener '{}'", listener_name); + + let downstream_metadata = DownstreamConnectionMetadata::FromInternal { + listener_name: listener_name.clone(), + endpoint_id: connection_pair.downstream.metadata().endpoint_id.clone(), + }; + + let filter_chain = if let Some(fc) = Listener::select_filterchain(&filter_chains, &downstream_metadata, None)? { + fc + } else { + warn!("No matching filter chain found for internal connection"); + return Err(crate::Error::new("No matching filter chain")); + }; + + let _downstream_stream = connection_pair.downstream; + + match &filter_chain.handler { + crate::listeners::filterchain::ConnectionHandler::Http(_http_manager) => { + info!("Processing internal connection through HTTP filter chain"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + Ok(()) + }, + crate::listeners::filterchain::ConnectionHandler::Tcp(_tcp_proxy) => { + info!("Processing internal connection through TCP filter chain"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + Ok(()) + }, + } +} diff --git a/orion-lib/src/listeners/listeners_manager.rs b/orion-lib/src/listeners/listeners_manager.rs index b6683148..4e692421 100644 --- a/orion-lib/src/listeners/listeners_manager.rs +++ b/orion-lib/src/listeners/listeners_manager.rs @@ -119,7 +119,7 @@ impl ListenersManager { warn!("Internal problem when updating a route: {e}"); } }, - _ = ct.cancelled() => { + () = ct.cancelled() => { warn!("Listener manager exiting"); return Ok(()); } @@ -128,7 +128,7 @@ impl ListenersManager { } pub fn start_listener(&mut self, listener: Listener, listener_conf: ListenerConfig) -> Result<()> { - let listener_name = listener.get_name().to_string(); + let listener_name = listener.get_name().to_owned(); if let Some((addr, dev)) = listener.get_socket() { info!("Listener {} at {addr} (device bind:{})", listener_name, dev.is_some()); } else { @@ -148,7 +148,7 @@ impl ListenersManager { let listener_info = ListenerInfo::new(join_handle, listener_conf, version); self.listener_handles.insert(listener_name.clone(), listener_info); - let version_count = self.listener_handles.get_vec(&listener_name).map(|v| v.len()).unwrap_or(0); + let version_count = self.listener_handles.get_vec(&listener_name).map(std::vec::Vec::len).unwrap_or(0); info!("Started version {} of listener {} ({} total active version(s))", version, listener_name, version_count); Ok(()) diff --git a/orion-lib/src/transport/internal_cluster_connector.rs b/orion-lib/src/transport/internal_cluster_connector.rs index befb4d2e..79663f48 100644 --- a/orion-lib/src/transport/internal_cluster_connector.rs +++ b/orion-lib/src/transport/internal_cluster_connector.rs @@ -83,8 +83,8 @@ impl InternalChannelConnector { Ok(InternalChannel { stream, cluster_name: self.cluster_name, - listener_name: self.connector.listener_name().to_string(), - endpoint_id: self.connector.endpoint_id().map(|s| s.to_string()), + listener_name: self.connector.listener_name().to_owned(), + endpoint_id: self.connector.endpoint_id().map(std::borrow::ToOwned::to_owned), }) } @@ -115,7 +115,7 @@ impl InternalChannel { } pub mod cluster_helpers { - use super::*; + use super::{global_internal_connection_factory, InternalChannelConnector}; use orion_configuration::config::cluster::InternalEndpointAddress; pub fn create_internal_connector( @@ -125,7 +125,7 @@ pub mod cluster_helpers { InternalChannelConnector::new( internal_addr.server_listener_name.to_string(), cluster_name, - internal_addr.endpoint_id.as_ref().map(|s| s.to_string()), + internal_addr.endpoint_id.as_ref().map(std::string::ToString::to_string), ) } diff --git a/orion-lib/src/transport/internal_connection.rs b/orion-lib/src/transport/internal_connection.rs index 4c5a5378..65b89c26 100644 --- a/orion-lib/src/transport/internal_connection.rs +++ b/orion-lib/src/transport/internal_connection.rs @@ -23,7 +23,7 @@ use std::{ use tokio::{ io::{AsyncRead, AsyncWrite}, - sync::{mpsc, RwLock}, + sync::{mpsc, Mutex, RwLock}, time::Instant, }; @@ -91,13 +91,13 @@ pub struct InternalConnectionPair { #[derive(Debug)] pub struct InternalStream { metadata: InternalConnectionMetadata, - stream: tokio::io::DuplexStream, + stream: Mutex, is_closed: Arc>, } impl InternalStream { fn new(metadata: InternalConnectionMetadata, stream: tokio::io::DuplexStream) -> Self { - Self { metadata, stream, is_closed: Arc::new(RwLock::new(false)) } + Self { metadata, stream: Mutex::new(stream), is_closed: Arc::new(RwLock::new(false)) } } } @@ -121,7 +121,13 @@ impl AsyncRead for InternalStream { cx: &mut std::task::Context<'_>, buf: &mut tokio::io::ReadBuf<'_>, ) -> std::task::Poll> { - tokio::io::AsyncRead::poll_read(std::pin::Pin::new(&mut self.get_mut().stream), cx, buf) + match self.stream.try_lock() { + Ok(mut stream) => tokio::io::AsyncRead::poll_read(std::pin::Pin::new(&mut *stream), cx, buf), + Err(_) => { + cx.waker().wake_by_ref(); + std::task::Poll::Pending + }, + } } } @@ -131,21 +137,39 @@ impl AsyncWrite for InternalStream { cx: &mut std::task::Context<'_>, buf: &[u8], ) -> std::task::Poll> { - tokio::io::AsyncWrite::poll_write(std::pin::Pin::new(&mut self.get_mut().stream), cx, buf) + match self.stream.try_lock() { + Ok(mut stream) => tokio::io::AsyncWrite::poll_write(std::pin::Pin::new(&mut *stream), cx, buf), + Err(_) => { + cx.waker().wake_by_ref(); + std::task::Poll::Pending + }, + } } fn poll_flush( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - tokio::io::AsyncWrite::poll_flush(std::pin::Pin::new(&mut self.get_mut().stream), cx) + match self.stream.try_lock() { + Ok(mut stream) => tokio::io::AsyncWrite::poll_flush(std::pin::Pin::new(&mut *stream), cx), + Err(_) => { + cx.waker().wake_by_ref(); + std::task::Poll::Pending + }, + } } fn poll_shutdown( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - tokio::io::AsyncWrite::poll_shutdown(std::pin::Pin::new(&mut self.get_mut().stream), cx) + match self.stream.try_lock() { + Ok(mut stream) => tokio::io::AsyncWrite::poll_shutdown(std::pin::Pin::new(&mut *stream), cx), + Err(_) => { + cx.waker().wake_by_ref(); + std::task::Poll::Pending + }, + } } } @@ -172,7 +196,7 @@ impl InternalConnectionFactory { let mut listeners = self.listeners.write().await; if listeners.contains_key(&name) { - return Err(Error::new(format!("Internal listener '{}' is already registered", name))); + return Err(Error::new(format!("Internal listener '{name}' is already registered"))); } listeners.insert(name, handle.clone()); @@ -183,7 +207,7 @@ impl InternalConnectionFactory { let mut listeners = self.listeners.write().await; if listeners.remove(name).is_none() { - return Err(Error::new(format!("Internal listener '{}' was not registered", name))); + return Err(Error::new(format!("Internal listener '{name}' was not registered"))); } Ok(()) @@ -191,11 +215,10 @@ impl InternalConnectionFactory { pub async fn connect_to_listener(&self, name: &str, endpoint_id: Option) -> Result { let listeners = self.listeners.read().await; - let handle = - listeners.get(name).ok_or_else(|| Error::new(format!("Internal listener '{}' not found", name)))?; + let handle = listeners.get(name).ok_or_else(|| Error::new(format!("Internal listener '{name}' not found")))?; let connection_pair = handle.create_connection(endpoint_id)?; - Ok(Box::new(InternalStreamWrapper(connection_pair.upstream))) + Ok(Box::new(InternalStreamWrapper { inner: connection_pair.upstream })) } pub async fn list_listeners(&self) -> Vec { @@ -205,7 +228,7 @@ impl InternalConnectionFactory { pub async fn is_listener_active(&self, name: &str) -> bool { let listeners = self.listeners.read().await; - listeners.get(name).map_or(false, |handle| handle.is_alive()) + listeners.get(name).is_some_and(InternalListenerHandle::is_alive) } pub async fn get_stats(&self) -> InternalConnectionStats { @@ -229,48 +252,85 @@ pub struct InternalConnectionStats { pub max_pooled_connections: usize, } -pub struct InternalStreamWrapper(Arc); +pub struct InternalStreamWrapper { + inner: Arc, +} impl Deref for InternalStreamWrapper { type Target = InternalStream; fn deref(&self) -> &Self::Target { - &self.0 + &self.inner } } impl AsyncRead for InternalStreamWrapper { fn poll_read( self: std::pin::Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - _buf: &mut tokio::io::ReadBuf<'_>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, ) -> std::task::Poll> { - // InternalStreamWrapper is read-only - actual I/O happens in InternalStream - std::task::Poll::Pending + match self.inner.stream.try_lock() { + Ok(mut stream) => { + let pinned_stream = std::pin::Pin::new(&mut *stream); + tokio::io::AsyncRead::poll_read(pinned_stream, cx, buf) + }, + Err(_) => { + cx.waker().wake_by_ref(); + std::task::Poll::Pending + }, + } } } impl AsyncWrite for InternalStreamWrapper { fn poll_write( self: std::pin::Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - _buf: &[u8], + cx: &mut std::task::Context<'_>, + buf: &[u8], ) -> std::task::Poll> { - std::task::Poll::Pending + match self.inner.stream.try_lock() { + Ok(mut stream) => { + let pinned_stream = std::pin::Pin::new(&mut *stream); + tokio::io::AsyncWrite::poll_write(pinned_stream, cx, buf) + }, + Err(_) => { + cx.waker().wake_by_ref(); + std::task::Poll::Pending + }, + } } fn poll_flush( self: std::pin::Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, + cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - std::task::Poll::Ready(Ok(())) + match self.inner.stream.try_lock() { + Ok(mut stream) => { + let pinned_stream = std::pin::Pin::new(&mut *stream); + tokio::io::AsyncWrite::poll_flush(pinned_stream, cx) + }, + Err(_) => { + cx.waker().wake_by_ref(); + std::task::Poll::Pending + }, + } } fn poll_shutdown( self: std::pin::Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, + cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - std::task::Poll::Ready(Ok(())) + match self.inner.stream.try_lock() { + Ok(mut stream) => { + let pinned_stream = std::pin::Pin::new(&mut *stream); + tokio::io::AsyncWrite::poll_shutdown(pinned_stream, cx) + }, + Err(_) => { + cx.waker().wake_by_ref(); + std::task::Poll::Pending + }, + } } } diff --git a/orion-proxy/src/proxy.rs b/orion-proxy/src/proxy.rs index 39674b06..af517fce 100644 --- a/orion-proxy/src/proxy.rs +++ b/orion-proxy/src/proxy.rs @@ -237,7 +237,7 @@ fn spawn_proxy_runtime_from_thread( _ = start_proxy(configuration_receivers, ct.clone()) => { info!("Proxy Runtime terminated!"); } - _ = ct.cancelled() => { + () = ct.cancelled() => { info!("Shutdown channel closed, shutting down Proxy runtime!"); } } @@ -264,7 +264,7 @@ fn spawn_services_runtime_from_thread( } info!("Services Runtime terminated!"); } - _ = ct.cancelled() => { + () = ct.cancelled() => { info!("Shutdown channel closed, shutting down Services runtime!"); } } From b6abbbf8e3e58006812eeeafd1b6c2d930159d9f Mon Sep 17 00:00:00 2001 From: Eeshu-Yadav Date: Sun, 5 Oct 2025 17:11:17 +0530 Subject: [PATCH 4/5] feat: Extract internal connection infrastructure to orion-internal crate Moved all internal connection related code (factories, connectors, streams, tests) from orion-lib to dedicated orion-internal crate. Signed-off-by: Eeshu-Yadav --- Cargo.lock | 26 ++++++++++ Cargo.toml | 2 + orion-internal/Cargo.toml | 16 ++++++ .../src/connection.rs | 9 ++-- .../src/connector.rs | 10 ++-- orion-internal/src/filter_state.rs | 49 +++++++++++++++++++ orion-internal/src/lib.rs | 39 +++++++++++++++ .../tests/integration_tests.rs | 19 ++++--- orion-lib/Cargo.toml | 1 + orion-lib/src/lib.rs | 8 +-- orion-lib/src/listeners/filter_state.rs | 14 +----- orion-lib/src/listeners/listener.rs | 6 +-- orion-lib/src/transport/mod.rs | 7 --- 13 files changed, 166 insertions(+), 40 deletions(-) create mode 100644 orion-internal/Cargo.toml rename orion-lib/src/transport/internal_connection.rs => orion-internal/src/connection.rs (98%) rename orion-lib/src/transport/internal_cluster_connector.rs => orion-internal/src/connector.rs (95%) create mode 100644 orion-internal/src/filter_state.rs create mode 100644 orion-internal/src/lib.rs rename orion-lib/tests/internal_connection_integration.rs => orion-internal/tests/integration_tests.rs (88%) diff --git a/Cargo.lock b/Cargo.lock index 9e9ea2c3..68eef159 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2182,6 +2182,18 @@ dependencies = [ "http", ] +[[package]] +name = "orion-internal" +version = "0.1.0" +dependencies = [ + "compact_str", + "orion-configuration", + "orion-error", + "tokio", + "tokio-test", + "tracing", +] + [[package]] name = "orion-interner" version = "0.1.0" @@ -2227,6 +2239,7 @@ dependencies = [ "orion-error", "orion-format", "orion-http-header", + "orion-internal", "orion-interner", "orion-metrics", "orion-tracing", @@ -3627,6 +3640,19 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-test" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2468baabc3311435b55dd935f702f42cd1b8abb7e754fb7dfb16bd36aa88f9f7" +dependencies = [ + "async-stream", + "bytes", + "futures-core", + "tokio", + "tokio-stream", +] + [[package]] name = "tokio-util" version = "0.7.16" diff --git a/Cargo.toml b/Cargo.toml index b2011252..bee8d760 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "orion-error", "orion-http-header", "orion-interner", + "orion-internal", "orion-format", "orion-lib", "orion-metrics", @@ -35,6 +36,7 @@ orion-error = { path = "orion-error" } orion-format = { path = "orion-format" } orion-http-header = { path = "orion-http-header" } orion-interner = { path = "orion-interner" } +orion-internal = { path = "orion-internal" } orion-lib = { path = "orion-lib" } orion-metrics = { path = "orion-metrics" } orion-tracing = { path = "orion-tracing" } diff --git a/orion-internal/Cargo.toml b/orion-internal/Cargo.toml new file mode 100644 index 00000000..19bca0ec --- /dev/null +++ b/orion-internal/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "orion-internal" +version = "0.1.0" +edition = "2021" +license = "Apache-2.0" +description = "Internal connection and internal listener infrastructure for Orion proxy" + +[dependencies] +tokio = { version = "1.42.0", features = ["full"] } +tracing = "0.1.41" +orion-error = { path = "../orion-error" } +orion-configuration = { path = "../orion-configuration" } +compact_str = "0.8.0" + +[dev-dependencies] +tokio-test = "0.4" diff --git a/orion-lib/src/transport/internal_connection.rs b/orion-internal/src/connection.rs similarity index 98% rename from orion-lib/src/transport/internal_connection.rs rename to orion-internal/src/connection.rs index 65b89c26..cc8d4b3f 100644 --- a/orion-lib/src/transport/internal_connection.rs +++ b/orion-internal/src/connection.rs @@ -27,8 +27,7 @@ use tokio::{ time::Instant, }; -use super::AsyncStream; -use crate::{Error, Result}; +pub use crate::{Error, Result}; #[derive(Debug, Clone)] pub struct InternalConnectionMetadata { @@ -213,7 +212,11 @@ impl InternalConnectionFactory { Ok(()) } - pub async fn connect_to_listener(&self, name: &str, endpoint_id: Option) -> Result { + pub async fn connect_to_listener( + &self, + name: &str, + endpoint_id: Option, + ) -> Result> { let listeners = self.listeners.read().await; let handle = listeners.get(name).ok_or_else(|| Error::new(format!("Internal listener '{name}' not found")))?; diff --git a/orion-lib/src/transport/internal_cluster_connector.rs b/orion-internal/src/connector.rs similarity index 95% rename from orion-lib/src/transport/internal_cluster_connector.rs rename to orion-internal/src/connector.rs index 79663f48..fb8116de 100644 --- a/orion-lib/src/transport/internal_cluster_connector.rs +++ b/orion-internal/src/connector.rs @@ -15,8 +15,7 @@ // // -use super::{global_internal_connection_factory, AsyncStream}; -use crate::{Error, Result}; +use crate::{connection::global_internal_connection_factory, Error, InternalStreamWrapper, Result}; #[derive(Debug, Clone)] pub struct InternalClusterConnector { @@ -37,7 +36,7 @@ impl InternalClusterConnector { self.endpoint_id.as_deref() } - pub async fn connect(&self) -> Result { + pub async fn connect(&self) -> Result> { let factory = global_internal_connection_factory(); if !factory.is_listener_active(&self.listener_name).await { @@ -94,7 +93,7 @@ impl InternalChannelConnector { } pub struct InternalChannel { - pub stream: AsyncStream, + pub stream: Box, pub cluster_name: &'static str, pub listener_name: String, pub endpoint_id: Option, @@ -116,6 +115,7 @@ impl InternalChannel { pub mod cluster_helpers { use super::{global_internal_connection_factory, InternalChannelConnector}; + use crate::InternalConnectionStats; use orion_configuration::config::cluster::InternalEndpointAddress; pub fn create_internal_connector( @@ -134,7 +134,7 @@ pub mod cluster_helpers { factory.is_listener_active(listener_name).await } - pub async fn get_internal_connection_stats() -> crate::transport::InternalConnectionStats { + pub async fn get_internal_connection_stats() -> InternalConnectionStats { let factory = global_internal_connection_factory(); factory.get_stats().await } diff --git a/orion-internal/src/filter_state.rs b/orion-internal/src/filter_state.rs new file mode 100644 index 00000000..45df3419 --- /dev/null +++ b/orion-internal/src/filter_state.rs @@ -0,0 +1,49 @@ +// Copyright 2025 The kmesh Authors +// +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +use std::net::SocketAddr; + +/// Reserved internal address used as the peer address for in-process (internal) connections. +/// Chosen from the loopback range (127.255.255.254:65534) to avoid conflicts with real network traffic. +/// This address clearly identifies internal connections in logs and debugging. +pub const INTERNAL_PEER_ADDR: SocketAddr = + SocketAddr::V4(std::net::SocketAddrV4::new(std::net::Ipv4Addr::new(127, 255, 255, 254), 65534)); + +/// Reserved internal address used as the local address for in-process (internal) connections. +/// Chosen from the loopback range (127.255.255.255:65535) to avoid conflicts with real network traffic. +/// This address clearly identifies internal connections in logs and debugging. +pub const INTERNAL_LOCAL_ADDR: SocketAddr = + SocketAddr::V4(std::net::SocketAddrV4::new(std::net::Ipv4Addr::new(127, 255, 255, 255), 65535)); + +/// Check if a socket address is an internal connection address +pub fn is_internal_address(addr: &SocketAddr) -> bool { + *addr == INTERNAL_PEER_ADDR || *addr == INTERNAL_LOCAL_ADDR +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_internal_addresses() { + assert!(is_internal_address(&INTERNAL_PEER_ADDR)); + assert!(is_internal_address(&INTERNAL_LOCAL_ADDR)); + + let normal_addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); + assert!(!is_internal_address(&normal_addr)); + } +} diff --git a/orion-internal/src/lib.rs b/orion-internal/src/lib.rs new file mode 100644 index 00000000..0950e4ab --- /dev/null +++ b/orion-internal/src/lib.rs @@ -0,0 +1,39 @@ +// Copyright 2025 The kmesh Authors +// +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +//! # orion-internal +//! +//! Internal connection and internal listener infrastructure for Orion proxy. +//! This crate provides the core functionality for in-process communication between +//! different components of the proxy, particularly for waypoint proxy capabilities +//! in ambient service mesh scenarios. + +pub mod connection; +pub mod connector; +pub mod filter_state; + +// Re-export commonly used types +pub use connection::{ + global_internal_connection_factory, InternalConnectionFactory, InternalConnectionMetadata, InternalConnectionPair, + InternalConnectionStats, InternalListenerHandle, InternalStream, InternalStreamWrapper, +}; +pub use connector::{cluster_helpers, InternalChannel, InternalChannelConnector, InternalClusterConnector}; +pub use filter_state::{is_internal_address, INTERNAL_LOCAL_ADDR, INTERNAL_PEER_ADDR}; + +// Re-export error types +pub type Error = orion_error::Error; +pub type Result = ::core::result::Result; diff --git a/orion-lib/tests/internal_connection_integration.rs b/orion-internal/tests/integration_tests.rs similarity index 88% rename from orion-lib/tests/internal_connection_integration.rs rename to orion-internal/tests/integration_tests.rs index 5efc61d8..a787642c 100644 --- a/orion-lib/tests/internal_connection_integration.rs +++ b/orion-internal/tests/integration_tests.rs @@ -15,7 +15,7 @@ // // -use orion_lib::{global_internal_connection_factory, InternalChannelConnector}; +use orion_internal::{cluster_helpers::*, global_internal_connection_factory, InternalChannelConnector}; #[tokio::test] async fn test_complete_internal_connection_flow() { @@ -155,22 +155,29 @@ async fn test_statistics_and_monitoring() { #[tokio::test] async fn test_cluster_helpers() { use orion_configuration::config::cluster::InternalEndpointAddress; - use orion_lib::cluster_helpers::*; let internal_addr = InternalEndpointAddress { - server_listener_name: "test_listener".to_string().into(), + server_listener_name: "test_cluster_helpers_listener".to_string().into(), endpoint_id: Some("endpoint1".to_string().into()), }; let connector = create_internal_connector(&internal_addr, "test_cluster"); assert_eq!(connector.cluster_name(), "test_cluster"); - assert_eq!(connector.listener_name(), "test_listener"); + assert_eq!(connector.listener_name(), "test_cluster_helpers_listener"); - assert!(!is_internal_listener_available("non_existent").await); + assert!(!is_internal_listener_available("non_existent_listener_xyz").await); let stats = get_internal_connection_stats().await; assert_eq!(stats.max_pooled_connections, 0); + let factory = global_internal_connection_factory(); + let (_handle, _rx, _listener_ref) = + factory.register_listener("test_cluster_helpers_listener".to_string()).await.unwrap(); + + assert!(is_internal_listener_available("test_cluster_helpers_listener").await); + let listeners = list_internal_listeners().await; - assert!(listeners.is_empty()); + assert!(listeners.contains(&"test_cluster_helpers_listener".to_string())); + + factory.unregister_listener("test_cluster_helpers_listener").await.unwrap(); } diff --git a/orion-lib/Cargo.toml b/orion-lib/Cargo.toml index 3319e985..e973481d 100644 --- a/orion-lib/Cargo.toml +++ b/orion-lib/Cargo.toml @@ -37,6 +37,7 @@ orion-error.workspace = true orion-format.workspace = true orion-http-header.workspace = true orion-interner.workspace = true +orion-internal.workspace = true orion-metrics.workspace = true orion-tracing.workspace = true orion-xds.workspace = true diff --git a/orion-lib/src/lib.rs b/orion-lib/src/lib.rs index 37b9f5a2..3a118746 100644 --- a/orion-lib/src/lib.rs +++ b/orion-lib/src/lib.rs @@ -54,11 +54,11 @@ use orion_configuration::config::{ Bootstrap, Cluster, Listener as ListenerConfig, }; pub use secrets::SecretManager; -pub use transport::internal_cluster_connector::cluster_helpers; pub(crate) use transport::AsyncStream; -pub use transport::{ - global_internal_connection_factory, InternalChannelConnector, InternalConnectionFactory, InternalConnectionPair, - InternalConnectionStats, InternalListenerHandle, + +pub use orion_internal::{ + cluster_helpers, global_internal_connection_factory, InternalChannelConnector, InternalConnectionFactory, + InternalConnectionPair, InternalConnectionStats, InternalListenerHandle, }; pub type Error = orion_error::Error; diff --git a/orion-lib/src/listeners/filter_state.rs b/orion-lib/src/listeners/filter_state.rs index 15bf6d1f..80f114a4 100644 --- a/orion-lib/src/listeners/filter_state.rs +++ b/orion-lib/src/listeners/filter_state.rs @@ -17,19 +17,9 @@ use compact_str::CompactString; use orion_configuration::config::common::TlvType; +use orion_internal::{is_internal_address, INTERNAL_LOCAL_ADDR, INTERNAL_PEER_ADDR}; use std::{collections::HashMap, net::SocketAddr}; -/// Reserved internal address used as the peer address for in-process (internal) connections. -/// Chosen from the loopback range (127.255.255.254:65534) to avoid conflicts with real network traffic. -/// This address clearly identifies internal connections in logs and debugging. -const INTERNAL_PEER_ADDR: std::net::SocketAddr = - std::net::SocketAddr::V4(std::net::SocketAddrV4::new(std::net::Ipv4Addr::new(127, 255, 255, 254), 65534)); - -/// Reserved internal address used as the local address for in-process (internal) connections. -/// Chosen from the loopback range (127.255.255.255:65535) to avoid conflicts with real network traffic. -/// This address clearly identifies internal connections in logs and debugging. -const INTERNAL_LOCAL_ADDR: std::net::SocketAddr = - std::net::SocketAddr::V4(std::net::SocketAddrV4::new(std::net::Ipv4Addr::new(127, 255, 255, 255), 65535)); #[derive(Debug, Clone)] pub enum DownstreamConnectionMetadata { FromSocket { @@ -83,7 +73,7 @@ impl DownstreamConnectionMetadata { } pub fn is_internal_address(addr: SocketAddr) -> bool { - addr == INTERNAL_PEER_ADDR || addr == INTERNAL_LOCAL_ADDR + is_internal_address(&addr) } } diff --git a/orion-lib/src/listeners/listener.rs b/orion-lib/src/listeners/listener.rs index 9abef41c..fea6daed 100644 --- a/orion-lib/src/listeners/listener.rs +++ b/orion-lib/src/listeners/listener.rs @@ -67,7 +67,7 @@ struct InternalConnectionWorkerPool { #[derive(Debug)] struct InternalConnectionTask { listener_name: String, - connection_pair: crate::transport::InternalConnectionPair, + connection_pair: orion_internal::InternalConnectionPair, filter_chains: Arc>, } @@ -402,7 +402,7 @@ impl Listener { mut route_updates_receiver: broadcast::Receiver, mut secret_updates_receiver: broadcast::Receiver, ) -> Error { - use crate::transport::global_internal_connection_factory; + use orion_internal::global_internal_connection_factory; use tracing::{debug, error, info, warn}; let filter_chains = Arc::new(filter_chains); @@ -985,7 +985,7 @@ filter_chains: async fn handle_internal_connection_static( listener_name: String, - connection_pair: crate::transport::InternalConnectionPair, + connection_pair: orion_internal::InternalConnectionPair, filter_chains: Arc>, ) -> Result<()> { use crate::listeners::filter_state::DownstreamConnectionMetadata; diff --git a/orion-lib/src/transport/mod.rs b/orion-lib/src/transport/mod.rs index c008fb54..d8bfcd3a 100644 --- a/orion-lib/src/transport/mod.rs +++ b/orion-lib/src/transport/mod.rs @@ -21,8 +21,6 @@ pub mod bind_device; pub mod connector; mod grpc_channel; mod http_channel; -pub mod internal_cluster_connector; -pub mod internal_connection; mod resolver; pub mod tcp_channel; pub use resolver::resolve; @@ -35,11 +33,6 @@ pub mod transport_socket; pub use self::{ grpc_channel::{GrpcService, SimpleRoundRobinGrpcServiceLB}, http_channel::{HttpChannel, HttpChannelBuilder}, - internal_cluster_connector::InternalChannelConnector, - internal_connection::{ - global_internal_connection_factory, InternalConnectionFactory, InternalConnectionPair, InternalConnectionStats, - InternalListenerHandle, - }, proxy_protocol::ProxyProtocolReader, tcp_channel::TcpChannelConnector, tlv_listener_filter::TlvListenerFilter, From 33e8c586eb8773f5af21143246e73980cad75729 Mon Sep 17 00:00:00 2001 From: Eeshu-Yadav Date: Sun, 5 Oct 2025 17:48:10 +0530 Subject: [PATCH 5/5] Address Copilot feedback: remove dead code, make buffers configurable, add TODOs Signed-off-by: Eeshu-Yadav --- orion-internal/src/connection.rs | 50 ++++++++++++++++++----- orion-internal/tests/integration_tests.rs | 40 ++++++++++++++---- orion-lib/src/lib.rs | 2 +- orion-lib/src/listeners/listener.rs | 42 ++++++++++++------- 4 files changed, 101 insertions(+), 33 deletions(-) diff --git a/orion-internal/src/connection.rs b/orion-internal/src/connection.rs index cc8d4b3f..81e72c37 100644 --- a/orion-internal/src/connection.rs +++ b/orion-internal/src/connection.rs @@ -42,6 +42,7 @@ pub struct InternalListenerHandle { pub name: String, pub connection_sender: mpsc::UnboundedSender, listener_ref: Weak<()>, + buffer_size_kb: Option, } impl InternalListenerHandle { @@ -49,8 +50,9 @@ impl InternalListenerHandle { name: String, connection_sender: mpsc::UnboundedSender, listener_ref: Weak<()>, + buffer_size_kb: Option, ) -> Self { - Self { name, connection_sender, listener_ref } + Self { name, connection_sender, listener_ref, buffer_size_kb } } pub fn is_alive(&self) -> bool { @@ -64,7 +66,7 @@ impl InternalListenerHandle { let metadata = InternalConnectionMetadata { listener_name: self.name.clone(), - buffer_size_kb: None, + buffer_size_kb: self.buffer_size_kb, created_at: Instant::now(), endpoint_id, }; @@ -185,12 +187,13 @@ impl InternalConnectionFactory { pub async fn register_listener( &self, name: String, + buffer_size_kb: Option, ) -> Result<(InternalListenerHandle, mpsc::UnboundedReceiver, Arc<()>)> { let (connection_tx, connection_rx) = mpsc::unbounded_channel(); let listener_ref = Arc::new(()); let weak_ref = Arc::downgrade(&listener_ref); - let handle = InternalListenerHandle::new(name.clone(), connection_tx, weak_ref); + let handle = InternalListenerHandle::new(name.clone(), connection_tx, weak_ref, buffer_size_kb); let mut listeners = self.listeners.write().await; @@ -337,8 +340,12 @@ impl AsyncWrite for InternalStreamWrapper { } } +const DEFAULT_BUFFER_SIZE: usize = 8192; + fn create_internal_connection_pair(metadata: InternalConnectionMetadata) -> (Arc, Arc) { - let (upstream_io, downstream_io) = tokio::io::duplex(1024); + let buffer_size = metadata.buffer_size_kb.map(|kb| (kb as usize) * 1024).unwrap_or(DEFAULT_BUFFER_SIZE); + + let (upstream_io, downstream_io) = tokio::io::duplex(buffer_size); let upstream = Arc::new(InternalStream::new(metadata.clone(), upstream_io)); let downstream = Arc::new(InternalStream::new(metadata, downstream_io)); @@ -368,14 +375,14 @@ mod tests { async fn test_listener_registration() { let factory = InternalConnectionFactory::new(); - let result = factory.register_listener("test_listener".to_string()).await; + let result = factory.register_listener("test_listener".to_string(), None).await; assert!(result.is_ok()); let (_handle, _rx, _listener_ref) = result.unwrap(); let stats = factory.get_stats().await; assert_eq!(stats.active_listeners, 1); - let result = factory.register_listener("test_listener".to_string()).await; + let result = factory.register_listener("test_listener".to_string(), None).await; assert!(result.is_err()); } @@ -383,7 +390,7 @@ mod tests { async fn test_listener_unregistration() { let factory = InternalConnectionFactory::new(); - let (_handle, _rx, _listener_ref) = factory.register_listener("test_listener".to_string()).await.unwrap(); + let (_handle, _rx, _listener_ref) = factory.register_listener("test_listener".to_string(), None).await.unwrap(); let result = factory.unregister_listener("test_listener").await; assert!(result.is_ok()); @@ -406,7 +413,7 @@ mod tests { async fn test_listener_lifecycle() { let factory = InternalConnectionFactory::new(); - let (handle, _rx, _listener_ref) = factory.register_listener("test_listener".to_string()).await.unwrap(); + let (handle, _rx, _listener_ref) = factory.register_listener("test_listener".to_string(), None).await.unwrap(); assert!(factory.is_listener_active("test_listener").await); assert!(handle.is_alive()); @@ -423,8 +430,8 @@ mod tests { let listeners = factory.list_listeners().await; assert!(listeners.is_empty()); - let (_handle1, _rx1, _listener_ref1) = factory.register_listener("listener1".to_string()).await.unwrap(); - let (_handle2, _rx2, _listener_ref2) = factory.register_listener("listener2".to_string()).await.unwrap(); + let (_handle1, _rx1, _listener_ref1) = factory.register_listener("listener1".to_string(), None).await.unwrap(); + let (_handle2, _rx2, _listener_ref2) = factory.register_listener("listener2".to_string(), None).await.unwrap(); let listeners = factory.list_listeners().await; assert_eq!(listeners.len(), 2); @@ -438,4 +445,27 @@ mod tests { let stats = factory.get_stats().await; assert_eq!(stats.max_pooled_connections, 0); } + + #[tokio::test] + async fn test_buffer_size_configuration() { + let metadata_default = InternalConnectionMetadata { + listener_name: "test".to_string(), + buffer_size_kb: None, + created_at: Instant::now(), + endpoint_id: None, + }; + let (upstream, downstream) = create_internal_connection_pair(metadata_default); + assert!(upstream.is_active()); + assert!(downstream.is_active()); + + let metadata_custom = InternalConnectionMetadata { + listener_name: "test".to_string(), + buffer_size_kb: Some(4), + created_at: Instant::now(), + endpoint_id: None, + }; + let (upstream_custom, downstream_custom) = create_internal_connection_pair(metadata_custom); + assert!(upstream_custom.is_active()); + assert!(downstream_custom.is_active()); + } } diff --git a/orion-internal/tests/integration_tests.rs b/orion-internal/tests/integration_tests.rs index a787642c..51401098 100644 --- a/orion-internal/tests/integration_tests.rs +++ b/orion-internal/tests/integration_tests.rs @@ -23,7 +23,7 @@ async fn test_complete_internal_connection_flow() { let listener_name = "test_integration_listener_global"; let (_handle, mut connection_receiver, _listener_ref) = - factory.register_listener(listener_name.to_string()).await.expect("Failed to register listener"); + factory.register_listener(listener_name.to_string(), None).await.expect("Failed to register listener"); assert!(factory.is_listener_active(listener_name).await); let listeners = factory.list_listeners().await; @@ -67,7 +67,7 @@ async fn test_connection_pooling() { let listener_name = "test_pooling_listener_global"; let (_handle, mut connection_receiver, _listener_ref) = - factory.register_listener(listener_name.to_string()).await.expect("Failed to register listener"); + factory.register_listener(listener_name.to_string(), None).await.expect("Failed to register listener"); let connector = InternalChannelConnector::new(listener_name.to_string(), "test_cluster", None); @@ -94,11 +94,11 @@ async fn test_error_scenarios() { assert!(result.is_err()); let listener_name = "test_error_listener_global"; - let result1 = factory.register_listener(listener_name.to_string()).await; + let result1 = factory.register_listener(listener_name.to_string(), None).await; assert!(result1.is_ok()); let (_handle1, _rx1, _listener_ref1) = result1.unwrap(); - let result2 = factory.register_listener(listener_name.to_string()).await; + let result2 = factory.register_listener(listener_name.to_string(), None).await; assert!(result2.is_err()); let result = factory.unregister_listener("non_existent").await; @@ -124,7 +124,7 @@ async fn test_global_factory() { let factory = global_internal_connection_factory(); let listener_name = "test_global_listener"; - let (_handle, _rx, _listener_ref) = factory.register_listener(listener_name.to_string()).await.unwrap(); + let (_handle, _rx, _listener_ref) = factory.register_listener(listener_name.to_string(), None).await.unwrap(); assert!(factory.is_listener_active(listener_name).await); @@ -141,7 +141,7 @@ async fn test_statistics_and_monitoring() { let listener1 = "test_stats_listener1_global"; let (_handle1, _rx1, _listener_ref1) = - factory.register_listener(listener1.to_string()).await.expect("Failed to register listener1"); + factory.register_listener(listener1.to_string(), None).await.expect("Failed to register listener1"); let stats = factory.get_stats().await; assert!(stats.active_listeners >= 1); @@ -172,7 +172,7 @@ async fn test_cluster_helpers() { let factory = global_internal_connection_factory(); let (_handle, _rx, _listener_ref) = - factory.register_listener("test_cluster_helpers_listener".to_string()).await.unwrap(); + factory.register_listener("test_cluster_helpers_listener".to_string(), None).await.unwrap(); assert!(is_internal_listener_available("test_cluster_helpers_listener").await); @@ -181,3 +181,29 @@ async fn test_cluster_helpers() { factory.unregister_listener("test_cluster_helpers_listener").await.unwrap(); } + +#[tokio::test] +async fn test_buffer_size_configuration() { + let factory = global_internal_connection_factory(); + + let listener_name = "test_buffer_size_listener"; + let buffer_size_kb = Some(8); + + let (_handle, mut connection_receiver, _listener_ref) = + factory.register_listener(listener_name.to_string(), buffer_size_kb).await.unwrap(); + + let connector = InternalChannelConnector::new(listener_name.to_string(), "test_cluster", None); + + let connection_future = connector.connect(); + let listener_future = connection_receiver.recv(); + let (_cluster_conn, listener_pair) = tokio::join!(connection_future, listener_future); + + assert!(listener_pair.is_some()); + let connection_pair = listener_pair.unwrap(); + + let metadata = connection_pair.downstream.metadata(); + assert_eq!(metadata.buffer_size_kb, Some(8)); + assert_eq!(metadata.listener_name, listener_name); + + factory.unregister_listener(listener_name).await.unwrap(); +} diff --git a/orion-lib/src/lib.rs b/orion-lib/src/lib.rs index 3a118746..4e04a633 100644 --- a/orion-lib/src/lib.rs +++ b/orion-lib/src/lib.rs @@ -44,7 +44,7 @@ pub use clusters::{ load_assignment::PartialClusterLoadAssignment, ClusterLoadAssignmentBuilder, }; -pub use listeners::listener::ListenerFactory; +pub use listeners::listener::{init_internal_worker_pool, ListenerFactory}; pub use listeners_manager::{ListenerConfigurationChange, ListenersManager, RouteConfigurationChange}; pub use orion_configuration::config::network_filters::http_connection_manager::RouteConfiguration; use orion_configuration::config::{ diff --git a/orion-lib/src/listeners/listener.rs b/orion-lib/src/listeners/listener.rs index fea6daed..75048110 100644 --- a/orion-lib/src/listeners/listener.rs +++ b/orion-lib/src/listeners/listener.rs @@ -84,7 +84,6 @@ impl InternalConnectionWorkerPool { while let Some(task) = receiver.recv().await { if let Err(e) = handle_internal_connection_static(task.listener_name, task.connection_pair, task.filter_chains) - .await { warn!("Error handling internal connection task: {}", e); } @@ -112,8 +111,19 @@ impl InternalConnectionWorkerPool { static INTERNAL_WORKER_POOL: std::sync::OnceLock = std::sync::OnceLock::new(); +pub fn init_internal_worker_pool(num_workers: Option) { + let workers = num_workers + .or_else(|| std::env::var("ORION_INTERNAL_WORKER_POOL_SIZE").ok().and_then(|s| s.parse::().ok())) + .unwrap_or(4); + let _ = INTERNAL_WORKER_POOL.set(InternalConnectionWorkerPool::new(workers)); +} + fn get_internal_worker_pool() -> &'static InternalConnectionWorkerPool { - INTERNAL_WORKER_POOL.get_or_init(|| InternalConnectionWorkerPool::new(4)) // 4 workers by default + INTERNAL_WORKER_POOL.get_or_init(|| { + let workers = + std::env::var("ORION_INTERNAL_WORKER_POOL_SIZE").ok().and_then(|s| s.parse::().ok()).unwrap_or(4); + InternalConnectionWorkerPool::new(workers) + }) } #[derive(Debug, Clone)] @@ -135,7 +145,6 @@ enum ListenerAddress { #[derive(Debug, Clone)] struct InternalListenerConfig { - #[allow(dead_code)] buffer_size_kb: Option, } #[derive(Debug, Clone)] @@ -395,7 +404,7 @@ impl Listener { async fn run_internal_listener( name: &'static str, - _internal_config: InternalListenerConfig, + internal_config: InternalListenerConfig, filter_chains: HashMap, _with_tls_inspector: bool, _with_tlv_listener_filter: bool, @@ -408,13 +417,14 @@ impl Listener { let filter_chains = Arc::new(filter_chains); let factory = global_internal_connection_factory(); - let (_handle, mut connection_receiver, _listener_ref) = match factory.register_listener(name.to_owned()).await { - Ok(result) => result, - Err(e) => { - error!("Failed to register internal listener '{}': {}", name, e); - return e; - }, - }; + let (_handle, mut connection_receiver, _listener_ref) = + match factory.register_listener(name.to_owned(), internal_config.buffer_size_kb).await { + Ok(result) => result, + Err(e) => { + error!("Failed to register internal listener '{}': {}", name, e); + return e; + }, + }; info!("Internal listener '{}' registered to connection factory", name); @@ -983,7 +993,7 @@ filter_chains: } } -async fn handle_internal_connection_static( +fn handle_internal_connection_static( listener_name: String, connection_pair: orion_internal::InternalConnectionPair, filter_chains: Arc>, @@ -1004,17 +1014,19 @@ async fn handle_internal_connection_static( return Err(crate::Error::new("No matching filter chain")); }; - let _downstream_stream = connection_pair.downstream; + let downstream_stream = connection_pair.downstream; match &filter_chain.handler { crate::listeners::filterchain::ConnectionHandler::Http(_http_manager) => { info!("Processing internal connection through HTTP filter chain"); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; + // TODO: Implement HTTP connection processing + let _ = downstream_stream; Ok(()) }, crate::listeners::filterchain::ConnectionHandler::Tcp(_tcp_proxy) => { info!("Processing internal connection through TCP filter chain"); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; + // TODO: Implement TCP connection processing + let _ = downstream_stream; Ok(()) }, }