Skip to content

Commit 3cfb033

Browse files
authored
RUST-2138 Add optional support for OpenTelemetry (#1495)
1 parent a552213 commit 3cfb033

File tree

102 files changed

+7031
-67
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

102 files changed

+7031
-67
lines changed

.evergreen/run-tests.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ set -o pipefail
66
source .evergreen/env.sh
77
source .evergreen/cargo-test.sh
88

9-
FEATURE_FLAGS+=("tracing-unstable" "cert-key-password")
9+
FEATURE_FLAGS+=("tracing-unstable" "cert-key-password" "opentelemetry" "error-backtrace")
1010

1111
if [ "$OPENSSL" = true ]; then
1212
FEATURE_FLAGS+=("openssl-tls")

Cargo.lock

Lines changed: 40 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,12 @@ tracing-unstable = ["dep:tracing", "dep:log", "bson3?/serde_json-1"]
7878
# compatible with the preview version.
7979
text-indexes-unstable = []
8080

81+
# Enable support for opentelemetry instrumentation
82+
opentelemetry = ["dep:opentelemetry"]
83+
84+
# Capture backtraces in errors. This can be slow, memory intensive, and very verbose.
85+
error-backtrace = []
86+
8187
[dependencies]
8288
base64 = "0.22"
8389
bitflags = "2"
@@ -101,6 +107,7 @@ mongodb-internal-macros = { path = "macros", version = "3.3.0" }
101107
num_cpus = { version = "1.13.1", optional = true }
102108
openssl = { version = "0.10.38", optional = true }
103109
openssl-probe = { version = "0.1.5", optional = true }
110+
opentelemetry = { version = "0.31.0", optional = true }
104111
pem = { version = "3.0.4", optional = true }
105112
percent-encoding = "2.0.0"
106113
pkcs8 = { version = "0.10.2", features = ["encryption", "pkcs5"], optional = true }
@@ -118,7 +125,7 @@ take_mut = "0.2.2"
118125
thiserror = "1.0.24"
119126
tokio-openssl = { version = "0.6.3", optional = true }
120127
tracing = { version = "0.1.36", optional = true }
121-
typed-builder = "0.20.0"
128+
typed-builder = "0.22.0"
122129
webpki-roots = "1"
123130
zstd = { version = "0.11.2", optional = true }
124131
macro_magic = "0.5.1"
@@ -216,6 +223,7 @@ futures = "0.3"
216223
hex = "0.4"
217224
home = "0.5"
218225
lambda_runtime = "0.6.0"
226+
opentelemetry_sdk = { version = "0.31.0", features = ["testing"] }
219227
pkcs8 = { version = "0.10.2", features = ["3des", "des-insecure", "sha1-insecure"] }
220228
pretty_assertions = "1.3.0"
221229
serde = { version = ">= 0.0.0", features = ["rc"] }

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ features = ["sync"]
5858
| `azure-oidc` | Enable support for Azure OIDC environment authentication. |
5959
| `gcp-oidc` | Enable support for GCP OIDC environment authentication. |
6060
| `text-indexes-unstable` | Enables support for text indexes in explicit encryption. This feature is in preview and should be used for experimental workloads only. This feature is unstable and its security is not guaranteed until released as Generally Available (GA). The GA version of this feature may not be backwards compatible with the preview version. |
61+
| `error-backtrace` | Capture backtraces in `Error` values. This can be slow, memory intensive, and very verbose. |
6162

6263
## Web Framework Examples
6364

src/bson_util.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,38 @@ pub(crate) mod option_u64_as_i64 {
341341
}
342342
}
343343

344+
/// Truncates the given string at the closest UTF-8 character boundary >= the provided length.
345+
/// If the new length is >= the current length, does nothing.
346+
#[cfg(any(feature = "tracing-unstable", feature = "opentelemetry"))]
347+
pub(crate) fn truncate_on_char_boundary(s: &mut String, new_len: usize) {
348+
let original_len = s.len();
349+
if original_len > new_len {
350+
// to avoid generating invalid UTF-8, find the first index >= max_length_bytes that is
351+
// the end of a character.
352+
// TODO: RUST-1496 we should use ceil_char_boundary here but it's currently nightly-only.
353+
// see: https://doc.rust-lang.org/std/string/struct.String.html#method.ceil_char_boundary
354+
let mut truncate_index = new_len;
355+
// is_char_boundary returns true when the provided value == the length of the string, so
356+
// if we reach the end of the string this loop will terminate.
357+
while !s.is_char_boundary(truncate_index) {
358+
truncate_index += 1;
359+
}
360+
s.truncate(truncate_index);
361+
// due to the "rounding up" behavior we might not actually end up truncating anything.
362+
// if we did, spec requires we add a trailing "...".
363+
if truncate_index < original_len {
364+
s.push_str("...")
365+
}
366+
}
367+
}
368+
369+
#[cfg(any(feature = "tracing-unstable", feature = "opentelemetry"))]
370+
pub(crate) fn doc_to_json_str(doc: crate::bson::Document, max_length_bytes: usize) -> String {
371+
let mut ext_json = Bson::Document(doc).into_relaxed_extjson().to_string();
372+
truncate_on_char_boundary(&mut ext_json, max_length_bytes);
373+
ext_json
374+
}
375+
344376
#[cfg(test)]
345377
mod test {
346378
use crate::bson_util::num_decimal_digits;

src/client.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,8 @@ struct ClientInner {
143143
end_sessions_token: std::sync::Mutex<AsyncDropToken>,
144144
#[cfg(feature = "in-use-encryption")]
145145
csfle: tokio::sync::RwLock<Option<csfle::ClientState>>,
146+
#[cfg(feature = "opentelemetry")]
147+
tracer: opentelemetry::global::BoxedTracer,
146148
#[cfg(test)]
147149
disable_command_events: AtomicBool,
148150
}
@@ -181,6 +183,9 @@ impl Client {
181183
tx: Some(cleanup_tx),
182184
});
183185

186+
#[cfg(feature = "opentelemetry")]
187+
let tracer = options.tracer();
188+
184189
let inner = TrackingArc::new(ClientInner {
185190
topology: Topology::new(options.clone())?,
186191
session_pool: ServerSessionPool::new(),
@@ -193,6 +198,8 @@ impl Client {
193198
end_sessions_token,
194199
#[cfg(feature = "in-use-encryption")]
195200
csfle: Default::default(),
201+
#[cfg(feature = "opentelemetry")]
202+
tracer,
196203
#[cfg(test)]
197204
disable_command_events: AtomicBool::new(false),
198205
});
@@ -668,6 +675,11 @@ impl Client {
668675
.await;
669676
}
670677
}
678+
679+
#[cfg(feature = "opentelemetry")]
680+
pub(crate) fn tracer(&self) -> &opentelemetry::global::BoxedTracer {
681+
&self.inner.tracer
682+
}
671683
}
672684

673685
#[derive(Clone, Debug)]

src/client/executor.rs

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ use crate::bson::RawDocumentBuf;
33
use crate::bson::{doc, RawBsonRef, RawDocument, Timestamp};
44
#[cfg(feature = "in-use-encryption")]
55
use futures_core::future::BoxFuture;
6+
#[cfg(feature = "opentelemetry")]
7+
use opentelemetry::context::FutureExt;
68
use serde::de::DeserializeOwned;
79
use std::sync::LazyLock;
810

@@ -14,6 +16,8 @@ use std::{
1416
};
1517

1618
use super::{options::ServerAddress, session::TransactionState, Client, ClientSession};
19+
#[cfg(not(feature = "opentelemetry"))]
20+
use crate::otel::OtelFutureStub as _;
1721
use crate::{
1822
bson::Document,
1923
change_stream::{
@@ -106,7 +110,26 @@ impl Client {
106110
op: &mut T,
107111
session: impl Into<Option<&mut ClientSession>>,
108112
) -> Result<ExecutionDetails<T>> {
109-
// Validate inputs that can be checked before server selection and connection checkout.
113+
let session = session.into();
114+
#[cfg(feature = "opentelemetry")]
115+
let span = self.start_operation_span(op, session.as_deref());
116+
let inner = self.execute_operation_with_details_inner(op, session);
117+
#[cfg(feature = "opentelemetry")]
118+
let inner = inner.with_context(span.context.clone());
119+
let result = inner.await;
120+
#[cfg(feature = "opentelemetry")]
121+
span.record_error(&result);
122+
123+
result
124+
}
125+
126+
async fn execute_operation_with_details_inner<T: Operation>(
127+
&self,
128+
op: &mut T,
129+
mut session: Option<&mut ClientSession>,
130+
) -> Result<ExecutionDetails<T>> {
131+
// Validate inputs that can be checked before server selection and connection
132+
// checkout.
110133
if self.inner.shutdown.executed.load(Ordering::SeqCst) {
111134
return Err(ErrorKind::Shutdown.into());
112135
}
@@ -122,7 +145,6 @@ impl Client {
122145
}
123146

124147
// Validate the session and update its transaction status if needed.
125-
let mut session = session.into();
126148
if let Some(ref mut session) = session {
127149
if !TrackingArc::ptr_eq(&self.inner, &session.client().inner) {
128150
return Err(Error::invalid_argument(
@@ -154,7 +176,13 @@ impl Client {
154176
}
155177
}
156178

157-
Box::pin(async { self.execute_operation_with_retry(op, session).await }).await
179+
Box::pin(async {
180+
self.execute_operation_with_retry(op, session)
181+
.with_current_context()
182+
.await
183+
})
184+
.with_current_context()
185+
.await
158186
}
159187

160188
/// Execute the given operation, returning the cursor created by the operation.
@@ -408,6 +436,7 @@ impl Client {
408436
retryability,
409437
effective_criteria,
410438
)
439+
.with_current_context()
411440
.await
412441
{
413442
Ok(output) => ExecutionDetails {
@@ -496,9 +525,21 @@ impl Client {
496525
let should_redact = cmd.should_redact();
497526
let cmd_name = cmd.name.clone();
498527
let target_db = cmd.target_db.clone();
528+
#[cfg(feature = "opentelemetry")]
529+
let cmd_attrs = crate::otel::CommandAttributes::new(&cmd);
499530

500531
let mut message = Message::try_from(cmd)?;
501532
message.request_id = Some(request_id);
533+
534+
#[cfg(feature = "opentelemetry")]
535+
let span = self.start_command_span(
536+
op,
537+
&connection_info,
538+
connection.stream_description()?,
539+
&message,
540+
cmd_attrs,
541+
);
542+
502543
#[cfg(feature = "in-use-encryption")]
503544
{
504545
let guard = self.inner.csfle.read().await;
@@ -629,6 +670,8 @@ impl Client {
629670
}
630671
}
631672
};
673+
#[cfg(feature = "opentelemetry")]
674+
span.record_command_result::<T>(&result);
632675

633676
if result
634677
.as_ref()

src/client/options.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,10 @@ pub struct ClientOptions {
612612
/// Limit on the number of mongos connections that may be created for sharded topologies.
613613
pub srv_max_hosts: Option<u32>,
614614

615+
/// Configuration for opentelemetry.
616+
#[cfg(feature = "opentelemetry")]
617+
pub tracing: Option<crate::otel::OpentelemetryOptions>,
618+
615619
/// Information from the SRV URI that generated these client options, if applicable.
616620
#[builder(setter(skip))]
617621
#[serde(skip)]

src/client/options/parse.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,8 @@ impl ClientOptions {
162162
tracing_max_document_length_bytes: None,
163163
srv_max_hosts: conn_str.srv_max_hosts,
164164
srv_service_name: conn_str.srv_service_name,
165+
#[cfg(feature = "opentelemetry")]
166+
tracing: None,
165167
}
166168
}
167169
}

0 commit comments

Comments
 (0)