Skip to content

Commit b54ce7f

Browse files
ge0Ajatyrcho
andauthored
Add a prefix to tags cache file names (#751)
* Add a prefix for tags cache files Use lambda function name as a prefix when creating tags cache files accessed by the forwarder. This will allow to use the same S3 bucket to store cache files from different forwarderst push * Update readme file to include a section about troubleshooting s3 triggers creation * Update aws/logs_monitoring/README.md Co-authored-by: Michel Daviot <michel@daviot.info> * Add cache prefix on init * Updat readme * update comment --------- Co-authored-by: Michel Daviot <michel@daviot.info>
1 parent 4ed92d9 commit b54ce7f

26 files changed

+571
-679
lines changed

aws/logs_monitoring/README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,10 @@ If you can't install the Forwarder using the provided CloudFormation template, y
138138

139139
If you encounter issues upgrading to the latest version, check the Troubleshooting section.
140140

141+
### Upgrade an older version to +3.106.0
142+
Starting version 3.106.0 Lambda function has been updated to add a prefix to cache filenames stored in the S3 bucket configured in `DD_S3_BUCKET_NAME`.
143+
This allows to use the same bucket to store cache files from several functions.
144+
141145
### Upgrade an older version to +3.99.0
142146

143147
Since version 3.99.0 the Lambda function has been updated to require **Python 3.11**. If upgrading an older forwarder installation to +3.99.0 or above, ensure the AWS Lambda function is configured to use Python 3.11
@@ -204,6 +208,14 @@ If you still couldn't figure out, please create a ticket for [Datadog Support][1
204208
### JSON-formatted logs are not appearing in Datadog
205209
If your logs contain an attribute that Datadog parses as a timestamp, you need to make sure that the timestamp is both current and in the correct format. See [Log Date Remapper][24] to learn about which attributes are parsed as timestamps and how to make sure that the timestamp is valid.
206210

211+
### Issue creating S3 triggers
212+
In case you encounter the following error when creating S3 triggers, we recommend considering following a fanout architecture proposed by AWS [in this article](https://aws.amazon.com/blogs/compute/fanout-s3-event-notifications-to-multiple-endpoints/)
213+
214+
```
215+
An error occurred when creating the trigger: Configuration is ambiguously defined. Cannot have overlapping suffixes in two rules if the prefixes are overlapping for the same event type.
216+
```
217+
218+
207219
## Contributing
208220

209221
We love pull requests. Here's a quick guide.

aws/logs_monitoring/caching/base_tags_cache.py

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,41 +14,55 @@
1414

1515
JITTER_MIN = 1
1616
JITTER_MAX = 100
17-
1817
DD_TAGS_CACHE_TTL_SECONDS = DD_TAGS_CACHE_TTL_SECONDS + randint(JITTER_MIN, JITTER_MAX)
19-
s3_client = boto3.resource("s3")
20-
logger = logging.getLogger()
21-
logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper()))
2218

2319

2420
class BaseTagsCache(object):
25-
CACHE_FILENAME = None
26-
CACHE_LOCK_FILENAME = None
27-
28-
def get_resources_paginator(self):
29-
return self.resource_tagging_client.get_paginator("get_resources")
30-
31-
def __init__(self, tags_ttl_seconds=DD_TAGS_CACHE_TTL_SECONDS):
21+
def __init__(
22+
self,
23+
prefix,
24+
cache_filename,
25+
cache_lock_filename,
26+
tags_ttl_seconds=DD_TAGS_CACHE_TTL_SECONDS,
27+
):
3228
self.tags_ttl_seconds = tags_ttl_seconds
3329
self.tags_by_id = {}
3430
self.last_tags_fetch_time = 0
35-
self.logger = logger
31+
self.cache_prefix = prefix
32+
self.cache_filename = cache_filename
33+
self.cache_lock_filename = cache_lock_filename
34+
self.logger = logging.getLogger()
35+
self.logger.setLevel(
36+
logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper())
37+
)
3638
self.resource_tagging_client = boto3.client("resourcegroupstaggingapi")
39+
self.s3_client = boto3.resource("s3")
40+
41+
def get_resources_paginator(self):
42+
return self.resource_tagging_client.get_paginator("get_resources")
43+
44+
def get_cache_name_with_prefix(self):
45+
return f"{self.cache_prefix}_{self.cache_filename}"
46+
47+
def get_cache_lock_with_prefix(self):
48+
return f"{self.cache_prefix}_{self.cache_lock_filename}"
3749

3850
def write_cache_to_s3(self, data):
3951
"""Writes tags cache to s3"""
4052
try:
4153
self.logger.debug("Trying to write data to s3: {}".format(data))
42-
s3_object = s3_client.Object(DD_S3_BUCKET_NAME, self.CACHE_FILENAME)
54+
s3_object = self.s3_client.Object(
55+
DD_S3_BUCKET_NAME, self.get_cache_name_with_prefix()
56+
)
4357
s3_object.put(Body=(bytes(json.dumps(data).encode("UTF-8"))))
4458
except ClientError:
4559
send_forwarder_internal_metrics("s3_cache_write_failure")
4660
self.logger.debug("Unable to write new cache to S3", exc_info=True)
4761

4862
def acquire_s3_cache_lock(self):
4963
"""Acquire cache lock"""
50-
cache_lock_object = s3_client.Object(
51-
DD_S3_BUCKET_NAME, self.CACHE_LOCK_FILENAME
64+
cache_lock_object = self.s3_client.Object(
65+
DD_S3_BUCKET_NAME, self.get_cache_lock_with_prefix()
5266
)
5367
try:
5468
file_content = cache_lock_object.get()
@@ -74,8 +88,8 @@ def acquire_s3_cache_lock(self):
7488
def release_s3_cache_lock(self):
7589
"""Release cache lock"""
7690
try:
77-
cache_lock_object = s3_client.Object(
78-
DD_S3_BUCKET_NAME, self.CACHE_LOCK_FILENAME
91+
cache_lock_object = self.s3_client.Object(
92+
DD_S3_BUCKET_NAME, self.get_cache_lock_with_prefix()
7993
)
8094
cache_lock_object.delete()
8195
send_forwarder_internal_metrics("s3_cache_lock_released")
@@ -87,7 +101,9 @@ def release_s3_cache_lock(self):
87101
def get_cache_from_s3(self):
88102
"""Retrieves tags cache from s3 and returns the body along with
89103
the last modified datetime for the cache"""
90-
cache_object = s3_client.Object(DD_S3_BUCKET_NAME, self.CACHE_FILENAME)
104+
cache_object = self.s3_client.Object(
105+
DD_S3_BUCKET_NAME, self.get_cache_name_with_prefix()
106+
)
91107
try:
92108
file_content = cache_object.get()
93109
tags_cache = json.loads(file_content["Body"].read().decode("utf-8"))
@@ -109,7 +125,7 @@ def _refresh(self):
109125
if not self.should_fetch_tags():
110126
self.logger.debug(
111127
"Not fetching custom tags because the env variable for the cache {} is not set to true".format(
112-
self.CACHE_FILENAME
128+
self.cache_filename
113129
)
114130
)
115131
return
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from caching.cloudwatch_log_group_cache import CloudwatchLogGroupTagsCache
2+
from caching.step_functions_cache import StepFunctionsTagsCache
3+
from caching.s3_tags_cache import S3TagsCache
4+
from caching.lambda_cache import LambdaTagsCache
5+
6+
7+
class CacheLayer:
8+
def __init__(self, prefix):
9+
self._cloudwatch_log_group_cache = CloudwatchLogGroupTagsCache(prefix)
10+
self._s3_tags_cache = S3TagsCache(prefix)
11+
self._step_functions_cache = StepFunctionsTagsCache(prefix)
12+
self._lambda_cache = LambdaTagsCache(prefix)
13+
14+
def get_cloudwatch_log_group_tags_cache(self):
15+
return self._cloudwatch_log_group_cache
16+
17+
def get_s3_tags_cache(self):
18+
return self._s3_tags_cache
19+
20+
def get_step_functions_tags_cache(self):
21+
return self._step_functions_cache
22+
23+
def get_lambda_tags_cache(self):
24+
return self._lambda_cache

aws/logs_monitoring/caching/cloudwatch_log_group_cache.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@
99

1010

1111
class CloudwatchLogGroupTagsCache(BaseTagsCache):
12-
CACHE_FILENAME = DD_S3_LOG_GROUP_CACHE_FILENAME
13-
CACHE_LOCK_FILENAME = DD_S3_LOG_GROUP_CACHE_LOCK_FILENAME
12+
def __init__(self, prefix):
13+
super().__init__(
14+
prefix, DD_S3_LOG_GROUP_CACHE_FILENAME, DD_S3_LOG_GROUP_CACHE_LOCK_FILENAME
15+
)
16+
self.cloudwatch_logs_client = boto3.client("logs")
1417

1518
def should_fetch_tags(self):
1619
return os.environ.get("DD_FETCH_LOG_GROUP_TAGS", "false").lower() == "true"
@@ -69,11 +72,10 @@ def get(self, log_group):
6972
return log_group_tags
7073

7174
def _get_log_group_tags(self, log_group):
72-
cloudwatch_logs_client = boto3.client("logs")
7375
response = None
7476
try:
7577
send_forwarder_internal_metrics("list_tags_log_group_api_call")
76-
response = cloudwatch_logs_client.list_tags_log_group(
78+
response = self.cloudwatch_logs_client.list_tags_log_group(
7779
logGroupName=log_group
7880
)
7981
except Exception as e:

aws/logs_monitoring/caching/lambda_cache.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import os
22
from botocore.exceptions import ClientError
3-
43
from caching.base_tags_cache import BaseTagsCache
54
from caching.common import (
65
send_forwarder_internal_metrics,
@@ -14,8 +13,8 @@
1413

1514

1615
class LambdaTagsCache(BaseTagsCache):
17-
CACHE_FILENAME = DD_S3_CACHE_FILENAME
18-
CACHE_LOCK_FILENAME = DD_S3_CACHE_LOCK_FILENAME
16+
def __init__(self, prefix):
17+
super().__init__(prefix, DD_S3_CACHE_FILENAME, DD_S3_CACHE_LOCK_FILENAME)
1918

2019
def should_fetch_tags(self):
2120
return os.environ.get("DD_FETCH_LAMBDA_TAGS", "false").lower() == "true"

aws/logs_monitoring/caching/s3_tags_cache.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212

1313

1414
class S3TagsCache(BaseTagsCache):
15-
CACHE_FILENAME = DD_S3_TAGS_CACHE_FILENAME
16-
CACHE_LOCK_FILENAME = DD_S3_TAGS_CACHE_LOCK_FILENAME
15+
def __init__(self, prefix):
16+
super().__init__(
17+
prefix, DD_S3_TAGS_CACHE_FILENAME, DD_S3_TAGS_CACHE_LOCK_FILENAME
18+
)
1719

1820
def should_fetch_tags(self):
1921
return True

aws/logs_monitoring/caching/step_functions_cache.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import os
22
from botocore.exceptions import ClientError
3-
43
from caching.base_tags_cache import BaseTagsCache
54
from caching.common import (
65
sanitize_aws_tag_string,
@@ -15,8 +14,12 @@
1514

1615

1716
class StepFunctionsTagsCache(BaseTagsCache):
18-
CACHE_FILENAME = DD_S3_STEP_FUNCTIONS_CACHE_FILENAME
19-
CACHE_LOCK_FILENAME = DD_S3_STEP_FUNCTIONS_CACHE_LOCK_FILENAME
17+
def __init__(self, prefix):
18+
super().__init__(
19+
prefix,
20+
DD_S3_STEP_FUNCTIONS_CACHE_FILENAME,
21+
DD_S3_STEP_FUNCTIONS_CACHE_LOCK_FILENAME,
22+
)
2023

2124
def should_fetch_tags(self):
2225
return os.environ.get("DD_FETCH_STEP_FUNCTIONS_TAGS", "false").lower() == "true"

aws/logs_monitoring/enhanced_lambda_metrics.py

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import re
88
import datetime
99
from time import time
10-
from caching.lambda_cache import LambdaTagsCache
1110

1211
ENHANCED_METRICS_NAMESPACE_PREFIX = "aws.lambda.enhanced"
1312

@@ -82,10 +81,6 @@
8281
)
8382
DD_SUBMIT_ENHANCED_METRICS = False
8483

85-
# Store the cache in the global scope so that it will be reused as long as
86-
# the log forwarder Lambda container is running
87-
account_lambda_custom_tags_cache = LambdaTagsCache()
88-
8984

9085
class DatadogMetricPoint(object):
9186
"""Holds a datapoint's data so that it can be prepared for submission to DD
@@ -141,7 +136,7 @@ def get_last_modified_time(s3_file):
141136
return last_modified_unix_time
142137

143138

144-
def parse_and_submit_enhanced_metrics(logs):
139+
def parse_and_submit_enhanced_metrics(logs, cache_layer):
145140
"""Parses enhanced metrics from logs and submits them to DD with tags
146141
147142
Args:
@@ -155,7 +150,7 @@ def parse_and_submit_enhanced_metrics(logs):
155150
for log in logs:
156151
try:
157152
enhanced_metrics = generate_enhanced_lambda_metrics(
158-
log, account_lambda_custom_tags_cache
153+
log, cache_layer.get_lambda_tags_cache()
159154
)
160155
for enhanced_metric in enhanced_metrics:
161156
enhanced_metric.submit_to_dd()
@@ -334,25 +329,6 @@ def calculate_estimated_cost(billed_duration_ms, memory_allocated):
334329
return BASE_LAMBDA_INVOCATION_PRICE + gb_seconds * LAMBDA_PRICE_PER_GB_SECOND
335330

336331

337-
def get_enriched_lambda_log_tags(log_event):
338-
"""Retrieves extra tags from lambda, either read from the function arn, or by fetching lambda tags from the function itself.
339-
340-
Args:
341-
log (dict<str, str | dict | int>): a log parsed from the event in the split method
342-
"""
343-
# Note that this arn attribute has been lowercased already
344-
log_function_arn = log_event.get("lambda", {}).get("arn")
345-
346-
if not log_function_arn:
347-
return []
348-
tags_from_arn = parse_lambda_tags_from_arn(log_function_arn)
349-
lambda_custom_tags = account_lambda_custom_tags_cache.get(log_function_arn)
350-
351-
# Combine and dedup tags
352-
tags = list(set(tags_from_arn + lambda_custom_tags))
353-
return tags
354-
355-
356332
def create_timeout_enhanced_metric(log_line):
357333
"""Parses and returns a value of 1 if a timeout occurred for the function
358334

aws/logs_monitoring/lambda_function.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,15 @@
88
import boto3
99
import logging
1010
import requests
11+
from hashlib import sha1
1112
from datadog_lambda.wrapper import datadog_lambda_wrapper
1213
from datadog import api
1314
from enhanced_lambda_metrics import parse_and_submit_enhanced_metrics
1415
from steps.parsing import parse
1516
from steps.enrichment import enrich
1617
from steps.transformation import transform
1718
from steps.splitting import split
19+
from caching.cache_layer import CacheLayer
1820
from forwarders import (
1921
forward_metrics,
2022
forward_traces,
@@ -29,6 +31,7 @@
2931
DD_ADDITIONAL_TARGET_LAMBDAS,
3032
)
3133

34+
3235
logger = logging.getLogger()
3336
logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper()))
3437

@@ -56,6 +59,8 @@
5659
api._api_host = DD_API_URL
5760
api._cacert = not DD_SKIP_SSL_VALIDATION
5861

62+
cache_layer = None
63+
5964

6065
def datadog_forwarder(event, context):
6166
"""The actual lambda function entry point"""
@@ -66,8 +71,10 @@ def datadog_forwarder(event, context):
6671
if DD_ADDITIONAL_TARGET_LAMBDAS:
6772
invoke_additional_target_lambdas(event)
6873

69-
parsed = parse(event, context)
70-
enriched = enrich(parsed)
74+
init_cache_layer(context)
75+
76+
parsed = parse(event, context, cache_layer)
77+
enriched = enrich(parsed, cache_layer)
7178
transformed = transform(enriched)
7279
metrics, logs, trace_payloads = split(transformed)
7380

@@ -79,7 +86,21 @@ def datadog_forwarder(event, context):
7986
if len(trace_payloads) > 0:
8087
forward_traces(trace_payloads)
8188

82-
parse_and_submit_enhanced_metrics(logs)
89+
parse_and_submit_enhanced_metrics(logs, cache_layer)
90+
91+
92+
def init_cache_layer(context):
93+
global cache_layer
94+
if cache_layer is None:
95+
# set the prefix for cache layer
96+
try:
97+
if not cache_layer:
98+
function_arn = context.invoked_function_arn.lower()
99+
prefix = sha1(function_arn.encode("UTF-8")).hexdigest()
100+
cache_layer = CacheLayer(prefix)
101+
except Exception as e:
102+
logger.exception(f"Failed to create cache layer due to {e}")
103+
raise
83104

84105

85106
def invoke_additional_target_lambdas(event):

0 commit comments

Comments
 (0)