1- use crate :: {
2- abstractions:: dbg_panic,
3- telemetry:: {
4- default_resource, metric_temporality_to_selector, prometheus_server:: PromServer ,
5- TelemetryInstance , TELEM_SERVICE_NAME ,
6- } ,
7- } ;
8- use opentelemetry:: {
9- self ,
10- metrics:: { Meter , MeterProvider as MeterProviderT , Unit } ,
11- KeyValue ,
12- } ;
13- use opentelemetry_otlp:: WithExportConfig ;
14- use opentelemetry_sdk:: {
15- metrics:: {
16- new_view,
17- reader:: { AggregationSelector , DefaultAggregationSelector } ,
18- Aggregation , Instrument , InstrumentKind , MeterProvider , MeterProviderBuilder ,
19- PeriodicReader , View ,
20- } ,
21- runtime, AttributeSet ,
22- } ;
23- use parking_lot:: RwLock ;
24- use std:: { collections:: HashMap , fmt:: Debug , net:: SocketAddr , sync:: Arc , time:: Duration } ;
25- use temporal_sdk_core_api:: telemetry:: {
26- metrics:: {
27- BufferAttributes , BufferInstrumentRef , CoreMeter , Counter , Gauge , Histogram ,
28- LazyBufferInstrument , MetricAttributes , MetricCallBufferer , MetricEvent , MetricKeyValue ,
29- MetricKind , MetricParameters , MetricUpdateVal , NewAttributes , NoOpCoreMeter ,
30- } ,
31- OtelCollectorOptions , PrometheusExporterOptions ,
1+ use crate :: { abstractions:: dbg_panic, telemetry:: TelemetryInstance } ;
2+
3+ use std:: { fmt:: Debug , sync:: Arc , time:: Duration } ;
4+ use temporal_sdk_core_api:: telemetry:: metrics:: {
5+ BufferAttributes , BufferInstrumentRef , CoreMeter , Counter , Gauge , Histogram ,
6+ LazyBufferInstrument , MetricAttributes , MetricCallBufferer , MetricEvent , MetricKeyValue ,
7+ MetricKind , MetricParameters , MetricUpdateVal , NewAttributes , NoOpCoreMeter ,
328} ;
33- use tokio:: task:: AbortHandle ;
34- use tonic:: metadata:: MetadataMap ;
359
3610/// Used to track context associated with metrics, and record/update them
3711///
@@ -415,20 +389,21 @@ pub(crate) fn eager(is_eager: bool) -> MetricKeyValue {
415389 MetricKeyValue :: new ( KEY_EAGER , is_eager)
416390}
417391
418- const WF_E2E_LATENCY_NAME : & str = "workflow_endtoend_latency" ;
419- const WF_TASK_SCHED_TO_START_LATENCY_NAME : & str = "workflow_task_schedule_to_start_latency" ;
420- const WF_TASK_REPLAY_LATENCY_NAME : & str = "workflow_task_replay_latency" ;
421- const WF_TASK_EXECUTION_LATENCY_NAME : & str = "workflow_task_execution_latency" ;
422- const ACT_SCHED_TO_START_LATENCY_NAME : & str = "activity_schedule_to_start_latency" ;
423- const ACT_EXEC_LATENCY_NAME : & str = "activity_execution_latency" ;
424- const NUM_POLLERS_NAME : & str = "num_pollers" ;
425- const TASK_SLOTS_AVAILABLE_NAME : & str = "worker_task_slots_available" ;
426- const STICKY_CACHE_SIZE_NAME : & str = "sticky_cache_size" ;
392+ pub ( super ) const WF_E2E_LATENCY_NAME : & str = "workflow_endtoend_latency" ;
393+ pub ( super ) const WF_TASK_SCHED_TO_START_LATENCY_NAME : & str =
394+ "workflow_task_schedule_to_start_latency" ;
395+ pub ( super ) const WF_TASK_REPLAY_LATENCY_NAME : & str = "workflow_task_replay_latency" ;
396+ pub ( super ) const WF_TASK_EXECUTION_LATENCY_NAME : & str = "workflow_task_execution_latency" ;
397+ pub ( super ) const ACT_SCHED_TO_START_LATENCY_NAME : & str = "activity_schedule_to_start_latency" ;
398+ pub ( super ) const ACT_EXEC_LATENCY_NAME : & str = "activity_execution_latency" ;
399+ pub ( super ) const NUM_POLLERS_NAME : & str = "num_pollers" ;
400+ pub ( super ) const TASK_SLOTS_AVAILABLE_NAME : & str = "worker_task_slots_available" ;
401+ pub ( super ) const STICKY_CACHE_SIZE_NAME : & str = "sticky_cache_size" ;
427402
428403/// Artisanal, handcrafted latency buckets for workflow e2e latency which should expose a useful
429404/// set of buckets for < 1 day runtime workflows. Beyond that, this metric probably isn't very
430405/// helpful
431- static WF_LATENCY_MS_BUCKETS : & [ f64 ] = & [
406+ pub ( super ) static WF_LATENCY_MS_BUCKETS : & [ f64 ] = & [
432407 100. ,
433408 500. ,
434409 1000. ,
@@ -449,14 +424,14 @@ static WF_LATENCY_MS_BUCKETS: &[f64] = &[
449424
450425/// Task latencies are expected to be fast, no longer than a second which was generally the deadlock
451426/// timeout in old SDKs. Here it's a bit different since a WFT may represent multiple activations.
452- static WF_TASK_MS_BUCKETS : & [ f64 ] = & [ 1. , 10. , 20. , 50. , 100. , 200. , 500. , 1000. ] ;
427+ pub ( super ) static WF_TASK_MS_BUCKETS : & [ f64 ] = & [ 1. , 10. , 20. , 50. , 100. , 200. , 500. , 1000. ] ;
453428
454429/// Activity are generally expected to take at least a little time, and sometimes quite a while,
455430/// since they're doing side-effecty things, etc.
456- static ACT_EXE_MS_BUCKETS : & [ f64 ] = & [ 50. , 100. , 500. , 1000. , 5000. , 10_000. , 60_000. ] ;
431+ pub ( super ) static ACT_EXE_MS_BUCKETS : & [ f64 ] = & [ 50. , 100. , 500. , 1000. , 5000. , 10_000. , 60_000. ] ;
457432
458433/// Schedule-to-start latency buckets for both WFT and AT
459- static TASK_SCHED_TO_START_MS_BUCKETS : & [ f64 ] =
434+ pub ( super ) static TASK_SCHED_TO_START_MS_BUCKETS : & [ f64 ] =
460435 & [ 100. , 500. , 1000. , 5000. , 10_000. , 100_000. , 1_000_000. ] ;
461436
462437/// Default buckets. Should never really be used as they will be meaningless for many things, but
@@ -479,153 +454,6 @@ pub fn default_buckets_for(histo_name: &str) -> &'static [f64] {
479454 }
480455}
481456
482- /// Chooses appropriate aggregators for our metrics
483- #[ derive( Debug , Clone , Default ) ]
484- pub struct SDKAggSelector {
485- default : DefaultAggregationSelector ,
486- }
487- impl AggregationSelector for SDKAggSelector {
488- fn aggregation ( & self , kind : InstrumentKind ) -> Aggregation {
489- match kind {
490- InstrumentKind :: Histogram => Aggregation :: ExplicitBucketHistogram {
491- boundaries : DEFAULT_MS_BUCKETS . to_vec ( ) ,
492- record_min_max : true ,
493- } ,
494- _ => self . default . aggregation ( kind) ,
495- }
496- }
497- }
498-
499- fn histo_view (
500- metric_name : & ' static str ,
501- buckets : & [ f64 ] ,
502- ) -> opentelemetry:: metrics:: Result < Box < dyn View > > {
503- new_view (
504- Instrument :: new ( ) . name ( format ! ( "*{metric_name}" ) ) ,
505- opentelemetry_sdk:: metrics:: Stream :: new ( ) . aggregation (
506- Aggregation :: ExplicitBucketHistogram {
507- boundaries : buckets. to_vec ( ) ,
508- record_min_max : true ,
509- } ,
510- ) ,
511- )
512- }
513-
514- pub ( super ) fn augment_meter_provider_with_defaults (
515- mpb : MeterProviderBuilder ,
516- global_tags : & HashMap < String , String > ,
517- ) -> opentelemetry:: metrics:: Result < MeterProviderBuilder > {
518- // Some histograms are actually gauges, but we have to use histograms otherwise they forget
519- // their value between collections since we don't use callbacks.
520- Ok ( mpb
521- . with_view ( histo_view ( WF_E2E_LATENCY_NAME , WF_LATENCY_MS_BUCKETS ) ?)
522- . with_view ( histo_view (
523- WF_TASK_EXECUTION_LATENCY_NAME ,
524- WF_TASK_MS_BUCKETS ,
525- ) ?)
526- . with_view ( histo_view ( WF_TASK_REPLAY_LATENCY_NAME , WF_TASK_MS_BUCKETS ) ?)
527- . with_view ( histo_view (
528- WF_TASK_SCHED_TO_START_LATENCY_NAME ,
529- TASK_SCHED_TO_START_MS_BUCKETS ,
530- ) ?)
531- . with_view ( histo_view (
532- ACT_SCHED_TO_START_LATENCY_NAME ,
533- TASK_SCHED_TO_START_MS_BUCKETS ,
534- ) ?)
535- . with_view ( histo_view ( ACT_EXEC_LATENCY_NAME , ACT_EXE_MS_BUCKETS ) ?)
536- . with_resource ( default_resource ( global_tags) ) )
537- }
538-
539- /// OTel has no built-in synchronous Gauge. Histograms used to be able to serve that purpose, but
540- /// they broke that. Lovely. So, we need to implement one by hand.
541- pub ( crate ) struct MemoryGaugeU64 {
542- labels_to_values : Arc < RwLock < HashMap < AttributeSet , u64 > > > ,
543- }
544-
545- impl MemoryGaugeU64 {
546- fn new ( params : MetricParameters , meter : & Meter ) -> Self {
547- let gauge = meter
548- . u64_observable_gauge ( params. name )
549- . with_unit ( Unit :: new ( params. unit ) )
550- . with_description ( params. description )
551- . init ( ) ;
552- let map = Arc :: new ( RwLock :: new ( HashMap :: < AttributeSet , u64 > :: new ( ) ) ) ;
553- let map_c = map. clone ( ) ;
554- meter
555- . register_callback ( & [ gauge. as_any ( ) ] , move |o| {
556- // This whole thing is... extra stupid.
557- // See https://github.com/open-telemetry/opentelemetry-rust/issues/1181
558- // The performance is likely bad here, but, given this is only called when metrics
559- // are exported it should be livable for now.
560- let map_rlock = map_c. read ( ) ;
561- for ( kvs, val) in map_rlock. iter ( ) {
562- let kvs: Vec < _ > = kvs
563- . iter ( )
564- . map ( |( k, v) | KeyValue :: new ( k. clone ( ) , v. clone ( ) ) )
565- . collect ( ) ;
566- o. observe_u64 ( & gauge, * val, kvs. as_slice ( ) )
567- }
568- } )
569- . expect ( "instrument must exist we just created it" ) ;
570- MemoryGaugeU64 {
571- labels_to_values : map,
572- }
573- }
574- fn record ( & self , val : u64 , kvs : & [ KeyValue ] ) {
575- self . labels_to_values
576- . write ( )
577- . insert ( AttributeSet :: from ( kvs) , val) ;
578- }
579- }
580-
581- /// Create an OTel meter that can be used as a [CoreMeter] to export metrics over OTLP.
582- pub fn build_otlp_metric_exporter (
583- opts : OtelCollectorOptions ,
584- ) -> Result < CoreOtelMeter , anyhow:: Error > {
585- let exporter = opentelemetry_otlp:: TonicExporterBuilder :: default ( )
586- . with_endpoint ( opts. url . to_string ( ) )
587- . with_metadata ( MetadataMap :: from_headers ( ( & opts. headers ) . try_into ( ) ?) )
588- . build_metrics_exporter (
589- Box :: < SDKAggSelector > :: default ( ) ,
590- Box :: new ( metric_temporality_to_selector ( opts. metric_temporality ) ) ,
591- ) ?;
592- let reader = PeriodicReader :: builder ( exporter, runtime:: Tokio )
593- . with_interval ( opts. metric_periodicity )
594- . build ( ) ;
595- let mp = augment_meter_provider_with_defaults (
596- MeterProvider :: builder ( ) . with_reader ( reader) ,
597- & opts. global_tags ,
598- ) ?
599- . build ( ) ;
600- Ok :: < _ , anyhow:: Error > ( CoreOtelMeter ( mp. meter ( TELEM_SERVICE_NAME ) ) )
601- }
602-
603- pub struct StartedPromServer {
604- pub meter : Arc < CoreOtelMeter > ,
605- pub bound_addr : SocketAddr ,
606- pub abort_handle : AbortHandle ,
607- }
608-
609- /// Builds and runs a prometheus endpoint which can be scraped by prom instances for metrics export.
610- /// Returns the meter that can be used as a [CoreMeter].
611- pub fn start_prometheus_metric_exporter (
612- opts : PrometheusExporterOptions ,
613- ) -> Result < StartedPromServer , anyhow:: Error > {
614- let ( srv, exporter) = PromServer :: new ( & opts, SDKAggSelector :: default ( ) ) ?;
615- let meter_provider = augment_meter_provider_with_defaults (
616- MeterProvider :: builder ( ) . with_reader ( exporter) ,
617- & opts. global_tags ,
618- ) ?
619- . build ( ) ;
620- let bound_addr = srv. bound_addr ( ) ;
621- let handle = tokio:: spawn ( async move { srv. run ( ) . await } ) ;
622- Ok ( StartedPromServer {
623- meter : Arc :: new ( CoreOtelMeter ( meter_provider. meter ( TELEM_SERVICE_NAME ) ) ) ,
624- bound_addr,
625- abort_handle : handle. abort_handle ( ) ,
626- } )
627- }
628-
629457/// Buffers [MetricEvent]s for periodic consumption by lang
630458#[ derive( Debug ) ]
631459pub struct MetricsCallBuffer < I >
@@ -778,64 +606,6 @@ where
778606 }
779607}
780608
781- #[ derive( Debug ) ]
782- pub struct CoreOtelMeter ( Meter ) ;
783- impl CoreMeter for CoreOtelMeter {
784- fn new_attributes ( & self , attribs : NewAttributes ) -> MetricAttributes {
785- MetricAttributes :: OTel {
786- kvs : Arc :: new ( attribs. attributes . into_iter ( ) . map ( KeyValue :: from) . collect ( ) ) ,
787- }
788- }
789-
790- fn extend_attributes (
791- & self ,
792- existing : MetricAttributes ,
793- attribs : NewAttributes ,
794- ) -> MetricAttributes {
795- if let MetricAttributes :: OTel { mut kvs } = existing {
796- Arc :: make_mut ( & mut kvs) . extend ( attribs. attributes . into_iter ( ) . map ( Into :: into) ) ;
797- MetricAttributes :: OTel { kvs }
798- } else {
799- dbg_panic ! ( "Must use OTel attributes with an OTel metric implementation" ) ;
800- existing
801- }
802- }
803-
804- fn counter ( & self , params : MetricParameters ) -> Arc < dyn Counter > {
805- Arc :: new (
806- self . 0
807- . u64_counter ( params. name )
808- . with_unit ( Unit :: new ( params. unit ) )
809- . with_description ( params. description )
810- . init ( ) ,
811- )
812- }
813-
814- fn histogram ( & self , params : MetricParameters ) -> Arc < dyn Histogram > {
815- Arc :: new (
816- self . 0
817- . u64_histogram ( params. name )
818- . with_unit ( Unit :: new ( params. unit ) )
819- . with_description ( params. description )
820- . init ( ) ,
821- )
822- }
823-
824- fn gauge ( & self , params : MetricParameters ) -> Arc < dyn Gauge > {
825- Arc :: new ( MemoryGaugeU64 :: new ( params, & self . 0 ) )
826- }
827- }
828-
829- impl Gauge for MemoryGaugeU64 {
830- fn record ( & self , value : u64 , attributes : & MetricAttributes ) {
831- if let MetricAttributes :: OTel { kvs } = attributes {
832- self . record ( value, kvs) ;
833- } else {
834- dbg_panic ! ( "Must use OTel attributes with an OTel metric implementation" ) ;
835- }
836- }
837- }
838-
839609#[ derive( Debug , derive_more:: Constructor ) ]
840610pub ( crate ) struct PrefixedMetricsMeter < CM > {
841611 prefix : String ,
0 commit comments