|
15 | 15 |
|
16 | 16 | package software.amazon.opentelemetry.javaagent.providers.exporter.aws.metrics; |
17 | 17 |
|
18 | | -import static org.junit.jupiter.api.Assertions.*; |
| 18 | +import static org.junit.jupiter.api.Assertions.assertEquals; |
| 19 | +import static org.junit.jupiter.api.Assertions.assertThrows; |
19 | 20 | import static org.mockito.Mockito.*; |
20 | 21 |
|
21 | | -import io.opentelemetry.api.common.Attributes; |
22 | | -import io.opentelemetry.sdk.metrics.data.DoublePointData; |
23 | | -import io.opentelemetry.sdk.metrics.data.GaugeData; |
24 | | -import io.opentelemetry.sdk.metrics.data.MetricData; |
25 | | -import io.opentelemetry.sdk.metrics.data.SumData; |
26 | 22 | import io.opentelemetry.sdk.metrics.export.MetricExporter; |
27 | | -import io.opentelemetry.sdk.resources.Resource; |
28 | | -import java.util.ArrayList; |
29 | | -import java.util.List; |
30 | | -import java.util.Map; |
31 | | -import org.junit.jupiter.api.BeforeEach; |
| 23 | +import java.util.*; |
| 24 | +import java.util.stream.Stream; |
32 | 25 | import org.junit.jupiter.api.Test; |
33 | | -import org.junit.jupiter.api.extension.ExtendWith; |
34 | | -import org.mockito.junit.jupiter.MockitoExtension; |
35 | | -import org.mockito.junit.jupiter.MockitoSettings; |
36 | | -import org.mockito.quality.Strictness; |
| 26 | +import org.junit.jupiter.params.ParameterizedTest; |
| 27 | +import org.junit.jupiter.params.provider.Arguments; |
| 28 | +import org.junit.jupiter.params.provider.MethodSource; |
37 | 29 | import software.amazon.opentelemetry.javaagent.providers.exporter.aws.common.LogEventEmitter; |
38 | 30 |
|
39 | | -@ExtendWith(MockitoExtension.class) |
40 | | -@MockitoSettings(strictness = Strictness.LENIENT) |
41 | | -public class AwsCloudWatchEmfExporterTest { |
| 31 | +public class AwsCloudWatchEmfExporterTest extends BaseEmfExporterTest { |
42 | 32 | private static final String NAMESPACE = "test-namespace"; |
43 | | - private MetricExporter exporter; |
44 | | - private LogEventEmitter mockEmitter; |
45 | | - private List<Map<String, Object>> capturedLogEvents; |
46 | | - |
47 | | - @BeforeEach |
48 | | - void setup() { |
49 | | - mockEmitter = mock(LogEventEmitter.class); |
50 | | - capturedLogEvents = new ArrayList<>(); |
51 | | - |
52 | | - doAnswer( |
53 | | - invocation -> { |
54 | | - capturedLogEvents.add(invocation.getArgument(0)); |
55 | | - return null; |
56 | | - }) |
57 | | - .when(mockEmitter) |
58 | | - .emit(any()); |
59 | | - |
60 | | - exporter = new AwsCloudWatchEmfExporter(NAMESPACE, mockEmitter); |
| 33 | + |
| 34 | + @Override |
| 35 | + protected MetricExporter createExporter() { |
| 36 | + return new AwsCloudWatchEmfExporter(NAMESPACE, mockEmitter); |
| 37 | + } |
| 38 | + |
| 39 | + @Override |
| 40 | + protected Optional<Map<String, Object>> validateEmfStructure( |
| 41 | + Map<String, Object> logEvent, String metricName) { |
| 42 | + Optional<Map<String, Object>> emfLogOpt = super.validateEmfStructure(logEvent, metricName); |
| 43 | + |
| 44 | + if (emfLogOpt.isPresent()) { |
| 45 | + Map<String, Object> emfLog = emfLogOpt.get(); |
| 46 | + Map<String, Object> aws = (Map<String, Object>) emfLog.get("_aws"); |
| 47 | + List<Map<String, Object>> cloudWatchMetrics = |
| 48 | + (List<Map<String, Object>>) aws.get("CloudWatchMetrics"); |
| 49 | + assertEquals(NAMESPACE, cloudWatchMetrics.get(0).get("Namespace")); |
| 50 | + } |
| 51 | + |
| 52 | + return emfLogOpt; |
61 | 53 | } |
62 | 54 |
|
63 | 55 | @Test |
64 | | - void testExporterCreation() {} |
| 56 | + void testBatchActiveNewBatch() { |
| 57 | + LogEventEmitter mockEmitter = mock(LogEventEmitter.class); |
| 58 | + AwsCloudWatchEmfExporter exporter = new AwsCloudWatchEmfExporter(NAMESPACE, mockEmitter); |
| 59 | + |
| 60 | + long currentTime = System.currentTimeMillis(); |
| 61 | + Map<String, Object> logEvent = new HashMap<>(); |
| 62 | + logEvent.put("message", "test"); |
| 63 | + logEvent.put("timestamp", currentTime); |
| 64 | + |
| 65 | + // Send multiple events with same timestamp - should be batched together |
| 66 | + exporter.emit(logEvent); |
| 67 | + exporter.emit(logEvent); |
| 68 | + exporter.emit(logEvent); |
| 69 | + |
| 70 | + // Should only call emit once per event since batch is active |
| 71 | + verify(mockEmitter, times(3)).emit(logEvent); |
| 72 | + } |
65 | 73 |
|
66 | 74 | @Test |
67 | | - void testGaugeAndSumMetricProcessing() { |
68 | | - MetricData gaugeMetric = createGaugeMetricWithPoint("test.gauge", 42.0); |
69 | | - MetricData sumMetric = createSumMetricWithPoint("test.sum", 100L); |
| 75 | + void testBatchInactiveAfter24Hours() { |
| 76 | + LogEventEmitter mockEmitter = mock(LogEventEmitter.class); |
| 77 | + AwsCloudWatchEmfExporter exporter = new AwsCloudWatchEmfExporter(NAMESPACE, mockEmitter); |
| 78 | + |
| 79 | + long baseTime = System.currentTimeMillis(); |
| 80 | + Map<String, Object> firstEvent = new HashMap<>(); |
| 81 | + firstEvent.put("message", "test1"); |
| 82 | + firstEvent.put("timestamp", baseTime); |
70 | 83 |
|
71 | | - exporter.export(List.of(gaugeMetric, sumMetric)); |
| 84 | + Map<String, Object> secondEvent = new HashMap<>(); |
| 85 | + secondEvent.put("message", "test2"); |
| 86 | + secondEvent.put("timestamp", baseTime + (25L * 60 * 60 * 1000)); // 25 hours later |
72 | 87 |
|
73 | | - // Validate that log events were emitted |
74 | | - assertEquals(2, capturedLogEvents.size()); |
| 88 | + exporter.emit(firstEvent); |
| 89 | + exporter.emit(secondEvent); |
75 | 90 |
|
76 | | - for (Map<String, Object> logEvent : capturedLogEvents) { |
77 | | - assertNotNull(logEvent.get("message")); |
78 | | - assertNotNull(logEvent.get("timestamp")); |
| 91 | + // Should trigger 2 separate batch sends due to 24-hour span limit |
| 92 | + verify(mockEmitter, times(1)).emit(firstEvent); |
| 93 | + verify(mockEmitter, times(1)).emit(secondEvent); |
| 94 | + } |
| 95 | + |
| 96 | + @ParameterizedTest |
| 97 | + @MethodSource("batchLimitScenarios") |
| 98 | + void testEventBatchLimits( |
| 99 | + Map<String, Object> logEvent, int eventCount, boolean shouldExceedLimit) { |
| 100 | + LogEventEmitter mockEmitter = mock(LogEventEmitter.class); |
| 101 | + AwsCloudWatchEmfExporter exporter = new AwsCloudWatchEmfExporter(NAMESPACE, mockEmitter); |
| 102 | + |
| 103 | + for (int i = 0; i < eventCount; i++) { |
| 104 | + exporter.emit(logEvent); |
| 105 | + } |
| 106 | + |
| 107 | + if (shouldExceedLimit) { |
| 108 | + verify(mockEmitter, atLeast(2)).emit(logEvent); |
| 109 | + } else { |
| 110 | + verify(mockEmitter, times(eventCount)).emit(logEvent); |
79 | 111 | } |
80 | 112 | } |
81 | 113 |
|
82 | | - private MetricData createGaugeMetricWithPoint(String name, double value) { |
83 | | - MetricData metricData = mock(MetricData.class); |
84 | | - GaugeData gaugeData = mock(GaugeData.class); |
| 114 | + @ParameterizedTest |
| 115 | + @MethodSource("invalidLogEvents") |
| 116 | + void testValidateLogEventInvalid(Map<String, Object> logEvent) { |
| 117 | + AwsCloudWatchEmfExporter exporter = |
| 118 | + new AwsCloudWatchEmfExporter(NAMESPACE, "test-log-group", "test-stream", "us-east-1"); |
85 | 119 |
|
86 | | - DoublePointData pointData = mock(DoublePointData.class); |
| 120 | + assertThrows(IllegalArgumentException.class, () -> exporter.emit(logEvent)); |
| 121 | + } |
87 | 122 |
|
88 | | - when(metricData.getName()).thenReturn(name); |
89 | | - when(metricData.getUnit()).thenReturn("1"); |
90 | | - when(metricData.getData()).thenReturn(gaugeData); |
91 | | - when(metricData.getResource()).thenReturn(Resource.getDefault()); |
92 | | - when(gaugeData.getPoints()).thenReturn(List.of(pointData)); |
| 123 | + static Stream<Arguments> batchLimitScenarios() { |
| 124 | + Map<String, Object> smallEvent = new HashMap<>(); |
| 125 | + smallEvent.put("message", "test"); |
| 126 | + smallEvent.put("timestamp", System.currentTimeMillis()); |
93 | 127 |
|
94 | | - when(pointData.getValue()).thenReturn(value); |
95 | | - when(pointData.getAttributes()).thenReturn(Attributes.empty()); |
96 | | - when(pointData.getEpochNanos()).thenReturn(System.nanoTime()); |
| 128 | + Map<String, Object> largeEvent = new HashMap<>(); |
| 129 | + largeEvent.put("message", "x".repeat(1024 * 1024)); |
| 130 | + largeEvent.put("timestamp", System.currentTimeMillis()); |
97 | 131 |
|
98 | | - return metricData; |
| 132 | + return Stream.of( |
| 133 | + Arguments.of(smallEvent, 10001, true), // count limit exceeded |
| 134 | + Arguments.of(largeEvent, 2, true), // size limit exceeded |
| 135 | + Arguments.of(smallEvent, 10, false) // within limits |
| 136 | + ); |
99 | 137 | } |
100 | 138 |
|
101 | | - private MetricData createSumMetricWithPoint(String name, long value) { |
102 | | - MetricData metricData = mock(MetricData.class); |
103 | | - SumData sumData = mock(SumData.class); |
104 | | - DoublePointData pointData = mock(DoublePointData.class); |
105 | | - |
106 | | - when(metricData.getName()).thenReturn(name); |
107 | | - when(metricData.getUnit()).thenReturn("1"); |
108 | | - when(metricData.getData()).thenReturn(sumData); |
109 | | - when(metricData.getResource()).thenReturn(Resource.getDefault()); |
110 | | - when(sumData.getPoints()).thenReturn(List.of(pointData)); |
111 | | - when(pointData.getValue()).thenReturn((double) value); |
112 | | - when(pointData.getAttributes()).thenReturn(Attributes.empty()); |
113 | | - when(pointData.getEpochNanos()).thenReturn(System.nanoTime()); |
114 | | - |
115 | | - return metricData; |
| 139 | + static Stream<Arguments> invalidLogEvents() { |
| 140 | + long currentTime = System.currentTimeMillis(); |
| 141 | + Map<String, Object> oldTimestampEvent = new HashMap<>(); |
| 142 | + oldTimestampEvent.put("message", "{\"test\":\"data\"}"); |
| 143 | + oldTimestampEvent.put("timestamp", currentTime - (15L * 24 * 60 * 60 * 1000)); |
| 144 | + |
| 145 | + Map<String, Object> futureTimestampEvent = new HashMap<>(); |
| 146 | + futureTimestampEvent.put("message", "{\"test\":\"data\"}"); |
| 147 | + futureTimestampEvent.put("timestamp", currentTime + (3L * 60 * 60 * 1000)); |
| 148 | + |
| 149 | + Map<String, Object> emptyMessageEvent = new HashMap<>(); |
| 150 | + emptyMessageEvent.put("message", ""); |
| 151 | + emptyMessageEvent.put("timestamp", currentTime); |
| 152 | + |
| 153 | + Map<String, Object> whitespaceMessageEvent = new HashMap<>(); |
| 154 | + whitespaceMessageEvent.put("message", " "); |
| 155 | + whitespaceMessageEvent.put("timestamp", currentTime); |
| 156 | + |
| 157 | + Map<String, Object> missingMessageEvent = new HashMap<>(); |
| 158 | + missingMessageEvent.put("timestamp", currentTime); |
| 159 | + |
| 160 | + return Stream.of( |
| 161 | + Arguments.of(oldTimestampEvent), |
| 162 | + Arguments.of(futureTimestampEvent), |
| 163 | + Arguments.of(emptyMessageEvent), |
| 164 | + Arguments.of(whitespaceMessageEvent), |
| 165 | + Arguments.of(missingMessageEvent)); |
116 | 166 | } |
117 | 167 | } |
0 commit comments