diff --git a/Cargo.lock b/Cargo.lock index 2a90569..afb1b5a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -144,7 +144,7 @@ checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn", ] [[package]] @@ -389,7 +389,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.107", + "syn", ] [[package]] @@ -589,7 +589,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn", ] [[package]] @@ -644,7 +644,7 @@ checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn", "unicode-xid", ] @@ -656,7 +656,7 @@ checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn", "unicode-xid", ] @@ -685,7 +685,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn", ] [[package]] @@ -762,7 +762,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.107", + "syn", ] [[package]] @@ -892,7 +892,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn", ] [[package]] @@ -1540,7 +1540,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.107", + "syn", ] [[package]] @@ -1657,6 +1657,7 @@ dependencies = [ "iroh-quinn", "irpc-derive", "n0-future", + "opentelemetry", "postcard", "rcgen", "rustls", @@ -1668,6 +1669,7 @@ dependencies = [ "tokio", "tokio-util", "tracing", + "tracing-opentelemetry", "tracing-subscriber", "trybuild", ] @@ -1678,7 +1680,7 @@ version = "0.8.0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn", ] [[package]] @@ -1705,9 +1707,9 @@ dependencies = [ [[package]] name = "is_terminal_polyfill" -version = "1.70.1" +version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" [[package]] name = "itoa" @@ -1920,14 +1922,14 @@ dependencies = [ [[package]] name = "nested_enum_utils" -version = "0.2.2" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43fa9161ed44d30e9702fe42bd78693bceac0fed02f647da749f36109023d3a3" +checksum = "b1d5475271bdd36a4a2769eac1ef88df0f99428ea43e52dfd8b0ee5cb674695f" dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 1.0.109", + "syn", ] [[package]] @@ -2088,7 +2090,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.107", + "syn", ] [[package]] @@ -2112,9 +2114,9 @@ dependencies = [ [[package]] name = "once_cell_polyfill" -version = "1.70.1" +version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" +checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" [[package]] name = "openssl-probe" @@ -2122,6 +2124,35 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" +[[package]] +name = "opentelemetry" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b84bcd6ae87133e903af7ef497404dda70c60d0ea14895fc8a5e6722754fc2a0" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 2.0.17", + "tracing", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e14ae4f5991976fd48df6d843de219ca6d31b01daaab2dad5af2badeded372bd" +dependencies = [ + "futures-channel", + "futures-executor", + "futures-util", + "opentelemetry", + "percent-encoding", + "rand 0.9.2", + "thiserror 2.0.17", +] + [[package]] name = "parking" version = "2.2.1" @@ -2209,7 +2240,7 @@ checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn", ] [[package]] @@ -2334,7 +2365,7 @@ checksum = "e0232bd009a197ceec9cc881ba46f727fcd8060a2d8d6a9dde7a69030a6fe2bb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn", ] [[package]] @@ -2631,9 +2662,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.33" +version = "0.23.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "751e04a496ca00bb97a5e043158d23d66b5aabf2e1d5aa2a0aaebb1aafe6f82c" +checksum = "6a9586e9ee2b4f8fab52a0048ca7334d7024eef48e2cb9407e3497bb7cab7fa7" dependencies = [ "log", "once_cell", @@ -2781,9 +2812,9 @@ dependencies = [ [[package]] name = "self_cell" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f7d95a54511e0c7be3f51e8867aa8cf35148d7b9445d44de2f943e2b206e749" +checksum = "16c2f82143577edb4921b71ede051dac62ca3c16084e918bf7b40c96ae10eb33" [[package]] name = "semver" @@ -2834,7 +2865,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn", ] [[package]] @@ -2985,7 +3016,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.107", + "syn", ] [[package]] @@ -3063,7 +3094,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.107", + "syn", ] [[package]] @@ -3072,17 +3103,6 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" -[[package]] -name = "syn" -version = "1.0.109" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" -dependencies = [ - "proc-macro2", - "quote", - "unicode-ident", -] - [[package]] name = "syn" version = "2.0.107" @@ -3111,7 +3131,7 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn", ] [[package]] @@ -3188,7 +3208,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn", ] [[package]] @@ -3199,7 +3219,7 @@ checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn", ] [[package]] @@ -3287,7 +3307,7 @@ checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn", ] [[package]] @@ -3464,7 +3484,7 @@ checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn", ] [[package]] @@ -3498,6 +3518,25 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e6e5658463dd88089aba75c7791e1d3120633b1bfde22478b28f625a9bb1b8e" +dependencies = [ + "js-sys", + "opentelemetry", + "opentelemetry_sdk", + "rustversion", + "smallvec", + "thiserror 2.0.17", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + [[package]] name = "tracing-subscriber" version = "0.3.20" @@ -3669,7 +3708,7 @@ dependencies = [ "log", "proc-macro2", "quote", - "syn 2.0.107", + "syn", "wasm-bindgen-shared", ] @@ -3704,7 +3743,7 @@ checksum = "9f07d2f20d4da7b26400c9f4a0511e6e0345b040694e8a75bd41d578fa4421d7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3914,7 +3953,7 @@ checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn", ] [[package]] @@ -3925,7 +3964,7 @@ checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn", ] [[package]] @@ -4420,7 +4459,7 @@ checksum = "38da3c9736e16c5d3c8c597a9aaa5d1fa565d0532ae05e27c24aa62fb32c0ab6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn", "synstructure", ] @@ -4447,7 +4486,7 @@ checksum = "88d2b8d9c68ad2b9e4340d7832716a4d21a22a1154777ad56ea55c51a9cf3831" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn", ] [[package]] @@ -4467,7 +4506,7 @@ checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn", "synstructure", ] @@ -4488,7 +4527,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn", ] [[package]] @@ -4521,5 +4560,5 @@ checksum = "5b96237efa0c878c64bd89c436f661be4e46b2f3eff1ebb976f7ef2321d2f58f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.107", + "syn", ] diff --git a/Cargo.toml b/Cargo.toml index b26b7d8..c4d363c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,9 @@ n0-future = { workspace = true } futures-util = { workspace = true, optional = true } # for the derive reexport/feature irpc-derive = { version = "0.8.0", path = "./irpc-derive", optional = true } +# for remote span propagation when both spans and rpc are enabled +opentelemetry = { version = "0.31", optional = true } +tracing-opentelemetry = { version = "0.32", optional = true } [target.'cfg(not(all(target_family = "wasm", target_os = "unknown")))'.dependencies] quinn = { workspace = true, optional = true, features = ["runtime-tokio"] } @@ -64,11 +67,11 @@ testresult = "0.4.1" [features] # enable the remote transport -rpc = ["dep:quinn", "dep:postcard", "dep:anyhow", "dep:smallvec", "dep:tracing", "tokio/io-util"] +rpc = ["dep:quinn", "dep:postcard", "dep:anyhow", "dep:smallvec", "dep:tracing", "tokio/io-util", "dep:opentelemetry", "dep:tracing-opentelemetry", "irpc-derive?/rpc"] # add test utilities quinn_endpoint_setup = ["rpc", "dep:rustls", "dep:rcgen", "dep:anyhow", "dep:futures-buffered", "quinn/rustls-ring"] # pick up parent span when creating channel messages -spans = ["dep:tracing"] +spans = ["dep:tracing", "irpc-derive?/spans"] stream = ["dep:futures-util"] derive = ["dep:irpc-derive"] varint-util = ["dep:postcard", "dep:smallvec", "tokio/io-util"] diff --git a/irpc-derive/Cargo.toml b/irpc-derive/Cargo.toml index 6aebd71..b35bad8 100644 --- a/irpc-derive/Cargo.toml +++ b/irpc-derive/Cargo.toml @@ -16,3 +16,9 @@ proc-macro = true syn = { version = "2", features = ["full"] } quote = "1" proc-macro2 = "1" + +[features] +# These features should match irpc's features for proper code generation +spans = [] +rpc = [] +default = ["spans", "rpc"] diff --git a/irpc-derive/src/lib.rs b/irpc-derive/src/lib.rs index 4f80b26..806ef76 100644 --- a/irpc-derive/src/lib.rs +++ b/irpc-derive/src/lib.rs @@ -285,6 +285,28 @@ fn generate_remote_service_impl( proto_enum_name: &Ident, variants_with_attr: &[(Ident, Type)], ) -> TokenStream2 { + // Generate match arms that set the span parent for each variant + #[cfg(all(feature = "spans", feature = "rpc"))] + let variants = variants_with_attr + .iter() + .map(|(variant_name, _inner_type)| { + let span_name = variant_name.to_string(); + quote! { + #proto_enum_name::#variant_name(msg) => { + // Create a span for this specific RPC operation + let span = ::tracing::info_span!(#span_name); + // Set its parent to the propagated remote context if available + if let Some(ctx) = ::irpc::span_propagation::take_remote_span_context() { + use ::irpc::span_propagation::OpenTelemetrySpanExt; + let _ = span.set_parent(ctx); + } + let _guard = span.enter(); + #message_enum_name::from(::irpc::WithChannels::from((msg, tx, rx))) + } + } + }); + + #[cfg(not(all(feature = "spans", feature = "rpc")))] let variants = variants_with_attr .iter() .map(|(variant_name, _inner_type)| { diff --git a/irpc-iroh/src/lib.rs b/irpc-iroh/src/lib.rs index e8fd3ab..7d9547b 100644 --- a/irpc-iroh/src/lib.rs +++ b/irpc-iroh/src/lib.rs @@ -263,8 +263,18 @@ pub async fn read_request_raw( recv.read_exact(&mut buf) .await .map_err(|e| io::Error::new(io::ErrorKind::UnexpectedEof, e))?; - let msg: R = + + // Deserialize the payload which includes optional span context + // irpc-iroh uses irpc with default features, which include spans and rpc, + // so span_propagation module always exists + let (span_ctx, msg): (Option, R) = postcard::from_bytes(&buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + + // Store span context in thread-local for use by with_remote_channels + if let Some(ctx) = span_ctx { + ctx.store_in_thread_local(); + } + let rx = recv; let tx = send; Ok(Some((msg, rx, tx))) diff --git a/src/lib.rs b/src/lib.rs index 06ba70f..620dfb7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -315,6 +315,81 @@ mod sealed { pub trait Sealed {} } +/// Span context propagation for remote RPC calls +/// +/// When both `spans` and `rpc` features are enabled, this module provides +/// automatic propagation of OpenTelemetry trace context across remote boundaries. +#[cfg(all(feature = "spans", feature = "rpc"))] +#[cfg_attr(quicrpc_docsrs, doc(cfg(all(feature = "spans", feature = "rpc"))))] +pub mod span_propagation { + use serde::{Deserialize, Serialize}; + use std::cell::RefCell; + use std::collections::HashMap; + + // Re-export for use in macro-generated code + pub use tracing_opentelemetry::OpenTelemetrySpanExt; + + thread_local! { + static SPAN_CONTEXT: RefCell> = RefCell::new(None); + } + + /// Carrier for propagating span context across RPC boundaries using W3C Trace Context format + #[derive(Debug, Clone, Serialize, Deserialize, Default)] + pub struct SpanContextCarrier { + headers: HashMap, + } + + impl opentelemetry::propagation::Injector for SpanContextCarrier { + fn set(&mut self, key: &str, value: String) { + self.headers.insert(key.to_string(), value); + } + } + + impl opentelemetry::propagation::Extractor for SpanContextCarrier { + fn get(&self, key: &str) -> Option<&str> { + self.headers.get(key).map(|v| v.as_str()) + } + + fn keys(&self) -> Vec<&str> { + self.headers.keys().map(|k| k.as_str()).collect() + } + } + + impl SpanContextCarrier { + /// Create a carrier from the current OpenTelemetry context + pub fn from_current() -> Self { + use opentelemetry::global; + let mut carrier = Self::default(); + global::get_text_map_propagator(|prop| { + prop.inject_context(&opentelemetry::Context::current(), &mut carrier); + }); + carrier + } + + /// Extract an OpenTelemetry context from this carrier + pub fn to_context(&self) -> opentelemetry::Context { + use opentelemetry::global; + global::get_text_map_propagator(|prop| { + prop.extract_with_context(&opentelemetry::Context::current(), self) + }) + } + + /// Store this carrier's context in thread-local storage for use by with_remote_channels + pub fn store_in_thread_local(&self) { + let context = self.to_context(); + SPAN_CONTEXT.with(|cell| { + *cell.borrow_mut() = Some(context); + }); + } + } + + /// Take the span context from thread-local storage (if any) and create a span with it as parent. + /// This is called by the generated `with_remote_channels` implementation. + pub fn take_remote_span_context() -> Option { + SPAN_CONTEXT.with(|cell| cell.borrow_mut().take()) + } +} + /// Requirements for a RPC message /// /// Even when just using the mem transport, we require messages to be Serializable and Deserializable. @@ -1972,11 +2047,22 @@ pub mod rpc { msg: impl Into, ) -> std::result::Result, WriteError> { let msg = msg.into(); - if postcard::experimental::serialized_size(&msg)? as u64 > MAX_MESSAGE_SIZE { + + // When both spans and rpc features are enabled, include span context + #[cfg(all(feature = "spans", feature = "rpc"))] + let payload = { + let span_ctx = crate::span_propagation::SpanContextCarrier::from_current(); + (Some(span_ctx), msg) + }; + + #[cfg(not(all(feature = "spans", feature = "rpc")))] + let payload = (None::<()>, msg); + + if postcard::experimental::serialized_size(&payload)? as u64 > MAX_MESSAGE_SIZE { return Err(WriteError::MaxMessageSizeExceeded); } let mut buf = SmallVec::<[u8; 128]>::new(); - buf.write_length_prefixed(&msg)?; + buf.write_length_prefixed(&payload)?; Ok(buf) } @@ -2419,8 +2505,23 @@ pub mod rpc { recv.read_exact(&mut buf) .await .map_err(|e| io::Error::new(io::ErrorKind::UnexpectedEof, e))?; - let msg: R = postcard::from_bytes(&buf) + + // Deserialize the payload which includes optional span context + #[cfg(all(feature = "spans", feature = "rpc"))] + let (span_ctx, msg): (Option, R) = + postcard::from_bytes(&buf) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + + #[cfg(not(all(feature = "spans", feature = "rpc")))] + let (_span_ctx, msg): (Option<()>, R) = postcard::from_bytes(&buf) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + + // Store span context in thread-local for use by with_remote_channels + #[cfg(all(feature = "spans", feature = "rpc"))] + if let Some(ctx) = span_ctx { + ctx.store_in_thread_local(); + } + let rx = recv; let tx = send; Ok(Some((msg, rx, tx)))