2020import io .dapr .spring .messaging .observation .DaprMessagingSenderContext ;
2121import io .micrometer .observation .Observation ;
2222import io .micrometer .observation .ObservationRegistry ;
23- import io .opentelemetry .api .OpenTelemetry ;
24- import io .opentelemetry .context .propagation .TextMapSetter ;
2523import org .slf4j .Logger ;
2624import org .slf4j .LoggerFactory ;
2725import org .springframework .beans .factory .BeanNameAware ;
3331
3432import javax .annotation .Nullable ;
3533
36- import java .util .HashMap ;
3734import java .util .Map ;
3835
3936/**
@@ -59,9 +56,6 @@ public class DaprMessagingTemplate<T> implements DaprMessagingOperations<T>, App
5956 @ Nullable
6057 private String beanName ;
6158
62- @ Nullable
63- private OpenTelemetry openTelemetry ;
64-
6559 @ Nullable
6660 private ObservationRegistry observationRegistry ;
6761
@@ -109,8 +103,6 @@ public void afterSingletonsInstantiated() {
109103
110104 observationRegistry = applicationContext .getBeanProvider (ObservationRegistry .class )
111105 .getIfUnique (() -> observationRegistry );
112- this .openTelemetry = this .applicationContext .getBeanProvider (OpenTelemetry .class )
113- .getIfUnique (() -> this .openTelemetry );
114106 observationConvention = applicationContext .getBeanProvider (DaprMessagingObservationConvention .class )
115107 .getIfUnique (() -> observationConvention );
116108 }
@@ -140,10 +132,7 @@ private Mono<Void> doSendAsync(String topic, T message) {
140132 }
141133
142134 private boolean canUseObservation () {
143- return observationEnabled
144- && observationRegistry != null
145- && openTelemetry != null
146- && beanName != null ;
135+ return observationEnabled && observationRegistry != null && beanName != null ;
147136 }
148137
149138 private Mono <Void > publishEvent (String pubsubName , String topic , T message ) {
@@ -154,31 +143,25 @@ private Mono<Void> publishEventWithObservation(String pubsubName, String topic,
154143 DaprMessagingSenderContext senderContext = DaprMessagingSenderContext .newContext (topic , this .beanName );
155144 Observation observation = createObservation (senderContext );
156145
157- return observation .observe (() ->
158- publishEvent (pubsubName , topic , message )
159- .contextWrite (getReactorContext ())
160- .doOnError (err -> {
161- LOGGER .error ("Failed to send msg to '{}' topic" , topic , err );
162-
163- observation .error (err );
164- observation .stop ();
165- })
166- .doOnSuccess (ignore -> {
167- LOGGER .trace ("Sent msg to '{}' topic" , topic );
146+ observation .start ();
168147
169- observation . stop ();
170- } )
171- );
172- }
148+ return publishEvent ( pubsubName , topic , message )
149+ . contextWrite ( getReactorContext ( senderContext ) )
150+ . doOnError ( err -> {
151+ LOGGER . error ( "Failed to send msg to '{}' topic" , topic , err );
173152
174- private Context getReactorContext () {
175- Map <String , String > map = new HashMap <>();
176- TextMapSetter <Map <String , String >> setter = (carrier , key , value ) -> map .put (key , value );
177- io .opentelemetry .context .Context otelContext = io .opentelemetry .context .Context .current ();
153+ observation .error (err );
154+ observation .stop ();
155+ })
156+ .doOnSuccess (ignore -> {
157+ LOGGER .trace ("Sent msg to '{}' topic" , topic );
178158
179- openTelemetry .getPropagators ().getTextMapPropagator ().inject (otelContext , map , setter );
159+ observation .stop ();
160+ });
161+ }
180162
181- return Context .of (map );
163+ private Context getReactorContext (DaprMessagingSenderContext senderContext ) {
164+ return Context .of (senderContext .properties ());
182165 }
183166
184167 private Observation createObservation (DaprMessagingSenderContext senderContext ) {
0 commit comments