55from lambda_app .decorators import SQSEvent
66from lambda_app .decorators .events import SQSRecord
77from lambda_app .events .tracker import EventTracker
8+ from lambda_app .events_helper import get_records_from_sqs_event , read_event
89from lambda_app .helper import generate_hash
910from lambda_app .logging import get_logger
1011from lambda_app .repositories .mysql .ocoren_repository import OcorenRepository
@@ -31,13 +32,13 @@ def process(self, sqs_event):
3132
3233 event_tracker = EventTracker (self .logger )
3334 event_hash = None
34- # todo extrair essa logica daqui, o servico nao tem que fazer isso
35- records = self . get_records_from_sqs_event (sqs_event )
35+
36+ records = get_records_from_sqs_event (sqs_event , self . logger )
3637 if records is not None :
3738 process_counter = 0
3839 for record in records :
3940 process_counter += 1
40- event = self . _read_event (record )
41+ event = read_event (record , self . logger )
4142 if event is None :
4243 raise Exception ('Event is None' )
4344 event_hash = event ['hash' ] if 'hash' in event else generate_hash (event )
@@ -73,62 +74,3 @@ def process(self, sqs_event):
7374 event_tracker .track (event_hash , {'result' : result })
7475 self .logger .info ('Finishing the process' )
7576 return result
76-
77- def _read_event (self , record ):
78- self .logger .info ('try to reading event form record: {}' .format (record ))
79- self .logger .info ('Getting type of data: {}' .format (type (record )))
80- try :
81- self .logger .info ('dump: {}' .format (json .dumps (record )))
82- except Exception as err :
83- self .logger .error (err )
84- event_body = None
85- try :
86- if isinstance (record , dict ):
87- try :
88- event_body = json .loads (record ['body' ])
89- except Exception as err :
90- self .logger .error (err )
91- unescaped_str = ast .literal_eval (record ['body' ])
92- event_body = json .loads (unescaped_str )
93- elif isinstance (record , str ):
94- record = json .loads (record )
95- event_body = json .loads (record .body )
96- elif isinstance (record .body , str ):
97- event_body = json .loads (record .body )
98- else :
99- event_body = record .body
100- except Exception as err :
101- self .logger .error (err )
102- self .logger .info ('event_body: {}' .format (event_body ))
103- return event_body
104-
105- def get_records_from_sqs_event (self , sqs_event ):
106- records = []
107- try :
108- if isinstance (sqs_event , SQSEvent ):
109- self .logger .info ("SQSEvent instance" )
110- if not helper .empty (sqs_event .to_dict ()):
111- try :
112- sqs_event_dict = sqs_event .to_dict ()
113- if 'Records' in sqs_event_dict :
114- sqs_event_dict = sqs_event_dict ['Records' ]
115- for record in sqs_event_dict :
116- records .append (record )
117- except Exception as err :
118- self .logger .error (err )
119- records .append (sqs_event .to_dict ())
120- elif isinstance (sqs_event , SQSRecord ):
121- self .logger .info ("SQSRecord instance" )
122- if not helper .empty (sqs_event .to_dict ()):
123- records .append (sqs_event )
124- except Exception as err :
125- self .logger .error (err )
126- if isinstance (sqs_event , SQSEvent ) or isinstance (sqs_event , SQSRecord ):
127- self .logger .error (sqs_event .__dict__ )
128- else :
129- try :
130- self .logger .error (json .dumps (sqs_event ))
131- except Exception as err :
132- self .logger .error (err )
133- self .logger .error (str (sqs_event ))
134- return records
0 commit comments