Skip to content

Commit 71f0e2d

Browse files
authored
Fix context propagation into webflux7 client callbacks (#15336)
1 parent 6d6915e commit 71f0e2d

File tree

4 files changed

+40
-12
lines changed

4 files changed

+40
-12
lines changed

instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/main/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/SpringWebfluxClientTelemetry.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import io.opentelemetry.api.OpenTelemetry;
99
import io.opentelemetry.context.propagation.ContextPropagators;
1010
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
11+
import io.opentelemetry.instrumentation.reactor.v3_1.ContextPropagationOperator;
1112
import io.opentelemetry.instrumentation.spring.webflux.v5_3.internal.WebClientTracingFilter;
1213
import java.util.List;
1314
import org.springframework.web.reactive.function.client.ClientRequest;
@@ -43,6 +44,12 @@ public static SpringWebfluxClientTelemetryBuilder builder(OpenTelemetry openTele
4344
this.propagators = propagators;
4445
}
4546

47+
/**
48+
* Adds the OpenTelemetry telemetry producing {@link ExchangeFilterFunction} to the provided list
49+
* of filter functions.
50+
*
51+
* @param exchangeFilterFunctions existing filter functions
52+
*/
4653
public void addFilter(List<ExchangeFilterFunction> exchangeFilterFunctions) {
4754
for (ExchangeFilterFunction filterFunction : exchangeFilterFunctions) {
4855
if (filterFunction instanceof WebClientTracingFilter) {
@@ -51,4 +58,21 @@ public void addFilter(List<ExchangeFilterFunction> exchangeFilterFunctions) {
5158
}
5259
exchangeFilterFunctions.add(new WebClientTracingFilter(clientInstrumenter, propagators));
5360
}
61+
62+
/**
63+
* Adds the OpenTelemetry telemetry producing {@link ExchangeFilterFunction} to the provided list
64+
* of filter functions. Also registers the Reactor context propagation hook {@link
65+
* ContextPropagationOperator} for propagating OpenTelemetry context into reactive pipelines.
66+
*
67+
* @param exchangeFilterFunctions existing filter functions
68+
*/
69+
public void addFilterAndRegisterReactorHook(
70+
List<ExchangeFilterFunction> exchangeFilterFunctions) {
71+
registerReactorHook();
72+
addFilter(exchangeFilterFunctions);
73+
}
74+
75+
private static void registerReactorHook() {
76+
ContextPropagationOperator.builder().build().registerOnEachOperator();
77+
}
5478
}

instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/main/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/SpringWebfluxServerTelemetry.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,24 @@ public static SpringWebfluxServerTelemetryBuilder builder(OpenTelemetry openTele
3939
this.serverInstrumenter = serverInstrumenter;
4040
}
4141

42+
/**
43+
* Returns an OpenTelemetry telemetry producing {@link WebFilter} that can be registered with
44+
* Spring Webflux to instrument HTTP server requests.
45+
*
46+
* @return OpenTelemetry telemetry producing {@link WebFilter}
47+
*/
4248
public WebFilter createWebFilter() {
4349
return new TelemetryProducingWebFilter(serverInstrumenter);
4450
}
4551

52+
/**
53+
* Returns an OpenTelemetry telemetry producing {@link WebFilter} that can be registered with
54+
* Spring Webflux to instrument HTTP server requests. Also registers the Reactor context
55+
* propagation hook {@link ContextPropagationOperator} for propagating OpenTelemetry context into
56+
* reactive pipelines.
57+
*
58+
* @return OpenTelemetry telemetry producing {@link WebFilter}
59+
*/
4660
public WebFilter createWebFilterAndRegisterReactorHook() {
4761
registerReactorHook();
4862
return this.createWebFilter();

instrumentation/spring/spring-webflux/spring-webflux-5.3/library/src/test/java/io/opentelemetry/instrumentation/spring/webflux/v5_3/SpringWebfluxClientInstrumentationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,6 @@ protected WebClient.Builder instrument(WebClient.Builder builder) {
2828
.setCapturedResponseHeaders(
2929
Collections.singletonList(AbstractHttpClientTest.TEST_RESPONSE_HEADER))
3030
.build();
31-
return builder.filters(instrumentation::addFilter);
31+
return builder.filters(instrumentation::addFilterAndRegisterReactorHook);
3232
}
3333
}

instrumentation/spring/spring-webflux/spring-webflux-5.3/testing/src/main/java/io/opentelemetry/instrumentation/spring/webflux/client/AbstractSpringWebfluxClientInstrumentationTest.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
import io.opentelemetry.api.common.AttributeKey;
2121
import io.opentelemetry.api.trace.SpanKind;
22-
import io.opentelemetry.context.Context;
23-
import io.opentelemetry.context.Scope;
2422
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest;
2523
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult;
2624
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions;
@@ -77,16 +75,8 @@ public void sendRequestWithCallback(
7775
Map<String, String> headers,
7876
HttpClientResult httpClientResult) {
7977
if (Webflux7Util.isWebflux7) {
80-
// FIXME: context is not propagated to the callback, this needs to be fixed
81-
Context context = Context.current();
8278
Webflux7Util.sendRequestWithCallback(
83-
request,
84-
status -> {
85-
try (Scope ignore = context.makeCurrent()) {
86-
httpClientResult.complete(status);
87-
}
88-
},
89-
httpClientResult::complete);
79+
request, httpClientResult::complete, httpClientResult::complete);
9080
} else {
9181
request
9282
.exchange()

0 commit comments

Comments
 (0)