diff --git a/internal/scraper/scraper.go b/internal/scraper/scraper.go index 87c60d5b1..1245c7e3b 100644 --- a/internal/scraper/scraper.go +++ b/internal/scraper/scraper.go @@ -806,6 +806,10 @@ RECORD: switch key := dec.Key(); string(key) { case "ts": + if !t.IsZero() { + continue // already set by the 'time' field + } + var err error t, err = time.Parse(time.RFC3339Nano, string(value)) if err != nil { @@ -816,6 +820,15 @@ RECORD: continue RECORD } + // k6 sets a 'time' field on its logs using RFC3339; if present, report it instead of the default timestamp + case "time": + var err error + t, err = time.Parse(time.RFC3339, string(value)) + if err != nil { + s.logger.Warn().Err(err).Bytes("value", value).Msg("invalid timestamp scanning logs") + continue RECORD + } + default: if err := enc.EncodeKeyval(key, value); err != nil { // We should never hit this because all the entries are valid. diff --git a/internal/scraper/scraper_test.go b/internal/scraper/scraper_test.go index ec1ce2dc2..15c0babb1 100644 --- a/internal/scraper/scraper_test.go +++ b/internal/scraper/scraper_test.go @@ -2035,3 +2035,59 @@ func TestTickWithOffset(t *testing.T) { }) } } + +func TestExtractLogsK6TimeOverridesDefaultTimestamp(t *testing.T) { + // Test that k6's "time" field overrides the default "ts" timestamp from the logger + testCases := []struct { + name string + logs string + sharedLabels []labelPair + expected func(t *testing.T, streams Streams) + }{ + { + name: "k6 time field overrides default ts", + logs: `ts=2023-06-01T20:00:00Z time="2023-06-01T13:40:26-06:00" level=info msg="k6 message"`, + sharedLabels: []labelPair{ + {name: "probe", value: "test-probe"}, + }, + expected: func(t *testing.T, streams Streams) { + require.Len(t, streams, 1) + require.Len(t, streams[0].Entries, 1) + entry := streams[0].Entries[0] + + expectedTime, _ := time.Parse(time.RFC3339, "2023-06-01T13:40:26-06:00") + require.Equal(t, expectedTime, entry.Timestamp, "should use k6 time field, not default ts") + require.Equal(t, entry.Line, "level=info msg=\"k6 message\"\n") + }, + }, + { + name: "ts field only (no k6 time)", + logs: `ts=2023-06-01T20:00:00Z level=info msg="normal message"`, + sharedLabels: []labelPair{ + {name: "probe", value: "test-probe"}, + }, + expected: func(t *testing.T, streams Streams) { + require.Len(t, streams, 1) + require.Len(t, streams[0].Entries, 1) + entry := streams[0].Entries[0] + + expectedTime, _ := time.Parse(time.RFC3339Nano, "2023-06-01T20:00:00Z") + require.Equal(t, expectedTime, entry.Timestamp) + require.Equal(t, entry.Line, "level=info msg=\"normal message\"\n") + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + s := Scraper{ + logger: testhelper.Logger(t), + } + + streams := s.extractLogs(time.Now(), []byte(tc.logs), tc.sharedLabels) + tc.expected(t, streams) + }) + } +}