Skip to content

Commit b0b0cb9

Browse files
feat: Add configurable compression level to DogStatsD metrics submission (#35)
* https://datadoghq.atlassian.net/browse/SVLS-7461 feat: Add configurable compression level to DogStatsD metrics submission Add compression_level field to FlusherConfig and DdApi to enable configurable zstd compression for metrics payloads. * https://datadoghq.atlassian.net/browse/SVLS-7461 feat: Add configurable compression level to DogStatsD metrics submission Add compression_level field to FlusherConfig and DdApi to enable configurable zstd compression for metrics payloads.
1 parent abfec75 commit b0b0cb9

File tree

6 files changed

+20
-1
lines changed

6 files changed

+20
-1
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/datadog-serverless-compat/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ tokio-util = { version = "0.7", default-features = false }
1515
tracing = { version = "0.1", default-features = false }
1616
tracing-core = { version = "0.1", default-features = false }
1717
tracing-subscriber = { version = "0.3", default-features = false, features = ["std", "registry", "fmt", "env-filter", "tracing-log"] }
18+
zstd = { version = "0.13.3", default-features = false }
1819

1920
[[bin]]
2021
name = "datadog-serverless-compat"

crates/datadog-serverless-compat/src/main.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use tokio::{
1414
};
1515
use tracing::{debug, error, info};
1616
use tracing_subscriber::EnvFilter;
17+
use zstd::zstd_safe::CompressionLevel;
1718

1819
use datadog_trace_agent::{
1920
aggregator::TraceAggregator,
@@ -215,6 +216,7 @@ async fn start_dogstatsd(
215216
https_proxy,
216217
timeout: DOGSTATSD_TIMEOUT_DURATION,
217218
retry_strategy: RetryStrategy::LinearBackoff(3, 1),
219+
compression_level: CompressionLevel::try_from(6).unwrap_or_default(),
218220
});
219221
Some(metrics_flusher)
220222
}

crates/dogstatsd/src/datadog.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::sync::OnceLock;
1919
use std::time::Duration;
2020
use tracing::{debug, error};
2121
use zstd::stream::write::Encoder;
22+
use zstd::zstd_safe::CompressionLevel;
2223

2324
// TODO: Move to the more ergonomic LazyLock when MSRV is 1.80
2425
static SITE_RE: OnceLock<Regex> = OnceLock::new();
@@ -138,6 +139,7 @@ pub struct DdApi {
138139
metrics_intake_url_prefix: MetricsIntakeUrlPrefix,
139140
client: Option<Client>,
140141
retry_strategy: RetryStrategy,
142+
compression_level: CompressionLevel,
141143
}
142144

143145
impl DdApi {
@@ -148,6 +150,7 @@ impl DdApi {
148150
https_proxy: Option<String>,
149151
timeout: Duration,
150152
retry_strategy: RetryStrategy,
153+
compression_level: CompressionLevel,
151154
) -> Self {
152155
let client = build_client(https_proxy, timeout)
153156
.inspect_err(|e| {
@@ -159,6 +162,7 @@ impl DdApi {
159162
metrics_intake_url_prefix,
160163
client,
161164
retry_strategy,
165+
compression_level,
162166
}
163167
}
164168

@@ -206,7 +210,7 @@ impl DdApi {
206210
let start = std::time::Instant::now();
207211

208212
let result = (|| -> std::io::Result<Vec<u8>> {
209-
let mut encoder = Encoder::new(Vec::new(), 6)?;
213+
let mut encoder = Encoder::new(Vec::new(), self.compression_level)?;
210214
encoder.write_all(&body)?;
211215
encoder.finish()
212216
})();

crates/dogstatsd/src/flusher.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::sync::Arc;
99
use std::time::Duration;
1010
use tokio::sync::OnceCell;
1111
use tracing::{debug, error};
12+
use zstd::zstd_safe::CompressionLevel;
1213

1314
#[derive(Clone)]
1415
pub struct Flusher {
@@ -20,6 +21,7 @@ pub struct Flusher {
2021
retry_strategy: RetryStrategy,
2122
aggregator_handle: AggregatorHandle,
2223
dd_api: OnceCell<Option<DdApi>>,
24+
compression_level: CompressionLevel,
2325
}
2426

2527
pub struct FlusherConfig {
@@ -29,6 +31,7 @@ pub struct FlusherConfig {
2931
pub https_proxy: Option<String>,
3032
pub timeout: Duration,
3133
pub retry_strategy: RetryStrategy,
34+
pub compression_level: CompressionLevel,
3235
}
3336

3437
impl Flusher {
@@ -40,6 +43,7 @@ impl Flusher {
4043
timeout: config.timeout,
4144
retry_strategy: config.retry_strategy,
4245
aggregator_handle: config.aggregator_handle,
46+
compression_level: config.compression_level,
4347
dd_api: OnceCell::new(),
4448
}
4549
}
@@ -55,6 +59,7 @@ impl Flusher {
5559
self.https_proxy.clone(),
5660
self.timeout,
5761
self.retry_strategy.clone(),
62+
self.compression_level.clone(),
5863
)),
5964
None => {
6065
error!("Failed to create dd_api: failed to get API key");

crates/dogstatsd/tests/integration_test.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use tokio::{
1818
time::{sleep, timeout, Duration},
1919
};
2020
use tokio_util::sync::CancellationToken;
21+
use zstd::zstd_safe::CompressionLevel;
2122

2223
#[cfg(test)]
2324
#[tokio::test]
@@ -60,6 +61,8 @@ async fn dogstatsd_server_ships_series() {
6061
https_proxy: None,
6162
timeout: std::time::Duration::from_secs(5),
6263
retry_strategy: RetryStrategy::Immediate(3),
64+
compression_level: CompressionLevel::try_from(6)
65+
.expect("failed to create compression level"),
6366
});
6467

6568
let server_address = "127.0.0.1:18125";
@@ -137,6 +140,7 @@ async fn test_send_with_retry_immediate_failure() {
137140
None,
138141
Duration::from_secs(1),
139142
retry_strategy.clone(),
143+
6,
140144
);
141145

142146
// Create a series using the Aggregator
@@ -192,6 +196,7 @@ async fn test_send_with_retry_linear_backoff_success() {
192196
None,
193197
Duration::from_secs(1),
194198
retry_strategy.clone(),
199+
6,
195200
);
196201

197202
// Create a series using the Aggregator
@@ -246,6 +251,7 @@ async fn test_send_with_retry_immediate_failure_after_one_attempt() {
246251
None,
247252
Duration::from_secs(1),
248253
retry_strategy.clone(),
254+
6,
249255
);
250256

251257
// Create a series using the Aggregator

0 commit comments

Comments
 (0)