1515
1616import io .dapr .client .DaprClient ;
1717import io .dapr .client .domain .Metadata ;
18+ import io .dapr .spring .messaging .observation .DaprMessagingObservationConvention ;
19+ import io .dapr .spring .messaging .observation .DaprMessagingObservationDocumentation ;
20+ import io .dapr .spring .messaging .observation .DaprMessagingSenderContext ;
21+ import io .micrometer .observation .Observation ;
22+ import io .micrometer .observation .ObservationRegistry ;
23+ import io .opentelemetry .api .OpenTelemetry ;
24+ import io .opentelemetry .context .propagation .TextMapSetter ;
25+ import org .slf4j .Logger ;
26+ import org .slf4j .LoggerFactory ;
27+ import org .springframework .beans .factory .BeanNameAware ;
28+ import org .springframework .beans .factory .SmartInitializingSingleton ;
29+ import org .springframework .context .ApplicationContext ;
30+ import org .springframework .context .ApplicationContextAware ;
1831import reactor .core .publisher .Mono ;
32+ import reactor .util .context .Context ;
1933
34+ import javax .annotation .Nullable ;
35+
36+ import java .util .HashMap ;
2037import java .util .Map ;
2138
22- public class DaprMessagingTemplate <T > implements DaprMessagingOperations <T > {
39+ /**
40+ * Create a new DaprMessagingTemplate.
41+ * @param <T> templated message type
42+ */
43+ public class DaprMessagingTemplate <T > implements DaprMessagingOperations <T >, ApplicationContextAware , BeanNameAware ,
44+ SmartInitializingSingleton {
2345
46+ private static final Logger LOGGER = LoggerFactory .getLogger (DaprMessagingTemplate .class );
2447 private static final String MESSAGE_TTL_IN_SECONDS = "10" ;
48+ private static final DaprMessagingObservationConvention DEFAULT_OBSERVATION_CONVENTION =
49+ DaprMessagingObservationConvention .getDefault ();
2550
2651 private final DaprClient daprClient ;
2752 private final String pubsubName ;
53+ private final Map <String , String > metadata ;
54+ private final boolean observationEnabled ;
55+
56+ @ Nullable
57+ private ApplicationContext applicationContext ;
58+
59+ @ Nullable
60+ private String beanName ;
61+
62+ @ Nullable
63+ private OpenTelemetry openTelemetry ;
64+
65+ @ Nullable
66+ private ObservationRegistry observationRegistry ;
2867
29- public DaprMessagingTemplate (DaprClient daprClient , String pubsubName ) {
68+ @ Nullable
69+ private DaprMessagingObservationConvention observationConvention ;
70+
71+ /**
72+ * Constructs a new DaprMessagingTemplate.
73+ * @param daprClient Dapr client
74+ * @param pubsubName pubsub name
75+ * @param observationEnabled whether to enable observations
76+ */
77+ public DaprMessagingTemplate (DaprClient daprClient , String pubsubName , boolean observationEnabled ) {
3078 this .daprClient = daprClient ;
3179 this .pubsubName = pubsubName ;
80+ this .metadata = Map .of (Metadata .TTL_IN_SECONDS , MESSAGE_TTL_IN_SECONDS );
81+ this .observationEnabled = observationEnabled ;
82+ }
83+
84+ @ Override
85+ public void setApplicationContext (ApplicationContext applicationContext ) {
86+ this .applicationContext = applicationContext ;
87+ }
88+
89+ @ Override
90+ public void setBeanName (String beanName ) {
91+ this .beanName = beanName ;
92+ }
93+
94+ /**
95+ * If observations are enabled, attempt to obtain the Observation registry and
96+ * convention.
97+ */
98+ @ Override
99+ public void afterSingletonsInstantiated () {
100+ if (!observationEnabled ) {
101+ LOGGER .debug ("Observations are not enabled - not recording" );
102+ return ;
103+ }
104+
105+ if (applicationContext == null ) {
106+ LOGGER .warn ("Observations enabled but application context null - not recording" );
107+ return ;
108+ }
109+
110+ observationRegistry = applicationContext .getBeanProvider (ObservationRegistry .class )
111+ .getIfUnique (() -> observationRegistry );
112+ this .openTelemetry = this .applicationContext .getBeanProvider (OpenTelemetry .class )
113+ .getIfUnique (() -> this .openTelemetry );
114+ observationConvention = applicationContext .getBeanProvider (DaprMessagingObservationConvention .class )
115+ .getIfUnique (() -> observationConvention );
32116 }
33117
34118 @ Override
@@ -38,29 +122,83 @@ public void send(String topic, T message) {
38122
39123 @ Override
40124 public SendMessageBuilder <T > newMessage (T message ) {
41- return new SendMessageBuilderImpl <>(this , message );
125+ return new DefaultSendMessageBuilder <>(this , message );
42126 }
43127
44128 private void doSend (String topic , T message ) {
45129 doSendAsync (topic , message ).block ();
46130 }
47131
48132 private Mono <Void > doSendAsync (String topic , T message ) {
49- return daprClient .publishEvent (pubsubName ,
50- topic ,
51- message ,
52- Map .of (Metadata .TTL_IN_SECONDS , MESSAGE_TTL_IN_SECONDS ));
133+ LOGGER .trace ("Sending message to '{}' topic" , topic );
134+
135+ if (canUseObservation ()) {
136+ return publishEventWithObservation (pubsubName , topic , message );
137+ }
138+
139+ return publishEvent (pubsubName , topic , message );
140+ }
141+
142+ private boolean canUseObservation () {
143+ return observationEnabled
144+ && observationRegistry != null
145+ && openTelemetry != null
146+ && beanName != null ;
147+ }
148+
149+ private Mono <Void > publishEvent (String pubsubName , String topic , T message ) {
150+ return daprClient .publishEvent (pubsubName , topic , message , metadata );
151+ }
152+
153+ private Mono <Void > publishEventWithObservation (String pubsubName , String topic , T message ) {
154+ DaprMessagingSenderContext senderContext = DaprMessagingSenderContext .newContext (topic , this .beanName );
155+ Observation observation = createObservation (senderContext );
156+
157+ return observation .observe (() ->
158+ publishEvent (pubsubName , topic , message )
159+ .contextWrite (getReactorContext ())
160+ .doOnError (err -> {
161+ LOGGER .error ("Failed to send msg to '{}' topic" , topic , err );
162+
163+ observation .error (err );
164+ observation .stop ();
165+ })
166+ .doOnSuccess (ignore -> {
167+ LOGGER .trace ("Sent msg to '{}' topic" , topic );
168+
169+ observation .stop ();
170+ })
171+ );
172+ }
173+
174+ private Context getReactorContext () {
175+ Map <String , String > map = new HashMap <>();
176+ TextMapSetter <Map <String , String >> setter = (carrier , key , value ) -> map .put (key , value );
177+ io .opentelemetry .context .Context otelContext = io .opentelemetry .context .Context .current ();
178+
179+ openTelemetry .getPropagators ().getTextMapPropagator ().inject (otelContext , map , setter );
180+
181+ return Context .of (map );
182+ }
183+
184+ private Observation createObservation (DaprMessagingSenderContext senderContext ) {
185+ return DaprMessagingObservationDocumentation .TEMPLATE_OBSERVATION .observation (
186+ observationConvention ,
187+ DEFAULT_OBSERVATION_CONVENTION ,
188+ () -> senderContext ,
189+ observationRegistry
190+ );
53191 }
54192
55- private static class SendMessageBuilderImpl <T > implements SendMessageBuilder <T > {
193+ private static class DefaultSendMessageBuilder <T > implements SendMessageBuilder <T > {
56194
57195 private final DaprMessagingTemplate <T > template ;
58196
59197 private final T message ;
60198
61199 private String topic ;
62200
63- SendMessageBuilderImpl (DaprMessagingTemplate <T > template , T message ) {
201+ DefaultSendMessageBuilder (DaprMessagingTemplate <T > template , T message ) {
64202 this .template = template ;
65203 this .message = message ;
66204 }
@@ -74,12 +212,12 @@ public SendMessageBuilder<T> withTopic(String topic) {
74212
75213 @ Override
76214 public void send () {
77- this . template .doSend (this . topic , this . message );
215+ template .doSend (topic , message );
78216 }
79217
80218 @ Override
81219 public Mono <Void > sendAsync () {
82- return this . template .doSendAsync (this . topic , this . message );
220+ return template .doSendAsync (topic , message );
83221 }
84222
85223 }
0 commit comments