Skip to content

Commit 9a51db6

Browse files
committed
Implement a locked cache for assigned exit nodes, Add exit node assignment check to local exit node selection logic. (Properly fixes #143)
1 parent 996b36d commit 9a51db6

File tree

4 files changed

+170
-56
lines changed

4 files changed

+170
-56
lines changed

src/daemon.rs

Lines changed: 153 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ pub struct Context {
7676
pub client: Client,
7777
// Let's implement a lock here to prevent multiple reconciles assigning the same exit node
7878
// to multiple services implicitly (#143)
79-
pub exit_node_lock: tokio::sync::Mutex<()>,
79+
pub exit_node_lock: Arc<tokio::sync::Mutex<Option<(std::time::Instant, String)>>>,
8080
}
8181

8282
/// Parses the `query` string to extract the namespace and name.
@@ -168,58 +168,129 @@ async fn check_service_managed(service: &Service) -> bool {
168168
const OPERATOR_CLASS: &str = "chisel-operator.io/chisel-operator-class";
169169
const OPERATOR_MANAGER: &str = "chisel-operator";
170170

171+
const BACKOFF_TIME_SECS: u64 = 5;
172+
173+
async fn find_free_exit_nodes(ctx: Arc<Context>) -> Result<Vec<ExitNode>, ReconcileError> {
174+
let svc_api: Api<Service> = Api::all(ctx.client.clone());
175+
let exit_node_api: Api<ExitNode> = Api::all(ctx.client.clone());
176+
177+
let svc_list = svc_api.list(&ListParams::default().timeout(30)).await?;
178+
let exit_node_list = exit_node_api
179+
.list(&ListParams::default().timeout(30))
180+
.await?;
181+
182+
let svc_list_filtered = svc_list
183+
.items
184+
.into_iter()
185+
.flat_map(|svc| {
186+
svc.status
187+
.and_then(|status| status.load_balancer)
188+
.and_then(|lb| lb.ingress)
189+
.and_then(|ingress| ingress.first().cloned())
190+
.and_then(|ingress| ingress.ip)
191+
// .map(|ip| ip)
192+
})
193+
.collect::<Vec<_>>();
194+
195+
let exit_node_list_filtered = exit_node_list.items.into_iter().filter(|node| {
196+
let host = node.get_host();
197+
!svc_list_filtered.contains(&host)
198+
});
199+
200+
Ok(exit_node_list_filtered.collect())
201+
}
202+
171203
#[instrument(skip(ctx))]
172204
async fn select_exit_node_local(
173205
ctx: &Arc<Context>,
174206
service: &Service,
175207
) -> Result<ExitNode, ReconcileError> {
176208
// Lock to prevent race conditions when assigning exit nodes to services
177-
let _lock = ctx.exit_node_lock.lock().await;
209+
let mut lock = match ctx.exit_node_lock.try_lock() {
210+
Ok(lock) => lock,
211+
Err(_) => {
212+
warn!("Exit node lock is already held, requeuing");
213+
return Err(ReconcileError::NoAvailableExitNodes);
214+
}
215+
};
178216
// if service has label with exit node name, use that and error if not found
179-
if let Some(exit_node_name) = service
180-
.metadata
181-
.labels
182-
.as_ref()
183-
.and_then(|labels| labels.get(EXIT_NODE_NAME_LABEL))
184-
{
185-
info!(
186-
?exit_node_name,
187-
"Service explicitly set to use a named exit node, using that"
188-
);
189-
find_exit_node_from_label(
190-
ctx.clone(),
191-
exit_node_name,
192-
&service.namespace().expect("Service namespace not found"),
193-
)
194-
.await
195-
.ok_or(ReconcileError::NoAvailableExitNodes)
196-
} else {
197-
// otherwise, use the first available exit node
198-
// (one to one mapping)
199-
let nodes: Api<ExitNode> = Api::all(ctx.client.clone());
200-
let node_list: kube::core::ObjectList<ExitNode> =
201-
nodes.list(&ListParams::default().timeout(30)).await?;
202-
node_list
203-
.items
204-
.into_iter()
205-
.filter(|node| {
206-
let is_cloud_provisioned = node
207-
.metadata
208-
.annotations
209-
.as_ref()
210-
.map(|annotations: &BTreeMap<String, String>| {
211-
annotations.contains_key(EXIT_NODE_PROVISIONER_LABEL)
212-
})
213-
.unwrap_or(false);
214-
215-
// Is the ExitNode not cloud provisioned or is the status set?
216-
!is_cloud_provisioned || node.status.is_some()
217-
})
218-
.collect::<Vec<ExitNode>>()
219-
.first()
217+
let exit_node_selection = {
218+
if let Some(exit_node_name) = service
219+
.metadata
220+
.labels
221+
.as_ref()
222+
.and_then(|labels| labels.get(EXIT_NODE_NAME_LABEL))
223+
{
224+
info!(
225+
?exit_node_name,
226+
"Service explicitly set to use a named exit node, using that"
227+
);
228+
find_exit_node_from_label(
229+
ctx.clone(),
230+
exit_node_name,
231+
&service.namespace().expect("Service namespace not found"),
232+
)
233+
.await
220234
.ok_or(ReconcileError::NoAvailableExitNodes)
221-
.cloned()
222-
}
235+
} else {
236+
// otherwise, use the first available exit node
237+
// (one to one mapping)
238+
// let nodes: Api<ExitNode> = Api::all(ctx.client.clone());
239+
// let node_list: kube::core::ObjectList<ExitNode> =
240+
// nodes.list(&ListParams::default().timeout(30)).await?;
241+
let node_list = find_free_exit_nodes(ctx.clone()).await?;
242+
debug!(?node_list, "Exit node list");
243+
node_list
244+
.into_iter()
245+
.filter(|node| {
246+
let is_cloud_provisioned = node
247+
.metadata
248+
.annotations
249+
.as_ref()
250+
.map(|annotations: &BTreeMap<String, String>| {
251+
annotations.contains_key(EXIT_NODE_PROVISIONER_LABEL)
252+
})
253+
.unwrap_or(false);
254+
255+
// Is the ExitNode not cloud provisioned or is the status set?
256+
!is_cloud_provisioned || node.status.is_some()
257+
})
258+
.filter(|node| {
259+
// debug!(?node, "Checking exit node");
260+
let host = node.get_host();
261+
if let Some((instant, ip_filter)) = lock.as_ref() {
262+
// Skip this exit node if it was recently assigned and the backoff period hasn't elapsed
263+
if instant.elapsed().as_secs() < BACKOFF_TIME_SECS {
264+
host != *ip_filter
265+
} else {
266+
true
267+
}
268+
} else {
269+
// No lock present, this exit node is available
270+
true
271+
}
272+
})
273+
.collect::<Vec<ExitNode>>()
274+
.first()
275+
.ok_or(ReconcileError::NoAvailableExitNodes)
276+
.cloned()
277+
}
278+
};
279+
// .inspect(|node| {
280+
// let exit_node_ip = node.get_host();
281+
// debug!(?exit_node_ip, "Selected exit node");
282+
// drop(lock);
283+
// })
284+
285+
// Add the selected exit node to the lock, with the current time and hostname
286+
// This will prevent other services within the backoff period from selecting the same exit node
287+
// Fixes #143 by filtering out exit nodes that were recently assigned
288+
// when applying multiple objects in parallel
289+
exit_node_selection.inspect(|node| {
290+
let exit_node_ip = node.get_host();
291+
debug!(?exit_node_ip, "Selected exit node");
292+
*lock = Some((std::time::Instant::now(), node.get_host()));
293+
})
223294
}
224295

225296
#[instrument(skip(ctx))]
@@ -309,7 +380,7 @@ async fn exit_node_for_service(
309380
}
310381
// #[instrument(skip(ctx), fields(trace_id))]
311382
/// Reconcile cluster state
312-
#[instrument(skip(ctx))]
383+
#[instrument(skip(ctx, obj))]
313384
async fn reconcile_svcs(obj: Arc<Service>, ctx: Arc<Context>) -> Result<Action, ReconcileError> {
314385
// Return if service is not LoadBalancer or if the loadBalancerClass is not blank or set to $OPERATOR_CLASS
315386

@@ -337,8 +408,36 @@ async fn reconcile_svcs(obj: Arc<Service>, ctx: Arc<Context>) -> Result<Action,
337408
let services: Api<Service> = Api::namespaced(ctx.client.clone(), &obj.namespace().unwrap());
338409
let nodes: Api<ExitNode> = Api::all(ctx.client.clone());
339410

411+
// --- Let's skip reconciling services whose exit node IP addresses still exit in the cluster
412+
// only list IP addresses of exit nodes
413+
let nodes_by_ip: BTreeMap<String, ExitNode> = nodes
414+
.list(&ListParams::default().timeout(30))
415+
.await?
416+
.items
417+
.into_iter()
418+
.filter_map(|node| {
419+
let host = node.get_host();
420+
if let Some(_status) = &node.status {
421+
Some((host, node))
422+
} else {
423+
None
424+
}
425+
})
426+
.collect();
427+
340428
let mut svc = services.get_status(&obj.name_any()).await?;
341429

430+
let svc_lb_ip = svc
431+
.status
432+
.as_ref()
433+
.and_then(|status| status.load_balancer.as_ref())
434+
.and_then(|lb| lb.ingress.as_ref())
435+
.and_then(|ingress| ingress.first())
436+
.and_then(|ingress| ingress.ip.clone())
437+
.unwrap_or_default();
438+
439+
let existing_bound_node = nodes_by_ip.get(&svc_lb_ip);
440+
342441
let obj = svc.clone();
343442

344443
let node_list = nodes.list(&ListParams::default().timeout(30)).await?;
@@ -373,7 +472,9 @@ async fn reconcile_svcs(obj: Arc<Service>, ctx: Arc<Context>) -> Result<Action,
373472

374473
// Else, use the first available exit node
375474
// Fails if there's no empty exit node available
376-
else {
475+
else if let Some(node) = existing_bound_node {
476+
node.clone()
477+
} else {
377478
select_exit_node_local(&ctx, &obj).await?
378479
}
379480
};
@@ -492,7 +593,7 @@ fn error_policy_exit_node(
492593
}
493594
const UNMANAGED_PROVISIONER: &str = "unmanaged";
494595

495-
#[instrument(skip(ctx))]
596+
#[instrument(skip(ctx, obj))]
496597
async fn reconcile_nodes(obj: Arc<ExitNode>, ctx: Arc<Context>) -> Result<Action, ReconcileError> {
497598
info!("exit node reconcile request: {}", obj.name_any());
498599
let is_managed = check_exit_node_managed(&obj).await;
@@ -716,6 +817,8 @@ pub async fn run() -> color_eyre::Result<()> {
716817

717818
let mut reconcilers = vec![];
718819

820+
let lock = Arc::new(tokio::sync::Mutex::new(None));
821+
719822
info!("Starting reconcilers...");
720823

721824
// TODO: figure out how to do this in a single controller because there is a potential race where the exit node reconciler runs at the same time as the service one
@@ -743,7 +846,7 @@ pub async fn run() -> color_eyre::Result<()> {
743846
error_policy,
744847
Arc::new(Context {
745848
client: client.clone(),
746-
exit_node_lock: tokio::sync::Mutex::new(()),
849+
exit_node_lock: lock.clone(),
747850
}),
748851
)
749852
.for_each(|_| futures::future::ready(()))
@@ -771,7 +874,7 @@ pub async fn run() -> color_eyre::Result<()> {
771874
error_policy_exit_node,
772875
Arc::new(Context {
773876
client,
774-
exit_node_lock: tokio::sync::Mutex::new(()),
877+
exit_node_lock: lock,
775878
}),
776879
)
777880
.for_each(|_| futures::future::ready(()))

src/deployment.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use k8s_openapi::{
1313
apimachinery::pkg::apis::meta::v1::LabelSelector,
1414
};
1515
use kube::{api::ResourceExt, core::ObjectMeta, error::ErrorResponse, Resource};
16-
use tracing::{debug, info, instrument};
16+
use tracing::{info, instrument, trace};
1717

1818
const CHISEL_IMAGE: &str = "jpillora/chisel";
1919

@@ -69,7 +69,7 @@ pub fn generate_remote_arg(node: &ExitNode) -> String {
6969

7070
let host = node.get_host();
7171

72-
debug!(host = ?host, "Host");
72+
trace!(host = ?host, "Host");
7373

7474
// Determine if the host is an IPv6 address and format accordingly
7575
let formatted_host = match host.parse::<IpAddr>() {
@@ -78,7 +78,7 @@ pub fn generate_remote_arg(node: &ExitNode) -> String {
7878
};
7979

8080
let output = format!("{}:{}", formatted_host, node.spec.port);
81-
debug!(output = ?output, "Output");
81+
trace!(output = ?output, "Output");
8282
output
8383
}
8484

@@ -134,7 +134,7 @@ pub fn generate_tunnel_args(svc: &Service) -> Result<Vec<String>, ReconcileError
134134
.collect();
135135

136136
info!("Generated arguments: {:?}", ports);
137-
debug!(svc = ?svc, "Source service");
137+
trace!(svc = ?svc, "Source service");
138138
Ok(ports)
139139
}
140140

src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ pub enum ReconcileError {
88
#[error("There are no exit nodes available to assign")]
99
NoAvailableExitNodes,
1010

11+
#[error("Exit Node being already assigned and its backoff time has not expired")]
12+
ExitNodeBackoff,
13+
1114
#[error("There are no ports set on this LoadBalancer")]
1215
NoPortsSet,
1316

src/main.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,19 @@ async fn main() -> Result<()> {
5454

5555
let logfmt_logger = tracing_logfmt::layer().boxed();
5656

57-
let pretty_logger = tracing_subscriber::fmt::layer().pretty().boxed();
57+
let pretty_logger = tracing_subscriber::fmt::layer()
58+
.pretty()
59+
.with_thread_ids(true)
60+
.with_thread_names(true)
61+
.boxed();
5862

5963
let json_logger = tracing_subscriber::fmt::layer().json().boxed();
6064

61-
let compact_logger = tracing_subscriber::fmt::layer().compact().boxed();
65+
let compact_logger = tracing_subscriber::fmt::layer()
66+
.compact()
67+
.with_thread_ids(true)
68+
.with_thread_names(true)
69+
.boxed();
6270

6371
let logger = match logger_env.as_str() {
6472
"logfmt" => logfmt_logger,

0 commit comments

Comments
 (0)