@@ -6,19 +6,21 @@ use crate::trace::Span;
66use crate :: trace:: SpanProcessor ;
77use crate :: trace:: { SpanData , SpanExporter } ;
88use futures_channel:: oneshot;
9- use futures_util:: pin_mut;
109use futures_util:: {
1110 future:: { self , BoxFuture , Either } ,
12- select,
11+ pin_mut , select,
1312 stream:: { self , FusedStream , FuturesUnordered } ,
1413 StreamExt as _,
1514} ;
1615use opentelemetry:: Context ;
1716use opentelemetry:: { otel_debug, otel_error, otel_warn} ;
1817use std:: fmt;
19- use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
20- use std:: sync:: Arc ;
18+ use std:: sync:: {
19+ atomic:: { AtomicUsize , Ordering } ,
20+ Arc ,
21+ } ;
2122use std:: time:: Duration ;
23+ use tokio:: sync:: RwLock ;
2224
2325/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports
2426/// them at a preconfigured interval.
@@ -188,13 +190,19 @@ struct BatchSpanProcessorInternal<E, R> {
188190 spans : Vec < SpanData > ,
189191 export_tasks : FuturesUnordered < BoxFuture < ' static , OTelSdkResult > > ,
190192 runtime : R ,
191- exporter : E ,
193+ exporter : Arc < RwLock < E > > ,
192194 config : BatchConfig ,
193195}
194196
195- impl < E : SpanExporter , R : RuntimeChannel > BatchSpanProcessorInternal < E , R > {
197+ impl < E : SpanExporter + Send + Sync + ' static , R : RuntimeChannel > BatchSpanProcessorInternal < E , R > {
196198 async fn flush ( & mut self , res_channel : Option < oneshot:: Sender < OTelSdkResult > > ) {
197- let export_result = self . export ( ) . await ;
199+ let export_result = Self :: export (
200+ self . spans . split_off ( 0 ) ,
201+ self . exporter . clone ( ) ,
202+ self . runtime . clone ( ) ,
203+ self . config . max_export_timeout ,
204+ )
205+ . await ;
198206 let task = Box :: pin ( async move {
199207 if let Some ( channel) = res_channel {
200208 // If a response channel is provided, attempt to send the export result through it.
@@ -243,9 +251,15 @@ impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
243251 self . export_tasks . next ( ) . await ;
244252 }
245253
246- let export_result = self . export ( ) . await ;
254+ let batch = self . spans . split_off ( 0 ) ;
255+ let exporter = self . exporter . clone ( ) ;
256+ let runtime = self . runtime . clone ( ) ;
257+ let max_export_timeout = self . config . max_export_timeout ;
258+
247259 let task = async move {
248- if let Err ( err) = export_result {
260+ if let Err ( err) =
261+ Self :: export ( batch, exporter, runtime, max_export_timeout) . await
262+ {
249263 otel_error ! (
250264 name: "BatchSpanProcessor.Export.Error" ,
251265 reason = format!( "{}" , err)
@@ -254,6 +268,7 @@ impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
254268
255269 Ok ( ( ) )
256270 } ;
271+
257272 // Special case when not using concurrent exports
258273 if self . config . max_concurrent_exports == 1 {
259274 let _ = task. await ;
@@ -288,34 +303,39 @@ impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
288303 // Stream has terminated or processor is shutdown, return to finish execution.
289304 BatchMessage :: Shutdown ( ch) => {
290305 self . flush ( Some ( ch) ) . await ;
291- let _ = self . exporter . shutdown ( ) ;
306+ let _ = self . exporter . write ( ) . await . shutdown ( ) ;
292307 return false ;
293308 }
294309 // propagate the resource
295310 BatchMessage :: SetResource ( resource) => {
296- self . exporter . set_resource ( & resource) ;
311+ self . exporter . write ( ) . await . set_resource ( & resource) ;
297312 }
298313 }
299314 true
300315 }
301316
302- async fn export ( & mut self ) -> OTelSdkResult {
317+ async fn export (
318+ batch : Vec < SpanData > ,
319+ exporter : Arc < RwLock < E > > ,
320+ runtime : R ,
321+ max_export_timeout : Duration ,
322+ ) -> OTelSdkResult {
303323 // Batch size check for flush / shutdown. Those methods may be called
304324 // when there's no work to do.
305- if self . spans . is_empty ( ) {
325+ if batch . is_empty ( ) {
306326 return Ok ( ( ) ) ;
307327 }
308328
309- let export = self . exporter . export ( self . spans . split_off ( 0 ) ) ;
310- let timeout = self . runtime . delay ( self . config . max_export_timeout ) ;
311- let time_out = self . config . max_export_timeout ;
329+ let exporter_guard = exporter. read ( ) . await ;
330+ let export = exporter_guard . export ( batch ) ;
331+ let timeout = runtime . delay ( max_export_timeout) ;
312332
313333 pin_mut ! ( export) ;
314334 pin_mut ! ( timeout) ;
315335
316336 match future:: select ( export, timeout) . await {
317337 Either :: Left ( ( export_res, _) ) => export_res,
318- Either :: Right ( ( _, _) ) => Err ( OTelSdkError :: Timeout ( time_out ) ) ,
338+ Either :: Right ( ( _, _) ) => Err ( OTelSdkError :: Timeout ( max_export_timeout ) ) ,
319339 }
320340 }
321341
@@ -368,7 +388,7 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
368388 export_tasks : FuturesUnordered :: new ( ) ,
369389 runtime : timeout_runtime,
370390 config,
371- exporter,
391+ exporter : Arc :: new ( RwLock :: new ( exporter ) ) ,
372392 } ;
373393
374394 processor. run ( messages) . await
@@ -435,6 +455,8 @@ mod tests {
435455 use crate :: trace:: { SpanData , SpanExporter } ;
436456 use futures_util:: Future ;
437457 use std:: fmt:: Debug ;
458+ use std:: sync:: atomic:: { AtomicBool , AtomicUsize , Ordering } ;
459+ use std:: sync:: Arc ;
438460 use std:: time:: Duration ;
439461
440462 struct BlockingExporter < D > {
@@ -463,6 +485,39 @@ mod tests {
463485 }
464486 }
465487
488+ /// Exporter that records whether two exports overlap in time.
489+ struct TrackingExporter {
490+ /// Artificial delay to keep each export alive for a while.
491+ delay : Duration ,
492+ /// Current number of in-flight exports.
493+ active : Arc < AtomicUsize > ,
494+ /// Set to true the first time we see overlap.
495+ concurrent_seen : Arc < AtomicBool > ,
496+ }
497+
498+ impl Debug for TrackingExporter {
499+ fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
500+ f. write_str ( "tracking exporter" )
501+ }
502+ }
503+
504+ impl SpanExporter for TrackingExporter {
505+ async fn export ( & self , _batch : Vec < SpanData > ) -> crate :: error:: OTelSdkResult {
506+ // Increment in-flight counter and note any overlap.
507+ let inflight = self . active . fetch_add ( 1 , Ordering :: SeqCst ) + 1 ;
508+ if inflight > 1 {
509+ self . concurrent_seen . store ( true , Ordering :: SeqCst ) ;
510+ }
511+
512+ // Keep the export "busy" for a bit.
513+ tokio:: time:: sleep ( self . delay ) . await ;
514+
515+ // Decrement counter.
516+ self . active . fetch_sub ( 1 , Ordering :: SeqCst ) ;
517+ Ok ( ( ) )
518+ }
519+ }
520+
466521 #[ test]
467522 fn test_build_batch_span_processor_builder ( ) {
468523 let mut env_vars = vec ! [
@@ -532,8 +587,8 @@ mod tests {
532587 ) ;
533588 }
534589
535- // If the time_out is true, then the result suppose to ended with timeout.
536- // otherwise the exporter should be able to export within time out duration.
590+ // If ` time_out` is ` true` , then the export should fail with a timeout.
591+ // Else, the exporter should be able to export within the timeout duration.
537592 async fn timeout_test_tokio ( time_out : bool ) {
538593 let config = BatchConfig {
539594 max_export_timeout : Duration :: from_millis ( if time_out { 5 } else { 60 } ) ,
@@ -554,27 +609,96 @@ mod tests {
554609 assert ! ( flush_res. is_ok( ) ) ;
555610 }
556611 let shutdown_res = processor. shutdown ( ) ;
612+ println ! ( "Shutdown result: {:?}" , shutdown_res) ;
557613 assert ! ( shutdown_res. is_ok( ) ) ;
558614 }
559615
560- #[ test]
561- fn test_timeout_tokio_timeout ( ) {
616+ #[ tokio :: test( flavor = "multi_thread" ) ]
617+ async fn test_timeout_tokio_timeout ( ) {
562618 // If time_out is true, then we ask exporter to block for 60s and set timeout to 5s.
563619 // If time_out is false, then we ask the exporter to block for 5s and set timeout to 60s.
564620 // Either way, the test should be finished within 5s.
565- let runtime = tokio:: runtime:: Builder :: new_multi_thread ( )
566- . enable_all ( )
567- . build ( )
568- . unwrap ( ) ;
569- runtime. block_on ( timeout_test_tokio ( true ) ) ;
621+ timeout_test_tokio ( true ) . await ;
570622 }
571623
572- #[ test]
573- fn test_timeout_tokio_not_timeout ( ) {
574- let runtime = tokio:: runtime:: Builder :: new_multi_thread ( )
575- . enable_all ( )
576- . build ( )
577- . unwrap ( ) ;
578- runtime. block_on ( timeout_test_tokio ( false ) ) ;
624+ #[ tokio:: test( flavor = "multi_thread" ) ]
625+ async fn test_timeout_tokio_not_timeout ( ) {
626+ timeout_test_tokio ( false ) . await ;
627+ }
628+
629+ #[ tokio:: test( flavor = "multi_thread" ) ]
630+ async fn test_concurrent_exports_expected ( ) {
631+ // Shared state for the exporter.
632+ let active = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
633+ let concurrent_seen = Arc :: new ( AtomicBool :: new ( false ) ) ;
634+
635+ let exporter = TrackingExporter {
636+ delay : Duration :: from_millis ( 50 ) ,
637+ active : active. clone ( ) ,
638+ concurrent_seen : concurrent_seen. clone ( ) ,
639+ } ;
640+
641+ // Intentionally tiny batch-size so every span forces an export.
642+ let config = BatchConfig {
643+ max_export_batch_size : 1 ,
644+ max_queue_size : 16 ,
645+ scheduled_delay : Duration :: from_secs ( 3600 ) , // effectively disabled
646+ max_export_timeout : Duration :: from_secs ( 5 ) ,
647+ max_concurrent_exports : 2 , // what we want to verify
648+ } ;
649+
650+ // Spawn the processor.
651+ let processor = BatchSpanProcessor :: new ( exporter, config, runtime:: Tokio ) ;
652+
653+ // Finish three spans in rapid succession.
654+ processor. on_end ( new_test_export_span_data ( ) ) ;
655+ processor. on_end ( new_test_export_span_data ( ) ) ;
656+ processor. on_end ( new_test_export_span_data ( ) ) ;
657+
658+ // Wait until everything has been exported.
659+ processor. force_flush ( ) . expect ( "force flush failed" ) ;
660+ processor. shutdown ( ) . expect ( "shutdown failed" ) ;
661+
662+ // Expect at least one period with >1 export in flight.
663+ assert ! (
664+ concurrent_seen. load( Ordering :: SeqCst ) ,
665+ "exports never overlapped, processor is still serialising them"
666+ ) ;
667+ }
668+
669+ #[ tokio:: test( flavor = "multi_thread" , worker_threads = 2 ) ]
670+ async fn test_exports_serial_when_max_concurrent_exports_1 ( ) {
671+ let active = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
672+ let concurrent_seen = Arc :: new ( AtomicBool :: new ( false ) ) ;
673+
674+ let exporter = TrackingExporter {
675+ delay : Duration :: from_millis ( 50 ) ,
676+ active : active. clone ( ) ,
677+ concurrent_seen : concurrent_seen. clone ( ) ,
678+ } ;
679+
680+ let config = BatchConfig {
681+ max_export_batch_size : 1 ,
682+ max_queue_size : 16 ,
683+ scheduled_delay : Duration :: from_secs ( 3600 ) ,
684+ max_export_timeout : Duration :: from_secs ( 5 ) ,
685+ max_concurrent_exports : 1 , // what we want to verify
686+ } ;
687+
688+ let processor = BatchSpanProcessor :: new ( exporter, config, runtime:: Tokio ) ;
689+
690+ // Finish several spans quickly.
691+ processor. on_end ( new_test_export_span_data ( ) ) ;
692+ processor. on_end ( new_test_export_span_data ( ) ) ;
693+ processor. on_end ( new_test_export_span_data ( ) ) ;
694+
695+ processor. force_flush ( ) . expect ( "force flush failed" ) ;
696+ processor. shutdown ( ) . expect ( "shutdown failed" ) ;
697+
698+ // There must never have been more than one export in flight.
699+ assert ! (
700+ !concurrent_seen. load( Ordering :: SeqCst ) ,
701+ "exports overlapped even though max_concurrent_exports was 1"
702+ ) ;
579703 }
580704}
0 commit comments