Skip to content

Commit 46d4032

Browse files
committed
new version released: 1.0.3
1 parent 160a067 commit 46d4032

File tree

3 files changed

+188
-2
lines changed

3 files changed

+188
-2
lines changed

build/lib/fastapi_elasticsearch_middleware/__init__.py

Whitespace-only changes.
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
import json
2+
import logging
3+
import time
4+
from fastapi import Request
5+
from datetime import datetime, timezone
6+
from fastapi.datastructures import Headers
7+
from starlette.types import ASGIApp, Receive, Scope, Send
8+
from elasticsearch import Elasticsearch
9+
10+
class LogLevelEnum:
11+
level_mapping = {
12+
2: "Info",
13+
3: "Warning",
14+
4: "Warning",
15+
5: "Error"
16+
}
17+
18+
class ElasticsearchLoggerMiddleware:
19+
def __init__(self, app: ASGIApp, config: dict) -> None:
20+
"""
21+
Initializes an Elasticsearch Logger Middleware for FastAPI.
22+
23+
Args:
24+
app (ASGIApp): The FastAPI ASGI application.
25+
config (dict): Configuration settings for Elasticsearch logging.
26+
{
27+
'url': str, # Elasticsearch server URL
28+
'user': str, # Elasticsearch API user
29+
'password': str # Elasticsearch API password
30+
'index': str, # Elasticsearch index name
31+
'environment': str, # Environment identifier for logs
32+
'limit': bool, # Limit Elasticsearch payload array and string lenght
33+
'debug': bool # When True logs aren't sent to Elasticsearch
34+
}
35+
"""
36+
elastic_config = config
37+
elastic_user = config.get('user')
38+
elastic_password = config.get('password')
39+
basic_auth = (elastic_user, elastic_password) if elastic_user and elastic_password else None
40+
self.elasticsearch_client = Elasticsearch([elastic_config.get('url')], basic_auth=basic_auth)
41+
self.index = elastic_config.get('index')
42+
self.limit = elastic_config.get('limit', False)
43+
self.environment = elastic_config.get('environment', 'Development')
44+
self.debug = elastic_config.get('debug', False)
45+
self.app = app
46+
47+
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
48+
49+
if scope.get('type') == 'http' and not self.debug:
50+
51+
request = Request(scope)
52+
53+
async def intercept_send(response):
54+
nonlocal log_data
55+
if response['type'] == 'http.response.body' and "response" not in log_data.keys(): # Streaming response, we don't want to log this
56+
await send(response)
57+
return
58+
if response['type'] == 'http.response.body' and "response" in log_data.keys(): # Response part
59+
60+
# Finishes telemetry
61+
end_time = time.time()
62+
elapsed_time = end_time - start_time
63+
64+
log_data["elapsed_time"] = elapsed_time
65+
66+
if log_data["response"]["headers"].get('content-type') == 'application/json':
67+
response_body = json.loads(response.get('body'))
68+
response_body = self.limit_array_length(response_body)
69+
70+
response_body = json.dumps(response_body, ensure_ascii=False) if 'body' in response.keys() else None
71+
log_data["response"]["body"] = response_body
72+
73+
elif log_data["response"]["headers"].get('content-type') == 'application/octet-stream':
74+
log_data["response"]["body"] = str(response.get('body')) if 'body' in response.keys() else None
75+
76+
self.log_to_elasticsearch(log_data)
77+
78+
if response['type'] == 'http.response.start': # Request part
79+
80+
request_headers = dict(request.headers) if 'headers' in request.keys() else None
81+
request_query_parameters = dict(request.query_params) if len(request.query_params._list) > 0 else None
82+
83+
response_headers = dict(Headers(raw=response.get('headers'))) if 'headers' in response.keys() else None
84+
85+
log_data["status_code"] = response['status']
86+
log_data["level"] = LogLevelEnum.level_mapping.get(int(str(response['status'])[0]))
87+
log_data["request"]["headers"] = request_headers
88+
log_data["request"]["query_parameters"] = request_query_parameters
89+
log_data["response"]["headers"] = response_headers
90+
91+
await send(response)
92+
93+
start_time = time.time()
94+
95+
log_data = {
96+
"@timestamp": datetime.now().replace(tzinfo=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S"),
97+
"environment": self.environment,
98+
"method": request.method,
99+
"path": request.url.path,
100+
"request": {},
101+
"response": {}
102+
}
103+
104+
# Starts telemetry
105+
start_time = time.time()
106+
107+
async def intercept_receive():
108+
message = await receive()
109+
110+
more_body = message.get("more_body", False)
111+
body = message.get("body", "")
112+
while more_body:
113+
message = await receive()
114+
body += message.get("body", b"")
115+
more_body = message.get("more_body", False)
116+
117+
message["body"] = body
118+
request_body = ''
119+
120+
if len(message["body"]) > 0:
121+
request_body = json.loads(body.decode('utf-8'))
122+
request_body = self.limit_string_length(request_body)
123+
request_body = json.dumps(request_body, ensure_ascii=False) if len(body.decode('utf-8')) > 0 else None
124+
125+
log_data["request"]["body"] = request_body
126+
127+
return message
128+
129+
await self.app(scope, intercept_receive, intercept_send)
130+
131+
else:
132+
await self.app(scope, receive, send)
133+
134+
def log_to_elasticsearch(self, log_data) -> None:
135+
try:
136+
self.elasticsearch_client.index(index=self.index, body=log_data)
137+
log_data.clear()
138+
except Exception as e:
139+
logging.error(f"Failed to log to Elasticsearch: {str(e)}")
140+
141+
def limit_string_length(self, data, max_lines=50):
142+
if not self.limit:
143+
return data
144+
if isinstance(data, dict):
145+
for key, value in data.items():
146+
data[key] = self.limit_string_length(value, max_lines)
147+
elif isinstance(data, list):
148+
for i in range(len(data)):
149+
data[i] = self.limit_string_length(data[i], max_lines)
150+
elif isinstance(data, str) and len(data.split('\n')) > max_lines:
151+
data_splitted = data.split('\n')[:max_lines]
152+
data_splitted.append(f' [...] value limited in {max_lines} lines')
153+
data = '\n'.join(data_splitted)
154+
elif isinstance(data, str) and len(data.split('/')) > max_lines: # Base64 files
155+
data_splitted = data.split('/')[:max_lines]
156+
data_splitted.append(f' [...] value limited in {max_lines} lines')
157+
data = '/'.join(data_splitted)
158+
return data
159+
160+
def limit_array_length(self, data, max_length=3):
161+
if not self.limit:
162+
return data
163+
if isinstance(data, dict):
164+
for key, value in data.items():
165+
data[key] = self.limit_array_length(value, max_length)
166+
elif isinstance(data, list) and len(data) > max_length:
167+
data = data[:max_length]
168+
data.append(f'[...] array limited in {max_length} objects')
169+
return data

setup.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,33 @@
22

33
setup(
44
name='fastapi_elasticsearch_middleware',
5-
version='1.0.2',
5+
version='1.0.3',
66
url='https://github.com/GGontijo/fastapi-elasticsearch-middleware.git',
77
description='Elasticsearch Logger Middleware for FastAPI',
88
long_description_content_type='text/markdown',
99
long_description=open('README.md').read(),
1010
author='Gabriel Gontijo',
1111
author_email='gabrieldercilio08@gmail.com',
1212
packages=find_packages(),
13+
keywords=['python', 'middleware', 'fastapi', 'elasticsearch', 'kibana', 'logstash', 'fastapi-middleware'],
1314
install_requires=[
1415
'fastapi',
1516
'elasticsearch',
16-
],
17+
],
18+
classifiers=[
19+
"Programming Language :: Python :: 3",
20+
"License :: OSI Approved :: MIT License",
21+
"Operating System :: OS Independent",
22+
"Development Status :: 4 - Beta",
23+
"Intended Audience :: Developers",
24+
"Topic :: Software Development :: Libraries :: Python Modules",
25+
"Topic :: Internet :: WWW/HTTP :: HTTP Servers",
26+
"Topic :: Internet :: WWW/HTTP :: ASGI",
27+
"Topic :: Internet :: WWW/HTTP :: ASGI :: Middleware",
28+
"Topic :: Internet :: WWW/HTTP :: ASGI :: Server",
29+
"Topic :: Internet :: WWW/HTTP :: ASGI :: Application",
30+
"Topic :: Internet :: WWW/HTTP :: ASGI :: Server",
31+
"Topic :: Internet :: WWW/HTTP :: ASGI :: Middleware",
32+
"Topic :: Internet :: WWW/HTTP :: ASGI :: Application",
33+
]
1734
)

0 commit comments

Comments
 (0)