Skip to content

Commit 02b5891

Browse files
authored
feat(otlp): support OTLP trace receiver (#654)
# What? Creates an OTLP receiver for traces so customers using OTEL can see traces in Datadog when using this extension. <img width="1339" alt="Screenshot 2025-04-23 at 3 10 55 PM" src="https://github.com/user-attachments/assets/29ca68c8-7ce4-4f43-80d1-04b5060b321f" /> # Motivation Support OTLP and [SVLS-6563](https://datadoghq.atlassian.net/browse/SVLS-6563) # How - Copied most of the `transform.rs` code from https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/transform/transform.go but did not include any v1 references. # Notes - Support is limited to traces and HTTP - Eventually will use some methods from https://github.com/DataDog/dd-trace-rs/blob/main/datadog-opentelemetry/src/span_conversion.rs if possible, as types might differ. [SVLS-6563]: https://datadoghq.atlassian.net/browse/SVLS-6563?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ
1 parent 3b10c17 commit 02b5891

File tree

10 files changed

+2889
-209
lines changed

10 files changed

+2889
-209
lines changed

bottlecap/Cargo.lock

Lines changed: 120 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bottlecap/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,12 @@ base64 = { version = "0.22", default-features = false }
4747
rmp-serde = { version = "1.3.0", default-features = false }
4848
rustls = { version = "0.23.18", default-features = false, features = ["aws-lc-rs"] }
4949
rand = { version = "0.8", default-features = false }
50-
prost = { version = "0.11.6", default-features = false }
50+
prost = { version = "0.13", default-features = false }
5151
zstd = { version = "0.13.3", default-features = false }
5252
futures = { version = "0.3.31", default-features = false }
5353
serde-aux = { version = "4.7", default-features = false }
54+
opentelemetry-proto = { version = "0.29", features = ["trace", "with-serde", "gen-tonic"] }
55+
opentelemetry-semantic-conventions = { version = "0.29", features = ["semconv_experimental"] }
5456

5557
[dev-dependencies]
5658
figment = { version = "0.10", default-features = false, features = ["yaml", "env", "test"] }

bottlecap/LICENSE-3rdparty.yml

Lines changed: 1185 additions & 188 deletions
Large diffs are not rendered by default.

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
#![deny(missing_copy_implementations)]
1010
#![deny(missing_debug_implementations)]
1111

12-
use bottlecap::lwa::proxy::start_lwa_proxy;
1312
use bottlecap::{
1413
base_url,
1514
config::{
@@ -25,16 +24,17 @@ use bottlecap::{
2524
},
2625
logger,
2726
logs::{agent::LogsAgent, flusher::Flusher as LogsFlusher},
27+
lwa::proxy::start_lwa_proxy,
28+
otlp::agent::Agent as OtlpAgent,
2829
secrets::decrypt,
2930
tags::{
3031
lambda::{self, tags::EXTENSION_VERSION},
3132
provider::Provider as TagProvider,
3233
},
3334
telemetry::{
34-
self,
3535
client::TelemetryApiClient,
3636
events::{TelemetryEvent, TelemetryRecord},
37-
listener::TelemetryListener,
37+
listener::{TelemetryListener, TelemetryListenerConfig},
3838
},
3939
traces::{
4040
stats_aggregator::StatsAggregator,
@@ -48,6 +48,7 @@ use bottlecap::{
4848
LAMBDA_RUNTIME_SLUG, TELEMETRY_PORT,
4949
};
5050
use datadog_trace_obfuscation::obfuscation_config;
51+
use datadog_trace_utils::send_data::SendData;
5152
use decrypt::resolve_secrets;
5253
use dogstatsd::{
5354
aggregator::Aggregator as MetricsAggregator,
@@ -72,7 +73,6 @@ use std::{
7273
sync::{Arc, Mutex},
7374
time::{Duration, Instant},
7475
};
75-
use telemetry::listener::TelemetryListenerConfig;
7676
use tokio::{sync::mpsc::Sender, sync::Mutex as TokioMutex};
7777
use tokio_util::sync::CancellationToken;
7878
use tracing::{debug, error};
@@ -326,11 +326,13 @@ async fn extension_loop_active(
326326
Arc::clone(&metrics_aggr),
327327
)));
328328

329+
let trace_aggregator = Arc::new(TokioMutex::new(trace_aggregator::TraceAggregator::default()));
329330
let (trace_agent_channel, trace_flusher, trace_processor, stats_flusher) = start_trace_agent(
330331
config,
331332
resolved_api_key.clone(),
332333
&tags_provider,
333334
Arc::clone(&invocation_processor),
335+
Arc::clone(&trace_aggregator),
334336
);
335337

336338
let lwa_proxy_stopper = start_lwa_proxy(Arc::clone(&invocation_processor));
@@ -351,6 +353,13 @@ async fn extension_loop_active(
351353
let telemetry_listener_cancel_token =
352354
setup_telemetry_client(&r.extension_id, logs_agent_channel).await?;
353355

356+
start_otlp_agent(
357+
config,
358+
tags_provider.clone(),
359+
trace_processor.clone(),
360+
trace_agent_channel.clone(),
361+
);
362+
354363
let mut flush_control = FlushControl::new(config.serverless_flush_strategy);
355364

356365
let mut race_flush_interval = flush_control.get_flush_interval();
@@ -687,6 +696,7 @@ fn start_trace_agent(
687696
resolved_api_key: String,
688697
tags_provider: &Arc<TagProvider>,
689698
invocation_processor: Arc<TokioMutex<InvocationProcessor>>,
699+
trace_aggregator: Arc<TokioMutex<trace_aggregator::TraceAggregator>>,
690700
) -> (
691701
Sender<datadog_trace_utils::send_data::SendData>,
692702
Arc<trace_flusher::ServerlessTraceFlusher>,
@@ -704,7 +714,6 @@ fn start_trace_agent(
704714
let stats_processor = Arc::new(stats_processor::ServerlessStatsProcessor {});
705715

706716
// Traces
707-
let trace_aggregator = Arc::new(TokioMutex::new(trace_aggregator::TraceAggregator::default()));
708717
let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher {
709718
aggregator: trace_aggregator.clone(),
710719
config: Arc::clone(config),
@@ -793,3 +802,22 @@ async fn setup_telemetry_client(
793802
.map_err(|e| Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
794803
Ok(telemetry_listener_cancel_token)
795804
}
805+
806+
fn start_otlp_agent(
807+
config: &Arc<Config>,
808+
tags_provider: Arc<TagProvider>,
809+
trace_processor: Arc<dyn trace_processor::TraceProcessor + Send + Sync>,
810+
trace_tx: Sender<SendData>,
811+
) {
812+
if !config.otlp_config_traces_enabled {
813+
return;
814+
}
815+
816+
let agent = OtlpAgent::new(config.clone(), tags_provider, trace_processor, trace_tx);
817+
818+
tokio::spawn(async move {
819+
if let Err(e) = agent.start().await {
820+
error!("Error starting OTLP agent: {e:?}");
821+
}
822+
});
823+
}

bottlecap/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ pub mod logger;
2626
pub mod logs;
2727
pub mod lwa;
2828
pub mod metrics;
29+
pub mod otlp;
2930
pub mod proc;
3031
pub mod secrets;
3132
pub mod tags;

0 commit comments

Comments
 (0)