Skip to content

Commit d9ea13b

Browse files
committed
Send proxy payload through mpsc channel to then get aggregated and flushed in proxy flusher
1 parent 3be23ab commit d9ea13b

File tree

12 files changed

+634
-163
lines changed

12 files changed

+634
-163
lines changed

Cargo.lock

Lines changed: 299 additions & 128 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/datadog-serverless-compat/src/main.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@ use tracing_subscriber::EnvFilter;
1717
use zstd::zstd_safe::CompressionLevel;
1818

1919
use datadog_trace_agent::{
20-
aggregator::TraceAggregator,
20+
trace_aggregator::TraceAggregator,
2121
config, env_verifier, mini_agent, stats_flusher, stats_processor,
2222
trace_flusher::{self, TraceFlusher},
2323
trace_processor,
24+
proxy_aggregator,
25+
proxy_flusher,
2426
};
2527

2628
use datadog_trace_utils::{config_utils::read_cloud_env, trace_utils::EnvironmentType};
@@ -120,13 +122,20 @@ pub async fn main() {
120122
Arc::clone(&config),
121123
));
122124

125+
let proxy_aggregator = Arc::new(TokioMutex::new(proxy_aggregator::ProxyAggregator::default()));
126+
let proxy_flusher = Arc::new(proxy_flusher::ProxyFlusher::new(
127+
proxy_aggregator,
128+
Arc::clone(&config),
129+
));
130+
123131
let mini_agent = Box::new(mini_agent::MiniAgent {
124132
config: Arc::clone(&config),
125133
env_verifier,
126134
trace_processor,
127135
trace_flusher,
128136
stats_processor,
129137
stats_flusher,
138+
proxy_flusher,
130139
});
131140

132141
tokio::spawn(async move {

crates/datadog-trace-agent/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog/", rev =
2323
datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog/", rev = "4eb2b8673354f974591c61bab3f7d485b4c119e0", features = ["mini_agent"] }
2424
datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog/", rev = "4eb2b8673354f974591c61bab3f7d485b4c119e0" }
2525
datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog/", rev = "4eb2b8673354f974591c61bab3f7d485b4c119e0" }
26+
datadog-fips = { path = "../datadog-fips", default-features = false }
2627
reqwest = { version = "0.12.23", features = ["json"] }
2728
bytes = "1.10.1"
2829

crates/datadog-trace-agent/src/config.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::sync::OnceLock;
1111
use datadog_trace_obfuscation::obfuscation_config;
1212
use datadog_trace_utils::config_utils::{
1313
read_cloud_env, trace_intake_url, trace_intake_url_prefixed, trace_stats_url,
14-
trace_stats_url_prefixed,
14+
trace_stats_url_prefixed
1515
};
1616
use datadog_trace_utils::trace_utils;
1717

@@ -86,6 +86,9 @@ pub struct Config {
8686
pub trace_flush_interval: u64,
8787
pub trace_intake: Endpoint,
8888
pub trace_stats_intake: Endpoint,
89+
/// how often to flush proxy requests, in seconds
90+
pub proxy_flush_interval: u64,
91+
pub proxy_intake: Endpoint,
8992
/// timeout for environment verification, in milliseconds
9093
pub verify_env_timeout: u64,
9194
pub proxy_url: Option<String>,
@@ -111,6 +114,7 @@ impl Config {
111114
// trace stats to)
112115
let mut trace_intake_url = trace_intake_url(&dd_site);
113116
let mut trace_stats_intake_url = trace_stats_url(&dd_site);
117+
let proxy_intake_url = format!("https://intake.profile.{}/api/v2/profile", dd_site);
114118

115119
// DD_APM_DD_URL env var will primarily be used for integration tests
116120
// overrides the entire trace/trace stats intake url prefix
@@ -139,6 +143,7 @@ impl Config {
139143
max_request_content_length: 10 * 1024 * 1024, // 10MB in Bytes
140144
trace_flush_interval: 3,
141145
stats_flush_interval: 3,
146+
proxy_flush_interval: 3,
142147
verify_env_timeout: 100,
143148
dd_dogstatsd_port,
144149
dd_site,
@@ -149,6 +154,11 @@ impl Config {
149154
},
150155
trace_stats_intake: Endpoint {
151156
url: hyper::Uri::from_str(&trace_stats_intake_url).unwrap(),
157+
api_key: Some(api_key.clone()),
158+
..Default::default()
159+
},
160+
proxy_intake: Endpoint {
161+
url: hyper::Uri::from_str(&proxy_intake_url).unwrap(),
152162
api_key: Some(api_key),
153163
..Default::default()
154164
},

crates/datadog-trace-agent/src/http_utils.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ use hyper::{
99
};
1010
use serde_json::json;
1111
use tracing::{debug, error};
12+
use datadog_fips::reqwest_adapter::create_reqwest_client_builder;
13+
use core::time::Duration;
14+
use std::error::Error;
1215

1316
/// Does two things:
1417
/// 1. Logs the given message. A success status code (within 200-299) will cause an info log to be
@@ -111,6 +114,19 @@ pub fn verify_request_content_length(
111114
None
112115
}
113116

117+
/// Builds a reqwest client with optional proxy configuration and timeout.
118+
/// Uses FIPS-compliant TLS when the fips feature is enabled.
119+
pub fn build_client(
120+
proxy_url: Option<&str>,
121+
timeout: Duration,
122+
) -> Result<reqwest::Client, Box<dyn Error>> {
123+
let mut builder = create_reqwest_client_builder()?.timeout(timeout);
124+
if let Some(proxy) = proxy_url {
125+
builder = builder.proxy(reqwest::Proxy::all(proxy)?);
126+
}
127+
Ok(builder.build()?)
128+
}
129+
114130
#[cfg(test)]
115131
mod tests {
116132
use ddcommon::hyper_migration;

crates/datadog-trace-agent/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,13 @@
77
#![cfg_attr(not(test), deny(clippy::todo))]
88
#![cfg_attr(not(test), deny(clippy::unimplemented))]
99

10-
pub mod aggregator;
10+
pub mod trace_aggregator;
1111
pub mod config;
1212
pub mod env_verifier;
1313
pub mod http_utils;
1414
pub mod mini_agent;
1515
pub mod proxy_aggregator;
16+
pub mod proxy_flusher;
1617
pub mod stats_flusher;
1718
pub mod stats_processor;
1819
pub mod trace_flusher;

crates/datadog-trace-agent/src/mini_agent.rs

Lines changed: 41 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,11 @@ use std::io;
1010
use std::net::SocketAddr;
1111
use std::sync::Arc;
1212
use std::time::Instant;
13-
use tokio::sync::{
14-
Mutex,
15-
mpsc::{self, Receiver, Sender},
16-
};
13+
use tokio::sync::mpsc::{self, Receiver, Sender};
1714
use tracing::{debug, error};
1815

1916
use crate::http_utils::{self, log_and_create_http_response};
20-
use crate::{config, env_verifier, stats_flusher, stats_processor, trace_flusher, trace_processor, proxy_aggregator::{self, ProxyRequest}};
17+
use crate::{config, env_verifier, stats_flusher, stats_processor, trace_flusher, trace_processor, proxy_aggregator::ProxyRequest, proxy_flusher};
2118
use datadog_trace_protobuf::pb;
2219
use datadog_trace_utils::trace_utils;
2320
use datadog_trace_utils::trace_utils::SendData;
@@ -29,6 +26,7 @@ const INFO_ENDPOINT_PATH: &str = "/info";
2926
const PROFILING_ENDPOINT_PATH: &str = "/profiling/v1/input";
3027
const TRACER_PAYLOAD_CHANNEL_BUFFER_SIZE: usize = 10;
3128
const STATS_PAYLOAD_CHANNEL_BUFFER_SIZE: usize = 10;
29+
const PROXY_PAYLOAD_CHANNEL_BUFFER_SIZE: usize = 10;
3230

3331
pub struct MiniAgent {
3432
pub config: Arc<config::Config>,
@@ -37,7 +35,7 @@ pub struct MiniAgent {
3735
pub stats_processor: Arc<dyn stats_processor::StatsProcessor + Send + Sync>,
3836
pub stats_flusher: Arc<dyn stats_flusher::StatsFlusher + Send + Sync>,
3937
pub env_verifier: Arc<dyn env_verifier::EnvVerifier + Send + Sync>,
40-
pub proxy_aggregator: Arc<Mutex<proxy_aggregator::Aggregator>>,
38+
pub proxy_flusher: Arc<proxy_flusher::ProxyFlusher>,
4139
}
4240

4341
impl MiniAgent {
@@ -89,18 +87,23 @@ impl MiniAgent {
8987
.start_stats_flusher(stats_config, stats_rx)
9088
.await;
9189
});
90+
// channels to send processed profiling requests to our proxy flusher.
91+
let (proxy_tx, proxy_rx): (
92+
Sender<ProxyRequest>,
93+
Receiver<ProxyRequest>,
94+
) = mpsc::channel(PROXY_PAYLOAD_CHANNEL_BUFFER_SIZE);
9295

9396
// start our proxy flusher for profiling requests
94-
let proxy_aggregator_for_flusher = self.proxy_aggregator.clone();
97+
let proxy_flusher = self.proxy_flusher.clone();
9598
tokio::spawn(async move {
96-
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(10));
99+
let proxy_flusher = proxy_flusher.clone();
100+
proxy_flusher.start_proxy_flusher(proxy_rx).await;
97101
});
98102

99103
// setup our hyper http server, where the endpoint_handler handles incoming requests
100104
let trace_processor = self.trace_processor.clone();
101105
let stats_processor = self.stats_processor.clone();
102106
let endpoint_config = self.config.clone();
103-
let proxy_aggregator = self.proxy_aggregator.clone();
104107

105108
let service = service_fn(move |req| {
106109
let trace_processor = trace_processor.clone();
@@ -111,7 +114,8 @@ impl MiniAgent {
111114

112115
let endpoint_config = endpoint_config.clone();
113116
let mini_agent_metadata = Arc::clone(&mini_agent_metadata);
114-
let proxy_aggregator = proxy_aggregator.clone();
117+
118+
let proxy_tx = proxy_tx.clone();
115119

116120
MiniAgent::trace_endpoint_handler(
117121
endpoint_config.clone(),
@@ -121,7 +125,7 @@ impl MiniAgent {
121125
stats_processor.clone(),
122126
stats_tx.clone(),
123127
Arc::clone(&mini_agent_metadata),
124-
proxy_aggregator.clone(),
128+
proxy_tx.clone(),
125129
)
126130
});
127131

@@ -185,7 +189,7 @@ impl MiniAgent {
185189
stats_processor: Arc<dyn stats_processor::StatsProcessor + Send + Sync>,
186190
stats_tx: Sender<pb::ClientStatsPayload>,
187191
mini_agent_metadata: Arc<trace_utils::MiniAgentMetadata>,
188-
proxy_aggregator: Arc<Mutex<proxy_aggregator::Aggregator>>,
192+
proxy_tx: Sender<ProxyRequest>,
189193
) -> http::Result<hyper_migration::HttpResponse> {
190194
match (req.method(), req.uri().path()) {
191195
(&Method::PUT | &Method::POST, TRACE_ENDPOINT_PATH) => {
@@ -210,7 +214,7 @@ impl MiniAgent {
210214
}
211215
}
212216
(&Method::POST, PROFILING_ENDPOINT_PATH) => {
213-
match Self::profiling_proxy_handler(config, req, proxy_aggregator).await {
217+
match Self::profiling_proxy_handler(config, req, proxy_tx).await {
214218
Ok(res) => Ok(res),
215219
Err(err) => log_and_create_http_response(
216220
&format!("Error processing profiling request: {err}"),
@@ -236,8 +240,8 @@ impl MiniAgent {
236240
async fn profiling_proxy_handler(
237241
config: Arc<config::Config>,
238242
request: hyper_migration::HttpRequest,
239-
proxy_aggregator: Arc<Mutex<proxy_aggregator::Aggregator>>,
240-
) -> Result<hyper_migration::HttpResponse, Box<dyn std::error::Error + Send + Sync>> {
243+
proxy_tx: Sender<ProxyRequest>,
244+
) -> http::Result<hyper_migration::HttpResponse> {
241245
debug!("Trace Agent | Proxied request for profiling");
242246

243247
// Extract headers and body
@@ -247,25 +251,37 @@ impl MiniAgent {
247251
config.max_request_content_length,
248252
"Error processing profiling request",
249253
) {
250-
return response.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
254+
return response;
251255
}
252256

253-
let body_bytes = body.collect().await?.to_bytes();
257+
let body_bytes = match body.collect().await {
258+
Ok(collected) => collected.to_bytes(),
259+
Err(e) => {
260+
return log_and_create_http_response(
261+
&format!("Error reading profiling request body: {e}"),
262+
StatusCode::BAD_REQUEST,
263+
);
264+
}
265+
};
254266

255267
// Create proxy request
256268
let proxy_request = ProxyRequest {
257269
headers: parts.headers,
258270
body: body_bytes,
259-
target_url: format!("https://intake.profile.{}/api/v2/profile", config.dd_site),
271+
target_url: config.proxy_intake.url.to_string(),
260272
};
261273

262-
let mut proxy_aggregator = proxy_aggregator.lock().await;
263-
proxy_aggregator.add(proxy_request);
264-
265-
Response::builder()
266-
.status(200)
267-
.body(hyper_migration::Body::from("Acknowledged profiling request"))
268-
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
274+
// Send to channel - flusher will aggregate and send
275+
match proxy_tx.send(proxy_request).await {
276+
Ok(_) => log_and_create_http_response(
277+
"Successfully buffered profiling request to be flushed",
278+
StatusCode::OK,
279+
),
280+
Err(err) => log_and_create_http_response(
281+
&format!("Error sending profiling request to the proxy flusher: {err}"),
282+
StatusCode::INTERNAL_SERVER_ERROR,
283+
),
284+
}
269285
}
270286

271287
fn info_handler(dd_dogstatsd_port: u16) -> http::Result<hyper_migration::HttpResponse> {

crates/datadog-trace-agent/src/proxy_aggregator.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,27 @@
1-
use bytes::Bytes; // TODO: Do we use bytes?
2-
use reqwest::header::HeaderMap; // TODO: Do we use reqwest?
1+
use bytes::Bytes;
2+
use reqwest::header::HeaderMap;
33

4+
#[derive(Clone)]
45
pub struct ProxyRequest {
56
pub headers: HeaderMap,
67
pub body: Bytes,
78
pub target_url: String,
89
}
910

1011
/// Takes in individual proxy requests and aggregates them into batches to be flushed to Datadog.
11-
pub struct Aggregator {
12+
pub struct ProxyAggregator {
1213
queue: Vec<ProxyRequest>,
1314
}
1415

15-
impl Default for Aggregator {
16+
impl Default for ProxyAggregator {
1617
fn default() -> Self {
17-
Aggregator {
18+
ProxyAggregator {
1819
queue: Vec::with_capacity(128), // arbitrary capacity for request queue
1920
}
2021
}
2122
}
2223

23-
impl Aggregator {
24+
impl ProxyAggregator {
2425
/// Takes in an individual proxy request.
2526
pub fn add(&mut self, request: ProxyRequest) {
2627
self.queue.push(request);

0 commit comments

Comments
 (0)