Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,132 changes: 517 additions & 615 deletions Cargo.lock

Large diffs are not rendered by default.

80 changes: 36 additions & 44 deletions LICENSE-3rdparty.csv

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion crates/datadog-serverless-compat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use zstd::zstd_safe::CompressionLevel;

use datadog_trace_agent::{
aggregator::TraceAggregator,
config, env_verifier, mini_agent, stats_flusher, stats_processor,
config, env_verifier, mini_agent, proxy_flusher, stats_flusher, stats_processor,
trace_flusher::{self, TraceFlusher},
trace_processor,
};
Expand Down Expand Up @@ -120,13 +120,16 @@ pub async fn main() {
Arc::clone(&config),
));

let proxy_flusher = Arc::new(proxy_flusher::ProxyFlusher::new(Arc::clone(&config)));

let mini_agent = Box::new(mini_agent::MiniAgent {
config: Arc::clone(&config),
env_verifier,
trace_processor,
trace_flusher,
stats_processor,
stats_flusher,
proxy_flusher,
});

tokio::spawn(async move {
Expand Down
6 changes: 6 additions & 0 deletions crates/datadog-trace-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ async-trait = "0.1.64"
tracing = { version = "0.1", default-features = false }
serde = { version = "1.0.145", features = ["derive"] }
serde_json = "1.0"
datadog-fips = { path = "../datadog-fips", default-features = false }
reqwest = { version = "0.12.23", features = ["json", "http2"], default-features = false }
bytes = "1.10.1"
ddcommon = { git = "https://github.com/DataDog/libdatadog/", rev = "902b6a5296963b96c4faf355f6c53eec263b7568" }
datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog/", rev = "902b6a5296963b96c4faf355f6c53eec263b7568" }
datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog/", rev = "902b6a5296963b96c4faf355f6c53eec263b7568", features = ["mini_agent"] }
Expand All @@ -30,3 +33,6 @@ serial_test = "2.0.0"
duplicate = "0.4.1"
tempfile = "3.3.0"
datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog/", rev = "902b6a5296963b96c4faf355f6c53eec263b7568", features=["test-utils"] }

[features]
default = [ "reqwest/rustls-tls" ]
21 changes: 21 additions & 0 deletions crates/datadog-trace-agent/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ pub struct Config {
pub trace_flush_interval: u64,
pub trace_intake: Endpoint,
pub trace_stats_intake: Endpoint,
/// Profiling intake endpoint (for proxying profiling data to Datadog)
pub profiling_intake: Endpoint,
/// HTTP client timeout for proxy requests, in milliseconds
pub proxy_client_timeout: u64,
/// Individual request timeout for proxy requests, in seconds
pub proxy_request_timeout: u64,
/// Maximum number of retry attempts for failed proxy requests
pub proxy_max_retries: u32,
/// Base backoff duration for proxy retries, in milliseconds
pub proxy_retry_backoff_base_ms: u64,
/// timeout for environment verification, in milliseconds
pub verify_env_timeout: u64,
pub proxy_url: Option<String>,
Expand All @@ -112,6 +122,8 @@ impl Config {
let mut trace_intake_url = trace_intake_url(&dd_site);
let mut trace_stats_intake_url = trace_stats_url(&dd_site);

let profiling_intake_url = format!("https://intake.profile.{}/api/v2/profile", dd_site);

// DD_APM_DD_URL env var will primarily be used for integration tests
// overrides the entire trace/trace stats intake url prefix
if let Ok(endpoint_prefix) = env::var("DD_APM_DD_URL") {
Expand Down Expand Up @@ -139,6 +151,10 @@ impl Config {
max_request_content_length: 10 * 1024 * 1024, // 10MB in Bytes
trace_flush_interval: 3,
stats_flush_interval: 3,
proxy_client_timeout: 30,
proxy_request_timeout: 5,
proxy_max_retries: 3,
proxy_retry_backoff_base_ms: 100,
verify_env_timeout: 100,
dd_dogstatsd_port,
dd_site,
Expand All @@ -149,6 +165,11 @@ impl Config {
},
trace_stats_intake: Endpoint {
url: hyper::Uri::from_str(&trace_stats_intake_url).unwrap(),
api_key: Some(api_key.clone()),
..Default::default()
},
profiling_intake: Endpoint {
url: hyper::Uri::from_str(&profiling_intake_url).unwrap(),
api_key: Some(api_key),
..Default::default()
},
Expand Down
16 changes: 16 additions & 0 deletions crates/datadog-trace-agent/src/http_utils.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use core::time::Duration;
use datadog_fips::reqwest_adapter::create_reqwest_client_builder;
use ddcommon::hyper_migration;
use hyper::{
header,
http::{self, HeaderMap},
Response, StatusCode,
};
use serde_json::json;
use std::error::Error;
use tracing::{debug, error};

/// Does two things:
Expand Down Expand Up @@ -111,6 +114,19 @@ pub fn verify_request_content_length(
None
}

/// Builds a reqwest client with optional proxy configuration and timeout.
/// Uses FIPS-compliant TLS when the fips feature is enabled.
pub fn build_client(
proxy_url: Option<&str>,
timeout: Duration,
) -> Result<reqwest::Client, Box<dyn Error>> {
let mut builder = create_reqwest_client_builder()?.timeout(timeout);
if let Some(proxy) = proxy_url {
builder = builder.proxy(reqwest::Proxy::https(proxy)?);
}
Ok(builder.build()?)
}

#[cfg(test)]
mod tests {
use ddcommon::hyper_migration;
Expand Down
1 change: 1 addition & 0 deletions crates/datadog-trace-agent/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub mod config;
pub mod env_verifier;
pub mod http_utils;
pub mod mini_agent;
pub mod proxy_flusher;
pub mod stats_flusher;
pub mod stats_processor;
pub mod trace_flusher;
Expand Down
80 changes: 78 additions & 2 deletions crates/datadog-trace-agent/src/mini_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use ddcommon::hyper_migration;
use http_body_util::BodyExt;
use hyper::service::service_fn;
use hyper::{http, Method, Response, StatusCode};
use serde_json::json;
Expand All @@ -13,6 +14,7 @@ use tokio::sync::mpsc::{self, Receiver, Sender};
use tracing::{debug, error};

use crate::http_utils::log_and_create_http_response;
use crate::proxy_flusher::{ProxyFlusher, ProxyRequest};
use crate::{config, env_verifier, stats_flusher, stats_processor, trace_flusher, trace_processor};
use datadog_trace_protobuf::pb;
use datadog_trace_utils::trace_utils;
Expand All @@ -22,8 +24,10 @@ const MINI_AGENT_PORT: usize = 8126;
const TRACE_ENDPOINT_PATH: &str = "/v0.4/traces";
const STATS_ENDPOINT_PATH: &str = "/v0.6/stats";
const INFO_ENDPOINT_PATH: &str = "/info";
const PROFILING_ENDPOINT_PATH: &str = "/profiling/v1/input";
const TRACER_PAYLOAD_CHANNEL_BUFFER_SIZE: usize = 10;
const STATS_PAYLOAD_CHANNEL_BUFFER_SIZE: usize = 10;
const PROXY_PAYLOAD_CHANNEL_BUFFER_SIZE: usize = 10;

pub struct MiniAgent {
pub config: Arc<config::Config>,
Expand All @@ -32,13 +36,14 @@ pub struct MiniAgent {
pub stats_processor: Arc<dyn stats_processor::StatsProcessor + Send + Sync>,
pub stats_flusher: Arc<dyn stats_flusher::StatsFlusher + Send + Sync>,
pub env_verifier: Arc<dyn env_verifier::EnvVerifier + Send + Sync>,
pub proxy_flusher: Arc<ProxyFlusher>,
}

impl MiniAgent {
pub async fn start_mini_agent(&self) -> Result<(), Box<dyn std::error::Error>> {
let now = Instant::now();

// verify we are in a google cloud funtion environment. if not, shut down the mini agent.
// verify we are in a google cloud function environment. if not, shut down the mini agent.
let mini_agent_metadata = Arc::new(
self.env_verifier
.verify_environment(
Expand Down Expand Up @@ -84,6 +89,17 @@ impl MiniAgent {
.await;
});

// channels to send processed profiling requests to our proxy flusher
let (proxy_tx, proxy_rx): (Sender<ProxyRequest>, Receiver<ProxyRequest>) =
mpsc::channel(PROXY_PAYLOAD_CHANNEL_BUFFER_SIZE);

// start our proxy flusher for profiling requests
let proxy_flusher = self.proxy_flusher.clone();
tokio::spawn(async move {
let proxy_flusher = proxy_flusher.clone();
proxy_flusher.start_proxy_flusher(proxy_rx).await;
});

// setup our hyper http server, where the endpoint_handler handles incoming requests
let trace_processor = self.trace_processor.clone();
let stats_processor = self.stats_processor.clone();
Expand All @@ -99,6 +115,8 @@ impl MiniAgent {
let endpoint_config = endpoint_config.clone();
let mini_agent_metadata = Arc::clone(&mini_agent_metadata);

let proxy_tx = proxy_tx.clone();

MiniAgent::trace_endpoint_handler(
endpoint_config.clone(),
req.map(hyper_migration::Body::incoming),
Expand All @@ -107,6 +125,7 @@ impl MiniAgent {
stats_processor.clone(),
stats_tx.clone(),
Arc::clone(&mini_agent_metadata),
proxy_tx.clone(),
)
});

Expand Down Expand Up @@ -170,6 +189,7 @@ impl MiniAgent {
stats_processor: Arc<dyn stats_processor::StatsProcessor + Send + Sync>,
stats_tx: Sender<pb::ClientStatsPayload>,
mini_agent_metadata: Arc<trace_utils::MiniAgentMetadata>,
proxy_tx: Sender<ProxyRequest>,
) -> http::Result<hyper_migration::HttpResponse> {
match (req.method(), req.uri().path()) {
(&Method::PUT | &Method::POST, TRACE_ENDPOINT_PATH) => {
Expand All @@ -193,6 +213,15 @@ impl MiniAgent {
),
}
}
(&Method::POST, PROFILING_ENDPOINT_PATH) => {
match Self::profiling_proxy_handler(config, req, proxy_tx).await {
Ok(res) => Ok(res),
Err(err) => log_and_create_http_response(
&format!("Error processing profiling request: {err}"),
StatusCode::INTERNAL_SERVER_ERROR,
),
}
}
(_, INFO_ENDPOINT_PATH) => match Self::info_handler(config.dd_dogstatsd_port) {
Ok(res) => Ok(res),
Err(err) => log_and_create_http_response(
Expand All @@ -208,13 +237,60 @@ impl MiniAgent {
}
}

/// Handles incoming proxy requests for profiling - can be abstracted into a generic proxy handler for other proxy requests in the future
async fn profiling_proxy_handler(
config: Arc<config::Config>,
request: hyper_migration::HttpRequest,
proxy_tx: Sender<ProxyRequest>,
) -> http::Result<hyper_migration::HttpResponse> {
debug!("Trace Agent | Received profiling request");

// Extract headers and body
let (parts, body) = request.into_parts();

let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(e) => {
return log_and_create_http_response(
&format!("Error reading profiling request body: {e}"),
StatusCode::BAD_REQUEST,
);
Comment on lines +240 to +257

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Enforce profiling payload size before buffering

The new profiling proxy endpoint reads the entire request body into memory (body.collect().await) without validating the Content-Length. The trace and stats handlers call verify_request_content_length to reject payloads larger than Config::max_request_content_length before allocation, but the profiling handler omits this guard. A client can therefore POST an arbitrarily large profiling payload and the mini agent will buffer it in RAM before forwarding, allowing a single request to exhaust memory and crash the function. Consider applying the same content length check as the other endpoints before collecting the body.

Useful? React with 👍 / 👎.

Copy link
Contributor Author

@kathiehuang kathiehuang Oct 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lambda extension also doesn't validate the content-length, probably because profiling payloads are typically much larger and they're coming from the Datadog profiler

}
};

// Create proxy request
let proxy_request = ProxyRequest {
headers: parts.headers,
body: body_bytes,
target_url: config.profiling_intake.url.to_string(),
};
Comment on lines +248 to +266

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Forward profiling query string to Datadog

The profiling proxy handler builds the outgoing request URL from the static profiling_intake endpoint and ignores parts.uri.query(). Profiling clients pass service, environment, and other metadata via the query string when posting to /profiling/v1/input; dropping these parameters means the flusher forwards payloads without required metadata, causing ingestion failures or misattribution. Append the incoming query parameters to ProxyRequest.target_url before sending to the proxy flusher.

Useful? React with 👍 / 👎.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


debug!(
"Trace Agent | Sending profiling request to channel, target: {}",
proxy_request.target_url
);

// Send to channel
match proxy_tx.send(proxy_request).await {
Ok(_) => log_and_create_http_response(
"Successfully buffered profiling request to be flushed",
StatusCode::OK,
),
Err(err) => log_and_create_http_response(
&format!("Error sending profiling request to the proxy flusher: {err}"),
StatusCode::INTERNAL_SERVER_ERROR,
),
}
}

fn info_handler(dd_dogstatsd_port: u16) -> http::Result<hyper_migration::HttpResponse> {
let response_json = json!(
{
"endpoints": [
TRACE_ENDPOINT_PATH,
STATS_ENDPOINT_PATH,
INFO_ENDPOINT_PATH
INFO_ENDPOINT_PATH,
PROFILING_ENDPOINT_PATH
],
"client_drop_p0s": true,
"config": {
Expand Down
Loading