|
15 | 15 |
|
16 | 16 | package software.amazon.opentelemetry.javaagent.providers; |
17 | 17 |
|
| 18 | +import com.fasterxml.jackson.core.JsonProcessingException; |
| 19 | +import com.fasterxml.jackson.core.type.TypeReference; |
| 20 | +import com.fasterxml.jackson.databind.ObjectMapper; |
| 21 | +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; |
18 | 22 | import io.opentelemetry.api.common.Attributes; |
19 | 23 | import io.opentelemetry.api.common.AttributesBuilder; |
20 | | -import io.opentelemetry.contrib.awsxray.AlwaysRecordSampler; |
| 24 | +import io.opentelemetry.contrib.awsxray.AwsXrayAdaptiveSamplingConfig; |
| 25 | +import io.opentelemetry.contrib.awsxray.AwsXrayRemoteSampler; |
21 | 26 | import io.opentelemetry.contrib.awsxray.ResourceHolder; |
22 | 27 | import io.opentelemetry.exporter.otlp.http.logs.OtlpHttpLogRecordExporter; |
23 | 28 | import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter; |
|
42 | 47 | import io.opentelemetry.sdk.trace.SpanProcessor; |
43 | 48 | import io.opentelemetry.sdk.trace.export.SpanExporter; |
44 | 49 | import io.opentelemetry.sdk.trace.samplers.Sampler; |
| 50 | +import java.io.IOException; |
| 51 | +import java.nio.charset.StandardCharsets; |
| 52 | +import java.nio.file.Files; |
| 53 | +import java.nio.file.Path; |
| 54 | +import java.nio.file.Paths; |
45 | 55 | import java.time.Duration; |
46 | 56 | import java.util.ArrayList; |
47 | 57 | import java.util.Arrays; |
@@ -142,11 +152,16 @@ public final class AwsApplicationSignalsCustomizerProvider |
142 | 152 | private static final String OTEL_EXPORTER_OTLP_LOGS_COMPRESSION_CONFIG = |
143 | 153 | "otel.exporter.otlp.logs.compression"; |
144 | 154 |
|
| 155 | + private static final String AWS_XRAY_ADAPTIVE_SAMPLING_CONFIG = |
| 156 | + "aws.xray.adaptive.sampling.config"; |
| 157 | + |
145 | 158 | // UDP packet can be upto 64KB. To limit the packet size, we limit the exported batch size. |
146 | 159 | // This is a bit of a magic number, as there is no simple way to tell how many spans can make a |
147 | 160 | // 64KB batch since spans can vary in size. |
148 | 161 | private static final int LAMBDA_SPAN_EXPORT_BATCH_SIZE = 10; |
149 | 162 |
|
| 163 | + private Sampler sampler; |
| 164 | + |
150 | 165 | public void customize(AutoConfigurationCustomizer autoConfiguration) { |
151 | 166 | autoConfiguration.addPropertiesCustomizer(this::customizeProperties); |
152 | 167 | autoConfiguration.addPropertiesCustomizer(this::customizeLambdaEnvProperties); |
@@ -281,6 +296,27 @@ private Resource customizeResource(Resource resource, ConfigProperties configPro |
281 | 296 | } |
282 | 297 |
|
283 | 298 | private Sampler customizeSampler(Sampler sampler, ConfigProperties configProps) { |
| 299 | + if (sampler instanceof AwsXrayRemoteSampler) { |
| 300 | + String config = configProps.getString(AWS_XRAY_ADAPTIVE_SAMPLING_CONFIG); |
| 301 | + AwsXrayAdaptiveSamplingConfig parsedConfig = null; |
| 302 | + |
| 303 | + try { |
| 304 | + parsedConfig = parseConfigString(config); |
| 305 | + } catch (Exception e) { |
| 306 | + logger.log( |
| 307 | + Level.WARNING, "Failed to parse adaptive sampling configuration: {0}", e.getMessage()); |
| 308 | + } |
| 309 | + |
| 310 | + if (parsedConfig != null) { |
| 311 | + try { |
| 312 | + ((AwsXrayRemoteSampler) sampler).setAdaptiveSamplingConfig(parsedConfig); |
| 313 | + } catch (Exception e) { |
| 314 | + logger.log( |
| 315 | + Level.WARNING, "Error processing adaptive sampling config: {0}", e.getMessage()); |
| 316 | + } |
| 317 | + } |
| 318 | + this.sampler = sampler; |
| 319 | + } |
284 | 320 | if (isApplicationSignalsEnabled(configProps)) { |
285 | 321 | return AlwaysRecordSampler.create(sampler); |
286 | 322 | } |
@@ -344,10 +380,13 @@ private SdkTracerProviderBuilder customizeTracerProviderBuilder( |
344 | 380 | .build(); |
345 | 381 |
|
346 | 382 | // Construct and set application signals metrics processor |
347 | | - SpanProcessor spanMetricsProcessor = |
| 383 | + AwsSpanMetricsProcessorBuilder awsSpanMetricsProcessorBuilder = |
348 | 384 | AwsSpanMetricsProcessorBuilder.create( |
349 | | - meterProvider, ResourceHolder.getResource(), meterProvider::forceFlush) |
350 | | - .build(); |
| 385 | + meterProvider, ResourceHolder.getResource(), meterProvider::forceFlush); |
| 386 | + if (this.sampler != null) { |
| 387 | + awsSpanMetricsProcessorBuilder.setSampler(this.sampler); |
| 388 | + } |
| 389 | + SpanProcessor spanMetricsProcessor = awsSpanMetricsProcessorBuilder.build(); |
351 | 390 | tracerProviderBuilder.addSpanProcessor(spanMetricsProcessor); |
352 | 391 | } |
353 | 392 | return tracerProviderBuilder; |
@@ -423,11 +462,14 @@ SpanExporter customizeSpanExporter(SpanExporter spanExporter, ConfigProperties c |
423 | 462 | } |
424 | 463 |
|
425 | 464 | if (isApplicationSignalsEnabled(configProps)) { |
426 | | - return AwsMetricAttributesSpanExporterBuilder.create( |
427 | | - spanExporter, ResourceHolder.getResource()) |
428 | | - .build(); |
| 465 | + spanExporter = |
| 466 | + AwsMetricAttributesSpanExporterBuilder.create(spanExporter, ResourceHolder.getResource()) |
| 467 | + .build(); |
429 | 468 | } |
430 | 469 |
|
| 470 | + if (this.sampler instanceof AwsXrayRemoteSampler) { |
| 471 | + ((AwsXrayRemoteSampler) this.sampler).setSpanExporter(spanExporter); |
| 472 | + } |
431 | 473 | return spanExporter; |
432 | 474 | } |
433 | 475 |
|
@@ -467,6 +509,44 @@ LogRecordExporter customizeLogsExporter( |
467 | 509 | return logsExporter; |
468 | 510 | } |
469 | 511 |
|
| 512 | + static AwsXrayAdaptiveSamplingConfig parseConfigString(String config) |
| 513 | + throws JsonProcessingException { |
| 514 | + if (config == null) { |
| 515 | + return null; |
| 516 | + } |
| 517 | + |
| 518 | + // Check if the config is a file path and the file exists |
| 519 | + Path path = Paths.get(config); |
| 520 | + if (Files.exists(path)) { |
| 521 | + try { |
| 522 | + config = String.join("\n", Files.readAllLines(path, StandardCharsets.UTF_8)); |
| 523 | + } catch (IOException e) { |
| 524 | + throw new IllegalArgumentException( |
| 525 | + "Failed to read adaptive sampling configuration file: " + e.getMessage(), e); |
| 526 | + } |
| 527 | + } |
| 528 | + |
| 529 | + ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory()); |
| 530 | + Map<String, Object> configMap = |
| 531 | + yamlMapper.readValue(config, new TypeReference<Map<String, Object>>() {}); |
| 532 | + |
| 533 | + Object versionObj = configMap.get("version"); |
| 534 | + if (versionObj == null) { |
| 535 | + throw new IllegalArgumentException( |
| 536 | + "Missing required 'version' field in adaptive sampling configuration"); |
| 537 | + } |
| 538 | + |
| 539 | + double version = ((Number) versionObj).doubleValue(); |
| 540 | + if (version >= 2L) { |
| 541 | + throw new IllegalArgumentException( |
| 542 | + "Incompatible adaptive sampling config version: " |
| 543 | + + version |
| 544 | + + ". This version of the AWS X-Ray remote sampler only supports versions strictly below 2.0."); |
| 545 | + } |
| 546 | + |
| 547 | + return yamlMapper.readValue(config, AwsXrayAdaptiveSamplingConfig.class); |
| 548 | + } |
| 549 | + |
470 | 550 | private enum ApplicationSignalsExporterProvider { |
471 | 551 | INSTANCE; |
472 | 552 |
|
|
0 commit comments