Skip to content

Commit ef2a403

Browse files
committed
Add PropagationContextElement Kotlin operator
Prior to this commit, the Micrometer context-propagation project would help propagating information from `ThreadLocal`, Reactor `Context` and other context objects. This is already well supported for Micrometer Observations. In the case of Kotlin suspending functions, the processing of tasks would not necessarily update the `ThreadLocal` when the function is scheduled on a different thread. This commit introduces the `PropagationContextElement` operator that connects the `ThreadLocal`, Reactor `Context` and Coroutine `Context` for all libraries using the "context-propagation" project. Applications must manually use this operator in suspending functions like so: ``` suspend fun suspendingFunction() { return withContext(PropagationContextElement(currentCoroutineContext())) { logger.info("Suspending function with traceId") } } ``` Closes gh-35185
1 parent 21e52a4 commit ef2a403

File tree

6 files changed

+246
-0
lines changed

6 files changed

+246
-0
lines changed

framework-docs/framework-docs.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ dependencies {
6565
implementation("com.github.ben-manes.caffeine:caffeine")
6666
implementation("com.mchange:c3p0:0.9.5.5")
6767
implementation("com.oracle.database.jdbc:ojdbc11")
68+
implementation("io.micrometer:context-propagation")
6869
implementation("io.projectreactor.netty:reactor-netty-http")
6970
implementation("jakarta.jms:jakarta.jms-api")
7071
implementation("jakarta.servlet:jakarta.servlet-api")
@@ -78,6 +79,8 @@ dependencies {
7879
implementation("org.assertj:assertj-core")
7980
implementation("org.eclipse.jetty.websocket:jetty-websocket-jetty-api")
8081
implementation("org.jetbrains.kotlin:kotlin-stdlib")
82+
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core")
83+
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
8184
implementation("org.junit.jupiter:junit-jupiter-api")
8285
implementation("tools.jackson.core:jackson-databind")
8386
implementation("tools.jackson.dataformat:jackson-dataformat-xml")

framework-docs/modules/ROOT/pages/languages/kotlin/coroutines.adoc

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,3 +250,36 @@ For Kotlin `Flow`, a `Flow<T>.transactional` extension is provided.
250250
}
251251
}
252252
----
253+
254+
[[coroutines.propagation]]
255+
== Context Propagation
256+
257+
Spring applications are xref:integration/observability.adoc[instrumented with Micrometer for Observability support].
258+
For tracing support, the current observation is propagated through a `ThreadLocal` for blocking code,
259+
or the Reactor `Context` for reactive pipelines. But the current observation also needs to be made available
260+
in the execution context of a suspended function. Without that, the current "traceId" will not be automatically prepended
261+
to logged statements from coroutines.
262+
263+
The `org.springframework.core.PropagationContextElement` operator generally ensures that the
264+
{micrometer-context-propagation-docs}/[Micrometer Context Propagation library] works with Kotlin Coroutines.
265+
266+
The `PropagationContextElement` requires the following dependencies:
267+
268+
`build.gradle.kts`
269+
[source,kotlin,indent=0]
270+
----
271+
dependencies {
272+
implementation("io.micrometer:context-propagation:${contextPropagationVersion}")
273+
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}")
274+
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}")
275+
}
276+
----
277+
278+
Applications can then use the `PropagationContextElement` operator to connect the `currentCoroutineContext()`
279+
with the context propagation mechanism:
280+
281+
include-code::./ContextPropagationSample[tag=context,indent=0]
282+
283+
Here, assuming that Micrometer Tracing is configured, the resulting logging statement
284+
will show the current "traceId" and unlock better observability for your application.
285+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2002-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.docs.languages.kotlin.coroutines.propagation
18+
19+
import kotlinx.coroutines.currentCoroutineContext
20+
import kotlinx.coroutines.withContext
21+
import org.apache.commons.logging.Log
22+
import org.apache.commons.logging.LogFactory
23+
import org.springframework.core.PropagationContextElement
24+
25+
class ContextPropagationSample {
26+
27+
companion object {
28+
private val logger: Log = LogFactory.getLog(
29+
ContextPropagationSample::class.java
30+
)
31+
}
32+
33+
// tag::context[]
34+
suspend fun suspendingFunction() {
35+
return withContext(PropagationContextElement(currentCoroutineContext())) {
36+
logger.info("Suspending function with traceId")
37+
}
38+
}
39+
// end::context[]
40+
}

spring-core/spring-core.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ dependencies {
104104
testImplementation("jakarta.xml.bind:jakarta.xml.bind-api")
105105
testImplementation("org.jetbrains.kotlinx:kotlinx-serialization-json")
106106
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
107+
testImplementation("io.micrometer:context-propagation")
108+
testImplementation("io.micrometer:micrometer-observation-test")
107109
testImplementation("org.mockito:mockito-core")
108110
testImplementation("com.networknt:json-schema-validator");
109111
testImplementation("org.skyscreamer:jsonassert")
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright 2002-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.core
18+
19+
import io.micrometer.context.ContextRegistry
20+
import io.micrometer.context.ContextSnapshot
21+
import io.micrometer.context.ContextSnapshotFactory
22+
import kotlinx.coroutines.ThreadContextElement
23+
import kotlinx.coroutines.reactor.ReactorContext
24+
import reactor.util.context.ContextView
25+
import kotlin.coroutines.AbstractCoroutineContextElement
26+
import kotlin.coroutines.CoroutineContext
27+
28+
29+
/**
30+
* [ThreadContextElement] that restores `ThreadLocals` from the Reactor [ContextSnapshot]
31+
* every time the coroutine with this element in the context is resumed on a thread.
32+
*
33+
* This effectively ensures that Kotlin Coroutines, Reactor and Micrometer Context Propagation
34+
* work together in an application, typically for observability purposes.
35+
*
36+
* Applications need to have both `"io.micrometer:context-propagation"` and
37+
* `"org.jetbrains.kotlinx:kotlinx-coroutines-reactor"` on the classpath to use this context element.
38+
*
39+
* The `PropagationContextElement` can be used like this:
40+
*
41+
* ```kotlin
42+
* suspend fun suspendable() {
43+
* withContext(PropagationContextElement(coroutineContext)) {
44+
* logger.info("Log statement with traceId")
45+
* }
46+
* }
47+
* ```
48+
*
49+
* @author Brian Clozel
50+
* @since 7.0
51+
*/
52+
class PropagationContextElement(private val context: CoroutineContext) : ThreadContextElement<ContextSnapshot.Scope>,
53+
AbstractCoroutineContextElement(Key) {
54+
55+
companion object Key : CoroutineContext.Key<PropagationContextElement>
56+
57+
val contextSnapshot: ContextSnapshot
58+
get() {
59+
val contextView: ContextView? = context[ReactorContext]?.context
60+
val contextSnapshotFactory =
61+
ContextSnapshotFactory.builder().contextRegistry(ContextRegistry.getInstance()).build()
62+
if (contextView != null) {
63+
return contextSnapshotFactory.captureFrom(contextView)
64+
}
65+
return contextSnapshotFactory.captureAll()
66+
}
67+
68+
override fun restoreThreadContext(context: CoroutineContext, oldState: ContextSnapshot.Scope) {
69+
oldState.close()
70+
}
71+
72+
override fun updateThreadContext(context: CoroutineContext): ContextSnapshot.Scope {
73+
return contextSnapshot.setThreadLocals()
74+
}
75+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Copyright 2002-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.core
18+
19+
import io.micrometer.observation.Observation
20+
import io.micrometer.observation.tck.TestObservationRegistry
21+
import kotlinx.coroutines.Dispatchers
22+
import kotlinx.coroutines.currentCoroutineContext
23+
import kotlinx.coroutines.runBlocking
24+
import kotlinx.coroutines.withContext
25+
import org.assertj.core.api.Assertions
26+
import org.assertj.core.api.Assertions.assertThat
27+
import org.junit.jupiter.api.AfterAll
28+
import org.junit.jupiter.api.BeforeAll
29+
import org.junit.jupiter.api.Test
30+
import org.reactivestreams.Publisher
31+
import reactor.core.publisher.Hooks
32+
import reactor.core.publisher.Mono
33+
import reactor.core.scheduler.Schedulers
34+
import kotlin.coroutines.Continuation
35+
36+
37+
/**
38+
* Kotlin tests for [PropagationContextElement].
39+
*
40+
* @author Brian Clozel
41+
*/
42+
class PropagationContextElementTests {
43+
44+
private val observationRegistry = TestObservationRegistry.create()
45+
46+
companion object {
47+
48+
@BeforeAll
49+
@JvmStatic
50+
fun init() {
51+
Hooks.enableAutomaticContextPropagation()
52+
}
53+
54+
@AfterAll
55+
@JvmStatic
56+
fun cleanup() {
57+
Hooks.disableAutomaticContextPropagation()
58+
}
59+
60+
}
61+
62+
@Test
63+
fun restoresFromThreadLocal() {
64+
val observation = Observation.createNotStarted("coroutine", observationRegistry)
65+
observation.observe {
66+
val result = runBlocking(Dispatchers.Unconfined) {
67+
suspendingFunction("test")
68+
}
69+
Assertions.assertThat(result).isEqualTo("coroutine")
70+
}
71+
}
72+
73+
@Test
74+
@Suppress("UNCHECKED_CAST")
75+
fun restoresFromReactorContext() {
76+
val method = PropagationContextElementTests::class.java.getDeclaredMethod("suspendingFunction", String::class.java, Continuation::class.java)
77+
val publisher = CoroutinesUtils.invokeSuspendingFunction(method, this, "test", null) as Publisher<String>
78+
val observation = Observation.createNotStarted("coroutine", observationRegistry)
79+
observation.observe {
80+
val result = Mono.from<String>(publisher).publishOn(Schedulers.boundedElastic()).block()
81+
assertThat(result).isEqualTo("coroutine")
82+
}
83+
}
84+
85+
suspend fun suspendingFunction(value: String): String? {
86+
return withContext(PropagationContextElement(currentCoroutineContext())) {
87+
val currentObservation = observationRegistry.currentObservation
88+
assertThat(currentObservation).isNotNull
89+
currentObservation?.context?.name
90+
}
91+
}
92+
93+
}

0 commit comments

Comments
 (0)