Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ pub struct Authenticator {
/// The prefix denoting the maximum number of the clients that can be connected via Wireguard using IPv6.
/// The maximum value for IPv6 is 128
pub private_network_prefix_v6: u8,

/// Timeout (in milliseconds) to wait for responses from the peer controller before failing.
/// Helps the authenticator recover from suspend/resume scenarios where peer RPCs hang.
pub peer_interaction_timeout_ms: u64,
}

impl Default for Authenticator {
Expand All @@ -68,6 +72,7 @@ impl Default for Authenticator {
tunnel_announced_port: WG_TUNNEL_PORT,
private_network_prefix_v4: WG_TUN_DEVICE_NETMASK_V4,
private_network_prefix_v6: WG_TUN_DEVICE_NETMASK_V6,
peer_interaction_timeout_ms: default_peer_interaction_timeout_ms(),
}
}
}
Expand All @@ -85,3 +90,7 @@ impl From<Authenticator> for nym_wireguard_types::Config {
}
}
}

pub const fn default_peer_interaction_timeout_ms() -> u64 {
5_000
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ pub enum AuthenticatorError {
#[error("peers can't be interacted with anymore")]
PeerInteractionStopped,

#[error("peers interaction timed out while attempting to {operation}")]
PeerInteractionTimeout { operation: &'static str },

#[error("unknown version number")]
UnknownVersion,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use std::{
sync::Arc,
time::{Duration, SystemTime},
};
use tokio::sync::RwLock;
use tokio_stream::wrappers::IntervalStream;

type AuthenticatorHandleResult = Result<(Vec<u8>, Option<Recipient>), AuthenticatorError>;
Expand Down Expand Up @@ -74,7 +73,7 @@ pub(crate) struct MixnetListener {
pub(crate) mixnet_client: nym_sdk::mixnet::MixnetClient,

// Registrations awaiting confirmation
pub(crate) registered_and_free: RwLock<RegisteredAndFree>,
pub(crate) registered_and_free: RegisteredAndFree,

pub(crate) peer_manager: PeerManager,

Expand All @@ -95,14 +94,15 @@ impl MixnetListener {
mixnet_client: nym_sdk::mixnet::MixnetClient,
upgrade_mode: UpgradeModeDetails,
ecash_verifier: Arc<dyn EcashManager + Send + Sync>,
peer_interaction_timeout: Duration,
) -> Self {
let timeout_check_interval =
IntervalStream::new(tokio::time::interval(DEFAULT_REGISTRATION_TIMEOUT_CHECK));
MixnetListener {
config,
mixnet_client,
registered_and_free: RwLock::new(RegisteredAndFree::new(free_private_network_ips)),
peer_manager: PeerManager::new(wireguard_gateway_data),
registered_and_free: RegisteredAndFree::new(free_private_network_ips),
peer_manager: PeerManager::new(wireguard_gateway_data, peer_interaction_timeout),
upgrade_mode,
ecash_verifier,
timeout_check_interval,
Expand Down Expand Up @@ -131,8 +131,8 @@ impl MixnetListener {
))
}

async fn remove_stale_registrations(&self) -> Result<(), AuthenticatorError> {
let mut registered_and_free = self.registered_and_free.write().await;
async fn remove_stale_registrations(&mut self) -> Result<(), AuthenticatorError> {
let registered_and_free = &mut self.registered_and_free;
let registered_values: Vec<_> = registered_and_free
.registration_in_progres
.values()
Expand Down Expand Up @@ -185,8 +185,9 @@ impl MixnetListener {
) -> AuthenticatorHandleResult {
let remote_public = init_message.pub_key();
let nonce: u64 = fastrand::u64(..);
let mut registered_and_free = self.registered_and_free.write().await;
if let Some(registration_data) = registered_and_free

if let Some(registration_data) = self
.registered_and_free
.registration_in_progres
.get(&remote_public)
{
Expand Down Expand Up @@ -292,7 +293,17 @@ impl MixnetListener {
return Ok((bytes, reply_to));
}

let peer = self.peer_manager.query_peer(remote_public).await?;
let peer = match self.peer_manager.query_peer(remote_public).await {
Ok(peer) => peer,
Err(err) => {
tracing::warn!(
"Failed to query peer {}: {err}. Continuing with fresh registration",
remote_public
);
None
}
};

if let Some(peer) = peer {
let allowed_ipv4 = peer
.allowed_ips
Expand Down Expand Up @@ -383,27 +394,33 @@ impl MixnetListener {
return Ok((bytes, reply_to));
}

let private_ip_ref = registered_and_free
.free_private_network_ips
.iter_mut()
.filter(|r| r.1.is_none())
.choose(&mut thread_rng())
.ok_or(AuthenticatorError::NoFreeIp)?;
let private_ips = *private_ip_ref.0;
// mark it as used, even though it's not final
*private_ip_ref.1 = Some(SystemTime::now());
let gateway_data = GatewayClient::new(
self.keypair().private_key(),
remote_public.inner(),
*private_ip_ref.0,
nonce,
);
let registration_data = latest::registration::RegistrationData {
nonce,
gateway_data: gateway_data.clone(),
wg_port: self.config.authenticator.tunnel_announced_port,
let (registration_data, private_ips) = {
let private_ip = self
.registered_and_free
.free_private_network_ips
.iter_mut()
.filter(|r| r.1.is_none())
.choose(&mut thread_rng())
.ok_or(AuthenticatorError::NoFreeIp)?;
let private_ips = *private_ip.0;
// mark it as used, even though it's not final
*private_ip.1 = Some(SystemTime::now());

let gateway_data = GatewayClient::new(
self.keypair().private_key(),
remote_public.inner(),
private_ips,
nonce,
);
let registration_data = latest::registration::RegistrationData {
nonce,
gateway_data: gateway_data.clone(),
wg_port: self.config.authenticator.tunnel_announced_port,
};
(registration_data, private_ips)
};
registered_and_free

self.registered_and_free
.registration_in_progres
.insert(remote_public, registration_data.clone());
let bytes = match AuthenticatorVersion::from(protocol) {
Expand Down Expand Up @@ -539,12 +556,12 @@ impl MixnetListener {
request_id: u64,
reply_to: Option<Recipient>,
) -> AuthenticatorHandleResult {
let mut registered_and_free = self.registered_and_free.write().await;
let registration_data = registered_and_free
let registration_data = self
.registered_and_free
.registration_in_progres
.get(&final_message.gateway_client_pub_key())
.ok_or(AuthenticatorError::RegistrationNotInProgress)?
.clone();
.cloned()
.ok_or(AuthenticatorError::RegistrationNotInProgress)?;

if final_message
.verify(self.keypair().private_key(), registration_data.nonce)
Expand Down Expand Up @@ -595,7 +612,7 @@ impl MixnetListener {
return Err(e);
}

registered_and_free
self.registered_and_free
.registration_in_progres
.remove(&final_message.gateway_client_pub_key());

Expand Down Expand Up @@ -818,7 +835,7 @@ impl MixnetListener {
.to_bytes()
.map_err(AuthenticatorError::response_serialisation)?,
AuthenticatorVersion::V1 | AuthenticatorVersion::V2 | AuthenticatorVersion::UNKNOWN => {
return Err(AuthenticatorError::UnknownVersion)
return Err(AuthenticatorError::UnknownVersion);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,15 @@ impl Authenticator {
}
})
.collect();
let peer_timeout = std::cmp::max(1, self.config.authenticator.peer_interaction_timeout_ms);
let mixnet_listener = crate::node::internal_service_providers::authenticator::mixnet_listener::MixnetListener::new(
self.config,
free_private_network_ips,
self.wireguard_gateway_data,
mixnet_client,
self.upgrade_mode_state,
self.ecash_verifier,
std::time::Duration::from_millis(peer_timeout),
);

tracing::info!("The address of this client is: {self_address}");
Expand Down
Loading
Loading