From 4842c8b32a9f47f93bd0683527b4dec769c26a2d Mon Sep 17 00:00:00 2001 From: kovaszab Date: Wed, 5 Nov 2025 13:54:19 +0100 Subject: [PATCH 1/5] add forceflush_timeout configurability to BatchSpanProcessor and BatchLogProcessor --- .../src/logs/batch_log_processor.rs | 31 ++++++++++++++++++- opentelemetry-sdk/src/trace/span_processor.rs | 31 ++++++++++++++++++- 2 files changed, 60 insertions(+), 2 deletions(-) diff --git a/opentelemetry-sdk/src/logs/batch_log_processor.rs b/opentelemetry-sdk/src/logs/batch_log_processor.rs index 9aae314f29..5dfe6d6991 100644 --- a/opentelemetry-sdk/src/logs/batch_log_processor.rs +++ b/opentelemetry-sdk/src/logs/batch_log_processor.rs @@ -54,6 +54,10 @@ pub(crate) const OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048; pub(crate) const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE"; /// Default maximum batch size. pub(crate) const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512; +/// Force flush timeout +pub(crate) const OTEL_BLRP_FORCE_FLUSH_TIMEOUT: &str = "OTEL_BLRP_FORCE_FLUSH_TIMEOUT"; +/// Default force flush timeout +pub(crate) const OTEL_BLRP_FORCE_FLUSH_TIMEOUT_DEFAULT: Duration = Duration::from_millis(5_000); /// Messages sent between application thread and batch log processor's work thread. #[allow(clippy::large_enum_variant)] @@ -339,6 +343,7 @@ impl BatchLogProcessor { let max_export_batch_size = config.max_export_batch_size; let current_batch_size = Arc::new(AtomicUsize::new(0)); let current_batch_size_for_thread = current_batch_size.clone(); + let forceflush_timeout = config.forceflush_timeout; let handle = thread::Builder::new() .name("OpenTelemetry.Logs.BatchProcessor".to_string()) @@ -489,7 +494,7 @@ impl BatchLogProcessor { logs_sender, message_sender, handle: Mutex::new(Some(handle)), - forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable + forceflush_timeout, dropped_logs_count: AtomicUsize::new(0), max_queue_size, export_log_message_sent: Arc::new(AtomicBool::new(false)), @@ -586,6 +591,9 @@ pub struct BatchConfig { /// is 512. pub(crate) max_export_batch_size: usize, + /// The maximum duration to wait when force flushing. + pub(crate) forceflush_timeout: Duration, + /// The maximum duration to export a batch of data. #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] pub(crate) max_export_timeout: Duration, @@ -603,6 +611,7 @@ pub struct BatchConfigBuilder { max_queue_size: usize, scheduled_delay: Duration, max_export_batch_size: usize, + forceflush_timeout: Duration, #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] max_export_timeout: Duration, } @@ -622,6 +631,7 @@ impl Default for BatchConfigBuilder { max_queue_size: OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, scheduled_delay: OTEL_BLRP_SCHEDULE_DELAY_DEFAULT, max_export_batch_size: OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, + forceflush_timeout: OTEL_BLRP_FORCE_FLUSH_TIMEOUT_DEFAULT, #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] max_export_timeout: OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, } @@ -682,6 +692,17 @@ impl BatchConfigBuilder { self } + /// Set forceflush_timeout for [`BatchConfigBuilder`]. + /// The default value is 5000 milliseconds. + /// + /// Corresponding environment variable: `OTEL_BLRP_FORCE_FLUSH_TIMEOUT`. + /// + /// Note: Programmatically setting this will override any value set via the environment variable. + pub fn with_forceflush_timeout(mut self, forceflush_timeout: Duration) -> Self { + self.forceflush_timeout = forceflush_timeout; + self + } + /// Builds a `BatchConfig` enforcing the following invariants: /// * `max_export_batch_size` must be less than or equal to `max_queue_size`. pub fn build(self) -> BatchConfig { @@ -692,6 +713,7 @@ impl BatchConfigBuilder { BatchConfig { max_queue_size: self.max_queue_size, scheduled_delay: self.scheduled_delay, + forceflush_timeout: self.forceflush_timeout, #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] max_export_timeout: self.max_export_timeout, max_export_batch_size, @@ -720,6 +742,13 @@ impl BatchConfigBuilder { self.scheduled_delay = Duration::from_millis(scheduled_delay); } + if let Some(forceflush_timeout) = env::var(OTEL_BLRP_FORCE_FLUSH_TIMEOUT) + .ok() + .and_then(|s| u64::from_str(&s).ok()) + { + self.forceflush_timeout = Duration::from_millis(forceflush_timeout); + } + #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] if let Some(max_export_timeout) = env::var(OTEL_BLRP_EXPORT_TIMEOUT) .ok() diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 0ebe80f885..ec4edd4caf 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -70,6 +70,10 @@ pub(crate) const OTEL_BSP_EXPORT_TIMEOUT_DEFAULT: Duration = Duration::from_mill pub(crate) const OTEL_BSP_MAX_CONCURRENT_EXPORTS: &str = "OTEL_BSP_MAX_CONCURRENT_EXPORTS"; /// Default max concurrent exports for BSP pub(crate) const OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT: usize = 1; +/// Force flush timeout +pub(crate) const OTEL_BSP_FORCE_FLUSH_TIMEOUT: &str = "OTEL_BSP_FORCE_FLUSH_TIMEOUT"; +/// Default force flush timeout +pub(crate) const OTEL_BSP_FORCE_FLUSH_TIMEOUT_DEFAULT: Duration = Duration::from_millis(5_000); /// `SpanProcessor` is an interface which allows hooks for span start and end /// method invocations. The span processors are invoked only when is_recording @@ -312,6 +316,7 @@ impl BatchSpanProcessor { let max_export_batch_size = config.max_export_batch_size; let current_batch_size = Arc::new(AtomicUsize::new(0)); let current_batch_size_for_thread = current_batch_size.clone(); + let forceflush_timeout = config.forceflush_timeout; let handle = thread::Builder::new() .name("OpenTelemetry.Traces.BatchProcessor".to_string()) @@ -424,7 +429,7 @@ impl BatchSpanProcessor { span_sender, message_sender, handle: Mutex::new(Some(handle)), - forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable + forceflush_timeout, dropped_spans_count: AtomicUsize::new(0), max_queue_size, export_span_message_sent: Arc::new(AtomicBool::new(false)), @@ -757,6 +762,9 @@ pub struct BatchConfig { /// by an exporter. A value of 1 will cause exports to be performed /// synchronously on the BatchSpanProcessor task. pub(crate) max_concurrent_exports: usize, + + /// The maximum duration to wait when force flushing. + pub(crate) forceflush_timeout: Duration, } impl Default for BatchConfig { @@ -773,6 +781,7 @@ pub struct BatchConfigBuilder { max_export_batch_size: usize, max_export_timeout: Duration, max_concurrent_exports: usize, + forceflush_timeout: Duration, } impl Default for BatchConfigBuilder { @@ -793,6 +802,7 @@ impl Default for BatchConfigBuilder { max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT, max_export_timeout: OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, max_concurrent_exports: OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT, + forceflush_timeout: OTEL_BSP_FORCE_FLUSH_TIMEOUT_DEFAULT, } .init_from_env_vars() } @@ -868,6 +878,17 @@ impl BatchConfigBuilder { self } + /// Set forceflush_timeout for [`BatchConfigBuilder`]. + /// The default value is 5000 milliseconds. + /// + /// Corresponding environment variable: `OTEL_BSP_FORCE_FLUSH_TIMEOUT`. + /// + /// Note: Programmatically setting this will override any value set via the environment variable. + pub fn with_forceflush_timeout(mut self, forceflush_timeout: Duration) -> Self { + self.forceflush_timeout = forceflush_timeout; + self + } + /// Builds a `BatchConfig` enforcing the following invariants: /// * `max_export_batch_size` must be less than or equal to `max_queue_size`. pub fn build(self) -> BatchConfig { @@ -880,6 +901,7 @@ impl BatchConfigBuilder { scheduled_delay: self.scheduled_delay, max_export_timeout: self.max_export_timeout, max_concurrent_exports: self.max_concurrent_exports, + forceflush_timeout: self.forceflush_timeout, max_export_batch_size, } } @@ -926,6 +948,13 @@ impl BatchConfigBuilder { self.max_export_timeout = Duration::from_millis(max_export_timeout); } + if let Some(forceflush_timeout) = env::var(OTEL_BSP_FORCE_FLUSH_TIMEOUT) + .ok() + .and_then(|s| u64::from_str(&s).ok()) + { + self.forceflush_timeout = Duration::from_millis(forceflush_timeout); + } + self } } From d5550d6afbadd6438d24ce5a30dadaf34a6192e4 Mon Sep 17 00:00:00 2001 From: kovaszab Date: Wed, 5 Nov 2025 15:45:25 +0100 Subject: [PATCH 2/5] fix compile errors --- .../src/trace/span_processor_with_async_runtime.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index b294f74043..e27fdfbe26 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -647,6 +647,7 @@ mod tests { scheduled_delay: Duration::from_secs(3600), // effectively disabled max_export_timeout: Duration::from_secs(5), max_concurrent_exports: 2, // what we want to verify + forceflush_timeout: Duration::from_secs(5), }; // Spawn the processor. @@ -685,6 +686,7 @@ mod tests { scheduled_delay: Duration::from_secs(3600), max_export_timeout: Duration::from_secs(5), max_concurrent_exports: 1, // what we want to verify + forceflush_timeout: Duration::from_secs(5), }; let processor = BatchSpanProcessor::new(exporter, config, runtime::Tokio); From b64f632c72b181dab7a3b313909890aa5300dc44 Mon Sep 17 00:00:00 2001 From: kovaszab Date: Wed, 5 Nov 2025 17:25:47 +0100 Subject: [PATCH 3/5] Revert changes --- .../src/logs/batch_log_processor.rs | 31 +------------------ opentelemetry-sdk/src/trace/span_processor.rs | 31 +------------------ .../span_processor_with_async_runtime.rs | 2 -- 3 files changed, 2 insertions(+), 62 deletions(-) diff --git a/opentelemetry-sdk/src/logs/batch_log_processor.rs b/opentelemetry-sdk/src/logs/batch_log_processor.rs index 5dfe6d6991..9aae314f29 100644 --- a/opentelemetry-sdk/src/logs/batch_log_processor.rs +++ b/opentelemetry-sdk/src/logs/batch_log_processor.rs @@ -54,10 +54,6 @@ pub(crate) const OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048; pub(crate) const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE"; /// Default maximum batch size. pub(crate) const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512; -/// Force flush timeout -pub(crate) const OTEL_BLRP_FORCE_FLUSH_TIMEOUT: &str = "OTEL_BLRP_FORCE_FLUSH_TIMEOUT"; -/// Default force flush timeout -pub(crate) const OTEL_BLRP_FORCE_FLUSH_TIMEOUT_DEFAULT: Duration = Duration::from_millis(5_000); /// Messages sent between application thread and batch log processor's work thread. #[allow(clippy::large_enum_variant)] @@ -343,7 +339,6 @@ impl BatchLogProcessor { let max_export_batch_size = config.max_export_batch_size; let current_batch_size = Arc::new(AtomicUsize::new(0)); let current_batch_size_for_thread = current_batch_size.clone(); - let forceflush_timeout = config.forceflush_timeout; let handle = thread::Builder::new() .name("OpenTelemetry.Logs.BatchProcessor".to_string()) @@ -494,7 +489,7 @@ impl BatchLogProcessor { logs_sender, message_sender, handle: Mutex::new(Some(handle)), - forceflush_timeout, + forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable dropped_logs_count: AtomicUsize::new(0), max_queue_size, export_log_message_sent: Arc::new(AtomicBool::new(false)), @@ -591,9 +586,6 @@ pub struct BatchConfig { /// is 512. pub(crate) max_export_batch_size: usize, - /// The maximum duration to wait when force flushing. - pub(crate) forceflush_timeout: Duration, - /// The maximum duration to export a batch of data. #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] pub(crate) max_export_timeout: Duration, @@ -611,7 +603,6 @@ pub struct BatchConfigBuilder { max_queue_size: usize, scheduled_delay: Duration, max_export_batch_size: usize, - forceflush_timeout: Duration, #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] max_export_timeout: Duration, } @@ -631,7 +622,6 @@ impl Default for BatchConfigBuilder { max_queue_size: OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, scheduled_delay: OTEL_BLRP_SCHEDULE_DELAY_DEFAULT, max_export_batch_size: OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, - forceflush_timeout: OTEL_BLRP_FORCE_FLUSH_TIMEOUT_DEFAULT, #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] max_export_timeout: OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, } @@ -692,17 +682,6 @@ impl BatchConfigBuilder { self } - /// Set forceflush_timeout for [`BatchConfigBuilder`]. - /// The default value is 5000 milliseconds. - /// - /// Corresponding environment variable: `OTEL_BLRP_FORCE_FLUSH_TIMEOUT`. - /// - /// Note: Programmatically setting this will override any value set via the environment variable. - pub fn with_forceflush_timeout(mut self, forceflush_timeout: Duration) -> Self { - self.forceflush_timeout = forceflush_timeout; - self - } - /// Builds a `BatchConfig` enforcing the following invariants: /// * `max_export_batch_size` must be less than or equal to `max_queue_size`. pub fn build(self) -> BatchConfig { @@ -713,7 +692,6 @@ impl BatchConfigBuilder { BatchConfig { max_queue_size: self.max_queue_size, scheduled_delay: self.scheduled_delay, - forceflush_timeout: self.forceflush_timeout, #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] max_export_timeout: self.max_export_timeout, max_export_batch_size, @@ -742,13 +720,6 @@ impl BatchConfigBuilder { self.scheduled_delay = Duration::from_millis(scheduled_delay); } - if let Some(forceflush_timeout) = env::var(OTEL_BLRP_FORCE_FLUSH_TIMEOUT) - .ok() - .and_then(|s| u64::from_str(&s).ok()) - { - self.forceflush_timeout = Duration::from_millis(forceflush_timeout); - } - #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] if let Some(max_export_timeout) = env::var(OTEL_BLRP_EXPORT_TIMEOUT) .ok() diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index ec4edd4caf..0ebe80f885 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -70,10 +70,6 @@ pub(crate) const OTEL_BSP_EXPORT_TIMEOUT_DEFAULT: Duration = Duration::from_mill pub(crate) const OTEL_BSP_MAX_CONCURRENT_EXPORTS: &str = "OTEL_BSP_MAX_CONCURRENT_EXPORTS"; /// Default max concurrent exports for BSP pub(crate) const OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT: usize = 1; -/// Force flush timeout -pub(crate) const OTEL_BSP_FORCE_FLUSH_TIMEOUT: &str = "OTEL_BSP_FORCE_FLUSH_TIMEOUT"; -/// Default force flush timeout -pub(crate) const OTEL_BSP_FORCE_FLUSH_TIMEOUT_DEFAULT: Duration = Duration::from_millis(5_000); /// `SpanProcessor` is an interface which allows hooks for span start and end /// method invocations. The span processors are invoked only when is_recording @@ -316,7 +312,6 @@ impl BatchSpanProcessor { let max_export_batch_size = config.max_export_batch_size; let current_batch_size = Arc::new(AtomicUsize::new(0)); let current_batch_size_for_thread = current_batch_size.clone(); - let forceflush_timeout = config.forceflush_timeout; let handle = thread::Builder::new() .name("OpenTelemetry.Traces.BatchProcessor".to_string()) @@ -429,7 +424,7 @@ impl BatchSpanProcessor { span_sender, message_sender, handle: Mutex::new(Some(handle)), - forceflush_timeout, + forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable dropped_spans_count: AtomicUsize::new(0), max_queue_size, export_span_message_sent: Arc::new(AtomicBool::new(false)), @@ -762,9 +757,6 @@ pub struct BatchConfig { /// by an exporter. A value of 1 will cause exports to be performed /// synchronously on the BatchSpanProcessor task. pub(crate) max_concurrent_exports: usize, - - /// The maximum duration to wait when force flushing. - pub(crate) forceflush_timeout: Duration, } impl Default for BatchConfig { @@ -781,7 +773,6 @@ pub struct BatchConfigBuilder { max_export_batch_size: usize, max_export_timeout: Duration, max_concurrent_exports: usize, - forceflush_timeout: Duration, } impl Default for BatchConfigBuilder { @@ -802,7 +793,6 @@ impl Default for BatchConfigBuilder { max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT, max_export_timeout: OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, max_concurrent_exports: OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT, - forceflush_timeout: OTEL_BSP_FORCE_FLUSH_TIMEOUT_DEFAULT, } .init_from_env_vars() } @@ -878,17 +868,6 @@ impl BatchConfigBuilder { self } - /// Set forceflush_timeout for [`BatchConfigBuilder`]. - /// The default value is 5000 milliseconds. - /// - /// Corresponding environment variable: `OTEL_BSP_FORCE_FLUSH_TIMEOUT`. - /// - /// Note: Programmatically setting this will override any value set via the environment variable. - pub fn with_forceflush_timeout(mut self, forceflush_timeout: Duration) -> Self { - self.forceflush_timeout = forceflush_timeout; - self - } - /// Builds a `BatchConfig` enforcing the following invariants: /// * `max_export_batch_size` must be less than or equal to `max_queue_size`. pub fn build(self) -> BatchConfig { @@ -901,7 +880,6 @@ impl BatchConfigBuilder { scheduled_delay: self.scheduled_delay, max_export_timeout: self.max_export_timeout, max_concurrent_exports: self.max_concurrent_exports, - forceflush_timeout: self.forceflush_timeout, max_export_batch_size, } } @@ -948,13 +926,6 @@ impl BatchConfigBuilder { self.max_export_timeout = Duration::from_millis(max_export_timeout); } - if let Some(forceflush_timeout) = env::var(OTEL_BSP_FORCE_FLUSH_TIMEOUT) - .ok() - .and_then(|s| u64::from_str(&s).ok()) - { - self.forceflush_timeout = Duration::from_millis(forceflush_timeout); - } - self } } diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index e27fdfbe26..b294f74043 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -647,7 +647,6 @@ mod tests { scheduled_delay: Duration::from_secs(3600), // effectively disabled max_export_timeout: Duration::from_secs(5), max_concurrent_exports: 2, // what we want to verify - forceflush_timeout: Duration::from_secs(5), }; // Spawn the processor. @@ -686,7 +685,6 @@ mod tests { scheduled_delay: Duration::from_secs(3600), max_export_timeout: Duration::from_secs(5), max_concurrent_exports: 1, // what we want to verify - forceflush_timeout: Duration::from_secs(5), }; let processor = BatchSpanProcessor::new(exporter, config, runtime::Tokio); From c18c006f8772fe5459d71824763d025da311211b Mon Sep 17 00:00:00 2001 From: kovaszab Date: Thu, 6 Nov 2025 11:03:38 +0100 Subject: [PATCH 4/5] add force_flush_with_timeout to processors and exporters --- .../tracing-http-propagator/src/server.rs | 2 +- .../src/logs/batch_log_processor.rs | 20 ++++++------- opentelemetry-sdk/src/logs/log_processor.rs | 8 +++++- opentelemetry-sdk/src/logs/logger_provider.rs | 9 ++++-- opentelemetry-sdk/src/trace/export.rs | 7 ++++- opentelemetry-sdk/src/trace/mod.rs | 2 +- opentelemetry-sdk/src/trace/provider.rs | 13 ++++++--- opentelemetry-sdk/src/trace/span_processor.rs | 28 +++++++++---------- .../span_processor_with_async_runtime.rs | 2 +- stress/src/traces.rs | 2 +- 10 files changed, 55 insertions(+), 38 deletions(-) diff --git a/examples/tracing-http-propagator/src/server.rs b/examples/tracing-http-propagator/src/server.rs index 8060c99841..d7ccb11e03 100644 --- a/examples/tracing-http-propagator/src/server.rs +++ b/examples/tracing-http-propagator/src/server.rs @@ -132,7 +132,7 @@ impl LogProcessor for EnrichWithBaggageLogProcessor { #[derive(Debug)] struct EnrichWithBaggageSpanProcessor; impl SpanProcessor for EnrichWithBaggageSpanProcessor { - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-sdk/src/logs/batch_log_processor.rs b/opentelemetry-sdk/src/logs/batch_log_processor.rs index 9aae314f29..0627cbf097 100644 --- a/opentelemetry-sdk/src/logs/batch_log_processor.rs +++ b/opentelemetry-sdk/src/logs/batch_log_processor.rs @@ -129,7 +129,6 @@ pub struct BatchLogProcessor { logs_sender: SyncSender, // Data channel to store log records and instrumentation scopes message_sender: SyncSender, // Control channel to store control messages for the worker thread handle: Mutex>>, - forceflush_timeout: Duration, export_log_message_sent: Arc, current_batch_size: Arc, max_export_batch_size: usize, @@ -221,21 +220,19 @@ impl LogProcessor for BatchLogProcessor { } } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, timeout: Duration) -> OTelSdkResult { let (sender, receiver) = mpsc::sync_channel(1); match self .message_sender .try_send(BatchMessage::ForceFlush(sender)) { - Ok(_) => receiver - .recv_timeout(self.forceflush_timeout) - .map_err(|err| { - if err == RecvTimeoutError::Timeout { - OTelSdkError::Timeout(self.forceflush_timeout) - } else { - OTelSdkError::InternalFailure(format!("{err}")) - } - })?, + Ok(_) => receiver.recv_timeout(timeout).map_err(|err| { + if err == RecvTimeoutError::Timeout { + OTelSdkError::Timeout(timeout) + } else { + OTelSdkError::InternalFailure(format!("{err}")) + } + })?, Err(mpsc::TrySendError::Full(_)) => { // If the control message could not be sent, emit a warning. otel_debug!( @@ -489,7 +486,6 @@ impl BatchLogProcessor { logs_sender, message_sender, handle: Mutex::new(Some(handle)), - forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable dropped_logs_count: AtomicUsize::new(0), max_queue_size, export_log_message_sent: Arc::new(AtomicBool::new(false)), diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index de882e5f6f..5b0c4fca3c 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -53,7 +53,13 @@ pub trait LogProcessor: Send + Sync + Debug { /// - `instrumentation`: The instrumentation scope associated with the log record. fn emit(&self, data: &mut SdkLogRecord, instrumentation: &InstrumentationScope); /// Force the logs lying in the cache to be exported. - fn force_flush(&self) -> OTelSdkResult; + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { + Ok(()) + } + /// Force the logs lying in the cache to be exported with default timeout. + fn force_flush(&self) -> OTelSdkResult { + self.force_flush_with_timeout(Duration::from_secs(5)) + } /// Shuts down the processor. /// After shutdown returns the log processor should stop processing any logs. /// It's up to the implementation on when to drop the LogProcessor. diff --git a/opentelemetry-sdk/src/logs/logger_provider.rs b/opentelemetry-sdk/src/logs/logger_provider.rs index 0097c3b6ca..29637ad03b 100644 --- a/opentelemetry-sdk/src/logs/logger_provider.rs +++ b/opentelemetry-sdk/src/logs/logger_provider.rs @@ -83,11 +83,11 @@ impl SdkLoggerProvider { } /// Force flush all remaining logs in log processors and return results. - pub fn force_flush(&self) -> OTelSdkResult { + pub fn force_flush_with_timeout(&self, timeout: Duration) -> OTelSdkResult { let result: Vec<_> = self .log_processors() .iter() - .map(|processor| processor.force_flush()) + .map(|processor| processor.force_flush_with_timeout(timeout)) .collect(); if result.iter().all(|r| r.is_ok()) { Ok(()) @@ -96,6 +96,11 @@ impl SdkLoggerProvider { } } + /// Force flush all remaining logs with default timeout. + pub fn force_flush(&self) -> OTelSdkResult { + self.force_flush_with_timeout(Duration::from_secs(5)) + } + /// Shuts down this `LoggerProvider` pub fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult { otel_debug!( diff --git a/opentelemetry-sdk/src/trace/export.rs b/opentelemetry-sdk/src/trace/export.rs index f09e5d5707..7b3a48996a 100644 --- a/opentelemetry-sdk/src/trace/export.rs +++ b/opentelemetry-sdk/src/trace/export.rs @@ -66,10 +66,15 @@ pub trait SpanExporter: Send + Sync + Debug { /// implemented as a blocking API or an asynchronous API which notifies the caller via /// a callback or an event. OpenTelemetry client authors can decide if they want to /// make the flush timeout configurable. - fn force_flush(&mut self) -> OTelSdkResult { + fn force_flush_with_timeout(&mut self, _timeout: Duration) -> OTelSdkResult { Ok(()) } + /// Force flush the exporter with default timeout. + fn force_flush(&mut self) -> OTelSdkResult { + self.force_flush_with_timeout(Duration::from_secs(5)) + } + /// Set the resource for the exporter. fn set_resource(&mut self, _resource: &Resource) {} } diff --git a/opentelemetry-sdk/src/trace/mod.rs b/opentelemetry-sdk/src/trace/mod.rs index 585f65f27d..7f793b5c45 100644 --- a/opentelemetry-sdk/src/trace/mod.rs +++ b/opentelemetry-sdk/src/trace/mod.rs @@ -143,7 +143,7 @@ mod tests { // let _c = Context::current(); } - fn force_flush(&self) -> crate::error::OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index 2b05f89aea..70f3654a2d 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -194,7 +194,7 @@ impl SdkTracerProvider { self.inner.is_shutdown.load(Ordering::Relaxed) } - /// Force flush all remaining spans in span processors and return results. + /// Force flush all remaining spans in span processors with a default timeout and return results. /// /// # Examples /// @@ -228,10 +228,15 @@ impl SdkTracerProvider { /// } /// ``` pub fn force_flush(&self) -> OTelSdkResult { + self.force_flush_with_timeout(Duration::from_secs(5)) + } + + /// force flush processors with a specified timeout + pub fn force_flush_with_timeout(&self, timeout: Duration) -> OTelSdkResult { let result: Vec<_> = self .span_processors() .iter() - .map(|processor| processor.force_flush()) + .map(|processor| processor.force_flush_with_timeout(timeout)) .collect(); if result.iter().all(|r| r.is_ok()) { Ok(()) @@ -530,7 +535,7 @@ mod tests { // ignore } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { if self.success { Ok(()) } else { @@ -793,7 +798,7 @@ mod tests { // No operation needed for this processor } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 0ebe80f885..c6a9ca7a4a 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -85,7 +85,11 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { /// TODO - This method should take reference to `SpanData` fn on_end(&self, span: SpanData); /// Force the spans lying in the cache to be exported. - fn force_flush(&self) -> OTelSdkResult; + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult; + /// Force flush the spans with a default timeout. + fn force_flush(&self) -> OTelSdkResult { + self.force_flush_with_timeout(Duration::from_secs(5)) + } /// Shuts down the processor. Called when SDK is shut down. This is an /// opportunity for processors to do any cleanup required. /// @@ -153,7 +157,7 @@ impl SpanProcessor for SimpleSpanProcessor { } } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { // Nothing to flush for simple span processor. Ok(()) } @@ -286,7 +290,6 @@ pub struct BatchSpanProcessor { span_sender: SyncSender, // Data channel to store spans message_sender: SyncSender, // Control channel to store control messages. handle: Mutex>>, - forceflush_timeout: Duration, export_span_message_sent: Arc, current_batch_size: Arc, max_export_batch_size: usize, @@ -424,7 +427,6 @@ impl BatchSpanProcessor { span_sender, message_sender, handle: Mutex::new(Some(handle)), - forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable dropped_spans_count: AtomicUsize::new(0), max_queue_size, export_span_message_sent: Arc::new(AtomicBool::new(false)), @@ -593,21 +595,19 @@ impl SpanProcessor for BatchSpanProcessor { } /// Flushes all pending spans. - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, timeout: Duration) -> OTelSdkResult { let (sender, receiver) = std::sync::mpsc::sync_channel(1); match self .message_sender .try_send(BatchMessage::ForceFlush(sender)) { - Ok(_) => receiver - .recv_timeout(self.forceflush_timeout) - .map_err(|err| { - if err == std::sync::mpsc::RecvTimeoutError::Timeout { - OTelSdkError::Timeout(self.forceflush_timeout) - } else { - OTelSdkError::InternalFailure(format!("{err}")) - } - })?, + Ok(_) => receiver.recv_timeout(timeout).map_err(|err| { + if err == std::sync::mpsc::RecvTimeoutError::Timeout { + OTelSdkError::Timeout(timeout) + } else { + OTelSdkError::InternalFailure(format!("{err}")) + } + })?, Err(std::sync::mpsc::TrySendError::Full(_)) => { // If the control message could not be sent, emit a warning. otel_debug!( diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index b294f74043..f349377e04 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -123,7 +123,7 @@ impl SpanProcessor for BatchSpanProcessor { } } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { let (res_sender, res_receiver) = oneshot::channel(); self.message_sender .try_send(BatchMessage::Flush(Some(res_sender))) diff --git a/stress/src/traces.rs b/stress/src/traces.rs index e0f15099e5..bccb59bd51 100644 --- a/stress/src/traces.rs +++ b/stress/src/traces.rs @@ -41,7 +41,7 @@ impl SpanProcessor for NoOpSpanProcessor { // No-op } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } From 2d11e07f7634ef3521cb99faa82ae91c026cb78d Mon Sep 17 00:00:00 2001 From: kovaszab Date: Thu, 6 Nov 2025 17:04:53 +0100 Subject: [PATCH 5/5] fix missing implementations --- examples/tracing-http-propagator/src/server.rs | 2 +- opentelemetry-appender-tracing/src/layer.rs | 3 ++- opentelemetry-proto/src/transform/logs.rs | 3 ++- opentelemetry-sdk/src/logs/concurrent_log_processor.rs | 2 +- opentelemetry-sdk/src/logs/log_processor.rs | 9 ++++----- .../src/logs/log_processor_with_async_runtime.rs | 6 +++--- opentelemetry-sdk/src/logs/logger_provider.rs | 8 ++++---- opentelemetry-sdk/src/logs/mod.rs | 5 +++-- opentelemetry-sdk/src/logs/simple_log_processor.rs | 2 +- 9 files changed, 21 insertions(+), 19 deletions(-) diff --git a/examples/tracing-http-propagator/src/server.rs b/examples/tracing-http-propagator/src/server.rs index d7ccb11e03..13eb3eb9e2 100644 --- a/examples/tracing-http-propagator/src/server.rs +++ b/examples/tracing-http-propagator/src/server.rs @@ -118,7 +118,7 @@ impl LogProcessor for EnrichWithBaggageLogProcessor { }); } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-appender-tracing/src/layer.rs b/opentelemetry-appender-tracing/src/layer.rs index d5f2435ab2..42a96f26d7 100644 --- a/opentelemetry-appender-tracing/src/layer.rs +++ b/opentelemetry-appender-tracing/src/layer.rs @@ -284,6 +284,7 @@ const fn severity_of_level(level: &Level) -> Severity { #[cfg(test)] mod tests { + use std::time::Duration; use crate::layer; use opentelemetry::logs::Severity; use opentelemetry::trace::TracerProvider; @@ -931,7 +932,7 @@ mod tests { true } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index c99f36fa0b..086f6c5b3f 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -221,6 +221,7 @@ pub mod tonic { #[cfg(test)] mod tests { + use std::time::Duration; use crate::transform::common::tonic::ResourceAttributesWithSchema; use opentelemetry::logs::LogRecord as _; use opentelemetry::logs::Logger; @@ -238,7 +239,7 @@ mod tests { impl LogProcessor for MockProcessor { fn emit(&self, _record: &mut SdkLogRecord, _instrumentation: &InstrumentationScope) {} - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-sdk/src/logs/concurrent_log_processor.rs b/opentelemetry-sdk/src/logs/concurrent_log_processor.rs index 052e3d9796..414f7a7bf1 100644 --- a/opentelemetry-sdk/src/logs/concurrent_log_processor.rs +++ b/opentelemetry-sdk/src/logs/concurrent_log_processor.rs @@ -37,7 +37,7 @@ impl LogProcessor for SimpleConcurrentLogProcessor { } } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { // TODO: invoke flush on exporter // once https://github.com/open-telemetry/opentelemetry-rust/issues/2261 // is resolved diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 5b0c4fca3c..35de04a87a 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -53,9 +53,7 @@ pub trait LogProcessor: Send + Sync + Debug { /// - `instrumentation`: The instrumentation scope associated with the log record. fn emit(&self, data: &mut SdkLogRecord, instrumentation: &InstrumentationScope); /// Force the logs lying in the cache to be exported. - fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { - Ok(()) - } + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult; /// Force the logs lying in the cache to be exported with default timeout. fn force_flush(&self) -> OTelSdkResult { self.force_flush_with_timeout(Duration::from_secs(5)) @@ -109,6 +107,7 @@ pub(crate) mod tests { use opentelemetry::logs::{Logger, LoggerProvider}; use opentelemetry::{InstrumentationScope, Key}; use std::sync::{Arc, Mutex}; + use std::time::Duration; #[derive(Debug, Clone)] pub(crate) struct MockLogExporter { @@ -158,7 +157,7 @@ pub(crate) mod tests { .push((record.clone(), instrumentation.clone())); //clone as the LogProcessor is storing the data. } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } @@ -188,7 +187,7 @@ pub(crate) mod tests { .push((record.clone(), instrumentation.clone())); } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs b/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs index bd725bec9a..46d3f84628 100644 --- a/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs @@ -76,7 +76,7 @@ impl LogProcessor for BatchLogProcessor { } } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { let (res_sender, res_receiver) = oneshot::channel(); self.message_sender .try_send(BatchMessage::Flush(Some(res_sender))) @@ -625,7 +625,7 @@ mod tests { .push((record.clone(), instrumentation.clone())); //clone as the LogProcessor is storing the data. } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } @@ -655,7 +655,7 @@ mod tests { .push((record.clone(), instrumentation.clone())); } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-sdk/src/logs/logger_provider.rs b/opentelemetry-sdk/src/logs/logger_provider.rs index 29637ad03b..00f6956ecc 100644 --- a/opentelemetry-sdk/src/logs/logger_provider.rs +++ b/opentelemetry-sdk/src/logs/logger_provider.rs @@ -345,7 +345,7 @@ mod tests { .expect("lock poisoned"); } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } @@ -398,7 +398,7 @@ mod tests { // nothing to do. } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } @@ -918,7 +918,7 @@ mod tests { // nothing to do. } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { *self.flush_called.lock().unwrap() = true; Ok(()) } @@ -949,7 +949,7 @@ mod tests { // nothing to do } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { *self.flush_called.lock().unwrap() = true; Ok(()) } diff --git a/opentelemetry-sdk/src/logs/mod.rs b/opentelemetry-sdk/src/logs/mod.rs index c54f001f97..dbc913dda1 100644 --- a/opentelemetry-sdk/src/logs/mod.rs +++ b/opentelemetry-sdk/src/logs/mod.rs @@ -46,6 +46,7 @@ mod tests { use std::borrow::Borrow; use std::collections::HashMap; use std::sync::{Arc, Mutex}; + use std::time::Duration; #[test] fn logging_sdk_test() { @@ -167,7 +168,7 @@ mod tests { }); } - fn force_flush(&self) -> crate::error::OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } @@ -273,7 +274,7 @@ mod tests { } } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-sdk/src/logs/simple_log_processor.rs b/opentelemetry-sdk/src/logs/simple_log_processor.rs index 5c8642221a..d388d0728c 100644 --- a/opentelemetry-sdk/src/logs/simple_log_processor.rs +++ b/opentelemetry-sdk/src/logs/simple_log_processor.rs @@ -113,7 +113,7 @@ impl LogProcessor for SimpleLogProcessor { } } - fn force_flush(&self) -> OTelSdkResult { + fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult { Ok(()) }