Skip to content

Commit 99c2e88

Browse files
committed
fix: implement Envoy-compliant connection draining with mixed protocol support
- Add ListenerProtocolConfig enum to handle HTTP, TCP, and mixed protocol listeners - Respect HttpConnectionManager.drain_timeout field from configuration - Support listeners with both http_connection_manager and tcp_proxy filters - Remove ambiguous 'Gradual' strategy, align with Envoy's draining behavior - Add initiate_listener_drain_from_filter_analysis() for proper integration Signed-off-by: Eeshu-Yadav <eeshuyadav123@gmail.com>
1 parent a8e8919 commit 99c2e88

File tree

6 files changed

+187
-51
lines changed

6 files changed

+187
-51
lines changed

orion-lib/src/listeners/drain_signaling.rs

Lines changed: 150 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@
1515
//
1616
//
1717

18+
1819
use crate::{Error, Result};
19-
use orion_configuration::config::listener::DrainType as ConfigDrainType;
20+
use orion_configuration::config::listener::{DrainType as ConfigDrainType, FilterChain, MainFilter};
2021
use pingora_timeout::fast_timeout::fast_timeout;
2122
use std::collections::HashMap;
2223
use std::sync::Arc;
@@ -25,6 +26,17 @@ use tokio::sync::RwLock;
2526
use tokio::time::sleep;
2627
use tracing::{debug, info, warn};
2728

29+
#[derive(Debug, Clone)]
30+
pub enum ListenerProtocolConfig {
31+
Http { drain_timeout: Option<Duration> },
32+
Tcp,
33+
Mixed {
34+
http_drain_timeout: Option<Duration>,
35+
has_tcp: bool,
36+
has_http: bool,
37+
},
38+
}
39+
2840
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2941
pub enum DrainScenario {
3042
HealthCheckFail,
@@ -46,8 +58,13 @@ impl DrainScenario {
4658
pub enum DrainStrategy {
4759
Tcp { global_timeout: Duration },
4860
Http { global_timeout: Duration, drain_timeout: Duration },
61+
Mixed {
62+
global_timeout: Duration,
63+
http_drain_timeout: Duration,
64+
tcp_connections: bool,
65+
http_connections: bool,
66+
},
4967
Immediate,
50-
Gradual,
5168
}
5269

5370
#[derive(Debug, Clone)]
@@ -102,9 +119,10 @@ impl ListenerDrainContext {
102119

103120
pub fn is_timeout_exceeded(&self) -> bool {
104121
let global_timeout = match &self.strategy {
105-
DrainStrategy::Tcp { global_timeout } | DrainStrategy::Http { global_timeout, .. } => *global_timeout,
122+
DrainStrategy::Tcp { global_timeout }
123+
| DrainStrategy::Http { global_timeout, .. }
124+
| DrainStrategy::Mixed { global_timeout, .. } => *global_timeout,
106125
DrainStrategy::Immediate => Duration::from_secs(0),
107-
DrainStrategy::Gradual => Duration::from_secs(600),
108126
};
109127

110128
self.drain_start.elapsed() >= global_timeout
@@ -113,6 +131,7 @@ impl ListenerDrainContext {
113131
pub fn get_http_drain_timeout(&self) -> Option<Duration> {
114132
match &self.strategy {
115133
DrainStrategy::Http { drain_timeout, .. } => Some(*drain_timeout),
134+
DrainStrategy::Mixed { http_drain_timeout, .. } => Some(*http_drain_timeout),
116135
_ => None,
117136
}
118137
}
@@ -126,12 +145,36 @@ pub struct DrainSignalingManager {
126145
listener_drain_state: Arc<RwLock<Option<ListenerDrainState>>>,
127146
}
128147

148+
impl ListenerProtocolConfig {
149+
pub fn from_listener_analysis(
150+
has_http_connection_manager: bool,
151+
has_tcp_proxy: bool,
152+
http_drain_timeout: Option<Duration>,
153+
) -> Self {
154+
match (has_http_connection_manager, has_tcp_proxy) {
155+
(true, true) => Self::Mixed {
156+
http_drain_timeout,
157+
has_tcp: true,
158+
has_http: true,
159+
},
160+
(true, false) => Self::Http {
161+
drain_timeout: http_drain_timeout,
162+
},
163+
(false, true) => Self::Tcp,
164+
(false, false) => {
165+
warn!("No HTTP connection manager or TCP proxy found in listener, defaulting to TCP draining");
166+
Self::Tcp
167+
}
168+
}
169+
}
170+
}
171+
129172
impl DrainSignalingManager {
130173
pub fn new() -> Self {
131174
Self {
132175
drain_contexts: Arc::new(RwLock::new(HashMap::new())),
133176
global_drain_timeout: Duration::from_secs(600),
134-
default_http_drain_timeout: Duration::from_millis(5000),
177+
default_http_drain_timeout: Duration::from_secs(5),
135178
listener_drain_state: Arc::new(RwLock::new(None)),
136179
}
137180
}
@@ -207,17 +250,23 @@ impl DrainSignalingManager {
207250
pub async fn initiate_listener_drain(
208251
&self,
209252
listener_id: String,
210-
is_http: bool,
211-
http_drain_timeout: Option<Duration>,
253+
protocol_config: ListenerProtocolConfig,
212254
active_connections: usize,
213255
) -> Result<Arc<ListenerDrainContext>> {
214-
let strategy = if is_http {
215-
DrainStrategy::Http {
256+
let strategy = match protocol_config {
257+
ListenerProtocolConfig::Http { drain_timeout } => DrainStrategy::Http {
216258
global_timeout: self.global_drain_timeout,
217-
drain_timeout: http_drain_timeout.unwrap_or(self.default_http_drain_timeout),
218-
}
219-
} else {
220-
DrainStrategy::Tcp { global_timeout: self.global_drain_timeout }
259+
drain_timeout: drain_timeout.unwrap_or(self.default_http_drain_timeout),
260+
},
261+
ListenerProtocolConfig::Tcp => DrainStrategy::Tcp {
262+
global_timeout: self.global_drain_timeout
263+
},
264+
ListenerProtocolConfig::Mixed { http_drain_timeout, has_tcp, has_http } => DrainStrategy::Mixed {
265+
global_timeout: self.global_drain_timeout,
266+
http_drain_timeout: http_drain_timeout.unwrap_or(self.default_http_drain_timeout),
267+
tcp_connections: has_tcp,
268+
http_connections: has_http,
269+
},
221270
};
222271

223272
let context = Arc::new(ListenerDrainContext::new(listener_id.clone(), strategy.clone(), active_connections));
@@ -326,6 +375,37 @@ impl DrainSignalingManager {
326375
Err(Error::new("Timeout waiting for listener drain completion"))
327376
}
328377
}
378+
379+
pub async fn initiate_listener_drain_from_filter_analysis(
380+
&self,
381+
listener_id: String,
382+
filter_chains: &[FilterChain],
383+
active_connections: usize,
384+
) -> Result<Arc<ListenerDrainContext>> {
385+
let mut has_http = false;
386+
let mut has_tcp = false;
387+
let mut http_drain_timeout: Option<Duration> = None;
388+
389+
for filter_chain in filter_chains {
390+
match &filter_chain.terminal_filter {
391+
MainFilter::Http(http_config) => {
392+
has_http = true;
393+
http_drain_timeout = http_config.drain_timeout;
394+
}
395+
MainFilter::Tcp(_) => {
396+
has_tcp = true;
397+
}
398+
}
399+
}
400+
401+
let protocol_config = ListenerProtocolConfig::from_listener_analysis(
402+
has_http,
403+
has_tcp,
404+
http_drain_timeout,
405+
);
406+
407+
self.initiate_listener_drain(listener_id, protocol_config, active_connections).await
408+
}
329409
}
330410

331411
impl Clone for DrainSignalingManager {
@@ -403,7 +483,11 @@ mod tests {
403483
let manager = DrainSignalingManager::new();
404484
assert!(!manager.has_draining_listeners().await);
405485

406-
let context = manager.initiate_listener_drain("test".to_string(), false, None, 1).await.unwrap();
486+
let context = manager.initiate_listener_drain(
487+
"test".to_string(),
488+
ListenerProtocolConfig::Tcp,
489+
1
490+
).await.unwrap();
407491

408492
assert!(manager.has_draining_listeners().await);
409493
assert_eq!(manager.get_draining_listeners().await, vec!["test"]);
@@ -417,7 +501,11 @@ mod tests {
417501
async fn test_timeout_behavior() {
418502
let manager = DrainSignalingManager::with_timeouts(Duration::from_millis(50), Duration::from_millis(25));
419503

420-
let context = manager.initiate_listener_drain("timeout-test".to_string(), true, None, 5).await.unwrap();
504+
let context = manager.initiate_listener_drain(
505+
"timeout-test".to_string(),
506+
ListenerProtocolConfig::Http { drain_timeout: None },
507+
5
508+
).await.unwrap();
421509

422510
sleep(Duration::from_millis(10)).await;
423511
sleep(Duration::from_millis(60)).await;
@@ -435,4 +523,51 @@ mod tests {
435523
"Expected manager to no longer track the listener after timeout"
436524
);
437525
}
526+
527+
#[tokio::test]
528+
async fn test_mixed_protocol_drain_context() {
529+
let strategy = DrainStrategy::Mixed {
530+
global_timeout: Duration::from_secs(600),
531+
http_drain_timeout: Duration::from_secs(5),
532+
tcp_connections: true,
533+
http_connections: true,
534+
};
535+
let context = ListenerDrainContext::new("test-mixed".to_string(), strategy, 10);
536+
537+
assert_eq!(context.get_active_connections().await, 10);
538+
assert!(!context.is_completed().await);
539+
assert_eq!(context.get_http_drain_timeout(), Some(Duration::from_secs(5)));
540+
assert!(!context.is_timeout_exceeded());
541+
}
542+
543+
#[tokio::test]
544+
async fn test_listener_protocol_config_analysis() {
545+
let http_config = ListenerProtocolConfig::from_listener_analysis(
546+
true, false, Some(Duration::from_secs(10))
547+
);
548+
match http_config {
549+
ListenerProtocolConfig::Http { drain_timeout } => {
550+
assert_eq!(drain_timeout, Some(Duration::from_secs(10)));
551+
}
552+
_ => panic!("Expected HTTP config"),
553+
}
554+
555+
let tcp_config = ListenerProtocolConfig::from_listener_analysis(false, true, None);
556+
match tcp_config {
557+
ListenerProtocolConfig::Tcp => {},
558+
_ => panic!("Expected TCP config"),
559+
}
560+
561+
let mixed_config = ListenerProtocolConfig::from_listener_analysis(
562+
true, true, Some(Duration::from_secs(3))
563+
);
564+
match mixed_config {
565+
ListenerProtocolConfig::Mixed { http_drain_timeout, has_tcp, has_http } => {
566+
assert_eq!(http_drain_timeout, Some(Duration::from_secs(3)));
567+
assert!(has_tcp);
568+
assert!(has_http);
569+
}
570+
_ => panic!("Expected Mixed config"),
571+
}
572+
}
438573
}

orion-lib/src/listeners/filterchain.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ impl FilterchainType {
236236
stream,
237237
hyper::service::service_fn(|req: Request<hyper::body::Incoming>| {
238238
let handler_req =
239-
ExtendedRequest { request: req, downstream_metadata: downstream_metadata.clone() };
239+
ExtendedRequest { request: req, downstream_metadata: Arc::new(downstream_metadata.connection.clone()) };
240240
req_handler.call(handler_req).map_err(orion_error::Error::into_inner)
241241
}),
242242
)

orion-lib/src/listeners/http_connection_manager.rs

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,10 @@ use parking_lot::Mutex;
7171
use route::MatchedRequest;
7272
use scopeguard::defer;
7373
use std::collections::HashSet;
74-
use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
74+
use std::sync::atomic::AtomicUsize;
7575
use std::thread::ThreadId;
7676
use std::time::Instant;
77-
use std::{fmt, future::Future, result::Result as StdResult, sync::Arc};
77+
use std::{fmt, future::Future, result::Result as StdResult, sync::{Arc, LazyLock}};
7878
use tokio::sync::mpsc::Permit;
7979
use tokio::sync::watch;
8080
use upgrades as upgrade_utils;
@@ -92,7 +92,7 @@ use crate::{
9292
body::body_with_timeout::BodyWithTimeout,
9393
listeners::{
9494
access_log::AccessLogContext, drain_signaling::DrainSignalingManager,
95-
filter_state::DownstreamConnectionMetadata, listeners_manager::ConnectionManager, rate_limiter::LocalRateLimit,
95+
filter_state::{DownstreamConnectionMetadata, DownstreamMetadata}, listeners_manager::ConnectionManager, rate_limiter::LocalRateLimit,
9696
synthetic_http_response::SyntheticHttpResponse,
9797
},
9898
utils::http::{request_head_size, response_head_size},
@@ -102,6 +102,9 @@ use orion_tracing::http_tracer::{HttpTracer, SpanKind, SpanName};
102102
use orion_tracing::request_id::{RequestId, RequestIdManager};
103103
use orion_tracing::trace_context::TraceContext;
104104

105+
static EMPTY_HASHMAP: LazyLock<Arc<HashMap<RouteMatch, Vec<Arc<HttpFilter>>>>> =
106+
LazyLock::new(|| Arc::new(HashMap::new()));
107+
105108
#[derive(Debug, Clone)]
106109
pub struct HttpConnectionManagerBuilder {
107110
listener_name: Option<&'static str>,
@@ -405,7 +408,7 @@ impl HttpConnectionManager {
405408
}
406409

407410
pub fn remove_route(&self) {
408-
self.http_filters_per_route.swap(Arc::new(HashMap::new()));
411+
self.http_filters_per_route.swap(EMPTY_HASHMAP.clone());
409412
let _ = self.router_sender.send_replace(None);
410413
}
411414

@@ -418,10 +421,11 @@ impl HttpConnectionManager {
418421
}
419422

420423
pub async fn start_draining(&self, drain_state: crate::listeners::drain_signaling::ListenerDrainState) {
421-
if let Some(drain_timeout) = self.drain_timeout {
422-
let listener_id = format!("{}-{}", self.listener_name, self.filter_chain_match_hash);
423-
let _ = self.drain_signaling.initiate_listener_drain(listener_id, true, Some(drain_timeout), 0).await;
424-
}
424+
let listener_id = format!("{}-{}", self.listener_name, self.filter_chain_match_hash);
425+
let protocol_config = crate::listeners::drain_signaling::ListenerProtocolConfig::Http {
426+
drain_timeout: self.drain_timeout
427+
};
428+
let _ = self.drain_signaling.initiate_listener_drain(listener_id, protocol_config, 0).await;
425429

426430
self.drain_signaling.start_listener_draining(drain_state).await;
427431
}
@@ -628,7 +632,7 @@ pub(crate) struct HttpRequestHandler {
628632

629633
pub struct ExtendedRequest<B> {
630634
pub request: Request<B>,
631-
pub downstream_metadata: Arc<DownstreamMetadata>,
635+
pub downstream_metadata: Arc<DownstreamConnectionMetadata>,
632636
}
633637

634638
#[derive(Debug)]
@@ -1174,7 +1178,7 @@ impl Service<ExtendedRequest<Incoming>> for HttpRequestHandler {
11741178

11751179
//
11761180
// 1. evaluate InitHttpContext, if logging is enabled
1177-
eval_http_init_context(&request, &trans_handler, downstream_metadata.server_name.as_deref());
1181+
eval_http_init_context(&request, &trans_handler, None);
11781182

11791183
//
11801184
// 2. create the MetricsBody, which will track the size of the request body
@@ -1329,9 +1333,14 @@ impl Service<ExtendedRequest<Incoming>> for HttpRequestHandler {
13291333
return Ok(response);
13301334
};
13311335

1336+
let downstream_metadata_with_server_name = Arc::new(DownstreamMetadata::new(
1337+
downstream_metadata.as_ref().clone(),
1338+
None::<String>,
1339+
));
1340+
13321341
let response = trans_handler
13331342
.clone()
1334-
.handle_transaction(route_conf, manager, permit, request, downstream_metadata)
1343+
.handle_transaction(route_conf, manager, permit, request, downstream_metadata_with_server_name)
13351344
.await;
13361345

13371346
trans_handler.trace_status_code(response, listener_name)

orion-lib/src/listeners/lds_update.rs

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -127,22 +127,13 @@ impl LdsManager {
127127

128128
let mut listeners_guard = listeners.write().await;
129129
if let Some(versions) = listeners_guard.get_vec_mut(&name) {
130-
let mut to_remove = Vec::new();
131-
for (i, listener_info) in versions.iter().enumerate() {
132-
if listener_info.is_draining() {
133-
to_remove.push(i);
134-
}
135-
}
136-
137-
for &index in to_remove.iter().rev() {
138-
if let Some(listener_info) = versions.get_mut(index) {
130+
versions.iter_mut()
131+
.filter(|listener_info| listener_info.is_draining())
132+
.for_each(|listener_info| {
139133
listener_info.handle.abort();
140134
info!("LDS: Draining version of listener '{}' forcibly closed after timeout", name);
141-
}
142-
}
143-
for &index in to_remove.iter().rev() {
144-
versions.remove(index);
145-
}
135+
});
136+
versions.retain(|listener_info| !listener_info.is_draining());
146137
if versions.is_empty() {
147138
listeners_guard.remove(&name);
148139
}

0 commit comments

Comments
 (0)