Skip to content

Commit aab4546

Browse files
authored
Centralize metrics submission in the forwarder (#754)
* Centralize metrics submission in the forwarder * Add a check on lambda_stats import
1 parent 0c48e01 commit aab4546

File tree

12 files changed

+56
-73
lines changed

12 files changed

+56
-73
lines changed

aws/logs_monitoring/caching/base_tags_cache.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010
DD_TAGS_CACHE_TTL_SECONDS,
1111
DD_S3_CACHE_LOCK_TTL_SECONDS,
1212
)
13-
from caching.common import get_last_modified_time, send_forwarder_internal_metrics
13+
from caching.common import get_last_modified_time
14+
from telemetry import send_forwarder_internal_metrics
1415

1516
JITTER_MIN = 1
1617
JITTER_MAX = 100

aws/logs_monitoring/caching/cloudwatch_log_group_cache.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import os
22
import boto3
33
from caching.base_tags_cache import BaseTagsCache
4-
from caching.common import sanitize_aws_tag_string, send_forwarder_internal_metrics
4+
from caching.common import sanitize_aws_tag_string
5+
from telemetry import send_forwarder_internal_metrics
56
from settings import (
67
DD_S3_LOG_GROUP_CACHE_FILENAME,
78
DD_S3_LOG_GROUP_CACHE_LOCK_FILENAME,

aws/logs_monitoring/caching/common.py

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,40 +3,17 @@
33
import logging
44
import re
55
from collections import defaultdict
6-
from telemetry import (
7-
DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX,
8-
get_forwarder_telemetry_tags,
9-
)
106

117
logger = logging.getLogger()
128
logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper()))
139

14-
try:
15-
from datadog_lambda.metric import lambda_stats
16-
17-
DD_SUBMIT_ENHANCED_METRICS = True
18-
except ImportError:
19-
logger.debug(
20-
"Could not import from the Datadog Lambda layer so enhanced metrics won't be submitted. "
21-
"Add the Datadog Lambda layer to this function to submit enhanced metrics."
22-
)
23-
DD_SUBMIT_ENHANCED_METRICS = False
2410

2511
_other_chars = r"\w:\-\.\/"
2612
Sanitize = re.compile(r"[^%s]" % _other_chars, re.UNICODE).sub
2713
Dedupe = re.compile(r"_+", re.UNICODE).sub
2814
FixInit = re.compile(r"^[_\d]*", re.UNICODE).sub
2915

3016

31-
def send_forwarder_internal_metrics(name, additional_tags=[]):
32-
"""Send forwarder's internal metrics to DD"""
33-
lambda_stats.distribution(
34-
"{}.{}".format(DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX, name),
35-
1,
36-
tags=get_forwarder_telemetry_tags() + additional_tags,
37-
)
38-
39-
4017
def get_last_modified_time(s3_file):
4118
last_modified_str = s3_file["ResponseMetadata"]["HTTPHeaders"]["last-modified"]
4219
last_modified_date = datetime.datetime.strptime(

aws/logs_monitoring/caching/lambda_cache.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
import os
22
from botocore.exceptions import ClientError
33
from caching.base_tags_cache import BaseTagsCache
4-
from caching.common import (
5-
send_forwarder_internal_metrics,
6-
parse_get_resources_response_for_tags_by_arn,
7-
)
4+
from caching.common import parse_get_resources_response_for_tags_by_arn
5+
from telemetry import send_forwarder_internal_metrics
86
from settings import (
97
DD_S3_CACHE_FILENAME,
108
DD_S3_CACHE_LOCK_FILENAME,

aws/logs_monitoring/caching/s3_tags_cache.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
from botocore.exceptions import ClientError
22
from caching.base_tags_cache import BaseTagsCache
3-
from caching.common import (
4-
send_forwarder_internal_metrics,
5-
parse_get_resources_response_for_tags_by_arn,
6-
)
3+
from caching.common import parse_get_resources_response_for_tags_by_arn
4+
from telemetry import send_forwarder_internal_metrics
75
from settings import (
86
DD_S3_TAGS_CACHE_FILENAME,
97
DD_S3_TAGS_CACHE_LOCK_FILENAME,

aws/logs_monitoring/caching/step_functions_cache.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
from caching.common import (
55
sanitize_aws_tag_string,
66
parse_get_resources_response_for_tags_by_arn,
7-
send_forwarder_internal_metrics,
87
)
8+
from telemetry import send_forwarder_internal_metrics
99
from settings import (
1010
DD_S3_STEP_FUNCTIONS_CACHE_FILENAME,
1111
DD_S3_STEP_FUNCTIONS_CACHE_LOCK_FILENAME,

aws/logs_monitoring/forwarders.py

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,7 @@
22
import json
33
import os
44

5-
from telemetry import (
6-
DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX,
7-
get_forwarder_telemetry_tags,
8-
)
9-
from datadog_lambda.metric import lambda_stats
5+
from telemetry import send_event_metric, send_log_metric
106
from trace_forwarder.connection import TraceConnection
117
from logs.logs import (
128
DatadogScrubber,
@@ -65,11 +61,7 @@ def forward_logs(logs):
6561
if logger.isEnabledFor(logging.DEBUG):
6662
logger.debug(f"Forwarded log batch: {json.dumps(batch)}")
6763

68-
lambda_stats.distribution(
69-
"{}.logs_forwarded".format(DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX),
70-
len(logs_to_forward),
71-
tags=get_forwarder_telemetry_tags(),
72-
)
64+
send_event_metric("logs_forwarded", len(logs_to_forward))
7365

7466

7567
def forward_metrics(metrics):
@@ -82,20 +74,14 @@ def forward_metrics(metrics):
8274

8375
for metric in metrics:
8476
try:
85-
lambda_stats.distribution(
86-
metric["m"], metric["v"], timestamp=metric["e"], tags=metric["t"]
87-
)
77+
send_log_metric(metric)
8878
except Exception:
8979
logger.exception(f"Exception while forwarding metric {json.dumps(metric)}")
9080
else:
9181
if logger.isEnabledFor(logging.DEBUG):
9282
logger.debug(f"Forwarded metric: {json.dumps(metric)}")
9383

94-
lambda_stats.distribution(
95-
"{}.metrics_forwarded".format(DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX),
96-
len(metrics),
97-
tags=get_forwarder_telemetry_tags(),
98-
)
84+
send_event_metric("metrics_forwarded", len(metrics))
9985

10086

10187
def forward_traces(trace_payloads):
@@ -112,8 +98,4 @@ def forward_traces(trace_payloads):
11298
if logger.isEnabledFor(logging.DEBUG):
11399
logger.debug(f"Forwarded traces: {json.dumps(trace_payloads)}")
114100

115-
lambda_stats.distribution(
116-
"{}.traces_forwarded".format(DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX),
117-
len(trace_payloads),
118-
tags=get_forwarder_telemetry_tags(),
119-
)
101+
send_event_metric("traces_forwarded", len(trace_payloads))

aws/logs_monitoring/steps/parsing.py

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,7 @@
77
import os
88
import itertools
99
import logging
10-
from datadog_lambda.metric import lambda_stats
11-
from telemetry import (
12-
DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX,
13-
get_forwarder_telemetry_tags,
14-
set_forwarder_telemetry_tags,
15-
)
10+
from telemetry import set_forwarder_telemetry_tags, send_event_metric
1611
from steps.handlers.awslogs_handler import awslogs_handler
1712
from steps.handlers.s3_handler import s3_handler
1813
from steps.common import (
@@ -182,10 +177,6 @@ def normalize_events(events, metadata):
182177
continue
183178

184179
"""Submit count of total events"""
185-
lambda_stats.distribution(
186-
"{}.incoming_events".format(DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX),
187-
events_counter,
188-
tags=get_forwarder_telemetry_tags(),
189-
)
180+
send_event_metric("incoming_events", events_counter)
190181

191182
return normalized

aws/logs_monitoring/telemetry.py

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,13 @@
33
# This product includes software developed at Datadog (https://www.datadoghq.com/).
44
# Copyright 2021 Datadog, Inc.
55

6+
try:
7+
from datadog_lambda.metric import lambda_stats
8+
9+
DD_SUBMIT_ENHANCED_METRICS = True
10+
except ImportError:
11+
DD_SUBMIT_ENHANCED_METRICS = False
12+
613
from settings import DD_FORWARDER_VERSION
714

815
DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX = "aws.dd_forwarder"
@@ -22,5 +29,33 @@ def set_forwarder_telemetry_tags(context, event_type):
2229
]
2330

2431

25-
def get_forwarder_telemetry_tags():
26-
return DD_FORWARDER_TELEMETRY_TAGS
32+
def send_forwarder_internal_metrics(name, additional_tags=[]):
33+
if not DD_SUBMIT_ENHANCED_METRICS:
34+
return
35+
36+
"""Send forwarder's internal metrics to DD"""
37+
lambda_stats.distribution(
38+
"{}.{}".format(DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX, name),
39+
1,
40+
tags=DD_FORWARDER_TELEMETRY_TAGS + additional_tags,
41+
)
42+
43+
44+
def send_event_metric(metric_name, metric_value):
45+
if not DD_SUBMIT_ENHANCED_METRICS:
46+
return
47+
48+
lambda_stats.distribution(
49+
"{}.{}".format(DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX, metric_name),
50+
metric_value,
51+
tags=DD_FORWARDER_TELEMETRY_TAGS,
52+
)
53+
54+
55+
def send_log_metric(metric):
56+
if not DD_SUBMIT_ENHANCED_METRICS:
57+
return
58+
59+
lambda_stats.distribution(
60+
metric["m"], metric["v"], timestamp=metric["e"], tags=metric["t"]
61+
)

aws/logs_monitoring/tests/test_awslogs_handler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
from unittest.mock import patch, MagicMock
88
from approvaltests.approvals import verify_as_json
99
from approvaltests.namer import NamerFactory
10-
from caching.cache_layer import CacheLayer
1110

1211
sys.modules["trace_forwarder.connection"] = MagicMock()
1312
sys.modules["datadog_lambda.wrapper"] = MagicMock()
@@ -30,6 +29,7 @@
3029
get_lower_cased_lambda_function_name,
3130
)
3231
from steps.handlers.aws_attributes import AwsAttributes
32+
from caching.cache_layer import CacheLayer
3333

3434
env_patch.stop()
3535

0 commit comments

Comments
 (0)