diff --git a/Cargo.lock b/Cargo.lock index 600d725..5741e84 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -265,33 +265,12 @@ dependencies = [ "winapi", ] -[[package]] -name = "dtoa" -version = "1.0.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dcbb2bf8e87535c23f7a8a321e364ce21462d0ff10cb6407820e8e96dfff6653" - [[package]] name = "equivalent" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" -[[package]] -name = "erased-serde" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c138974f9d5e7fe373eb04df7cae98833802ae4b11c24ac7039a21d5af4b26c" -dependencies = [ - "serde", -] - -[[package]] -name = "erased_set" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a02a5d186d7bf1cb21f1f95e1a9cfa5c1f2dcd803a47aad454423ceec13525c5" - [[package]] name = "fastrand" version = "2.3.0" @@ -952,18 +931,29 @@ checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" [[package]] name = "iroh-metrics" -version = "0.32.0" +version = "0.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0f7cd1ffe3b152a5f4f4c1880e01e07d96001f20e02cc143cb7842987c616b3" +checksum = "f70466f14caff7420a14373676947e25e2917af6a5b1bec45825beb2bf1eb6a7" dependencies = [ - "erased_set", - "prometheus-client", + "iroh-metrics-derive", + "itoa", "serde", - "struct_iterable", - "thiserror 2.0.11", + "snafu", "tracing", ] +[[package]] +name = "iroh-metrics-derive" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d12f5c45c4ed2436302a4e03cad9a0ad34b2962ad0c5791e1019c0ee30eeb09" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.96", +] + [[package]] name = "iroh-quinn-udp" version = "0.5.7" @@ -1012,16 +1002,6 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ee93343901ab17bd981295f2cf0026d4ad018c7c31ba84549a4ddbb47a45104" -[[package]] -name = "lock_api" -version = "0.4.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" -dependencies = [ - "autocfg", - "scopeguard", -] - [[package]] name = "log" version = "0.4.25" @@ -1378,29 +1358,6 @@ version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" -[[package]] -name = "parking_lot" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" -dependencies = [ - "lock_api", - "parking_lot_core", -] - -[[package]] -name = "parking_lot_core" -version = "0.9.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" -dependencies = [ - "cfg-if", - "libc", - "redox_syscall", - "smallvec", - "windows-targets 0.52.6", -] - [[package]] name = "paste" version = "1.0.15" @@ -1507,29 +1464,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "prometheus-client" -version = "0.22.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "504ee9ff529add891127c4827eb481bd69dc0ebc72e9a682e187db4caa60c3ca" -dependencies = [ - "dtoa", - "itoa", - "parking_lot", - "prometheus-client-derive-encode", -] - -[[package]] -name = "prometheus-client-derive-encode" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.96", -] - [[package]] name = "quote" version = "1.0.38" @@ -1569,15 +1503,6 @@ dependencies = [ "getrandom", ] -[[package]] -name = "redox_syscall" -version = "0.5.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03a862b389f93e68874fbf580b9de08dd02facb9a788ebadaf4a3fd33cf58834" -dependencies = [ - "bitflags 2.8.0", -] - [[package]] name = "regex" version = "1.11.1" @@ -1691,12 +1616,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" -[[package]] -name = "scopeguard" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" - [[package]] name = "send_wrapper" version = "0.6.0" @@ -1840,35 +1759,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" -[[package]] -name = "struct_iterable" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "849a064c6470a650b72e41fa6c057879b68f804d113af92900f27574828e7712" -dependencies = [ - "struct_iterable_derive", - "struct_iterable_internal", -] - -[[package]] -name = "struct_iterable_derive" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8bb939ce88a43ea4e9d012f2f6b4cc789deb2db9d47bad697952a85d6978662c" -dependencies = [ - "erased-serde", - "proc-macro2", - "quote", - "struct_iterable_internal", - "syn 2.0.96", -] - -[[package]] -name = "struct_iterable_internal" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9426b2a0c03e6cc2ea8dbc0168dbbf943f88755e409fb91bcb8f6a268305f4a" - [[package]] name = "syn" version = "1.0.109" diff --git a/portmapper/Cargo.toml b/portmapper/Cargo.toml index d06f462..5f02482 100644 --- a/portmapper/Cargo.toml +++ b/portmapper/Cargo.toml @@ -22,7 +22,7 @@ derive_more = { version = "1.0.0", features = ["debug", "display", "from", "try_ futures-lite = "2.5" futures-util = "0.3.25" igd-next = { version = "0.15.1", features = ["aio_tokio"] } -iroh-metrics = { version = "0.32", default-features = false } +iroh-metrics = { version = "0.34", default-features = false } libc = "0.2.139" nested_enum_utils = "0.2.0" netwatch = { version = "0.4.0", path = "../netwatch" } diff --git a/portmapper/src/current_mapping.rs b/portmapper/src/current_mapping.rs index 5b67297..762433a 100644 --- a/portmapper/src/current_mapping.rs +++ b/portmapper/src/current_mapping.rs @@ -5,14 +5,16 @@ use std::{ net::{Ipv4Addr, SocketAddrV4}, num::NonZeroU16, pin::Pin, + sync::Arc, task::Poll, time::Duration, }; -use iroh_metrics::inc; use tokio::{sync::watch, time}; use tracing::{debug, trace}; +use crate::Metrics; + /// This is an implementation detail to facilitate testing. pub(super) trait Mapping: std::fmt::Debug + Unpin { fn external(&self) -> (Ipv4Addr, NonZeroU16); @@ -73,16 +75,18 @@ pub(super) struct CurrentMapping { /// Waker to ensure this is polled when needed. #[debug(skip)] waker: Option, + metrics: Arc, } impl CurrentMapping { /// Creates a new [`CurrentMapping`] and returns the watcher over its external address. - pub(super) fn new() -> (Self, watch::Receiver>) { + pub(super) fn new(metrics: Arc) -> (Self, watch::Receiver>) { let (address_tx, address_rx) = watch::channel(None); let wrapper = CurrentMapping { mapping: None, address_tx, waker: None, + metrics, }; (wrapper, address_rx) } @@ -108,7 +112,7 @@ impl CurrentMapping { // inform only if this produces a different external address let update = old_addr != maybe_external_addr; if update { - inc!(super::Metrics, external_address_updated); + self.metrics.external_address_updated.inc(); }; update }); @@ -201,7 +205,7 @@ mod tests { const TEST_PORT: NonZeroU16 = // SAFETY: it's clearly non zero unsafe { NonZeroU16::new_unchecked(9586) }; const TEST_IP: std::net::Ipv4Addr = std::net::Ipv4Addr::LOCALHOST; - let (mut c, mut watcher) = CurrentMapping::::new(); + let (mut c, mut watcher) = CurrentMapping::::new(Default::default()); let now = std::time::Instant::now(); c.update(Some((TEST_IP, TEST_PORT))); diff --git a/portmapper/src/lib.rs b/portmapper/src/lib.rs index 85c7a72..f33490b 100644 --- a/portmapper/src/lib.rs +++ b/portmapper/src/lib.rs @@ -3,12 +3,12 @@ use std::{ net::{Ipv4Addr, SocketAddrV4}, num::NonZeroU16, + sync::Arc, time::{Duration, Instant}, }; use current_mapping::CurrentMapping; use futures_lite::StreamExt; -use iroh_metrics::inc; use netwatch::interfaces::HomeRouter; use snafu::Snafu; use tokio::sync::{mpsc, oneshot, watch}; @@ -139,6 +139,8 @@ pub struct Client { port_mapping: watch::Receiver>, /// Channel used to communicate with the port mapping service. service_tx: mpsc::Sender, + /// Metrics collected by the service. + metrics: Arc, /// A handle to the service that will cancel the spawned task once the client is dropped. _service_handle: std::sync::Arc>, } @@ -152,9 +154,14 @@ impl Default for Client { impl Client { /// Create a new port mapping client. pub fn new(config: Config) -> Self { + Self::with_metrics(config, Default::default()) + } + + /// Creates a new port mapping client with a previously created metrics collector. + pub fn with_metrics(config: Config, metrics: Arc) -> Self { let (service_tx, service_rx) = mpsc::channel(SERVICE_CHANNEL_CAPACITY); - let (service, watcher) = Service::new(config, service_rx); + let (service, watcher) = Service::new(config, service_rx, metrics.clone()); let handle = AbortOnDropHandle::new(tokio::spawn( async move { service.run().await }.instrument(info_span!("portmapper.service")), @@ -163,6 +170,7 @@ impl Client { Client { port_mapping: watcher, service_tx, + metrics, _service_handle: std::sync::Arc::new(handle), } } @@ -230,6 +238,11 @@ impl Client { pub fn watch_external_address(&self) -> watch::Receiver> { self.port_mapping.clone() } + + /// Returns the metrics collected by the service. + pub fn metrics(&self) -> &Arc { + &self.metrics + } } /// Port mapping protocol information obtained during a probe. @@ -261,6 +274,7 @@ impl Probe { output: ProbeOutput, local_ip: Ipv4Addr, gateway: Ipv4Addr, + metrics: Arc, ) -> Probe { let ProbeOutput { upnp, pcp, nat_pmp } = output; let Config { @@ -270,8 +284,9 @@ impl Probe { } = config; let mut upnp_probing_task = util::MaybeFuture { inner: (enable_upnp && !upnp).then(|| { - Box::pin(async { - upnp::probe_available() + let metrics = metrics.clone(); + Box::pin(async move { + upnp::probe_available(&metrics) .await .map(|addr| (addr, Instant::now())) }) @@ -280,8 +295,9 @@ impl Probe { let mut pcp_probing_task = util::MaybeFuture { inner: (enable_pcp && !pcp).then(|| { - Box::pin(async { - inc!(Metrics, pcp_probes); + let metrics = metrics.clone(); + Box::pin(async move { + metrics.pcp_probes.inc(); pcp::probe_available(local_ip, gateway) .await .then(Instant::now) @@ -300,7 +316,7 @@ impl Probe { }; if upnp_probing_task.inner.is_some() { - inc!(Metrics, upnp_probes); + metrics.upnp_probes.inc(); } let mut upnp_done = upnp_probing_task.inner.is_none(); @@ -359,7 +375,7 @@ impl Probe { } /// Updates a probe with the `Some` values of another probe that is _assumed_ newer. - fn update(&mut self, probe: Probe) { + fn update(&mut self, probe: Probe, metrics: &Arc) { let Probe { last_probe, last_upnp_gateway_addr, @@ -367,7 +383,7 @@ impl Probe { last_nat_pmp, } = probe; if last_upnp_gateway_addr.is_some() { - inc!(Metrics, upnp_available); + metrics.upnp_available.inc(); let new_gateway = last_upnp_gateway_addr .as_ref() .map(|(addr, _last_seen)| addr); @@ -376,7 +392,7 @@ impl Probe { .as_ref() .map(|(addr, _last_seen)| addr); if new_gateway != old_gateway { - inc!(Metrics, upnp_gateway_updated); + metrics.upnp_gateway_updated.inc(); debug!( "upnp gateway changed {:?} -> {:?}", old_gateway @@ -390,7 +406,7 @@ impl Probe { self.last_upnp_gateway_addr = last_upnp_gateway_addr; } if last_pcp.is_some() { - inc!(Metrics, pcp_available); + metrics.pcp_available.inc(); self.last_pcp = last_pcp; } if last_nat_pmp.is_some() { @@ -428,14 +444,16 @@ pub struct Service { /// Requests for a probe that arrive while this task is still in progress will receive the same /// result. probing_task: Option<(AbortOnDropHandle, Vec>)>, + metrics: Arc, } impl Service { fn new( config: Config, rx: mpsc::Receiver, + metrics: Arc, ) -> (Self, watch::Receiver>) { - let (current_mapping, watcher) = CurrentMapping::new(); + let (current_mapping, watcher) = CurrentMapping::new(metrics.clone()); let mut full_probe = Probe::empty(); if let Some(in_the_past) = full_probe .last_probe @@ -452,6 +470,7 @@ impl Service { full_probe, mapping_task: None, probing_task: None, + metrics, }; (service, watcher) @@ -516,7 +535,7 @@ impl Service { receivers: Vec>, ) { let result = result.map(|probe| { - self.full_probe.update(probe); + self.full_probe.update(probe, &self.metrics); // TODO(@divma): the gateway of the current mapping could have changed. Tailscale // still assumes the current mapping is valid/active and will return it even after // this @@ -540,11 +559,11 @@ impl Service { } Ok(Err(e)) => { debug!("failed to get a port mapping {e}"); - inc!(Metrics, mapping_failures); + self.metrics.mapping_failures.inc(); } Err(e) => { debug!("failed to get a port mapping {e}"); - inc!(Metrics, mapping_failures); + self.metrics.mapping_failures.inc(); } } } @@ -564,7 +583,7 @@ impl Service { async fn update_local_port(&mut self, local_port: Option) { // ignore requests to update the local port in a way that does not produce a change if local_port != self.local_port { - inc!(Metrics, local_port_updates); + self.metrics.local_port_updates.inc(); let old_port = std::mem::replace(&mut self.local_port, local_port); // clear the current mapping task if any @@ -602,7 +621,7 @@ impl Service { fn get_mapping(&mut self, external_addr: Option<(Ipv4Addr, NonZeroU16)>) { if let Some(local_port) = self.local_port { - inc!(Metrics, mapping_attempts); + self.metrics.mapping_attempts.inc(); let (local_ip, gateway) = match ip_and_gateway() { Ok(ip_and_gw) => ip_and_gw, @@ -681,7 +700,7 @@ impl Service { // we don't care if the requester is no longer there let _ = result_tx.send(Ok(probe_output)); } else { - inc!(Metrics, probes_started); + self.metrics.probes_started.inc(); let (local_ip, gateway) = match ip_and_gateway() { Ok(ip_and_gw) => ip_and_gw, @@ -694,13 +713,14 @@ impl Service { }; let config = self.config.clone(); - let handle = - tokio::spawn( - async move { - Probe::from_output(config, probe_output, local_ip, gateway).await - } - .instrument(info_span!("portmapper.probe")), - ); + let metrics = self.metrics.clone(); + let handle = tokio::spawn( + async move { + Probe::from_output(config, probe_output, local_ip, gateway, metrics) + .await + } + .instrument(info_span!("portmapper.probe")), + ); let receivers = vec![result_tx]; self.probing_task = Some((AbortOnDropHandle::new(handle), receivers)); } diff --git a/portmapper/src/metrics.rs b/portmapper/src/metrics.rs index f30dbd5..f7982f8 100644 --- a/portmapper/src/metrics.rs +++ b/portmapper/src/metrics.rs @@ -1,68 +1,41 @@ -use iroh_metrics::{ - core::{Counter, Metric}, - struct_iterable::Iterable, -}; +use iroh_metrics::{Counter, MetricsGroup}; +use serde::{Deserialize, Serialize}; /// Enum of metrics for the module -#[allow(missing_docs)] -#[derive(Debug, Clone, Iterable)] +#[derive(Debug, Default, MetricsGroup, Serialize, Deserialize)] +#[metrics(name = "portmap")] pub struct Metrics { /* * General port mapping metrics */ + /// Number of probing tasks started. pub probes_started: Counter, + /// Number of updates to the local port. pub local_port_updates: Counter, + /// Number of mapping tasks started. pub mapping_attempts: Counter, + /// Number of failed mapping tasks. pub mapping_failures: Counter, + /// Number of times the external address obtained via port mapping was updated. pub external_address_updated: Counter, /* * UPnP metrics */ + /// Number of UPnP probes executed. pub upnp_probes: Counter, + /// Number of failed Upnp probes. pub upnp_probes_failed: Counter, + /// Number of UPnP probes that found it available. pub upnp_available: Counter, + /// Number of UPnP probes that resulted in a gateway different to the previous one, pub upnp_gateway_updated: Counter, /* * PCP metrics */ + /// Number of PCP probes executed. pub pcp_probes: Counter, + /// Number of PCP probes that found it available. pub pcp_available: Counter, } - -impl Default for Metrics { - fn default() -> Self { - Self { - probes_started: Counter::new("Number of probing tasks started."), - local_port_updates: Counter::new("Number of updates to the local port."), - mapping_attempts: Counter::new("Number of mapping tasks started."), - mapping_failures: Counter::new("Number of failed mapping tasks"), - external_address_updated: Counter::new( - "Number of times the external address obtained via port mapping was updated.", - ), - - /* - * UPnP metrics - */ - upnp_probes: Counter::new("Number of UPnP probes executed."), - upnp_probes_failed: Counter::new("Number of failed Upnp probes"), - upnp_available: Counter::new("Number of UPnP probes that found it available."), - upnp_gateway_updated: Counter::new( - "Number of UPnP probes that resulted in a gateway different to the previous one.", - ), - - /* - * PCP metrics - */ - pcp_probes: Counter::new("Number of PCP probes executed."), - pcp_available: Counter::new("Number of PCP probes that found it available."), - } - } -} - -impl Metric for Metrics { - fn name() -> &'static str { - "portmap" - } -} diff --git a/portmapper/src/upnp.rs b/portmapper/src/upnp.rs index dd8bc3d..b4bf5e7 100644 --- a/portmapper/src/upnp.rs +++ b/portmapper/src/upnp.rs @@ -1,11 +1,11 @@ use std::{ net::{Ipv4Addr, SocketAddrV4}, num::NonZeroU16, + sync::Arc, time::Duration, }; use igd_next::{aio as aigd, AddAnyPortError, GetExternalIpError, RemovePortError, SearchError}; -use iroh_metrics::inc; use nested_enum_utils::common_fields; use snafu::{Backtrace, ResultExt, Snafu}; use tracing::debug; @@ -163,8 +163,8 @@ impl Mapping { } /// Searches for UPnP gateways. -pub async fn probe_available() -> Option { - inc!(Metrics, upnp_probes); +pub async fn probe_available(metrics: &Arc) -> Option { + metrics.upnp_probes.inc(); // Wrap in manual timeout, because igd_next doesn't respect the set timeout let res = tokio::time::timeout( @@ -179,12 +179,12 @@ pub async fn probe_available() -> Option { match res { Ok(Ok(gateway)) => Some(gateway), Err(e) => { - inc!(Metrics, upnp_probes_failed); + metrics.upnp_probes_failed.inc(); debug!("upnp probe timed out: {e}"); None } Ok(Err(e)) => { - inc!(Metrics, upnp_probes_failed); + metrics.upnp_probes_failed.inc(); debug!("upnp probe failed: {e}"); None }