Skip to content

Commit b7405a9

Browse files
committed
Update
1 parent 53c6dcc commit b7405a9

File tree

4 files changed

+140
-8
lines changed

4 files changed

+140
-8
lines changed

.env

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
BASIC_AUTH="Basic ZXNfYWRtaW46MQ=="

controller/index_controller.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,32 @@ async def index_mapping_compare(index_name="test", source_cluster="http://localh
8888
logger.info(f"Delay Time : {Delay_Time}")
8989

9090

91+
@app.get("/all_indices_mapping_compare",
92+
status_code=StatusHanlder.HTTP_STATUS_200,
93+
responses={
94+
200: {"description" : "OK"},
95+
404 :{"description" : "URl not found"}
96+
},
97+
description="Sample Payload : http://localhost:8001/index/all_indices_mapping_compare?source_cluster=http://localhost:9200&target_cluster=http://localhost:9292",
98+
summary="* Return All Index Mapping Compare")
99+
async def all_index_mapping_compare(source_cluster="http://localhost:9200", target_cluster="http://localhost:9200"):
100+
StartTime, EndTime, Delay_Time = 0, 0, 0
101+
try:
102+
StartTime = datetime.datetime.now()
103+
104+
response = SearchAPIHandlerInject.get_index_all_mapping_compare(source_cluster, target_cluster)
105+
106+
if isinstance(response, dict):
107+
logger.info(f"SearchOmniHandler:all_index_mapping_compare - {json.dumps(response, indent=2)}")
108+
109+
EndTime = datetime.datetime.now()
110+
Delay_Time = str((EndTime - StartTime).seconds) + '.' + str((EndTime - StartTime).microseconds).zfill(6)[:2]
111+
112+
return response
113+
finally:
114+
logger.info(f"Delay Time : {Delay_Time}")
115+
116+
91117

92118
"""
93119
@app.get("/mapping_compare_all",

service-start.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,6 @@ else
1717
source $VENV/Scripts/activate
1818
fi
1919

20-
python -m uvicorn main:app --reload --host=0.0.0.0 --port=8001 --workers 4
20+
python -m uvicorn main:app --reload --host=0.0.0.0 --port=8001 --workers 1
2121
# gunicorn -k uvicorn.workers.UvicornWorker main:app --bind 0.0.0.0:8001 --workers 4
2222
# poetry run uvicorn main:app --reload --host=0.0.0.0 --port=8001 --workers 4

service/es_search_handler.py

Lines changed: 112 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from fastapi import Depends
88
import jsondiff
99
import requests
10+
import os
1011

1112

1213
class SearchCommonHandler(object):
@@ -16,7 +17,7 @@ def get_headers():
1617
''' Elasticsearch Header '''
1718
return {
1819
'Content-type': 'application/json',
19-
'Authorization' : 'Basic YWRtaW46cG9zY28xMjM=',
20+
'Authorization' : 'Basic {}'.format(os.getenv('BASIC_AUTH')),
2021
'Connection': 'close'
2122
}
2223

@@ -95,6 +96,8 @@ class SearchAPIHandler(object):
9596
def __init__(self, logger, hosts):
9697
self.logger = logger
9798
self.hosts = hosts
99+
self.all_same_mapping = []
100+
self.response = {}
98101

99102

100103
def get_es_health(self, es_host):
@@ -230,8 +233,10 @@ def get_index_mapping_compare(self, index_name, source, target):
230233
verify_certs=False,
231234
max_retries=0,
232235
timeout=5)
236+
233237
try:
234238
source_mapping = self.es_client_source.indices.get_mapping(index=index_name)
239+
self.logger.info(source_mapping)
235240
except Exception as e:
236241
# return {"error" : 'Index [{}]was not found in {} [Source:Elasticsearch Cluster]'.format(index_name, source)}
237242
return StatusException.raise_exception('Index [{}]was not found in {} [Source:Elasticsearch Cluster]'.format(index_name, source))
@@ -243,19 +248,119 @@ def get_index_mapping_compare(self, index_name, source, target):
243248
return StatusException.raise_exception('Index [{}]was not found in {} [Target:Elasticsearch Cluster]'.format(index_name, target))
244249

245250
# Compare JSON objects using jsondiff
246-
# diff = jsondiff.diff(source_mapping, target_mapping, marshal=True, syntax="symmetric")
247-
diff = jsondiff.diff(source_mapping, target_mapping, marshal=True)
251+
diff = jsondiff.diff(source_mapping, target_mapping, marshal=True, syntax="symmetric")
252+
# diff = jsondiff.diff(source_mapping, target_mapping, marshal=True)
248253

249254
# Print the difference between the two JSON objects
250-
# print(json.dumps(diff, indent=2))
255+
# self.logger.info(json.dumps(diff, indent=2))
256+
251257
return response_payload_transform(diff)
252258
except Exception as e:
253259
return {"error" : str(e)}
254-
finally:
255-
self.es_client_source.close()
256-
self.es_client_target.close()
257260

258261

262+
def compare_mapping(self, index_name, diff):
263+
''' compare diff using jsondiff library '''
264+
if not diff:
265+
self.all_same_mapping.append(True)
266+
self.response.update({index_name : {'diff' : 'Same mapping'}})
267+
else:
268+
self.all_same_mapping.append(False)
269+
self.response.update({index_name : {'diff' : 'Different mapping', 'result' : diff}})
270+
return self.response, self.all_same_mapping
271+
272+
273+
def get_mapping_from_properties(self, mapping, es_v5=False):
274+
if es_v5:
275+
return {"properties" : v2.get("properties") for k, v in mapping.items() for k1, v1 in v.items() for k2, v2 in v1.items() if self.lookup_type_in_indices(k2)}
276+
else:
277+
return {'properties': v2 for k, v in mapping.items() for k1, v1 in v.items() for k2, v2 in v1.items() }
278+
279+
280+
def es_version_verify(self, es_client):
281+
# print(es_client.info()['version']['number'], type(es_client.info()['version']['number']))
282+
''' if es_client v.5.X '''
283+
if "5." in es_client.info()['version']['number']:
284+
return True
285+
else:
286+
return False
287+
288+
def lookup_type_in_indices(self, key):
289+
''' lookup type we want to compare from the source es cluster '''
290+
if "OM_" in key or "WX_" in key or "ES_" in key or "ARCHIVE_" in key:
291+
return True
292+
return False
293+
294+
295+
def get_index_all_mapping_compare(self, source, target):
296+
''' Compare all index mapping between two clusters for a given ES indices'''
297+
source_idx_cnt, target_idx_cnt = 0, 0
298+
try:
299+
self.es_client_source = Elasticsearch(hosts=source,
300+
headers=SearchCommonHandler.get_headers(),
301+
verify_certs=False,
302+
max_retries=0,
303+
timeout=5)
304+
self.es_client_target = Elasticsearch(hosts=target,
305+
headers=SearchCommonHandler.get_headers(),
306+
verify_certs=False,
307+
max_retries=0,
308+
timeout=5)
309+
try:
310+
source_idx_lists = list(self.es_client_source.indices.get("*"))
311+
312+
for index_name in source_idx_lists:
313+
''' real index '''
314+
if index_name.startswith("wx_") or index_name.startswith("om_") or index_name.startswith("es_") or index_name.startswith("archive_es_"):
315+
source_idx_cnt +=1
316+
try:
317+
source_mapping = self.es_client_source.indices.get_mapping(index=index_name)
318+
target_mapping = self.es_client_target.indices.get_mapping(index=index_name)
319+
320+
''' Get ES v.5.6.4 mapping '''
321+
source_mapping = self.get_mapping_from_properties(source_mapping, es_v5=self.es_version_verify(self.es_client_source))
322+
# print(source_mapping)
323+
324+
''' Get ES v.8.17 mapping if es_version_verify False '''
325+
target_mapping = self.get_mapping_from_properties(target_mapping, es_v5=self.es_version_verify(self.es_client_target))
326+
# print(target_mapping)
327+
328+
''' Get ES v.8.17 mapping '''
329+
# target_mapping = get_mapping_from_properties(target_mapping)
330+
# print(target_mapping)
331+
332+
# Compare JSON objects using jsondiff
333+
diff = jsondiff.diff(source_mapping, target_mapping, marshal=True, syntax="symmetric")
334+
# diff = jsondiff.diff(source_mapping, target_mapping, marshal=True)
335+
336+
''' Compare mapping the specific index_name between source/target cluster '''
337+
self.compare_mapping(index_name, diff)
338+
target_idx_cnt += 1
339+
except Exception as e:
340+
print(e)
341+
# return StatusException.raise_exception('Index [{}]was not found in {} [Source:Elasticsearch Cluster]'.format(index_name, source))
342+
# pass
343+
except Exception as e:
344+
print(e)
345+
# pass
346+
return {"error" : str(e)}
347+
348+
resp = {
349+
"mappings_same" : all(self.all_same_mapping),
350+
"mapping_details" : self.response,
351+
"source_idx_total_cnt" : source_idx_cnt,
352+
"target_idx_total_cnt" : target_idx_cnt
353+
}
354+
355+
# return self.response, self.all_same_mapping
356+
return resp
357+
# return all(self.all_same_mapping)
358+
359+
except Exception as e:
360+
# return StatusException.raise_exception(str(e))
361+
return {"error" : str(e)}
362+
363+
259364
async def get_index_mapping_compare_test(self, source_mapping, target_mapping):
260365
''' Compare index mapping as test between two clusters'''
261366
try:

0 commit comments

Comments
 (0)