2424
2525from caso import exception
2626import caso .messenger
27-
27+ #add json lib
28+ import json
29+ #add datetime lib
30+ import datetime
2831
2932opts = [
3033 cfg .StrOpt ("host" , default = "localhost" , help = "Logstash host to send records to." ),
@@ -49,11 +52,29 @@ def __init__(self, host=CONF.logstash.host, port=CONF.logstash.port):
4952 self .sock = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
5053
5154 def push (self , records ):
55+
56+ # NOTE(acostantini): code for the serialization and push of the
57+ # records in logstash. JSON format to be used and encoding UTF-8
58+ """Serialization of records to be sent to logstash"""
59+ if not records :
60+ return
61+
62+ #Actual timestamp to be added on each record
63+ cdt = datetime .datetime .now ()
64+ ct = int (datetime .datetime .now ().timestamp ())
65+
66+ #Open the connection with LS
67+ self .sock .connect ((self .host , self .port ))
68+
5269 """Push records to logstash using tcp."""
5370 try :
54- self .sock .connect ((self .host , self .port ))
55- for _ , record in six .iteritems (records ):
56- self .sock .sendall (record .as_json () + "\n " )
71+ for record in records :
72+ #serialization of record
73+ rec = record .logstash_message ()
74+ #cASO timestamp added to each record
75+ rec ['caso-timestamp' ]= ct
76+ #Send the record to LS
77+ self .sock .send ((json .dumps (rec )+ '\n ' ).encode ('utf-8' ))
5778 except socket .error as e :
5879 raise exception .LogstashConnectionError (
5980 host = self .host , port = self .port , exception = e
0 commit comments