@@ -38,10 +38,15 @@ public static class RabbitMQActivitySource
3838 private static readonly ActivitySource s_subscriberSource =
3939 new ActivitySource ( SubscriberSourceName , AssemblyVersion ) ;
4040
41+ private static readonly ActivitySource s_connectionSource =
42+ new ActivitySource ( ConnectionSourceName , AssemblyVersion ) ;
43+
4144 public const string PublisherSourceName = "RabbitMQ.Client.Publisher" ;
4245 public const string SubscriberSourceName = "RabbitMQ.Client.Subscriber" ;
46+ public const string ConnectionSourceName = "RabbitMQ.Client.Connection" ;
4347
44- public static Action < Activity , IDictionary < string , object ? > > ContextInjector { get ; set ; } = DefaultContextInjector ;
48+ public static Action < Activity , IDictionary < string , object ? > > ContextInjector { get ; set ; } =
49+ DefaultContextInjector ;
4550
4651 public static Func < IReadOnlyBasicProperties , ActivityContext > ContextExtractor { get ; set ; } =
4752 DefaultContextExtractor ;
@@ -56,6 +61,19 @@ public static class RabbitMQActivitySource
5661 new KeyValuePair < string , object ? > ( ProtocolVersion , "0.9.1" )
5762 } ;
5863
64+ internal static Activity ? OpenConnection ( IFrameHandler frameHandler )
65+ {
66+ if ( ! s_connectionSource . HasListeners ( ) )
67+ {
68+ return null ;
69+ }
70+ Activity ? connectionActivity =
71+ s_connectionSource . StartRabbitMQActivity ( "rabbitmq connect" , ActivityKind . Client ) ;
72+ connectionActivity ?
73+ . SetNetworkTags ( frameHandler ) ;
74+ return connectionActivity ;
75+ }
76+
5977 internal static Activity ? Send ( string routingKey , string exchange , int bodySize ,
6078 ActivityContext linkedContext = default )
6179 {
@@ -66,18 +84,21 @@ public static class RabbitMQActivitySource
6684
6785 Activity ? activity = linkedContext == default
6886 ? s_publisherSource . StartRabbitMQActivity (
69- UseRoutingKeyAsOperationName ? $ "{ routingKey } { MessagingOperationTypeSend } " : MessagingOperationTypeSend ,
87+ UseRoutingKeyAsOperationName
88+ ? $ "{ routingKey } { MessagingOperationTypeSend } "
89+ : MessagingOperationTypeSend ,
7090 ActivityKind . Producer )
7191 : s_publisherSource . StartLinkedRabbitMQActivity (
72- UseRoutingKeyAsOperationName ? $ "{ routingKey } { MessagingOperationTypeSend } " : MessagingOperationTypeSend ,
92+ UseRoutingKeyAsOperationName
93+ ? $ "{ routingKey } { MessagingOperationTypeSend } "
94+ : MessagingOperationTypeSend ,
7395 ActivityKind . Producer , linkedContext ) ;
7496 if ( activity != null && activity . IsAllDataRequested )
7597 {
7698 PopulateMessagingTags ( MessagingOperationTypeSend , routingKey , exchange , 0 , bodySize , activity ) ;
7799 }
78100
79101 return activity ;
80-
81102 }
82103
83104 internal static Activity ? ReceiveEmpty ( string queue )
@@ -88,7 +109,9 @@ public static class RabbitMQActivitySource
88109 }
89110
90111 Activity ? activity = s_subscriberSource . StartRabbitMQActivity (
91- UseRoutingKeyAsOperationName ? $ "{ queue } { MessagingOperationTypeReceive } " : MessagingOperationTypeReceive ,
112+ UseRoutingKeyAsOperationName
113+ ? $ "{ queue } { MessagingOperationTypeReceive } "
114+ : MessagingOperationTypeReceive ,
92115 ActivityKind . Consumer ) ;
93116 if ( activity != null && activity . IsAllDataRequested )
94117 {
@@ -110,11 +133,14 @@ public static class RabbitMQActivitySource
110133
111134 // Extract the PropagationContext of the upstream parent from the message headers.
112135 Activity ? activity = s_subscriberSource . StartLinkedRabbitMQActivity (
113- UseRoutingKeyAsOperationName ? $ "{ routingKey } { MessagingOperationTypeReceive } " : MessagingOperationTypeReceive , ActivityKind . Consumer ,
136+ UseRoutingKeyAsOperationName
137+ ? $ "{ routingKey } { MessagingOperationTypeReceive } "
138+ : MessagingOperationTypeReceive , ActivityKind . Consumer ,
114139 ContextExtractor ( readOnlyBasicProperties ) ) ;
115140 if ( activity != null && activity . IsAllDataRequested )
116141 {
117- PopulateMessagingTags ( MessagingOperationTypeReceive , routingKey , exchange , deliveryTag , readOnlyBasicProperties ,
142+ PopulateMessagingTags ( MessagingOperationTypeReceive , routingKey , exchange , deliveryTag ,
143+ readOnlyBasicProperties ,
118144 bodySize , activity ) ;
119145 }
120146
@@ -131,7 +157,9 @@ public static class RabbitMQActivitySource
131157
132158 // Extract the PropagationContext of the upstream parent from the message headers.
133159 Activity ? activity = s_subscriberSource . StartLinkedRabbitMQActivity (
134- UseRoutingKeyAsOperationName ? $ "{ routingKey } { MessagingOperationTypeProcess } " : MessagingOperationTypeProcess ,
160+ UseRoutingKeyAsOperationName
161+ ? $ "{ routingKey } { MessagingOperationTypeProcess } "
162+ : MessagingOperationTypeProcess ,
135163 ActivityKind . Consumer , ContextExtractor ( basicProperties ) ) ;
136164 if ( activity != null && activity . IsAllDataRequested )
137165 {
@@ -142,10 +170,19 @@ public static class RabbitMQActivitySource
142170 return activity ;
143171 }
144172
173+ internal static void ReportException ( this Activity ? activity , Exception exception )
174+ {
175+ activity ? . AddTag ( "exception.message" , exception . Message ) ;
176+ activity ? . AddTag ( "exception.stacktrace" , exception . ToString ( ) ) ;
177+ activity ? . AddTag ( "exception.type" , exception . GetType ( ) . FullName ) ;
178+ activity ? . SetStatus ( ActivityStatusCode . Error ) ;
179+ }
180+
145181 private static Activity ? StartRabbitMQActivity ( this ActivitySource source , string name , ActivityKind kind ,
146182 ActivityContext parentContext = default )
147183 {
148- return source . CreateActivity ( name , kind , parentContext , idFormat : ActivityIdFormat . W3C , tags : CreationTags ) ? . Start ( ) ;
184+ return source . CreateActivity ( name , kind , parentContext , idFormat : ActivityIdFormat . W3C , tags : CreationTags )
185+ ? . Start ( ) ;
149186 }
150187
151188 private static Activity ? StartLinkedRabbitMQActivity ( this ActivitySource source , string name , ActivityKind kind ,
@@ -273,7 +310,8 @@ private static ActivityContext DefaultContextExtractor(IReadOnlyBasicProperties
273310 return default ;
274311 }
275312
276- DistributedContextPropagator . Current . ExtractTraceIdAndState ( props . Headers , DefaultContextGetter , out string ? traceParent , out string ? traceState ) ;
313+ DistributedContextPropagator . Current . ExtractTraceIdAndState ( props . Headers , DefaultContextGetter ,
314+ out string ? traceParent , out string ? traceState ) ;
277315 return ActivityContext . TryParse ( traceParent , traceState , out ActivityContext context ) ? context : default ;
278316 }
279317
@@ -288,7 +326,8 @@ private static void DefaultContextSetter(object? carrier, string name, string va
288326 carrierDictionary [ name ] = value ;
289327 }
290328
291- private static void DefaultContextGetter ( object ? carrier , string name , out string ? value , out IEnumerable < string > ? values )
329+ private static void DefaultContextGetter ( object ? carrier , string name , out string ? value ,
330+ out IEnumerable < string > ? values )
292331 {
293332 if ( carrier is IDictionary < string , object > carrierDict &&
294333 carrierDict . TryGetValue ( name , out object ? propsVal ) && propsVal is byte [ ] bytes )
0 commit comments