From 2b21e0f8f427433257259a0cbc774d6fbe7e33dc Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 27 Aug 2025 21:37:03 +0000 Subject: [PATCH 1/5] Initial plan From 4cf3788857ab96e3d04750a7108633ea07448ccd Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 27 Aug 2025 21:59:40 +0000 Subject: [PATCH 2/5] Implement export_unsampled feature for BatchSpanProcessor Co-authored-by: cijothomas <5232798+cijothomas@users.noreply.github.com> --- opentelemetry-sdk/src/trace/span_processor.rs | 200 +++++++++++++++++- .../span_processor_with_async_runtime.rs | 14 +- 2 files changed, 201 insertions(+), 13 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index f276e1bb96..a3f3fd832d 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -292,6 +292,7 @@ pub struct BatchSpanProcessor { max_export_batch_size: usize, dropped_spans_count: AtomicUsize, max_queue_size: usize, + config: BatchConfig, } impl BatchSpanProcessor { @@ -312,6 +313,7 @@ impl BatchSpanProcessor { let max_export_batch_size = config.max_export_batch_size; let current_batch_size = Arc::new(AtomicUsize::new(0)); let current_batch_size_for_thread = current_batch_size.clone(); + let config_for_thread = config.clone(); let handle = thread::Builder::new() .name("OpenTelemetry.Traces.BatchProcessor".to_string()) @@ -319,9 +321,9 @@ impl BatchSpanProcessor { let _suppress_guard = Context::enter_telemetry_suppressed_scope(); otel_debug!( name: "BatchSpanProcessor.ThreadStarted", - interval_in_millisecs = config.scheduled_delay.as_millis(), - max_export_batch_size = config.max_export_batch_size, - max_queue_size = config.max_queue_size, + interval_in_millisecs = config_for_thread.scheduled_delay.as_millis(), + max_export_batch_size = config_for_thread.max_export_batch_size, + max_queue_size = config_for_thread.max_queue_size, ); let mut spans = Vec::with_capacity(config.max_export_batch_size); let mut last_export_time = Instant::now(); @@ -348,7 +350,7 @@ impl BatchSpanProcessor { &mut spans, &mut last_export_time, ¤t_batch_size, - &config, + &config_for_thread, ); } BatchMessage::ForceFlush(sender) => { @@ -359,7 +361,7 @@ impl BatchSpanProcessor { &mut spans, &mut last_export_time, ¤t_batch_size, - &config, + &config_for_thread, ); let _ = sender.send(result); } @@ -371,7 +373,7 @@ impl BatchSpanProcessor { &mut spans, &mut last_export_time, ¤t_batch_size, - &config, + &config_for_thread, ); let _ = exporter.shutdown(); let _ = sender.send(result); @@ -400,7 +402,7 @@ impl BatchSpanProcessor { &mut spans, &mut last_export_time, ¤t_batch_size, - &config, + &config_for_thread, ); } Err(RecvTimeoutError::Disconnected) => { @@ -430,6 +432,7 @@ impl BatchSpanProcessor { export_span_message_sent: Arc::new(AtomicBool::new(false)), current_batch_size, max_export_batch_size, + config, } } @@ -527,6 +530,11 @@ impl SpanProcessor for BatchSpanProcessor { /// Handles span end. fn on_end(&self, span: SpanData) { + // Check if span should be exported based on sampling and config + if !span.span_context.is_sampled() && !self.config.export_unsampled { + return; + } + let result = self.span_sender.try_send(span); // match for result and handle each separately @@ -729,7 +737,7 @@ where /// Batch span processor configuration. /// Use [`BatchConfigBuilder`] to configure your own instance of [`BatchConfig`]. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct BatchConfig { /// The maximum queue size to buffer spans for delayed processing. If the /// queue gets full it drops the spans. The default value of is 2048. @@ -757,6 +765,14 @@ pub struct BatchConfig { /// by an exporter. A value of 1 will cause exports to be performed /// synchronously on the BatchSpanProcessor task. pub(crate) max_concurrent_exports: usize, + + /// Whether to export unsampled spans that are recording. + /// + /// When false (default), only sampled spans are exported. + /// When true, spans with `is_recording == true` but `TraceFlags::SAMPLED == false` + /// are also exported. This is useful for accurate span-to-metrics pipelines + /// and feeding tail-samplers. + pub(crate) export_unsampled: bool, } impl Default for BatchConfig { @@ -773,6 +789,7 @@ pub struct BatchConfigBuilder { max_export_batch_size: usize, max_export_timeout: Duration, max_concurrent_exports: usize, + export_unsampled: bool, } impl Default for BatchConfigBuilder { @@ -793,6 +810,7 @@ impl Default for BatchConfigBuilder { max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT, max_export_timeout: OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, max_concurrent_exports: OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT, + export_unsampled: false, } .init_from_env_vars() } @@ -868,6 +886,17 @@ impl BatchConfigBuilder { self } + /// Set export_unsampled for [`BatchConfigBuilder`]. + /// + /// When false (default), only sampled spans are exported. + /// When true, spans with `is_recording == true` but `TraceFlags::SAMPLED == false` + /// are also exported. This is useful for accurate span-to-metrics pipelines + /// and feeding tail-samplers. + pub fn with_export_unsampled(mut self, export_unsampled: bool) -> Self { + self.export_unsampled = export_unsampled; + self + } + /// Builds a `BatchConfig` enforcing the following invariants: /// * `max_export_batch_size` must be less than or equal to `max_queue_size`. pub fn build(self) -> BatchConfig { @@ -881,6 +910,7 @@ impl BatchConfigBuilder { max_export_timeout: self.max_export_timeout, max_concurrent_exports: self.max_concurrent_exports, max_export_batch_size, + export_unsampled: self.export_unsampled, } } @@ -947,7 +977,7 @@ mod tests { use crate::trace::InMemorySpanExporterBuilder; use crate::trace::{BatchConfig, BatchConfigBuilder, SpanEvents, SpanLinks}; use crate::trace::{SpanData, SpanExporter}; - use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status}; + use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState}; use std::fmt::Debug; use std::time::Duration; @@ -1127,7 +1157,13 @@ mod tests { // Helper function to create a default test span fn create_test_span(name: &str) -> SpanData { SpanData { - span_context: SpanContext::empty_context(), + span_context: SpanContext::new( + TraceId::from(1), + SpanId::from(1), + TraceFlags::SAMPLED, + false, + TraceState::default(), + ), parent_span_id: SpanId::INVALID, span_kind: SpanKind::Internal, name: name.to_string().into(), @@ -1298,6 +1334,150 @@ mod tests { assert_eq!(current_batch_size, 0, "Unexpected current batch size"); } + #[test] + fn batchspanprocessor_export_unsampled_disabled_by_default() { + let exporter = MockSpanExporter::new(); + let exporter_shared = exporter.exported_spans.clone(); + let processor = BatchSpanProcessor::new(exporter, BatchConfig::default()); + + // Create an unsampled span + let unsampled_span = SpanData { + span_context: SpanContext::new( + TraceId::from(1), + SpanId::from(1), + TraceFlags::NOT_SAMPLED, + false, + TraceState::default(), + ), + parent_span_id: SpanId::INVALID, + span_kind: SpanKind::Internal, + name: "unsampled_span".into(), + start_time: opentelemetry::time::now(), + end_time: opentelemetry::time::now(), + attributes: Vec::new(), + dropped_attributes_count: 0, + events: SpanEvents::default(), + links: SpanLinks::default(), + status: Status::Unset, + instrumentation_scope: Default::default(), + }; + + // Send the unsampled span + processor.on_end(unsampled_span); + + // Force flush and verify no spans were exported + let flush_result = processor.force_flush(); + assert!(flush_result.is_ok()); + + let exported_spans = exporter_shared.lock().unwrap(); + assert_eq!( + exported_spans.len(), + 0, + "Unsampled spans should not be exported by default" + ); + } + + #[test] + fn batchspanprocessor_export_unsampled_enabled() { + let exporter = MockSpanExporter::new(); + let exporter_shared = exporter.exported_spans.clone(); + let config = BatchConfigBuilder::default() + .with_export_unsampled(true) + .build(); + let processor = BatchSpanProcessor::new(exporter, config); + + // Create an unsampled span + let unsampled_span = SpanData { + span_context: SpanContext::new( + TraceId::from(1), + SpanId::from(1), + TraceFlags::NOT_SAMPLED, + false, + TraceState::default(), + ), + parent_span_id: SpanId::INVALID, + span_kind: SpanKind::Internal, + name: "unsampled_span".into(), + start_time: opentelemetry::time::now(), + end_time: opentelemetry::time::now(), + attributes: Vec::new(), + dropped_attributes_count: 0, + events: SpanEvents::default(), + links: SpanLinks::default(), + status: Status::Unset, + instrumentation_scope: Default::default(), + }; + + // Send the unsampled span + processor.on_end(unsampled_span.clone()); + + // Force flush and verify the span was exported + let flush_result = processor.force_flush(); + assert!(flush_result.is_ok()); + + let exported_spans = exporter_shared.lock().unwrap(); + assert_eq!( + exported_spans.len(), + 1, + "Unsampled spans should be exported when export_unsampled is enabled" + ); + assert_eq!(exported_spans[0].name, "unsampled_span"); + } + + #[test] + fn batchspanprocessor_export_both_sampled_and_unsampled() { + let exporter = MockSpanExporter::new(); + let exporter_shared = exporter.exported_spans.clone(); + let config = BatchConfigBuilder::default() + .with_export_unsampled(true) + .build(); + let processor = BatchSpanProcessor::new(exporter, config); + + // Create a sampled span + let sampled_span = create_test_span("sampled_span"); + + // Create an unsampled span + let unsampled_span = SpanData { + span_context: SpanContext::new( + TraceId::from(2), + SpanId::from(2), + TraceFlags::NOT_SAMPLED, + false, + TraceState::default(), + ), + parent_span_id: SpanId::INVALID, + span_kind: SpanKind::Internal, + name: "unsampled_span".into(), + start_time: opentelemetry::time::now(), + end_time: opentelemetry::time::now(), + attributes: Vec::new(), + dropped_attributes_count: 0, + events: SpanEvents::default(), + links: SpanLinks::default(), + status: Status::Unset, + instrumentation_scope: Default::default(), + }; + + // Send both spans + processor.on_end(sampled_span); + processor.on_end(unsampled_span); + + // Force flush and verify both spans were exported + let flush_result = processor.force_flush(); + assert!(flush_result.is_ok()); + + let exported_spans = exporter_shared.lock().unwrap(); + assert_eq!( + exported_spans.len(), + 2, + "Both sampled and unsampled spans should be exported when export_unsampled is enabled" + ); + + let span_names: Vec<_> = exported_spans.iter().map(|s| s.name.as_ref()).collect(); + assert!(span_names.contains(&"sampled_span")); + assert!(span_names.contains(&"unsampled_span")); + } + #[test] fn validate_span_attributes_exported_correctly() { let exporter = MockSpanExporter::new(); diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index b294f74043..89792ff28a 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -90,6 +90,9 @@ pub struct BatchSpanProcessor { // Track the maximum queue size that was configured for this processor max_queue_size: usize, + + // Configuration for the processor + config: BatchConfig, } impl fmt::Debug for BatchSpanProcessor { @@ -106,7 +109,8 @@ impl SpanProcessor for BatchSpanProcessor { } fn on_end(&self, span: SpanData) { - if !span.span_context.is_sampled() { + // Check if span should be exported based on sampling and config + if !span.span_context.is_sampled() && !self.config.export_unsampled { return; } @@ -374,13 +378,14 @@ impl BatchSpanProcessor { runtime.batch_message_channel(config.max_queue_size); let max_queue_size = config.max_queue_size; + let config_for_worker = config.clone(); let inner_runtime = runtime.clone(); // Spawn worker process via user-defined spawn function. runtime.spawn(async move { // Timer will take a reference to the current runtime, so its important we do this within the // runtime.spawn() - let ticker = to_interval_stream(inner_runtime.clone(), config.scheduled_delay) + let ticker = to_interval_stream(inner_runtime.clone(), config_for_worker.scheduled_delay) .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. .map(|_| BatchMessage::Flush(None)); let timeout_runtime = inner_runtime.clone(); @@ -390,7 +395,7 @@ impl BatchSpanProcessor { spans: Vec::new(), export_tasks: FuturesUnordered::new(), runtime: timeout_runtime, - config, + config: config_for_worker, exporter: Arc::new(RwLock::new(exporter)), }; @@ -402,6 +407,7 @@ impl BatchSpanProcessor { message_sender, dropped_spans_count: AtomicUsize::new(0), max_queue_size, + config, } } @@ -647,6 +653,7 @@ mod tests { scheduled_delay: Duration::from_secs(3600), // effectively disabled max_export_timeout: Duration::from_secs(5), max_concurrent_exports: 2, // what we want to verify + export_unsampled: false, }; // Spawn the processor. @@ -685,6 +692,7 @@ mod tests { scheduled_delay: Duration::from_secs(3600), max_export_timeout: Duration::from_secs(5), max_concurrent_exports: 1, // what we want to verify + export_unsampled: false, }; let processor = BatchSpanProcessor::new(exporter, config, runtime::Tokio); From c3bfd8d8bb3257528cc44757fb47331772869575 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 27 Aug 2025 22:01:56 +0000 Subject: [PATCH 3/5] Add integration test and final verification for export_unsampled feature Co-authored-by: cijothomas <5232798+cijothomas@users.noreply.github.com> --- opentelemetry-sdk/src/trace/span_processor.rs | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index a3f3fd832d..f2602cd887 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -1478,6 +1478,89 @@ mod tests { assert!(span_names.contains(&"unsampled_span")); } + #[test] + fn integration_test_export_unsampled_feature_demo() { + println!("\n๐Ÿงช Integration Test: export_unsampled feature demonstration\n"); + + // Test 1: Default behavior (export_unsampled = false) + println!("=== Test 1: Default behavior (unsampled spans should NOT be exported) ==="); + let exporter1 = MockSpanExporter::new(); + let exporter_shared1 = exporter1.exported_spans.clone(); + let processor1 = BatchSpanProcessor::new(exporter1, BatchConfig::default()); + + let unsampled_span = SpanData { + span_context: SpanContext::new( + TraceId::from(1), + SpanId::from(1), + TraceFlags::NOT_SAMPLED, + false, + TraceState::default(), + ), + parent_span_id: SpanId::INVALID, + span_kind: SpanKind::Internal, + name: "unsampled_demo_1".into(), + start_time: opentelemetry::time::now(), + end_time: opentelemetry::time::now(), + attributes: Vec::new(), + dropped_attributes_count: 0, + events: SpanEvents::default(), + links: SpanLinks::default(), + status: Status::Unset, + instrumentation_scope: Default::default(), + }; + + processor1.on_end(unsampled_span); + processor1.force_flush().unwrap(); + + let exported_spans1 = exporter_shared1.lock().unwrap(); + println!("โœ… Exported spans count: {} (expected: 0)", exported_spans1.len()); + assert_eq!(exported_spans1.len(), 0); + drop(exported_spans1); + + // Test 2: export_unsampled enabled + println!("\n=== Test 2: export_unsampled enabled (unsampled spans SHOULD be exported) ==="); + let exporter2 = MockSpanExporter::new(); + let exporter_shared2 = exporter2.exported_spans.clone(); + let config = BatchConfigBuilder::default() + .with_export_unsampled(true) + .build(); + let processor2 = BatchSpanProcessor::new(exporter2, config); + + let unsampled_span = SpanData { + span_context: SpanContext::new( + TraceId::from(2), + SpanId::from(2), + TraceFlags::NOT_SAMPLED, + false, + TraceState::default(), + ), + parent_span_id: SpanId::INVALID, + span_kind: SpanKind::Internal, + name: "unsampled_demo_2".into(), + start_time: opentelemetry::time::now(), + end_time: opentelemetry::time::now(), + attributes: Vec::new(), + dropped_attributes_count: 0, + events: SpanEvents::default(), + links: SpanLinks::default(), + status: Status::Unset, + instrumentation_scope: Default::default(), + }; + + processor2.on_end(unsampled_span); + processor2.force_flush().unwrap(); + + let exported_spans2 = exporter_shared2.lock().unwrap(); + println!("โœ… Exported spans count: {} (expected: 1)", exported_spans2.len()); + println!("โœ… Exported span name: {}", exported_spans2[0].name); + println!("โœ… Span is sampled: {} (expected: false)", exported_spans2[0].span_context.is_sampled()); + assert_eq!(exported_spans2.len(), 1); + assert_eq!(exported_spans2[0].name, "unsampled_demo_2"); + assert!(!exported_spans2[0].span_context.is_sampled()); + + println!("\n๐ŸŽ‰ Integration test passed! export_unsampled feature working correctly!"); + } + #[test] fn validate_span_attributes_exported_correctly() { let exporter = MockSpanExporter::new(); From 965c4ffa30024e3daaa254e3085f4582fcdd13db Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 27 Aug 2025 22:51:49 +0000 Subject: [PATCH 4/5] Minimize changes: store export_unsampled as direct field instead of entire config - Remove config field from BatchSpanProcessor structs (both sync and async) - Add export_unsampled bool field directly to structs - Extract export_unsampled from config during construction - Update sampling logic to use direct field access - Fix tests to use sampled spans to maintain expected behavior - Add Clone trait to BatchConfig for compatibility This addresses the reviewer feedback to minimize unrelated changes while preserving the export_unsampled functionality. Co-authored-by: cijothomas <5232798+cijothomas@users.noreply.github.com> --- opentelemetry-sdk/src/trace/span_processor.rs | 416 +++++++----------- .../span_processor_with_async_runtime.rs | 8 +- 2 files changed, 165 insertions(+), 259 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index f2602cd887..984ae5e234 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -292,7 +292,7 @@ pub struct BatchSpanProcessor { max_export_batch_size: usize, dropped_spans_count: AtomicUsize, max_queue_size: usize, - config: BatchConfig, + export_unsampled: bool, } impl BatchSpanProcessor { @@ -311,9 +311,9 @@ impl BatchSpanProcessor { let (message_sender, message_receiver) = sync_channel::(64); // Is this a reasonable bound? let max_queue_size = config.max_queue_size; let max_export_batch_size = config.max_export_batch_size; + let export_unsampled = config.export_unsampled; let current_batch_size = Arc::new(AtomicUsize::new(0)); let current_batch_size_for_thread = current_batch_size.clone(); - let config_for_thread = config.clone(); let handle = thread::Builder::new() .name("OpenTelemetry.Traces.BatchProcessor".to_string()) @@ -321,9 +321,9 @@ impl BatchSpanProcessor { let _suppress_guard = Context::enter_telemetry_suppressed_scope(); otel_debug!( name: "BatchSpanProcessor.ThreadStarted", - interval_in_millisecs = config_for_thread.scheduled_delay.as_millis(), - max_export_batch_size = config_for_thread.max_export_batch_size, - max_queue_size = config_for_thread.max_queue_size, + interval_in_millisecs = config.scheduled_delay.as_millis(), + max_export_batch_size = config.max_export_batch_size, + max_queue_size = config.max_queue_size, ); let mut spans = Vec::with_capacity(config.max_export_batch_size); let mut last_export_time = Instant::now(); @@ -350,7 +350,7 @@ impl BatchSpanProcessor { &mut spans, &mut last_export_time, ¤t_batch_size, - &config_for_thread, + &config, ); } BatchMessage::ForceFlush(sender) => { @@ -361,7 +361,7 @@ impl BatchSpanProcessor { &mut spans, &mut last_export_time, ¤t_batch_size, - &config_for_thread, + &config, ); let _ = sender.send(result); } @@ -373,7 +373,7 @@ impl BatchSpanProcessor { &mut spans, &mut last_export_time, ¤t_batch_size, - &config_for_thread, + &config, ); let _ = exporter.shutdown(); let _ = sender.send(result); @@ -402,7 +402,7 @@ impl BatchSpanProcessor { &mut spans, &mut last_export_time, ¤t_batch_size, - &config_for_thread, + &config, ); } Err(RecvTimeoutError::Disconnected) => { @@ -432,7 +432,7 @@ impl BatchSpanProcessor { export_span_message_sent: Arc::new(AtomicBool::new(false)), current_batch_size, max_export_batch_size, - config, + export_unsampled, } } @@ -530,8 +530,8 @@ impl SpanProcessor for BatchSpanProcessor { /// Handles span end. fn on_end(&self, span: SpanData) { - // Check if span should be exported based on sampling and config - if !span.span_context.is_sampled() && !self.config.export_unsampled { + // Export spans if they are sampled OR if export_unsampled is enabled + if !span.span_context.is_sampled() && !self.export_unsampled { return; } @@ -767,11 +767,8 @@ pub struct BatchConfig { pub(crate) max_concurrent_exports: usize, /// Whether to export unsampled spans that are recording. - /// - /// When false (default), only sampled spans are exported. - /// When true, spans with `is_recording == true` but `TraceFlags::SAMPLED == false` - /// are also exported. This is useful for accurate span-to-metrics pipelines - /// and feeding tail-samplers. + /// If true, spans with is_recording() == true but TraceFlags::SAMPLED == false + /// will be exported. Defaults to false for backward compatibility. pub(crate) export_unsampled: bool, } @@ -861,6 +858,15 @@ impl BatchConfigBuilder { self } + /// Set export_unsampled for [`BatchConfigBuilder`]. + /// When enabled, allows exporting spans with `is_recording() == true` but `TraceFlags::SAMPLED == false`. + /// This is useful for span-to-metrics pipelines and feeding tail-samplers with complete input. + /// The default value is false for backward compatibility. + pub fn with_export_unsampled(mut self, export_unsampled: bool) -> Self { + self.export_unsampled = export_unsampled; + self + } + /// Set scheduled_delay_duration for [`BatchConfigBuilder`]. /// It's the delay interval in milliseconds between two consecutive processing of batches. /// The default value is 5000 milliseconds. @@ -886,17 +892,6 @@ impl BatchConfigBuilder { self } - /// Set export_unsampled for [`BatchConfigBuilder`]. - /// - /// When false (default), only sampled spans are exported. - /// When true, spans with `is_recording == true` but `TraceFlags::SAMPLED == false` - /// are also exported. This is useful for accurate span-to-metrics pipelines - /// and feeding tail-samplers. - pub fn with_export_unsampled(mut self, export_unsampled: bool) -> Self { - self.export_unsampled = export_unsampled; - self - } - /// Builds a `BatchConfig` enforcing the following invariants: /// * `max_export_batch_size` must be less than or equal to `max_queue_size`. pub fn build(self) -> BatchConfig { @@ -1334,233 +1329,6 @@ mod tests { assert_eq!(current_batch_size, 0, "Unexpected current batch size"); } - #[test] - fn batchspanprocessor_export_unsampled_disabled_by_default() { - let exporter = MockSpanExporter::new(); - let exporter_shared = exporter.exported_spans.clone(); - let processor = BatchSpanProcessor::new(exporter, BatchConfig::default()); - - // Create an unsampled span - let unsampled_span = SpanData { - span_context: SpanContext::new( - TraceId::from(1), - SpanId::from(1), - TraceFlags::NOT_SAMPLED, - false, - TraceState::default(), - ), - parent_span_id: SpanId::INVALID, - span_kind: SpanKind::Internal, - name: "unsampled_span".into(), - start_time: opentelemetry::time::now(), - end_time: opentelemetry::time::now(), - attributes: Vec::new(), - dropped_attributes_count: 0, - events: SpanEvents::default(), - links: SpanLinks::default(), - status: Status::Unset, - instrumentation_scope: Default::default(), - }; - - // Send the unsampled span - processor.on_end(unsampled_span); - - // Force flush and verify no spans were exported - let flush_result = processor.force_flush(); - assert!(flush_result.is_ok()); - - let exported_spans = exporter_shared.lock().unwrap(); - assert_eq!( - exported_spans.len(), - 0, - "Unsampled spans should not be exported by default" - ); - } - - #[test] - fn batchspanprocessor_export_unsampled_enabled() { - let exporter = MockSpanExporter::new(); - let exporter_shared = exporter.exported_spans.clone(); - let config = BatchConfigBuilder::default() - .with_export_unsampled(true) - .build(); - let processor = BatchSpanProcessor::new(exporter, config); - - // Create an unsampled span - let unsampled_span = SpanData { - span_context: SpanContext::new( - TraceId::from(1), - SpanId::from(1), - TraceFlags::NOT_SAMPLED, - false, - TraceState::default(), - ), - parent_span_id: SpanId::INVALID, - span_kind: SpanKind::Internal, - name: "unsampled_span".into(), - start_time: opentelemetry::time::now(), - end_time: opentelemetry::time::now(), - attributes: Vec::new(), - dropped_attributes_count: 0, - events: SpanEvents::default(), - links: SpanLinks::default(), - status: Status::Unset, - instrumentation_scope: Default::default(), - }; - - // Send the unsampled span - processor.on_end(unsampled_span.clone()); - - // Force flush and verify the span was exported - let flush_result = processor.force_flush(); - assert!(flush_result.is_ok()); - - let exported_spans = exporter_shared.lock().unwrap(); - assert_eq!( - exported_spans.len(), - 1, - "Unsampled spans should be exported when export_unsampled is enabled" - ); - assert_eq!(exported_spans[0].name, "unsampled_span"); - } - - #[test] - fn batchspanprocessor_export_both_sampled_and_unsampled() { - let exporter = MockSpanExporter::new(); - let exporter_shared = exporter.exported_spans.clone(); - let config = BatchConfigBuilder::default() - .with_export_unsampled(true) - .build(); - let processor = BatchSpanProcessor::new(exporter, config); - - // Create a sampled span - let sampled_span = create_test_span("sampled_span"); - - // Create an unsampled span - let unsampled_span = SpanData { - span_context: SpanContext::new( - TraceId::from(2), - SpanId::from(2), - TraceFlags::NOT_SAMPLED, - false, - TraceState::default(), - ), - parent_span_id: SpanId::INVALID, - span_kind: SpanKind::Internal, - name: "unsampled_span".into(), - start_time: opentelemetry::time::now(), - end_time: opentelemetry::time::now(), - attributes: Vec::new(), - dropped_attributes_count: 0, - events: SpanEvents::default(), - links: SpanLinks::default(), - status: Status::Unset, - instrumentation_scope: Default::default(), - }; - - // Send both spans - processor.on_end(sampled_span); - processor.on_end(unsampled_span); - - // Force flush and verify both spans were exported - let flush_result = processor.force_flush(); - assert!(flush_result.is_ok()); - - let exported_spans = exporter_shared.lock().unwrap(); - assert_eq!( - exported_spans.len(), - 2, - "Both sampled and unsampled spans should be exported when export_unsampled is enabled" - ); - - let span_names: Vec<_> = exported_spans.iter().map(|s| s.name.as_ref()).collect(); - assert!(span_names.contains(&"sampled_span")); - assert!(span_names.contains(&"unsampled_span")); - } - - #[test] - fn integration_test_export_unsampled_feature_demo() { - println!("\n๐Ÿงช Integration Test: export_unsampled feature demonstration\n"); - - // Test 1: Default behavior (export_unsampled = false) - println!("=== Test 1: Default behavior (unsampled spans should NOT be exported) ==="); - let exporter1 = MockSpanExporter::new(); - let exporter_shared1 = exporter1.exported_spans.clone(); - let processor1 = BatchSpanProcessor::new(exporter1, BatchConfig::default()); - - let unsampled_span = SpanData { - span_context: SpanContext::new( - TraceId::from(1), - SpanId::from(1), - TraceFlags::NOT_SAMPLED, - false, - TraceState::default(), - ), - parent_span_id: SpanId::INVALID, - span_kind: SpanKind::Internal, - name: "unsampled_demo_1".into(), - start_time: opentelemetry::time::now(), - end_time: opentelemetry::time::now(), - attributes: Vec::new(), - dropped_attributes_count: 0, - events: SpanEvents::default(), - links: SpanLinks::default(), - status: Status::Unset, - instrumentation_scope: Default::default(), - }; - - processor1.on_end(unsampled_span); - processor1.force_flush().unwrap(); - - let exported_spans1 = exporter_shared1.lock().unwrap(); - println!("โœ… Exported spans count: {} (expected: 0)", exported_spans1.len()); - assert_eq!(exported_spans1.len(), 0); - drop(exported_spans1); - - // Test 2: export_unsampled enabled - println!("\n=== Test 2: export_unsampled enabled (unsampled spans SHOULD be exported) ==="); - let exporter2 = MockSpanExporter::new(); - let exporter_shared2 = exporter2.exported_spans.clone(); - let config = BatchConfigBuilder::default() - .with_export_unsampled(true) - .build(); - let processor2 = BatchSpanProcessor::new(exporter2, config); - - let unsampled_span = SpanData { - span_context: SpanContext::new( - TraceId::from(2), - SpanId::from(2), - TraceFlags::NOT_SAMPLED, - false, - TraceState::default(), - ), - parent_span_id: SpanId::INVALID, - span_kind: SpanKind::Internal, - name: "unsampled_demo_2".into(), - start_time: opentelemetry::time::now(), - end_time: opentelemetry::time::now(), - attributes: Vec::new(), - dropped_attributes_count: 0, - events: SpanEvents::default(), - links: SpanLinks::default(), - status: Status::Unset, - instrumentation_scope: Default::default(), - }; - - processor2.on_end(unsampled_span); - processor2.force_flush().unwrap(); - - let exported_spans2 = exporter_shared2.lock().unwrap(); - println!("โœ… Exported spans count: {} (expected: 1)", exported_spans2.len()); - println!("โœ… Exported span name: {}", exported_spans2[0].name); - println!("โœ… Span is sampled: {} (expected: false)", exported_spans2[0].span_context.is_sampled()); - assert_eq!(exported_spans2.len(), 1); - assert_eq!(exported_spans2[0].name, "unsampled_demo_2"); - assert!(!exported_spans2[0].span_context.is_sampled()); - - println!("\n๐ŸŽ‰ Integration test passed! export_unsampled feature working correctly!"); - } - #[test] fn validate_span_attributes_exported_correctly() { let exporter = MockSpanExporter::new(); @@ -1705,4 +1473,142 @@ mod tests { let exported_spans = exporter_shared.lock().unwrap(); assert_eq!(exported_spans.len(), 10); } + + #[test] + fn batchspanprocessor_export_unsampled_disabled_by_default() { + let exporter = MockSpanExporter::new(); + let exporter_shared = exporter.exported_spans.clone(); + let processor = BatchSpanProcessor::new(exporter, BatchConfig::default()); + + let unsampled_span = SpanData { + span_context: SpanContext::new( + TraceId::from(1), + SpanId::from(1), + TraceFlags::NOT_SAMPLED, + false, + TraceState::default(), + ), + parent_span_id: SpanId::INVALID, + span_kind: SpanKind::Internal, + name: "unsampled_span".into(), + start_time: opentelemetry::time::now(), + end_time: opentelemetry::time::now(), + attributes: Vec::new(), + dropped_attributes_count: 0, + events: SpanEvents::default(), + links: SpanLinks::default(), + status: Status::Unset, + instrumentation_scope: Default::default(), + }; + + processor.on_end(unsampled_span); + processor.force_flush().unwrap(); + + let exported_spans = exporter_shared.lock().unwrap(); + assert_eq!(exported_spans.len(), 0, "Unsampled spans should not be exported by default"); + } + + #[test] + fn batchspanprocessor_export_unsampled_enabled() { + let exporter = MockSpanExporter::new(); + let exporter_shared = exporter.exported_spans.clone(); + let config = BatchConfigBuilder::default() + .with_export_unsampled(true) + .build(); + let processor = BatchSpanProcessor::new(exporter, config); + + let unsampled_span = SpanData { + span_context: SpanContext::new( + TraceId::from(1), + SpanId::from(1), + TraceFlags::NOT_SAMPLED, + false, + TraceState::default(), + ), + parent_span_id: SpanId::INVALID, + span_kind: SpanKind::Internal, + name: "unsampled_span".into(), + start_time: opentelemetry::time::now(), + end_time: opentelemetry::time::now(), + attributes: Vec::new(), + dropped_attributes_count: 0, + events: SpanEvents::default(), + links: SpanLinks::default(), + status: Status::Unset, + instrumentation_scope: Default::default(), + }; + + processor.on_end(unsampled_span); + processor.force_flush().unwrap(); + + let exported_spans = exporter_shared.lock().unwrap(); + assert_eq!(exported_spans.len(), 1, "Unsampled spans should be exported when export_unsampled is enabled"); + assert_eq!(exported_spans[0].name, "unsampled_span"); + assert!(!exported_spans[0].span_context.is_sampled()); + } + + #[test] + fn batchspanprocessor_mixed_sampled_and_unsampled() { + let exporter = MockSpanExporter::new(); + let exporter_shared = exporter.exported_spans.clone(); + let config = BatchConfigBuilder::default() + .with_export_unsampled(true) + .build(); + let processor = BatchSpanProcessor::new(exporter, config); + + // Add a sampled span + let sampled_span = SpanData { + span_context: SpanContext::new( + TraceId::from(1), + SpanId::from(1), + TraceFlags::SAMPLED, + false, + TraceState::default(), + ), + parent_span_id: SpanId::INVALID, + span_kind: SpanKind::Internal, + name: "sampled_span".into(), + start_time: opentelemetry::time::now(), + end_time: opentelemetry::time::now(), + attributes: Vec::new(), + dropped_attributes_count: 0, + events: SpanEvents::default(), + links: SpanLinks::default(), + status: Status::Unset, + instrumentation_scope: Default::default(), + }; + + // Add an unsampled span + let unsampled_span = SpanData { + span_context: SpanContext::new( + TraceId::from(2), + SpanId::from(2), + TraceFlags::NOT_SAMPLED, + false, + TraceState::default(), + ), + parent_span_id: SpanId::INVALID, + span_kind: SpanKind::Internal, + name: "unsampled_span".into(), + start_time: opentelemetry::time::now(), + end_time: opentelemetry::time::now(), + attributes: Vec::new(), + dropped_attributes_count: 0, + events: SpanEvents::default(), + links: SpanLinks::default(), + status: Status::Unset, + instrumentation_scope: Default::default(), + }; + + processor.on_end(sampled_span); + processor.on_end(unsampled_span); + processor.force_flush().unwrap(); + + let exported_spans = exporter_shared.lock().unwrap(); + assert_eq!(exported_spans.len(), 2, "Both sampled and unsampled spans should be exported when export_unsampled is enabled"); + + let span_names: Vec<&str> = exported_spans.iter().map(|s| s.name.as_ref()).collect(); + assert!(span_names.contains(&"sampled_span")); + assert!(span_names.contains(&"unsampled_span")); + } } diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index 89792ff28a..5601cd0f06 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -91,8 +91,8 @@ pub struct BatchSpanProcessor { // Track the maximum queue size that was configured for this processor max_queue_size: usize, - // Configuration for the processor - config: BatchConfig, + // Whether to export unsampled spans that are recording + export_unsampled: bool, } impl fmt::Debug for BatchSpanProcessor { @@ -110,7 +110,7 @@ impl SpanProcessor for BatchSpanProcessor { fn on_end(&self, span: SpanData) { // Check if span should be exported based on sampling and config - if !span.span_context.is_sampled() && !self.config.export_unsampled { + if !span.span_context.is_sampled() && !self.export_unsampled { return; } @@ -407,7 +407,7 @@ impl BatchSpanProcessor { message_sender, dropped_spans_count: AtomicUsize::new(0), max_queue_size, - config, + export_unsampled: config.export_unsampled, } } From cb08c5f1d4a6294a229aea6e62a98ae1c35cbe28 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 2 Sep 2025 15:08:44 +0000 Subject: [PATCH 5/5] Fix code formatting with cargo fmt Co-authored-by: cijothomas <5232798+cijothomas@users.noreply.github.com> --- opentelemetry-sdk/src/trace/span_processor.rs | 46 ++++++++++++------- .../span_processor_with_async_runtime.rs | 7 +-- 2 files changed, 34 insertions(+), 19 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 984ae5e234..14d08eb78a 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -767,7 +767,7 @@ pub struct BatchConfig { pub(crate) max_concurrent_exports: usize, /// Whether to export unsampled spans that are recording. - /// If true, spans with is_recording() == true but TraceFlags::SAMPLED == false + /// If true, spans with is_recording() == true but TraceFlags::SAMPLED == false /// will be exported. Defaults to false for backward compatibility. pub(crate) export_unsampled: bool, } @@ -972,7 +972,9 @@ mod tests { use crate::trace::InMemorySpanExporterBuilder; use crate::trace::{BatchConfig, BatchConfigBuilder, SpanEvents, SpanLinks}; use crate::trace::{SpanData, SpanExporter}; - use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState}; + use opentelemetry::trace::{ + SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState, + }; use std::fmt::Debug; use std::time::Duration; @@ -1479,7 +1481,7 @@ mod tests { let exporter = MockSpanExporter::new(); let exporter_shared = exporter.exported_spans.clone(); let processor = BatchSpanProcessor::new(exporter, BatchConfig::default()); - + let unsampled_span = SpanData { span_context: SpanContext::new( TraceId::from(1), @@ -1500,12 +1502,16 @@ mod tests { status: Status::Unset, instrumentation_scope: Default::default(), }; - + processor.on_end(unsampled_span); processor.force_flush().unwrap(); - + let exported_spans = exporter_shared.lock().unwrap(); - assert_eq!(exported_spans.len(), 0, "Unsampled spans should not be exported by default"); + assert_eq!( + exported_spans.len(), + 0, + "Unsampled spans should not be exported by default" + ); } #[test] @@ -1516,7 +1522,7 @@ mod tests { .with_export_unsampled(true) .build(); let processor = BatchSpanProcessor::new(exporter, config); - + let unsampled_span = SpanData { span_context: SpanContext::new( TraceId::from(1), @@ -1537,12 +1543,16 @@ mod tests { status: Status::Unset, instrumentation_scope: Default::default(), }; - + processor.on_end(unsampled_span); processor.force_flush().unwrap(); - + let exported_spans = exporter_shared.lock().unwrap(); - assert_eq!(exported_spans.len(), 1, "Unsampled spans should be exported when export_unsampled is enabled"); + assert_eq!( + exported_spans.len(), + 1, + "Unsampled spans should be exported when export_unsampled is enabled" + ); assert_eq!(exported_spans[0].name, "unsampled_span"); assert!(!exported_spans[0].span_context.is_sampled()); } @@ -1555,7 +1565,7 @@ mod tests { .with_export_unsampled(true) .build(); let processor = BatchSpanProcessor::new(exporter, config); - + // Add a sampled span let sampled_span = SpanData { span_context: SpanContext::new( @@ -1577,7 +1587,7 @@ mod tests { status: Status::Unset, instrumentation_scope: Default::default(), }; - + // Add an unsampled span let unsampled_span = SpanData { span_context: SpanContext::new( @@ -1599,14 +1609,18 @@ mod tests { status: Status::Unset, instrumentation_scope: Default::default(), }; - + processor.on_end(sampled_span); processor.on_end(unsampled_span); processor.force_flush().unwrap(); - + let exported_spans = exporter_shared.lock().unwrap(); - assert_eq!(exported_spans.len(), 2, "Both sampled and unsampled spans should be exported when export_unsampled is enabled"); - + assert_eq!( + exported_spans.len(), + 2, + "Both sampled and unsampled spans should be exported when export_unsampled is enabled" + ); + let span_names: Vec<&str> = exported_spans.iter().map(|s| s.name.as_ref()).collect(); assert!(span_names.contains(&"sampled_span")); assert!(span_names.contains(&"unsampled_span")); diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index 5601cd0f06..fe28ab8fbe 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -385,9 +385,10 @@ impl BatchSpanProcessor { runtime.spawn(async move { // Timer will take a reference to the current runtime, so its important we do this within the // runtime.spawn() - let ticker = to_interval_stream(inner_runtime.clone(), config_for_worker.scheduled_delay) - .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. - .map(|_| BatchMessage::Flush(None)); + let ticker = + to_interval_stream(inner_runtime.clone(), config_for_worker.scheduled_delay) + .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. + .map(|_| BatchMessage::Flush(None)); let timeout_runtime = inner_runtime.clone(); let messages = Box::pin(stream::select(message_receiver, ticker));