Skip to content

Commit 5513171

Browse files
authored
chore(tracing-utils): add tracing utils for custom instrument (#3293)
* chore(guard): add metadata to rate limit error * chore(tracing-utils): add tracing utils for custom instrument
1 parent 820b380 commit 5513171

File tree

8 files changed

+126
-92
lines changed

8 files changed

+126
-92
lines changed

Cargo.lock

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,9 @@ path = "engine/packages/test-deps-docker"
374374
[workspace.dependencies.rivet-tracing-reconfigure]
375375
path = "engine/packages/tracing-reconfigure"
376376

377+
[workspace.dependencies.rivet-tracing-utils]
378+
path = "engine/packages/tracing-utils"
379+
377380
[workspace.dependencies.rivet-types]
378381
path = "engine/packages/types"
379382

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
[package]
2+
name = "rivet-tracing-utils"
3+
version.workspace = true
4+
authors.workspace = true
5+
license.workspace = true
6+
edition.workspace = true
7+
8+
[dependencies]
9+
futures-util.workspace = true
10+
lazy_static.workspace = true
11+
rivet-metrics.workspace = true
12+
tracing.workspace = true
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
use std::{
2+
future::Future,
3+
pin::Pin,
4+
task::{Context, Poll},
5+
time::Instant,
6+
};
7+
8+
use futures_util::future;
9+
use tracing::{Instrument, instrument::Instrumented};
10+
11+
use rivet_metrics::KeyValue;
12+
13+
/// Attempts to create a new future to select over a list of futures.
14+
/// Non-panicking version of [futures_util::future::select_all](https://docs.rs/futures/0.3.15/futures/future/fn.select_all.html).
15+
///
16+
/// If `iter` is empty, a `Pending` future is returned.
17+
pub async fn select_all_or_wait<I>(iter: I) -> <I::Item as Future>::Output
18+
where
19+
I: IntoIterator,
20+
I::Item: Future + Unpin,
21+
{
22+
let futs = iter.into_iter().collect::<Vec<I::Item>>();
23+
24+
if !futs.is_empty() {
25+
future::select_all(futs).await.0
26+
} else {
27+
std::future::pending().await
28+
}
29+
}
30+
31+
pub trait CustomInstrumentExt: Sized {
32+
fn custom_instrument(self, span: tracing::Span) -> CustomInstrumented<Self> {
33+
CustomInstrumented {
34+
inner: self.instrument(span),
35+
start: Instant::now(),
36+
}
37+
}
38+
}
39+
40+
impl<F: Sized> CustomInstrumentExt for F {}
41+
42+
pub struct CustomInstrumented<T> {
43+
inner: Instrumented<T>,
44+
start: Instant,
45+
}
46+
47+
impl<T: Future> Future for CustomInstrumented<T> {
48+
type Output = T::Output;
49+
50+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
51+
let this = unsafe { self.get_unchecked_mut() };
52+
let inner = unsafe { Pin::new_unchecked(&mut this.inner) };
53+
54+
let metadata = inner.span().metadata().clone();
55+
56+
match inner.poll(cx) {
57+
Poll::Ready(val) => {
58+
if let Some(metadata) = metadata {
59+
if let (Some(file), Some(line)) = (metadata.file(), metadata.line()) {
60+
metrics::INSTRUMENTED_FUTURE_DURATION.record(
61+
this.start.elapsed().as_secs_f64(),
62+
&[
63+
KeyValue::new("location", format!("{file}:{line}")),
64+
KeyValue::new("name", metadata.name()),
65+
],
66+
);
67+
}
68+
}
69+
Poll::Ready(val)
70+
}
71+
Poll::Pending => Poll::Pending,
72+
}
73+
}
74+
}
75+
76+
mod metrics {
77+
use rivet_metrics::{
78+
MICRO_BUCKETS,
79+
otel::{global::*, metrics::*},
80+
};
81+
82+
lazy_static::lazy_static! {
83+
static ref METER: Meter = meter("rivet-util-core");
84+
85+
/// Expected attributes: "location", "name"
86+
pub static ref INSTRUMENTED_FUTURE_DURATION: Histogram<f64> = METER.f64_histogram("rivet_instrumented_future_duration")
87+
.with_description("Duration of a future.")
88+
.with_boundaries(MICRO_BUCKETS.to_vec())
89+
.build();
90+
}
91+
}

engine/packages/universaldb/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ futures-util.workspace = true
1414
lazy_static.workspace = true
1515
rand.workspace = true
1616
rivet-metrics.workspace = true
17+
rivet-tracing-utils.workspace = true
1718
rocksdb.workspace = true
1819
serde.workspace = true
1920
thiserror.workspace = true

engine/packages/universaldb/src/database.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::future::Future;
22

33
use anyhow::{Context, Result, anyhow};
44
use futures_util::FutureExt;
5+
use rivet_tracing_utils::CustomInstrumentExt;
56

67
use crate::{
78
driver::{DatabaseDriverHandle, Erased},
@@ -20,6 +21,7 @@ impl Database {
2021
}
2122

2223
/// Run a closure with automatic retry logic
24+
#[tracing::instrument(skip_all)]
2325
pub async fn run<'a, F, Fut, T>(&'a self, closure: F) -> Result<T>
2426
where
2527
F: Fn(RetryableTransaction) -> Fut + Send + Sync,
@@ -29,7 +31,9 @@ impl Database {
2931
let closure = &closure;
3032
self.driver
3133
.run(Box::new(|tx| {
32-
async move { closure(tx).await.map(|value| Box::new(value) as Erased) }.boxed()
34+
async move { closure(tx).await.map(|value| Box::new(value) as Erased) }
35+
.custom_instrument(tracing::info_span!("run_attempt"))
36+
.boxed()
3337
}))
3438
.await
3539
.and_then(|res| {

engine/packages/util/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ regex.workspace = true
2424
reqwest.workspace = true
2525
rivet-config.workspace = true
2626
rivet-metrics.workspace = true
27+
rivet-tracing-utils.workspace = true
2728
rivet-util-id.workspace = true
2829
serde.workspace = true
2930
serde_json.workspace = true

engine/packages/util/src/future.rs

Lines changed: 1 addition & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -1,91 +1 @@
1-
use std::{
2-
future::Future,
3-
pin::Pin,
4-
task::{Context, Poll},
5-
time::Instant,
6-
};
7-
8-
use futures_util::future;
9-
use tracing::{Instrument, instrument::Instrumented};
10-
11-
use rivet_metrics::KeyValue;
12-
13-
/// Attempts to create a new future to select over a list of futures.
14-
/// Non-panicking version of [futures_util::future::select_all](https://docs.rs/futures/0.3.15/futures/future/fn.select_all.html).
15-
///
16-
/// If `iter` is empty, a `Pending` future is returned.
17-
pub async fn select_all_or_wait<I>(iter: I) -> <I::Item as Future>::Output
18-
where
19-
I: IntoIterator,
20-
I::Item: Future + Unpin,
21-
{
22-
let futs = iter.into_iter().collect::<Vec<I::Item>>();
23-
24-
if !futs.is_empty() {
25-
future::select_all(futs).await.0
26-
} else {
27-
std::future::pending().await
28-
}
29-
}
30-
31-
pub trait CustomInstrumentExt: Sized {
32-
fn custom_instrument(self, span: tracing::Span) -> CustomInstrumented<Self> {
33-
CustomInstrumented {
34-
inner: self.instrument(span),
35-
start: Instant::now(),
36-
}
37-
}
38-
}
39-
40-
impl<F: Sized> CustomInstrumentExt for F {}
41-
42-
pub struct CustomInstrumented<T> {
43-
inner: Instrumented<T>,
44-
start: Instant,
45-
}
46-
47-
impl<T: Future> Future for CustomInstrumented<T> {
48-
type Output = T::Output;
49-
50-
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
51-
let this = unsafe { self.get_unchecked_mut() };
52-
let inner = unsafe { Pin::new_unchecked(&mut this.inner) };
53-
54-
let metadata = inner.span().metadata().clone();
55-
56-
match inner.poll(cx) {
57-
Poll::Ready(val) => {
58-
if let Some(metadata) = metadata {
59-
if let (Some(file), Some(line)) = (metadata.file(), metadata.line()) {
60-
metrics::INSTRUMENTED_FUTURE_DURATION.record(
61-
this.start.elapsed().as_secs_f64(),
62-
&[
63-
KeyValue::new("location", format!("{file}:{line}")),
64-
KeyValue::new("name", metadata.name()),
65-
],
66-
);
67-
}
68-
}
69-
Poll::Ready(val)
70-
}
71-
Poll::Pending => Poll::Pending,
72-
}
73-
}
74-
}
75-
76-
mod metrics {
77-
use rivet_metrics::{
78-
MICRO_BUCKETS,
79-
otel::{global::*, metrics::*},
80-
};
81-
82-
lazy_static::lazy_static! {
83-
static ref METER: Meter = meter("rivet-util-core");
84-
85-
/// Expected attributes: "location", "name"
86-
pub static ref INSTRUMENTED_FUTURE_DURATION: Histogram<f64> = METER.f64_histogram("rivet_instrumented_future_duration")
87-
.with_description("Duration of a future.")
88-
.with_boundaries(MICRO_BUCKETS.to_vec())
89-
.build();
90-
}
91-
}
1+
pub use rivet_tracing_utils::*;

0 commit comments

Comments
 (0)