-
Notifications
You must be signed in to change notification settings - Fork 0
[SVLS-7018] Support profiling in Azure Functions #40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 22 commits
3be23ab
d9ea13b
601c893
516bc94
435363c
9502707
a381612
0e45726
420e805
4698d0f
3ec3d73
e63d8ac
638acb3
d529656
574b371
6feb3b1
a8a5a6b
5a8984a
6a64f40
4d2ddc2
6c8a8db
0045627
8522ac0
c774b3b
cca8317
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ | |
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| use ddcommon::hyper_migration; | ||
| use http_body_util::BodyExt; | ||
| use hyper::service::service_fn; | ||
| use hyper::{http, Method, Response, StatusCode}; | ||
| use serde_json::json; | ||
|
|
@@ -13,6 +14,7 @@ use tokio::sync::mpsc::{self, Receiver, Sender}; | |
| use tracing::{debug, error}; | ||
|
|
||
| use crate::http_utils::log_and_create_http_response; | ||
| use crate::proxy_flusher::{ProxyFlusher, ProxyRequest}; | ||
| use crate::{config, env_verifier, stats_flusher, stats_processor, trace_flusher, trace_processor}; | ||
| use datadog_trace_protobuf::pb; | ||
| use datadog_trace_utils::trace_utils; | ||
|
|
@@ -22,8 +24,10 @@ const MINI_AGENT_PORT: usize = 8126; | |
| const TRACE_ENDPOINT_PATH: &str = "/v0.4/traces"; | ||
| const STATS_ENDPOINT_PATH: &str = "/v0.6/stats"; | ||
| const INFO_ENDPOINT_PATH: &str = "/info"; | ||
| const PROFILING_ENDPOINT_PATH: &str = "/profiling/v1/input"; | ||
| const TRACER_PAYLOAD_CHANNEL_BUFFER_SIZE: usize = 10; | ||
| const STATS_PAYLOAD_CHANNEL_BUFFER_SIZE: usize = 10; | ||
| const PROXY_PAYLOAD_CHANNEL_BUFFER_SIZE: usize = 10; | ||
|
|
||
| pub struct MiniAgent { | ||
| pub config: Arc<config::Config>, | ||
|
|
@@ -32,13 +36,14 @@ pub struct MiniAgent { | |
| pub stats_processor: Arc<dyn stats_processor::StatsProcessor + Send + Sync>, | ||
| pub stats_flusher: Arc<dyn stats_flusher::StatsFlusher + Send + Sync>, | ||
| pub env_verifier: Arc<dyn env_verifier::EnvVerifier + Send + Sync>, | ||
| pub proxy_flusher: Arc<ProxyFlusher>, | ||
| } | ||
|
|
||
| impl MiniAgent { | ||
| pub async fn start_mini_agent(&self) -> Result<(), Box<dyn std::error::Error>> { | ||
| let now = Instant::now(); | ||
|
|
||
| // verify we are in a google cloud funtion environment. if not, shut down the mini agent. | ||
| // verify we are in a google cloud function environment. if not, shut down the mini agent. | ||
| let mini_agent_metadata = Arc::new( | ||
| self.env_verifier | ||
| .verify_environment( | ||
|
|
@@ -84,6 +89,17 @@ impl MiniAgent { | |
| .await; | ||
| }); | ||
|
|
||
| // channels to send processed profiling requests to our proxy flusher | ||
| let (proxy_tx, proxy_rx): (Sender<ProxyRequest>, Receiver<ProxyRequest>) = | ||
| mpsc::channel(PROXY_PAYLOAD_CHANNEL_BUFFER_SIZE); | ||
|
|
||
| // start our proxy flusher for profiling requests | ||
| let proxy_flusher = self.proxy_flusher.clone(); | ||
| tokio::spawn(async move { | ||
| let proxy_flusher = proxy_flusher.clone(); | ||
| proxy_flusher.start_proxy_flusher(proxy_rx).await; | ||
| }); | ||
|
|
||
| // setup our hyper http server, where the endpoint_handler handles incoming requests | ||
| let trace_processor = self.trace_processor.clone(); | ||
| let stats_processor = self.stats_processor.clone(); | ||
|
|
@@ -99,6 +115,8 @@ impl MiniAgent { | |
| let endpoint_config = endpoint_config.clone(); | ||
| let mini_agent_metadata = Arc::clone(&mini_agent_metadata); | ||
|
|
||
| let proxy_tx = proxy_tx.clone(); | ||
|
|
||
| MiniAgent::trace_endpoint_handler( | ||
| endpoint_config.clone(), | ||
| req.map(hyper_migration::Body::incoming), | ||
|
|
@@ -107,6 +125,7 @@ impl MiniAgent { | |
| stats_processor.clone(), | ||
| stats_tx.clone(), | ||
| Arc::clone(&mini_agent_metadata), | ||
| proxy_tx.clone(), | ||
| ) | ||
| }); | ||
|
|
||
|
|
@@ -170,6 +189,7 @@ impl MiniAgent { | |
| stats_processor: Arc<dyn stats_processor::StatsProcessor + Send + Sync>, | ||
| stats_tx: Sender<pb::ClientStatsPayload>, | ||
| mini_agent_metadata: Arc<trace_utils::MiniAgentMetadata>, | ||
| proxy_tx: Sender<ProxyRequest>, | ||
| ) -> http::Result<hyper_migration::HttpResponse> { | ||
| match (req.method(), req.uri().path()) { | ||
| (&Method::PUT | &Method::POST, TRACE_ENDPOINT_PATH) => { | ||
|
|
@@ -193,6 +213,15 @@ impl MiniAgent { | |
| ), | ||
| } | ||
| } | ||
| (&Method::POST, PROFILING_ENDPOINT_PATH) => { | ||
| match Self::profiling_proxy_handler(config, req, proxy_tx).await { | ||
| Ok(res) => Ok(res), | ||
| Err(err) => log_and_create_http_response( | ||
| &format!("Error processing profiling request: {err}"), | ||
| StatusCode::INTERNAL_SERVER_ERROR, | ||
| ), | ||
| } | ||
| } | ||
| (_, INFO_ENDPOINT_PATH) => match Self::info_handler(config.dd_dogstatsd_port) { | ||
| Ok(res) => Ok(res), | ||
| Err(err) => log_and_create_http_response( | ||
|
|
@@ -208,13 +237,60 @@ impl MiniAgent { | |
| } | ||
| } | ||
|
|
||
| /// Handles incoming proxy requests for profiling - can be abstracted into a generic proxy handler for other proxy requests in the future | ||
| async fn profiling_proxy_handler( | ||
| config: Arc<config::Config>, | ||
| request: hyper_migration::HttpRequest, | ||
| proxy_tx: Sender<ProxyRequest>, | ||
| ) -> http::Result<hyper_migration::HttpResponse> { | ||
| debug!("Trace Agent | Received profiling request"); | ||
|
|
||
| // Extract headers and body | ||
| let (parts, body) = request.into_parts(); | ||
|
|
||
| let body_bytes = match body.collect().await { | ||
| Ok(collected) => collected.to_bytes(), | ||
| Err(e) => { | ||
| return log_and_create_http_response( | ||
| &format!("Error reading profiling request body: {e}"), | ||
| StatusCode::BAD_REQUEST, | ||
| ); | ||
|
Comment on lines
+240
to
+257
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The new profiling proxy endpoint reads the entire request body into memory ( Useful? React with 👍 / 👎.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The lambda extension also doesn't validate the content-length, probably because profiling payloads are typically much larger and they're coming from the Datadog profiler |
||
| } | ||
| }; | ||
|
|
||
| // Create proxy request | ||
| let proxy_request = ProxyRequest { | ||
| headers: parts.headers, | ||
| body: body_bytes, | ||
| target_url: config.profiling_intake.url.to_string(), | ||
kathiehuang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| }; | ||
|
Comment on lines
+248
to
+266
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The profiling proxy handler builds the outgoing request URL from the static Useful? React with 👍 / 👎.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The lambda extension doesn't preserve query parameters: https://github.com/DataDog/datadog-lambda-extension/blob/main/bottlecap/src/traces/proxy_flusher.rs#L119 |
||
|
|
||
| debug!( | ||
| "Trace Agent | Sending profiling request to channel, target: {}", | ||
| proxy_request.target_url | ||
| ); | ||
|
|
||
| // Send to channel | ||
| match proxy_tx.send(proxy_request).await { | ||
| Ok(_) => log_and_create_http_response( | ||
| "Successfully buffered profiling request to be flushed", | ||
| StatusCode::OK, | ||
| ), | ||
| Err(err) => log_and_create_http_response( | ||
| &format!("Error sending profiling request to the proxy flusher: {err}"), | ||
| StatusCode::INTERNAL_SERVER_ERROR, | ||
| ), | ||
| } | ||
| } | ||
|
|
||
| fn info_handler(dd_dogstatsd_port: u16) -> http::Result<hyper_migration::HttpResponse> { | ||
| let response_json = json!( | ||
| { | ||
| "endpoints": [ | ||
| TRACE_ENDPOINT_PATH, | ||
| STATS_ENDPOINT_PATH, | ||
| INFO_ENDPOINT_PATH | ||
| INFO_ENDPOINT_PATH, | ||
| PROFILING_ENDPOINT_PATH | ||
| ], | ||
| "client_drop_p0s": true, | ||
| "config": { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.