Skip to content

Commit f902e55

Browse files
committed
workload/changefeed: add more logs
1 parent 7a64926 commit f902e55

File tree

2 files changed

+19
-0
lines changed

2 files changed

+19
-0
lines changed

pkg/workload/changefeeds/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ go_library(
77
visibility = ["//visibility:public"],
88
deps = [
99
"//pkg/util/hlc",
10+
"//pkg/util/log",
1011
"//pkg/util/timeutil",
1112
"//pkg/workload",
1213
"//pkg/workload/histogram",

pkg/workload/changefeeds/changefeeds.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,16 @@ import (
1313
"time"
1414

1515
"github.com/cockroachdb/cockroach/pkg/util/hlc"
16+
"github.com/cockroachdb/cockroach/pkg/util/log"
1617
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
1718
"github.com/cockroachdb/cockroach/pkg/workload"
1819
"github.com/cockroachdb/cockroach/pkg/workload/histogram"
1920
"github.com/cockroachdb/errors"
2021
"github.com/jackc/pgx/v5"
2122
)
2223

24+
var logResolvedEvery = log.Every(10 * time.Second)
25+
2326
// AddChangefeedToQueryLoad augments the passed QueryLoad to contain an extra
2427
// worker to run a changefeed over the tables of the generator.
2528
func AddChangefeedToQueryLoad(
@@ -83,6 +86,11 @@ func AddChangefeedToQueryLoad(
8386
}
8487
}
8588

89+
epoch, err := hlc.ParseHLC(cursorStr)
90+
if err != nil {
91+
return err
92+
}
93+
8694
tableNames := strings.Builder{}
8795
for i, table := range gen.Tables() {
8896
if i == 0 {
@@ -122,10 +130,15 @@ func AddChangefeedToQueryLoad(
122130
return true
123131
}
124132
var rows pgx.Rows
133+
var changefeedStartTime time.Time
125134
maybeSetupRows := func() (done bool) {
126135
if rows != nil {
127136
return false
128137
}
138+
if changefeedStartTime.IsZero() {
139+
changefeedStartTime = timeutil.Now()
140+
}
141+
log.Dev.Infof(ctx, "creating changefeed after %s with stmt: %s with args %v", timeutil.Since(epoch.GoTime()), stmt, args)
129142
var err error
130143
rows, err = conn.Query(cfCtx, stmt, args...)
131144
return maybeMarkDone(err)
@@ -169,6 +182,11 @@ func AddChangefeedToQueryLoad(
169182
return errors.Errorf("resolved timestamp %s is less than last resolved timestamp %s", resolved, lastResolved)
170183
}
171184
lastResolved = resolved
185+
if !lastResolved.IsEmpty() {
186+
if logResolvedEvery.ShouldLog() {
187+
log.Dev.Infof(ctx, "received resolved timestamp: lag=%s, ts=%s, sinceStart=%s", timeutil.Since(lastResolved.GoTime()), lastResolved, timeutil.Since(changefeedStartTime))
188+
}
189+
}
172190
} else {
173191
return errors.Errorf("failed to parse CHANGEFEED event: %s", values[2])
174192
}

0 commit comments

Comments
 (0)