Skip to content

Commit 23886fb

Browse files
authored
Store and retry forwarding events in case of exceptions (#759)
* Refactor forwarding and logs * Store and retry forwarding events incase of exceptions * Add a retry mechanism to store events on S3 in case of failure. * Multiple functions can use the same bucket to store events * Address comments * Change retry execution to be triggered on custom invocations only
1 parent 4559eb1 commit 23886fb

File tree

10 files changed

+306
-84
lines changed

10 files changed

+306
-84
lines changed

aws/logs_monitoring/README.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,13 @@ If you can't install the Forwarder using the provided CloudFormation template, y
120120
4. Set the environment variable `DD_ENHANCED_METRICS` to `false` on the Forwarder. This stops the Forwarder from generating enhanced metrics itself, but it will still forward custom metrics from other lambdas.
121121
5. Some AWS accounts are configured such that triggers will not automatically create resource-based policies allowing Cloudwatch log groups to invoke the forwarder. Reference the [CloudWatchLogPermissions][103] to see which permissions are required for the forwarder to be invoked by Cloudwatch Log Events.
122122
6. [Configure triggers][104].
123-
7. Create an S3 bucket, and set environment variable `DD_S3_BUCKET_NAME` to the bucket name. Also provide `s3:GetObject`, `s3:PutObject`, and `s3:DeleteObject` permissions on this bucket to the Lambda execution role. This bucket is used to store the Lambda tags cache.
123+
7. Create an S3 bucket, and set environment variable `DD_S3_BUCKET_NAME` to the bucket name. Also provide `s3:GetObject`, `s3:PutObject`, `s3:ListBucket`, and `s3:DeleteObject` permissions on this bucket to the Lambda execution role. This bucket is used to store the different tags cache i.e. Lambda, S3, Step Function and Log Group. Additionally, this bucket will be used to store unforwarded events incase of forwarding exceptions.
124+
8. Set environment variable `DD_STORE_FAILED_EVENTS` to `true` to enable the forwarder to also store event data in the S3 bucket. In case of exceptions when sending logs, metrics or traces to intake, the forwarder will store relevant data in the S3 bucket. On custom invocations i.e. on receiving an event with the `retry` keyword set to a non empty string (which can be manually triggered - see below), the forwarder will retry sending the stored events. When successful it will clear up the storage in the bucket.
125+
126+
```bash
127+
aws lambda invoke --function-name <function-name> --payload '{"retry":"true"}' out
128+
```
129+
124130

125131
[101]: https://github.com/DataDog/datadog-serverless-functions/releases
126132
[102]: https://app.datadoghq.com/organization-settings/api-keys
@@ -138,6 +144,9 @@ If you can't install the Forwarder using the provided CloudFormation template, y
138144

139145
If you encounter issues upgrading to the latest version, check the Troubleshooting section.
140146

147+
### Upgrade an older version to +3.107.0
148+
Starting version 3.107.0 a new feature is added to enable Lambda function to store unforwarded events incase of exceptions on the intake point. If the feature is enabled using `DD_STORE_FAILED_EVENTS` env var, failing events will be stored under a defined dir in the same S3 bucket used to store tags cache. The same bucket can be used to store logs from several Lambda functions under unique subdirs.
149+
141150
### Upgrade an older version to +3.106.0
142151
Starting version 3.106.0 Lambda function has been updated to add a prefix to cache filenames stored in the S3 bucket configured in `DD_S3_BUCKET_NAME`.
143152
This allows to use the same bucket to store cache files from several functions.

aws/logs_monitoring/forwarder.py

Lines changed: 129 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
from logs.datadog_client import DatadogClient
1616
from logs.datadog_tcp_client import DatadogTCPClient
1717
from logs.datadog_scrubber import DatadogScrubber
18-
from logs.helpers import filter_logs
18+
from logs.helpers import filter_logs, add_retry_tag
19+
from retry.storage import Storage
20+
from retry.enums import RetryPrefix
1921
from settings import (
2022
DD_API_KEY,
2123
DD_USE_TCP,
@@ -25,92 +27,147 @@
2527
DD_PORT,
2628
DD_TRACE_INTAKE_URL,
2729
DD_FORWARD_LOG,
30+
DD_STORE_FAILED_EVENTS,
2831
SCRUBBING_RULE_CONFIGS,
2932
INCLUDE_AT_MATCH,
3033
EXCLUDE_AT_MATCH,
3134
)
3235

3336
logger = logging.getLogger()
3437
logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper()))
35-
trace_connection = TraceConnection(
36-
DD_TRACE_INTAKE_URL, DD_API_KEY, DD_SKIP_SSL_VALIDATION
37-
)
3838

3939

40-
def forward(logs, metrics, traces):
41-
"""
42-
Forward logs, metrics, and traces to Datadog in a background thread.
43-
"""
44-
if DD_FORWARD_LOG:
45-
_forward_logs(logs)
46-
47-
_forward_metrics(metrics)
48-
49-
if len(traces) > 0:
50-
_forward_traces(traces)
51-
52-
53-
def _forward_logs(logs):
54-
"""Forward logs to Datadog"""
55-
if logger.isEnabledFor(logging.DEBUG):
56-
logger.debug(f"Forwarding {len(logs)} logs")
57-
logs_to_forward = filter_logs(
58-
[json.dumps(log, ensure_ascii=False) for log in logs],
59-
include_pattern=INCLUDE_AT_MATCH,
60-
exclude_pattern=EXCLUDE_AT_MATCH,
61-
)
62-
scrubber = DatadogScrubber(SCRUBBING_RULE_CONFIGS)
63-
if DD_USE_TCP:
64-
batcher = DatadogBatcher(256 * 1000, 256 * 1000, 1)
65-
cli = DatadogTCPClient(DD_URL, DD_PORT, DD_NO_SSL, DD_API_KEY, scrubber)
66-
else:
67-
batcher = DatadogBatcher(512 * 1000, 4 * 1000 * 1000, 400)
68-
cli = DatadogHTTPClient(
69-
DD_URL, DD_PORT, DD_NO_SSL, DD_SKIP_SSL_VALIDATION, DD_API_KEY, scrubber
40+
class Forwarder(object):
41+
def __init__(self, function_prefix):
42+
self.trace_connection = TraceConnection(
43+
DD_TRACE_INTAKE_URL, DD_API_KEY, DD_SKIP_SSL_VALIDATION
7044
)
45+
self.storage = Storage(function_prefix)
7146

72-
with DatadogClient(cli) as client:
73-
try:
47+
def forward(self, logs, metrics, traces):
48+
"""
49+
Forward logs, metrics, and traces to Datadog in a background thread.
50+
"""
51+
if DD_FORWARD_LOG:
52+
self._forward_logs(logs)
53+
self._forward_metrics(metrics)
54+
self._forward_traces(traces)
55+
56+
def retry(self):
57+
"""
58+
Retry forwarding logs, metrics, and traces to Datadog.
59+
"""
60+
for prefix in RetryPrefix:
61+
self._retry_prefix(prefix)
62+
63+
def _retry_prefix(self, prefix):
64+
if logger.isEnabledFor(logging.DEBUG):
65+
logger.debug(f"Retrying {prefix} data")
66+
67+
key_data = self.storage.get_data(prefix)
68+
69+
for k, d in key_data.items():
70+
if d is None:
71+
continue
72+
match prefix:
73+
case RetryPrefix.LOGS:
74+
self._forward_logs(d, key=k)
75+
case RetryPrefix.METRICS:
76+
self._forward_metrics(d, key=k)
77+
case RetryPrefix.TRACES:
78+
self._forward_traces(d, key=k)
79+
80+
def _forward_logs(self, logs, key=None):
81+
"""Forward logs to Datadog"""
82+
if logger.isEnabledFor(logging.DEBUG):
83+
logger.debug(f"Forwarding {len(logs)} logs")
84+
85+
logs_to_forward = []
86+
for log in logs:
87+
if key:
88+
log = add_retry_tag(log)
89+
logs_to_forward.append(json.dumps(log, ensure_ascii=False))
90+
91+
logs_to_forward = filter_logs(
92+
logs_to_forward, INCLUDE_AT_MATCH, EXCLUDE_AT_MATCH
93+
)
94+
95+
scrubber = DatadogScrubber(SCRUBBING_RULE_CONFIGS)
96+
if DD_USE_TCP:
97+
batcher = DatadogBatcher(256 * 1000, 256 * 1000, 1)
98+
cli = DatadogTCPClient(DD_URL, DD_PORT, DD_NO_SSL, DD_API_KEY, scrubber)
99+
else:
100+
batcher = DatadogBatcher(512 * 1000, 4 * 1000 * 1000, 400)
101+
cli = DatadogHTTPClient(
102+
DD_URL, DD_PORT, DD_NO_SSL, DD_SKIP_SSL_VALIDATION, DD_API_KEY, scrubber
103+
)
104+
105+
failed_logs = []
106+
with DatadogClient(cli) as client:
74107
for batch in batcher.batch(logs_to_forward):
75-
client.send(batch)
108+
try:
109+
client.send(batch)
110+
except Exception:
111+
logger.exception(f"Exception while forwarding log batch {batch}")
112+
failed_logs.extend(batch)
113+
else:
114+
if logger.isEnabledFor(logging.DEBUG):
115+
logger.debug(f"Forwarded log batch: {batch}")
116+
if key:
117+
self.storage.delete_data(key)
118+
119+
if DD_STORE_FAILED_EVENTS and len(failed_logs) > 0 and not key:
120+
self.storage.store_data(RetryPrefix.LOGS, failed_logs)
121+
122+
send_event_metric("logs_forwarded", len(logs_to_forward) - len(failed_logs))
123+
124+
def _forward_metrics(self, metrics, key=None):
125+
"""
126+
Forward custom metrics submitted via logs to Datadog in a background thread
127+
using `lambda_stats` that is provided by the Datadog Python Lambda Layer.
128+
"""
129+
if logger.isEnabledFor(logging.DEBUG):
130+
logger.debug(f"Forwarding {len(metrics)} metrics")
131+
132+
failed_metrics = []
133+
for metric in metrics:
134+
try:
135+
send_log_metric(metric)
136+
except Exception:
137+
logger.exception(
138+
f"Exception while forwarding metric {json.dumps(metric)}"
139+
)
140+
failed_metrics.append(metric)
141+
else:
76142
if logger.isEnabledFor(logging.DEBUG):
77-
logger.debug(f"Forwarded log batch: {json.dumps(batch)}")
143+
logger.debug(f"Forwarded metric: {json.dumps(metric)}")
144+
if key:
145+
self.storage.delete_data(key)
146+
147+
if DD_STORE_FAILED_EVENTS and len(failed_metrics) > 0 and not key:
148+
self.storage.store_data(RetryPrefix.METRICS, failed_metrics)
149+
150+
send_event_metric("metrics_forwarded", len(metrics) - len(failed_metrics))
151+
152+
def _forward_traces(self, traces, key=None):
153+
if not len(traces) > 0:
154+
return
155+
156+
if logger.isEnabledFor(logging.DEBUG):
157+
logger.debug(f"Forwarding {len(traces)} traces")
158+
159+
try:
160+
serialized_trace_paylods = json.dumps(traces)
161+
self.trace_connection.send_traces(serialized_trace_paylods)
78162
except Exception:
79163
logger.exception(
80-
f"Exception while forwarding log batch {json.dumps(batch)}"
164+
f"Exception while forwarding traces {serialized_trace_paylods}"
81165
)
166+
if DD_STORE_FAILED_EVENTS and not key:
167+
self.storage.store_data(RetryPrefix.TRACES, traces)
82168
else:
83-
send_event_metric("logs_forwarded", len(logs_to_forward))
84-
85-
86-
def _forward_metrics(metrics):
87-
"""
88-
Forward custom metrics submitted via logs to Datadog in a background thread
89-
using `lambda_stats` that is provided by the Datadog Python Lambda Layer.
90-
"""
91-
if logger.isEnabledFor(logging.DEBUG):
92-
logger.debug(f"Forwarding {len(metrics)} metrics")
93-
try:
94-
for metric in metrics:
95-
send_log_metric(metric)
96169
if logger.isEnabledFor(logging.DEBUG):
97-
logger.debug(f"Forwarded metric: {json.dumps(metric)}")
98-
except Exception:
99-
logger.exception(f"Exception while forwarding metric {json.dumps(metric)}")
100-
else:
101-
send_event_metric("metrics_forwarded", len(metrics))
102-
103-
104-
def _forward_traces(trace_payloads):
105-
if logger.isEnabledFor(logging.DEBUG):
106-
logger.debug(f"Forwarding {len(trace_payloads)} traces")
107-
try:
108-
trace_connection.send_traces(trace_payloads)
109-
except Exception:
110-
logger.exception(
111-
f"Exception while forwarding traces {json.dumps(trace_payloads)}"
112-
)
113-
else:
114-
if logger.isEnabledFor(logging.DEBUG):
115-
logger.debug(f"Forwarded traces: {json.dumps(trace_payloads)}")
116-
send_event_metric("traces_forwarded", len(trace_payloads))
170+
logger.debug(f"Forwarded traces: {serialized_trace_paylods}")
171+
if key:
172+
self.storage.delete_data(key)
173+
send_event_metric("traces_forwarded", len(traces))

aws/logs_monitoring/lambda_function.py

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@
1717
from steps.transformation import transform
1818
from steps.splitting import split
1919
from caching.cache_layer import CacheLayer
20-
from forwarder import forward
20+
from forwarder import Forwarder
2121
from settings import (
2222
DD_API_KEY,
2323
DD_SKIP_SSL_VALIDATION,
2424
DD_API_URL,
2525
DD_FORWARDER_VERSION,
2626
DD_ADDITIONAL_TARGET_LAMBDAS,
27+
DD_RETRY_KEYWORD,
2728
)
2829

2930

@@ -55,6 +56,7 @@
5556
api._cacert = not DD_SKIP_SSL_VALIDATION
5657

5758
cache_layer = None
59+
forwarder = None
5860

5961

6062
def datadog_forwarder(event, context):
@@ -66,31 +68,51 @@ def datadog_forwarder(event, context):
6668
if DD_ADDITIONAL_TARGET_LAMBDAS:
6769
invoke_additional_target_lambdas(event)
6870

69-
init_cache_layer(context)
71+
function_prefix = get_function_arn_digest(context)
72+
init_cache_layer(function_prefix)
73+
init_forwarder(function_prefix)
7074

7175
parsed = parse(event, context, cache_layer)
7276
enriched = enrich(parsed, cache_layer)
7377
transformed = transform(enriched)
7478
metrics, logs, trace_payloads = split(transformed)
7579

76-
forward(logs, metrics, trace_payloads)
80+
forwarder.forward(logs, metrics, trace_payloads)
7781
parse_and_submit_enhanced_metrics(logs, cache_layer)
7882

83+
try:
84+
if bool(event.get(DD_RETRY_KEYWORD, False)) is True:
85+
forwarder.retry()
86+
except Exception as e:
87+
if logger.isEnabledFor(logging.DEBUG):
88+
logger.debug(f"Failed to retry forwarding {e}")
89+
pass
7990

80-
def init_cache_layer(context):
91+
92+
def init_cache_layer(function_prefix):
8193
global cache_layer
8294
if cache_layer is None:
8395
# set the prefix for cache layer
8496
try:
85-
if not cache_layer:
86-
function_arn = context.invoked_function_arn.lower()
87-
prefix = sha1(function_arn.encode("UTF-8")).hexdigest()
88-
cache_layer = CacheLayer(prefix)
97+
if cache_layer is None:
98+
cache_layer = CacheLayer(function_prefix)
8999
except Exception as e:
90100
logger.exception(f"Failed to create cache layer due to {e}")
91101
raise
92102

93103

104+
def init_forwarder(function_prefix):
105+
global forwarder
106+
if forwarder is None:
107+
forwarder = Forwarder(function_prefix)
108+
109+
110+
def get_function_arn_digest(context):
111+
function_arn = context.invoked_function_arn.lower()
112+
prefix = sha1(function_arn.encode("UTF-8")).hexdigest()
113+
return prefix
114+
115+
94116
def invoke_additional_target_lambdas(event):
95117
lambda_client = boto3.client("lambda")
96118
lambda_arns = DD_ADDITIONAL_TARGET_LAMBDAS.split(",")

aws/logs_monitoring/logs/helpers.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@
88
import re
99
import gzip
1010
import os
11+
import json
1112
from logs.exceptions import ScrubbingException
1213

14+
from settings import DD_CUSTOM_TAGS, DD_RETRY_KEYWORD
15+
1316
logger = logging.getLogger()
1417
logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper()))
1518

@@ -75,3 +78,13 @@ def compileRegex(rule, pattern):
7578
raise Exception(
7679
"could not compile {} regex with pattern: {}".format(rule, pattern)
7780
)
81+
82+
83+
def add_retry_tag(log):
84+
try:
85+
log = json.loads(log)
86+
log[DD_CUSTOM_TAGS] = log.get(DD_CUSTOM_TAGS, "") + f",{DD_RETRY_KEYWORD}:true"
87+
except Exception:
88+
logger.warning(f"cannot add retry tag for log {log}")
89+
90+
return log

aws/logs_monitoring/retry/enums.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from enum import Enum
2+
3+
4+
class RetryPrefix(Enum):
5+
LOGS = "logs"
6+
METRICS = "metrics"
7+
TRACES = "traces"
8+
9+
def __str__(self):
10+
return self.value

0 commit comments

Comments
 (0)