Skip to content

Commit ed54aa9

Browse files
authored
Refactor forwarding and logs (#755)
1 parent 30dadb6 commit ed54aa9

13 files changed

+342
-286
lines changed

aws/logs_monitoring/customized_log_group.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
# Unless explicitly stated otherwise all files in this repository are licensed
2+
# under the Apache License Version 2.0.
3+
# This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
# Copyright 2021 Datadog, Inc.
5+
6+
17
import re
28

39
"""

aws/logs_monitoring/forwarders.py renamed to aws/logs_monitoring/forwarder.py

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,33 @@
1+
# Unless explicitly stated otherwise all files in this repository are licensed
2+
# under the Apache License Version 2.0.
3+
# This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
# Copyright 2021 Datadog, Inc.
5+
6+
17
import logging
28
import json
39
import os
410

511
from telemetry import send_event_metric, send_log_metric
612
from trace_forwarder.connection import TraceConnection
7-
from logs.logs import (
8-
DatadogScrubber,
9-
DatadogBatcher,
10-
DatadogClient,
11-
DatadogHTTPClient,
12-
DatadogTCPClient,
13-
)
14-
from logs.logs_helpers import filter_logs
13+
from logs.datadog_http_client import DatadogHTTPClient
14+
from logs.datadog_batcher import DatadogBatcher
15+
from logs.datadog_client import DatadogClient
16+
from logs.datadog_tcp_client import DatadogTCPClient
17+
from logs.datadog_scrubber import DatadogScrubber
18+
from logs.helpers import filter_logs
1519
from settings import (
1620
DD_API_KEY,
1721
DD_USE_TCP,
1822
DD_NO_SSL,
1923
DD_SKIP_SSL_VALIDATION,
2024
DD_URL,
2125
DD_PORT,
26+
DD_TRACE_INTAKE_URL,
27+
DD_FORWARD_LOG,
2228
SCRUBBING_RULE_CONFIGS,
2329
INCLUDE_AT_MATCH,
2430
EXCLUDE_AT_MATCH,
25-
DD_TRACE_INTAKE_URL,
2631
)
2732

2833
logger = logging.getLogger()
@@ -32,7 +37,20 @@
3237
)
3338

3439

35-
def forward_logs(logs):
40+
def forward(logs, metrics, traces):
41+
"""
42+
Forward logs, metrics, and traces to Datadog in a background thread.
43+
"""
44+
if DD_FORWARD_LOG:
45+
_forward_logs(logs)
46+
47+
_forward_metrics(metrics)
48+
49+
if len(traces) > 0:
50+
_forward_traces(traces)
51+
52+
53+
def _forward_logs(logs):
3654
"""Forward logs to Datadog"""
3755
if logger.isEnabledFor(logging.DEBUG):
3856
logger.debug(f"Forwarding {len(logs)} logs")
@@ -56,15 +74,15 @@ def forward_logs(logs):
5674
try:
5775
client.send(batch)
5876
except Exception:
59-
logger.exception(f"Exception while forwarding log batch {batch}")
60-
else:
77+
logger.exception("Exception while forwarding log batch")
78+
finally:
6179
if logger.isEnabledFor(logging.DEBUG):
6280
logger.debug(f"Forwarded log batch: {json.dumps(batch)}")
6381

6482
send_event_metric("logs_forwarded", len(logs_to_forward))
6583

6684

67-
def forward_metrics(metrics):
85+
def _forward_metrics(metrics):
6886
"""
6987
Forward custom metrics submitted via logs to Datadog in a background thread
7088
using `lambda_stats` that is provided by the Datadog Python Lambda Layer.
@@ -76,25 +94,23 @@ def forward_metrics(metrics):
7694
try:
7795
send_log_metric(metric)
7896
except Exception:
79-
logger.exception(f"Exception while forwarding metric {json.dumps(metric)}")
80-
else:
97+
logger.exception("Exception while forwarding metrics")
98+
finally:
8199
if logger.isEnabledFor(logging.DEBUG):
82100
logger.debug(f"Forwarded metric: {json.dumps(metric)}")
83101

84102
send_event_metric("metrics_forwarded", len(metrics))
85103

86104

87-
def forward_traces(trace_payloads):
105+
def _forward_traces(trace_payloads):
88106
if logger.isEnabledFor(logging.DEBUG):
89107
logger.debug(f"Forwarding {len(trace_payloads)} traces")
90108

91109
try:
92110
trace_connection.send_traces(trace_payloads)
93111
except Exception:
94-
logger.exception(
95-
f"Exception while forwarding traces {json.dumps(trace_payloads)}"
96-
)
97-
else:
112+
logger.exception("Exception while forwarding traces")
113+
finally:
98114
if logger.isEnabledFor(logging.DEBUG):
99115
logger.debug(f"Forwarded traces: {json.dumps(trace_payloads)}")
100116

aws/logs_monitoring/lambda_function.py

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,9 @@
1717
from steps.transformation import transform
1818
from steps.splitting import split
1919
from caching.cache_layer import CacheLayer
20-
from forwarders import (
21-
forward_metrics,
22-
forward_traces,
23-
forward_logs,
24-
)
20+
from forwarder import forward
2521
from settings import (
2622
DD_API_KEY,
27-
DD_FORWARD_LOG,
2823
DD_SKIP_SSL_VALIDATION,
2924
DD_API_URL,
3025
DD_FORWARDER_VERSION,
@@ -78,14 +73,7 @@ def datadog_forwarder(event, context):
7873
transformed = transform(enriched)
7974
metrics, logs, trace_payloads = split(transformed)
8075

81-
if DD_FORWARD_LOG:
82-
forward_logs(logs)
83-
84-
forward_metrics(metrics)
85-
86-
if len(trace_payloads) > 0:
87-
forward_traces(trace_payloads)
88-
76+
forward(logs, metrics, trace_payloads)
8977
parse_and_submit_enhanced_metrics(logs, cache_layer)
9078

9179

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
# Unless explicitly stated otherwise all files in this repository are licensed
2+
# under the Apache License Version 2.0.
3+
# This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
# Copyright 2021 Datadog, Inc.
5+
6+
7+
class DatadogBatcher(object):
8+
def __init__(self, max_item_size_bytes, max_batch_size_bytes, max_items_count):
9+
self._max_item_size_bytes = max_item_size_bytes
10+
self._max_batch_size_bytes = max_batch_size_bytes
11+
self._max_items_count = max_items_count
12+
13+
def _sizeof_bytes(self, item):
14+
return len(str(item).encode("UTF-8"))
15+
16+
def batch(self, items):
17+
"""
18+
Returns an array of batches.
19+
Each batch contains at most max_items_count items and
20+
is not strictly greater than max_batch_size_bytes.
21+
All items strictly greater than max_item_size_bytes are dropped.
22+
"""
23+
batches = []
24+
batch = []
25+
size_bytes = 0
26+
size_count = 0
27+
for item in items:
28+
item_size_bytes = self._sizeof_bytes(item)
29+
if size_count > 0 and (
30+
size_count >= self._max_items_count
31+
or size_bytes + item_size_bytes > self._max_batch_size_bytes
32+
):
33+
batches.append(batch)
34+
batch = []
35+
size_bytes = 0
36+
size_count = 0
37+
# all items exceeding max_item_size_bytes are dropped here
38+
if item_size_bytes <= self._max_item_size_bytes:
39+
batch.append(item)
40+
size_bytes += item_size_bytes
41+
size_count += 1
42+
if size_count > 0:
43+
batches.append(batch)
44+
return batches
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# Unless explicitly stated otherwise all files in this repository are licensed
2+
# under the Apache License Version 2.0.
3+
# This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
# Copyright 2021 Datadog, Inc.
5+
6+
7+
import time
8+
from logs.exceptions import RetriableException
9+
10+
11+
class DatadogClient(object):
12+
"""
13+
Client that implements a exponential retrying logic to send a batch of logs.
14+
"""
15+
16+
def __init__(self, client, max_backoff=30):
17+
self._client = client
18+
self._max_backoff = max_backoff
19+
20+
def send(self, logs):
21+
backoff = 1
22+
while True:
23+
try:
24+
self._client.send(logs)
25+
return
26+
except RetriableException:
27+
time.sleep(backoff)
28+
if backoff < self._max_backoff:
29+
backoff *= 2
30+
continue
31+
32+
def __enter__(self):
33+
self._client.__enter__()
34+
return self
35+
36+
def __exit__(self, ex_type, ex_value, traceback):
37+
self._client.__exit__(ex_type, ex_value, traceback)
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
# Unless explicitly stated otherwise all files in this repository are licensed
2+
# under the Apache License Version 2.0.
3+
# This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
# Copyright 2021 Datadog, Inc.
5+
6+
7+
import os
8+
import logging
9+
10+
from concurrent.futures import as_completed
11+
from requests_futures.sessions import FuturesSession
12+
from logs.helpers import compress_logs
13+
from logs.exceptions import ScrubbingException
14+
15+
from settings import (
16+
DD_USE_COMPRESSION,
17+
DD_COMPRESSION_LEVEL,
18+
DD_MAX_WORKERS,
19+
DD_FORWARDER_VERSION,
20+
)
21+
22+
logger = logging.getLogger()
23+
logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper()))
24+
25+
26+
class DatadogHTTPClient(object):
27+
"""
28+
Client that sends a batch of logs over HTTP.
29+
"""
30+
31+
_POST = "POST"
32+
if DD_USE_COMPRESSION:
33+
_HEADERS = {"Content-type": "application/json", "Content-Encoding": "gzip"}
34+
else:
35+
_HEADERS = {"Content-type": "application/json"}
36+
37+
_HEADERS["DD-EVP-ORIGIN"] = "aws_forwarder"
38+
_HEADERS["DD-EVP-ORIGIN-VERSION"] = DD_FORWARDER_VERSION
39+
40+
def __init__(
41+
self, host, port, no_ssl, skip_ssl_validation, api_key, scrubber, timeout=10
42+
):
43+
self._HEADERS.update({"DD-API-KEY": api_key})
44+
protocol = "http" if no_ssl else "https"
45+
self._url = "{}://{}:{}/api/v2/logs".format(protocol, host, port)
46+
self._scrubber = scrubber
47+
self._timeout = timeout
48+
self._session = None
49+
self._ssl_validation = not skip_ssl_validation
50+
self._futures = []
51+
if logger.isEnabledFor(logging.DEBUG):
52+
logger.debug(
53+
f"Initialized http client for logs intake: "
54+
f"<host: {host}, port: {port}, url: {self._url}, no_ssl: {no_ssl}, "
55+
f"skip_ssl_validation: {skip_ssl_validation}, timeout: {timeout}>"
56+
)
57+
58+
def _connect(self):
59+
self._session = FuturesSession(max_workers=DD_MAX_WORKERS)
60+
self._session.headers.update(self._HEADERS)
61+
62+
def _close(self):
63+
# Resolve all the futures and log exceptions if any
64+
for future in as_completed(self._futures):
65+
try:
66+
future.result()
67+
except Exception:
68+
logger.exception("Exception while forwarding logs")
69+
70+
self._session.close()
71+
72+
def send(self, logs):
73+
"""
74+
Sends a batch of log, only retry on server and network errors.
75+
"""
76+
try:
77+
data = self._scrubber.scrub("[{}]".format(",".join(logs)))
78+
except ScrubbingException:
79+
raise Exception("could not scrub the payload")
80+
if DD_USE_COMPRESSION:
81+
data = compress_logs(data, DD_COMPRESSION_LEVEL)
82+
83+
# FuturesSession returns immediately with a future object
84+
future = self._session.post(
85+
self._url, data, timeout=self._timeout, verify=self._ssl_validation
86+
)
87+
self._futures.append(future)
88+
89+
def __enter__(self):
90+
self._connect()
91+
return self
92+
93+
def __exit__(self, ex_type, ex_value, traceback):
94+
self._close()
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Unless explicitly stated otherwise all files in this repository are licensed
2+
# under the Apache License Version 2.0.
3+
# This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
# Copyright 2021 Datadog, Inc.
5+
6+
7+
import os
8+
from logs.exceptions import ScrubbingException
9+
from logs.helpers import compileRegex
10+
11+
12+
class DatadogScrubber(object):
13+
def __init__(self, configs):
14+
rules = []
15+
for config in configs:
16+
if config.name in os.environ:
17+
rules.append(
18+
ScrubbingRule(
19+
compileRegex(config.name, config.pattern), config.placeholder
20+
)
21+
)
22+
self._rules = rules
23+
24+
def scrub(self, payload):
25+
for rule in self._rules:
26+
try:
27+
payload = rule.regex.sub(rule.placeholder, payload)
28+
except Exception:
29+
raise ScrubbingException()
30+
return payload
31+
32+
33+
class ScrubbingRule(object):
34+
def __init__(self, regex, placeholder):
35+
self.regex = regex
36+
self.placeholder = placeholder

0 commit comments

Comments
 (0)