@@ -6,19 +6,21 @@ use crate::trace::Span;
66use crate :: trace:: SpanProcessor ;
77use crate :: trace:: { SpanData , SpanExporter } ;
88use futures_channel:: oneshot;
9- use futures_util:: pin_mut;
109use futures_util:: {
1110 future:: { self , BoxFuture , Either } ,
12- select,
11+ pin_mut , select,
1312 stream:: { self , FusedStream , FuturesUnordered } ,
1413 StreamExt as _,
1514} ;
1615use opentelemetry:: Context ;
1716use opentelemetry:: { otel_debug, otel_error, otel_warn} ;
1817use std:: fmt;
19- use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
20- use std:: sync:: Arc ;
18+ use std:: sync:: {
19+ atomic:: { AtomicUsize , Ordering } ,
20+ Arc ,
21+ } ;
2122use std:: time:: Duration ;
23+ use tokio:: sync:: RwLock ;
2224
2325/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports
2426/// them at a preconfigured interval.
@@ -188,13 +190,22 @@ struct BatchSpanProcessorInternal<E, R> {
188190 spans : Vec < SpanData > ,
189191 export_tasks : FuturesUnordered < BoxFuture < ' static , OTelSdkResult > > ,
190192 runtime : R ,
191- exporter : E ,
192193 config : BatchConfig ,
194+ // TODO: Redesign the `SpanExporter` trait to use immutable references (`&self`)
195+ // for all methods. This would allow us to remove the `RwLock` and just use `Arc<E>`,
196+ // similar to how `crate::logs::LogExporter` is implemented.
197+ exporter : Arc < RwLock < E > > ,
193198}
194199
195- impl < E : SpanExporter , R : RuntimeChannel > BatchSpanProcessorInternal < E , R > {
200+ impl < E : SpanExporter + ' static , R : RuntimeChannel > BatchSpanProcessorInternal < E , R > {
196201 async fn flush ( & mut self , res_channel : Option < oneshot:: Sender < OTelSdkResult > > ) {
197- let export_result = self . export ( ) . await ;
202+ let export_result = Self :: export (
203+ self . spans . split_off ( 0 ) ,
204+ self . exporter . clone ( ) ,
205+ self . runtime . clone ( ) ,
206+ self . config . max_export_timeout ,
207+ )
208+ . await ;
198209 let task = Box :: pin ( async move {
199210 if let Some ( channel) = res_channel {
200211 // If a response channel is provided, attempt to send the export result through it.
@@ -243,9 +254,15 @@ impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
243254 self . export_tasks . next ( ) . await ;
244255 }
245256
246- let export_result = self . export ( ) . await ;
257+ let batch = self . spans . split_off ( 0 ) ;
258+ let exporter = self . exporter . clone ( ) ;
259+ let runtime = self . runtime . clone ( ) ;
260+ let max_export_timeout = self . config . max_export_timeout ;
261+
247262 let task = async move {
248- if let Err ( err) = export_result {
263+ if let Err ( err) =
264+ Self :: export ( batch, exporter, runtime, max_export_timeout) . await
265+ {
249266 otel_error ! (
250267 name: "BatchSpanProcessor.Export.Error" ,
251268 reason = format!( "{}" , err)
@@ -254,6 +271,7 @@ impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
254271
255272 Ok ( ( ) )
256273 } ;
274+
257275 // Special case when not using concurrent exports
258276 if self . config . max_concurrent_exports == 1 {
259277 let _ = task. await ;
@@ -288,34 +306,39 @@ impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
288306 // Stream has terminated or processor is shutdown, return to finish execution.
289307 BatchMessage :: Shutdown ( ch) => {
290308 self . flush ( Some ( ch) ) . await ;
291- let _ = self . exporter . shutdown ( ) ;
309+ let _ = self . exporter . write ( ) . await . shutdown ( ) ;
292310 return false ;
293311 }
294312 // propagate the resource
295313 BatchMessage :: SetResource ( resource) => {
296- self . exporter . set_resource ( & resource) ;
314+ self . exporter . write ( ) . await . set_resource ( & resource) ;
297315 }
298316 }
299317 true
300318 }
301319
302- async fn export ( & mut self ) -> OTelSdkResult {
320+ async fn export (
321+ batch : Vec < SpanData > ,
322+ exporter : Arc < RwLock < E > > ,
323+ runtime : R ,
324+ max_export_timeout : Duration ,
325+ ) -> OTelSdkResult {
303326 // Batch size check for flush / shutdown. Those methods may be called
304327 // when there's no work to do.
305- if self . spans . is_empty ( ) {
328+ if batch . is_empty ( ) {
306329 return Ok ( ( ) ) ;
307330 }
308331
309- let export = self . exporter . export ( self . spans . split_off ( 0 ) ) ;
310- let timeout = self . runtime . delay ( self . config . max_export_timeout ) ;
311- let time_out = self . config . max_export_timeout ;
332+ let exporter_guard = exporter. read ( ) . await ;
333+ let export = exporter_guard . export ( batch ) ;
334+ let timeout = runtime . delay ( max_export_timeout) ;
312335
313336 pin_mut ! ( export) ;
314337 pin_mut ! ( timeout) ;
315338
316339 match future:: select ( export, timeout) . await {
317340 Either :: Left ( ( export_res, _) ) => export_res,
318- Either :: Right ( ( _, _) ) => Err ( OTelSdkError :: Timeout ( time_out ) ) ,
341+ Either :: Right ( ( _, _) ) => Err ( OTelSdkError :: Timeout ( max_export_timeout ) ) ,
319342 }
320343 }
321344
@@ -368,7 +391,7 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
368391 export_tasks : FuturesUnordered :: new ( ) ,
369392 runtime : timeout_runtime,
370393 config,
371- exporter,
394+ exporter : Arc :: new ( RwLock :: new ( exporter ) ) ,
372395 } ;
373396
374397 processor. run ( messages) . await
@@ -435,6 +458,8 @@ mod tests {
435458 use crate :: trace:: { SpanData , SpanExporter } ;
436459 use futures_util:: Future ;
437460 use std:: fmt:: Debug ;
461+ use std:: sync:: atomic:: { AtomicBool , AtomicUsize , Ordering } ;
462+ use std:: sync:: Arc ;
438463 use std:: time:: Duration ;
439464
440465 struct BlockingExporter < D > {
@@ -463,6 +488,39 @@ mod tests {
463488 }
464489 }
465490
491+ /// Exporter that records whether two exports overlap in time.
492+ struct TrackingExporter {
493+ /// Artificial delay to keep each export alive for a while.
494+ delay : Duration ,
495+ /// Current number of in-flight exports.
496+ active : Arc < AtomicUsize > ,
497+ /// Set to true the first time we see overlap.
498+ concurrent_seen : Arc < AtomicBool > ,
499+ }
500+
501+ impl Debug for TrackingExporter {
502+ fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
503+ f. write_str ( "tracking exporter" )
504+ }
505+ }
506+
507+ impl SpanExporter for TrackingExporter {
508+ async fn export ( & self , _batch : Vec < SpanData > ) -> crate :: error:: OTelSdkResult {
509+ // Increment in-flight counter and note any overlap.
510+ let inflight = self . active . fetch_add ( 1 , Ordering :: SeqCst ) + 1 ;
511+ if inflight > 1 {
512+ self . concurrent_seen . store ( true , Ordering :: SeqCst ) ;
513+ }
514+
515+ // Keep the export "busy" for a bit.
516+ tokio:: time:: sleep ( self . delay ) . await ;
517+
518+ // Decrement counter.
519+ self . active . fetch_sub ( 1 , Ordering :: SeqCst ) ;
520+ Ok ( ( ) )
521+ }
522+ }
523+
466524 #[ test]
467525 fn test_build_batch_span_processor_builder ( ) {
468526 let mut env_vars = vec ! [
@@ -532,8 +590,8 @@ mod tests {
532590 ) ;
533591 }
534592
535- // If the time_out is true, then the result suppose to ended with timeout.
536- // otherwise the exporter should be able to export within time out duration.
593+ // If ` time_out` is ` true` , then the export should fail with a timeout.
594+ // Else, the exporter should be able to export within the timeout duration.
537595 async fn timeout_test_tokio ( time_out : bool ) {
538596 let config = BatchConfig {
539597 max_export_timeout : Duration :: from_millis ( if time_out { 5 } else { 60 } ) ,
@@ -557,24 +615,92 @@ mod tests {
557615 assert ! ( shutdown_res. is_ok( ) ) ;
558616 }
559617
560- #[ test]
561- fn test_timeout_tokio_timeout ( ) {
618+ #[ tokio :: test( flavor = "multi_thread" ) ]
619+ async fn test_timeout_tokio_timeout ( ) {
562620 // If time_out is true, then we ask exporter to block for 60s and set timeout to 5s.
563621 // If time_out is false, then we ask the exporter to block for 5s and set timeout to 60s.
564622 // Either way, the test should be finished within 5s.
565- let runtime = tokio:: runtime:: Builder :: new_multi_thread ( )
566- . enable_all ( )
567- . build ( )
568- . unwrap ( ) ;
569- runtime. block_on ( timeout_test_tokio ( true ) ) ;
623+ timeout_test_tokio ( true ) . await ;
570624 }
571625
572- #[ test]
573- fn test_timeout_tokio_not_timeout ( ) {
574- let runtime = tokio:: runtime:: Builder :: new_multi_thread ( )
575- . enable_all ( )
576- . build ( )
577- . unwrap ( ) ;
578- runtime. block_on ( timeout_test_tokio ( false ) ) ;
626+ #[ tokio:: test( flavor = "multi_thread" ) ]
627+ async fn test_timeout_tokio_not_timeout ( ) {
628+ timeout_test_tokio ( false ) . await ;
629+ }
630+
631+ #[ tokio:: test( flavor = "multi_thread" ) ]
632+ async fn test_concurrent_exports_expected ( ) {
633+ // Shared state for the exporter.
634+ let active = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
635+ let concurrent_seen = Arc :: new ( AtomicBool :: new ( false ) ) ;
636+
637+ let exporter = TrackingExporter {
638+ delay : Duration :: from_millis ( 50 ) ,
639+ active : active. clone ( ) ,
640+ concurrent_seen : concurrent_seen. clone ( ) ,
641+ } ;
642+
643+ // Intentionally tiny batch-size so every span forces an export.
644+ let config = BatchConfig {
645+ max_export_batch_size : 1 ,
646+ max_queue_size : 16 ,
647+ scheduled_delay : Duration :: from_secs ( 3600 ) , // effectively disabled
648+ max_export_timeout : Duration :: from_secs ( 5 ) ,
649+ max_concurrent_exports : 2 , // what we want to verify
650+ } ;
651+
652+ // Spawn the processor.
653+ let processor = BatchSpanProcessor :: new ( exporter, config, runtime:: Tokio ) ;
654+
655+ // Finish three spans in rapid succession.
656+ processor. on_end ( new_test_export_span_data ( ) ) ;
657+ processor. on_end ( new_test_export_span_data ( ) ) ;
658+ processor. on_end ( new_test_export_span_data ( ) ) ;
659+
660+ // Wait until everything has been exported.
661+ processor. force_flush ( ) . expect ( "force flush failed" ) ;
662+ processor. shutdown ( ) . expect ( "shutdown failed" ) ;
663+
664+ // Expect at least one period with >1 export in flight.
665+ assert ! (
666+ concurrent_seen. load( Ordering :: SeqCst ) ,
667+ "exports never overlapped, processor is still serialising them"
668+ ) ;
669+ }
670+
671+ #[ tokio:: test( flavor = "multi_thread" , worker_threads = 2 ) ]
672+ async fn test_exports_serial_when_max_concurrent_exports_1 ( ) {
673+ let active = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
674+ let concurrent_seen = Arc :: new ( AtomicBool :: new ( false ) ) ;
675+
676+ let exporter = TrackingExporter {
677+ delay : Duration :: from_millis ( 50 ) ,
678+ active : active. clone ( ) ,
679+ concurrent_seen : concurrent_seen. clone ( ) ,
680+ } ;
681+
682+ let config = BatchConfig {
683+ max_export_batch_size : 1 ,
684+ max_queue_size : 16 ,
685+ scheduled_delay : Duration :: from_secs ( 3600 ) ,
686+ max_export_timeout : Duration :: from_secs ( 5 ) ,
687+ max_concurrent_exports : 1 , // what we want to verify
688+ } ;
689+
690+ let processor = BatchSpanProcessor :: new ( exporter, config, runtime:: Tokio ) ;
691+
692+ // Finish several spans quickly.
693+ processor. on_end ( new_test_export_span_data ( ) ) ;
694+ processor. on_end ( new_test_export_span_data ( ) ) ;
695+ processor. on_end ( new_test_export_span_data ( ) ) ;
696+
697+ processor. force_flush ( ) . expect ( "force flush failed" ) ;
698+ processor. shutdown ( ) . expect ( "shutdown failed" ) ;
699+
700+ // There must never have been more than one export in flight.
701+ assert ! (
702+ !concurrent_seen. load( Ordering :: SeqCst ) ,
703+ "exports overlapped even though max_concurrent_exports was 1"
704+ ) ;
579705 }
580706}
0 commit comments