@@ -84,21 +84,23 @@ func parseLine(line string) ([]interface{}, time.Time) {
8484 parsed [i ] = nil
8585 }
8686 }
87- return parsed , time . Now ()
87+ return parsed , reqTime
8888}
8989
90- func storeLogs (logs []string , conn * pgx.Conn ) {
90+ func storeLogs (logs []string , conn * pgx.Conn , lastEntryTime time. Time ) {
9191 rows := make ([][]interface {}, 0 , len (logs ))
9292 for _ , line := range logs {
9393 fmt .Println (line )
94- parsed , _ := parseLine (line )
95- rows = append (rows , parsed )
94+ parsed , entryTime := parseLine (line )
95+ if entryTime .After (lastEntryTime ) {
96+ rows = append (rows , parsed )
97+ }
9698 }
9799 if len (rows ) > 0 {
98100 _ , err := conn .CopyFrom (
99101 context .Background (),
100102 pgx.Identifier {os .Getenv ("TABLE_SCHEMA" ), os .Getenv ("TABLE_NAME" )},
101- []string {"time_local" , "path" , "ip" , "server_name" , "remote_user" , "remote_port" , "time" , "user_agent" , "user_id_got" , "user_id_set" , "request" , "status" , "body_bytes_sent" , "request_time" , "request_method" , "geoip_country_code" , "http_referrer" },
103+ []string {"time_local" , "path" , "ip" , "server_name" , "remote_user" , "remote_port" , "time" , "user_agent" , "user_id_got" , "user_id_set" , "request" , "status" , "body_bytes_sent" , "request_time" , "request_method" , "geoip_country_code" , "http_referrer" },
102104 pgx .CopyFromRows (rows ),
103105 )
104106 if err != nil {
@@ -107,6 +109,9 @@ func storeLogs(logs []string, conn *pgx.Conn) {
107109 }
108110 log .Printf ("Stored %d entries\n " , len (rows ))
109111 }
112+ if len (rows ) != len (logs ) {
113+ log .Printf ("Skipped %d entries\n " , len (logs ) - len (rows ))
114+ }
110115}
111116
112117func processLogs (conn * pgx.Conn ) {
@@ -115,12 +120,29 @@ func processLogs(conn *pgx.Conn) {
115120 buffer := make ([]string , 0 , 100 )
116121 var s string
117122 flushTime , _ := strconv .ParseInt (os .Getenv ("FLUSH_MS" ), 10 , 64 )
123+
124+ var lastEntryString string
125+ var lastEntryTime time.Time
126+ err := conn .QueryRow (
127+ context .Background (),
128+ fmt .Sprintf ("SELECT MAX(time)::text max FROM \" %s\" .\" %s\" " , os .Getenv ("TABLE_SCHEMA" ), os .Getenv ("TABLE_NAME" )),
129+ ).Scan (& lastEntryString )
130+ if err != nil {
131+ log .Println ("First time sync" )
132+ } else {
133+ log .Printf ("Last entry time: %s" , lastEntryString )
134+ lastEntryTime , err = time .Parse ("2006-01-02 15:04:05" , lastEntryString )
135+ if err != nil {
136+ panic (err )
137+ }
138+ }
139+
118140 for {
119141 s = <- logString
120142 now = time .Now ()
121143 buffer = append (buffer , s )
122- if now .Sub (lastSync ).Milliseconds () >= flushTime || now . Sub ( lastSync ). Milliseconds () < 100 {
123- storeLogs (buffer , conn )
144+ if now .Sub (lastSync ).Milliseconds () >= flushTime {
145+ storeLogs (buffer , conn , lastEntryTime )
124146 buffer = buffer [:0 ]
125147 lastSync = now
126148 }
0 commit comments