Skip to content

Commit 81571a5

Browse files
authored
sampling (#42)
1 parent 9884f04 commit 81571a5

File tree

6 files changed

+255
-22
lines changed

6 files changed

+255
-22
lines changed

rust/Cargo.lock

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ resolver = "2"
44
members = [
55
"instrumented-channel",
66
"moving-average",
7+
"sample",
78
"sdk",
89
"sdk-examples",
910
"sdk-server-framework",
@@ -25,6 +26,7 @@ aptos-indexer-processor-sdk-server-framework = { path = 'sdk-server-framework' }
2526
aptos-indexer-transaction-stream = { path = "transaction-stream" }
2627
instrumented-channel = { path = "instrumented-channel" }
2728
aptos-moving-average = { path = "moving-average" }
29+
sample = { path = "sample" }
2830

2931
ahash = { version = "0.8.7", features = ["serde"] }
3032
anyhow = "1.0.86"

rust/sample/Cargo.toml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[package]
2+
name = "sample"
3+
version = "0.1.0"
4+
5+
# Workspace inherited keys
6+
authors = { workspace = true }
7+
edition = { workspace = true }
8+
homepage = { workspace = true }
9+
license = { workspace = true }
10+
publish = { workspace = true }
11+
repository = { workspace = true }
12+
rust-version = { workspace = true }
13+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

rust/sample/src/lib.rs

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
use std::{
2+
sync::atomic::{AtomicU64, Ordering},
3+
time::{Duration, SystemTime},
4+
};
5+
6+
/// ## Sampling logs
7+
///
8+
/// Sometimes logging a large amount of data is expensive. In order to log information only part
9+
/// of the time, we've added a `sample!` macro that's configurable on how often we want to execute some code.
10+
///
11+
/// `SampleRate` determines how often the sampled statement will occur.
12+
///
13+
/// ```
14+
/// use aptos_logger::{info, sample, sample::{SampleRate, Sampling}};
15+
/// use std::time::Duration;
16+
///
17+
/// // Sampled based on frequency of events, log only every 2 logs
18+
/// sample!(SampleRate::Frequency(2), info!("Long log"));
19+
///
20+
/// // Sampled based on time passed, log at most once a minute
21+
/// sample!(SampleRate::Duration(Duration::from_secs(60)), info!("Long log"));
22+
/// ```
23+
24+
/// The rate at which a `sample!` macro will run it's given function
25+
#[derive(Debug)]
26+
pub enum SampleRate {
27+
/// Only sample a single time during a window of time. This rate only has a resolution in
28+
/// seconds.
29+
Duration(Duration),
30+
/// Sample based on the frequency of the event. The provided u64 is the inverse of the
31+
/// frequency (1/x), for example Frequency(2) means that 1 out of every 2 events will be
32+
/// sampled (1/2).
33+
Frequency(u64),
34+
/// Always Sample
35+
Always,
36+
}
37+
38+
/// An internal struct that can be checked if a sample is ready for the `sample!` macro
39+
pub struct Sampling {
40+
rate: SampleRate,
41+
state: AtomicU64,
42+
}
43+
44+
impl Sampling {
45+
pub const fn new(rate: SampleRate) -> Self {
46+
Self {
47+
rate,
48+
state: AtomicU64::new(0),
49+
}
50+
}
51+
52+
pub fn sample(&self) -> bool {
53+
match &self.rate {
54+
SampleRate::Duration(rate) => Self::sample_duration(rate, &self.state),
55+
SampleRate::Frequency(rate) => Self::sample_frequency(*rate, &self.state),
56+
SampleRate::Always => true,
57+
}
58+
}
59+
60+
fn sample_frequency(rate: u64, count: &AtomicU64) -> bool {
61+
let previous_count = count
62+
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
63+
let new_count = if count == 0 {
64+
rate.saturating_sub(1)
65+
} else {
66+
count.saturating_sub(1)
67+
};
68+
Some(new_count)
69+
})
70+
.expect("Closure should always returns 'Some'. This is a Bug.");
71+
72+
previous_count == 0
73+
}
74+
75+
fn sample_duration(rate: &Duration, last_sample: &AtomicU64) -> bool {
76+
let rate = rate.as_secs();
77+
// Seconds since Unix Epoch
78+
let now = SystemTime::now()
79+
.duration_since(SystemTime::UNIX_EPOCH)
80+
.expect("SystemTime before UNIX EPOCH!")
81+
.as_secs();
82+
83+
last_sample
84+
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |last_sample| {
85+
if now.saturating_sub(last_sample) >= rate {
86+
Some(now)
87+
} else {
88+
None
89+
}
90+
})
91+
.is_ok()
92+
}
93+
}
94+
95+
/// Samples a given function at a `SampleRate`, useful for periodically emitting logs or metrics on
96+
/// high throughput pieces of code.
97+
#[macro_export]
98+
macro_rules! sample {
99+
($sample_rate:expr, $($args:expr)+ ,) => {
100+
$crate::sample!($sample_rate, $($args)+);
101+
};
102+
103+
($sample_rate:expr, $($args:tt)+) => {{
104+
static SAMPLING: $crate::Sampling = $crate::Sampling::new($sample_rate);
105+
if SAMPLING.sample() {
106+
$($args)+
107+
}
108+
}};
109+
}
110+
111+
#[cfg(test)]
112+
mod tests {
113+
use super::*;
114+
115+
#[test]
116+
fn frequency() {
117+
// Frequency
118+
let sampling = Sampling::new(SampleRate::Frequency(10));
119+
let mut v = Vec::new();
120+
for i in 0..=25 {
121+
if sampling.sample() {
122+
v.push(i);
123+
}
124+
}
125+
126+
assert_eq!(v, vec![0, 10, 20]);
127+
}
128+
129+
#[test]
130+
fn always() {
131+
// Always
132+
let sampling = Sampling::new(SampleRate::Always);
133+
let mut v = Vec::new();
134+
for i in 0..5 {
135+
if sampling.sample() {
136+
v.push(i);
137+
}
138+
}
139+
140+
assert_eq!(v, vec![0, 1, 2, 3, 4]);
141+
}
142+
143+
#[ignore]
144+
#[test]
145+
fn duration() {
146+
// Duration
147+
let sampling = Sampling::new(SampleRate::Duration(Duration::from_secs(1)));
148+
let mut v = Vec::new();
149+
for i in 0..5 {
150+
if sampling.sample() {
151+
v.push(i);
152+
}
153+
154+
std::thread::sleep(Duration::from_millis(500));
155+
}
156+
157+
assert_eq!(v.len(), 2);
158+
}
159+
160+
#[test]
161+
fn macro_expansion() {
162+
for i in 0..10 {
163+
sample!(
164+
SampleRate::Frequency(2),
165+
println!("loooooooooooooooooooooooooong hello {}", i),
166+
);
167+
168+
sample!(SampleRate::Frequency(2), {
169+
println!("hello {}", i);
170+
});
171+
172+
sample!(SampleRate::Frequency(2), println!("hello {}", i));
173+
174+
sample! {
175+
SampleRate::Frequency(2),
176+
177+
for j in 10..20 {
178+
println!("hello {}", j);
179+
}
180+
}
181+
}
182+
}
183+
184+
#[test]
185+
fn threaded() {
186+
fn work() -> usize {
187+
let mut count = 0;
188+
189+
for _ in 0..1000 {
190+
sample!(SampleRate::Frequency(5), count += 1);
191+
}
192+
193+
count
194+
}
195+
196+
let mut handles = Vec::new();
197+
for _ in 0..10 {
198+
handles.push(std::thread::spawn(work));
199+
}
200+
201+
let mut count = 0;
202+
for handle in handles {
203+
count += handle.join().unwrap();
204+
}
205+
206+
assert_eq!(count, 2000);
207+
}
208+
}

rust/transaction-stream/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ futures-util = { workspace = true }
2121
once_cell = { workspace = true }
2222
prometheus = { workspace = true }
2323
prost = { workspace = true }
24+
sample = { workspace = true }
2425
serde = { workspace = true }
2526
tokio = { workspace = true }
2627
tonic = { workspace = true }

rust/transaction-stream/src/transaction_stream.rs

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use aptos_protos::{
88
};
99
use futures_util::StreamExt;
1010
use prost::Message;
11+
use sample::{sample, SampleRate};
1112
use std::time::Duration;
1213
use tokio::time::timeout;
1314
use tonic::{Response, Streaming};
@@ -397,28 +398,31 @@ impl TransactionStream {
397398
let duration_in_secs = grpc_channel_recv_latency.elapsed().as_secs_f64();
398399
self.fetch_ma.tick_now(num_txns as u64);
399400

400-
info!(
401-
stream_address = self
402-
.transaction_stream_config
403-
.indexer_grpc_data_service_address
404-
.to_string(),
405-
connection_id = self.connection_id,
406-
start_version = start_version,
407-
end_version = end_version,
408-
start_txn_timestamp_iso = start_txn_timestamp
409-
.as_ref()
410-
.map(timestamp_to_iso)
411-
.unwrap_or_default(),
412-
end_txn_timestamp_iso = end_txn_timestamp
413-
.as_ref()
414-
.map(timestamp_to_iso)
415-
.unwrap_or_default(),
416-
num_of_transactions = end_version - start_version + 1,
417-
size_in_bytes = size_in_bytes,
418-
duration_in_secs = duration_in_secs,
419-
tps = self.fetch_ma.avg().ceil() as u64,
420-
bytes_per_sec = size_in_bytes as f64 / duration_in_secs,
421-
"[Transaction Stream] Received transactions from GRPC.",
401+
sample!(
402+
SampleRate::Duration(Duration::from_secs(1)),
403+
info!(
404+
stream_address = self
405+
.transaction_stream_config
406+
.indexer_grpc_data_service_address
407+
.to_string(),
408+
connection_id = self.connection_id,
409+
start_version = start_version,
410+
end_version = end_version,
411+
start_txn_timestamp_iso = start_txn_timestamp
412+
.as_ref()
413+
.map(timestamp_to_iso)
414+
.unwrap_or_default(),
415+
end_txn_timestamp_iso = end_txn_timestamp
416+
.as_ref()
417+
.map(timestamp_to_iso)
418+
.unwrap_or_default(),
419+
num_of_transactions = end_version - start_version + 1,
420+
size_in_bytes = size_in_bytes,
421+
duration_in_secs = duration_in_secs,
422+
tps = self.fetch_ma.avg().ceil() as u64,
423+
bytes_per_sec = size_in_bytes as f64 / duration_in_secs,
424+
"[Transaction Stream] Received transactions from GRPC.",
425+
)
422426
);
423427

424428
if let Some(last_fetched_version) = self.last_fetched_version {

0 commit comments

Comments
 (0)