Skip to content

Commit ad9b0b2

Browse files
craig[bot]ajwerner
andcommitted
Merge #155516
155516: workload: add mechanism to run changefeed alongside workloads r=wenyihu6 a=stevendanna This is a manual foward-port of #73034 with a few small modifications from me. This allows a user to run a changefeed with any workload which listens for writes to all tables. It adds a `--with-changefeed` flag to `workload <name> run` commands and then outputs a time series which indicates the number of rows and the difference in time between when the client sees the row and its commit timestamp. Note that using the commit timestamp may be controversial. That timestamp will generally include all of the client latency. That's got some nice properties. If the latency added no additional latency, then the duration distribution would be exactly that of the client's writes. Example: ```bash ./bin/workload run kv --concurrency 10 --with-changefeed=true ``` ``` _elapsed___errors__ops/sec(inst)___ops/sec(cum)__p50(ms)__p95(ms)__p99(ms)_pMax(ms) 31.0s 0 15544.3 12910.1 3221.2 5905.6 6442.5 6710.9 changefeed 31.0s 0 7504.6 7405.3 1.2 2.0 9.4 17.8 write 32.0s 0 14963.0 12974.2 2952.8 5905.6 6174.0 6710.9 changefeed 32.0s 0 7408.5 7405.4 1.2 1.9 8.9 19.9 write 33.0s 0 14197.2 13011.3 2818.6 6174.0 6442.5 6979.3 changefeed 33.0s 0 7039.6 7394.3 1.2 2.2 7.6 37.7 write 34.0s 0 15786.2 13092.9 2550.1 5637.1 6174.0 6442.5 changefeed 34.0s 0 7162.6 7387.5 1.2 2.4 7.6 35.7 write 35.0s 0 10789.0 13027.1 2818.6 6174.0 6442.5 6710.9 changefeed 35.0s 0 3513.0 7276.8 1.2 2.5 7.6 352.3 write 36.0s 0 16003.6 13109.7 3221.2 6174.0 7247.8 7516.2 changefeed 36.0s 0 8260.3 7304.1 1.1 1.7 4.5 16.3 write 37.0s 0 15547.7 13175.6 3087.0 5100.3 6174.0 6710.9 changefeed 37.0s 0 8218.4 7328.8 1.0 1.5 5.8 18.9 write 38.0s 0 14046.1 13198.5 2684.4 6710.9 6979.3 7516.2 changefeed 38.0s 0 8062.1 7348.1 1.1 1.6 5.2 16.8 write 39.0s 0 12096.6 13170.3 1677.7 1879.0 2080.4 2147.5 changefeed 39.0s 0 8285.4 7372.1 1.1 1.5 4.7 17.8 write 40.0s 0 11868.9 13137.8 1208.0 1409.3 1409.3 1409.3 changefeed 40.0s 0 8661.9 7404.4 1.0 1.5 3.4 12.6 write _elapsed___errors__ops/sec(inst)___ops/sec(cum)__p50(ms)__p95(ms)__p99(ms)_pMax(ms) 41.0s 0 8675.6 13028.9 838.9 1006.6 1006.6 1006.6 changefeed 41.0s 0 8846.6 7439.6 1.0 1.4 3.3 12.1 write ``` Release note (cli change): `cockroach workload <name> run` commands now offer a `--with-changefeed` flag to additionally run a changefeed watching for writes to the workload's tables. Co-authored-by: Andrew Werner <awerner32@gmail.com>
2 parents 5b5330b + bf84cf5 commit ad9b0b2

File tree

5 files changed

+186
-4
lines changed

5 files changed

+186
-4
lines changed

pkg/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2865,6 +2865,7 @@ GO_TARGETS = [
28652865
"//pkg/workload/bank:bank",
28662866
"//pkg/workload/bank:bank_test",
28672867
"//pkg/workload/bulkingest:bulkingest",
2868+
"//pkg/workload/changefeeds:changefeeds",
28682869
"//pkg/workload/cli:cli",
28692870
"//pkg/workload/cli:cli_test",
28702871
"//pkg/workload/conflict:conflict",
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library")
2+
3+
go_library(
4+
name = "changefeeds",
5+
srcs = ["changefeeds.go"],
6+
importpath = "github.com/cockroachdb/cockroach/pkg/workload/changefeeds",
7+
visibility = ["//visibility:public"],
8+
deps = [
9+
"//pkg/util/hlc",
10+
"//pkg/util/timeutil",
11+
"//pkg/workload",
12+
"//pkg/workload/histogram",
13+
"@com_github_cockroachdb_errors//:errors",
14+
"@com_github_jackc_pgx_v5//:pgx",
15+
],
16+
)
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package changefeeds
7+
8+
import (
9+
"context"
10+
"encoding/json"
11+
"fmt"
12+
"strings"
13+
14+
"github.com/cockroachdb/cockroach/pkg/util/hlc"
15+
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
16+
"github.com/cockroachdb/cockroach/pkg/workload"
17+
"github.com/cockroachdb/cockroach/pkg/workload/histogram"
18+
"github.com/cockroachdb/errors"
19+
"github.com/jackc/pgx/v5"
20+
)
21+
22+
// AddChangefeedToQueryLoad augments the passed QueryLoad to contain an extra
23+
// worker to run a changefeed over the tables of the generator.
24+
func AddChangefeedToQueryLoad(
25+
ctx context.Context,
26+
gen workload.ConnFlagser,
27+
dbName string,
28+
urls []string,
29+
reg *histogram.Registry,
30+
ql *workload.QueryLoad,
31+
) error {
32+
cfg := workload.MultiConnPoolCfg{
33+
Method: gen.ConnFlags().Method,
34+
MaxTotalConnections: 2,
35+
}
36+
37+
mcp, err := workload.NewMultiConnPool(ctx, cfg, urls...)
38+
if err != nil {
39+
return err
40+
}
41+
conn, err := mcp.Get().Acquire(ctx)
42+
if err != nil {
43+
return err
44+
}
45+
if _, err := conn.Exec(ctx, fmt.Sprintf("USE %q", dbName)); err != nil {
46+
return err
47+
}
48+
49+
setAppName := fmt.Sprintf("SET application_name='%s_changefeed'", gen.Meta().Name)
50+
if _, err := conn.Exec(ctx, setAppName); err != nil {
51+
return err
52+
}
53+
54+
cfLatency := reg.GetHandle().Get("changefeed")
55+
56+
var sessionID string
57+
if err := conn.QueryRow(ctx, "SHOW session_id").Scan(&sessionID); err != nil {
58+
return errors.Wrap(err, "getting session_id")
59+
}
60+
61+
// Create a second connection to close the first connection by issuing a
62+
// cancel request.
63+
closeConn, err := mcp.Get().Acquire(ctx)
64+
if err != nil {
65+
return err
66+
}
67+
if _, err := closeConn.Exec(ctx, setAppName); err != nil {
68+
return err
69+
}
70+
71+
tableNames := strings.Builder{}
72+
for i, table := range gen.Tables() {
73+
if i == 0 {
74+
fmt.Fprintf(&tableNames, "%q", table.Name)
75+
} else {
76+
fmt.Fprintf(&tableNames, ", %q", table.Name)
77+
}
78+
}
79+
80+
stmt := fmt.Sprintf("EXPERIMENTAL CHANGEFEED FOR %s WITH updated, no_initial_scan, schema_change_policy=nobackfill",
81+
tableNames.String())
82+
cfCtx, cancel := context.WithCancel(ctx)
83+
84+
var doneErr error
85+
maybeMarkDone := func(err error) (done bool) {
86+
if err == nil {
87+
return false
88+
}
89+
cancel()
90+
_ = conn.Conn().Close(ctx)
91+
doneErr, conn = err, nil
92+
return true
93+
}
94+
var rows pgx.Rows
95+
maybeSetupRows := func() (done bool) {
96+
if rows != nil {
97+
return false
98+
}
99+
var err error
100+
rows, err = conn.Query(cfCtx, stmt)
101+
return maybeMarkDone(err)
102+
}
103+
ql.WorkerFns = append(ql.WorkerFns, func(ctx context.Context) error {
104+
if doneErr != nil {
105+
return doneErr
106+
}
107+
if maybeSetupRows() {
108+
return doneErr
109+
}
110+
if rows.Next() {
111+
values, err := rows.Values()
112+
if maybeMarkDone(err) {
113+
return doneErr
114+
}
115+
type updatedJSON struct {
116+
Updated string `json:"updated"`
117+
}
118+
var v updatedJSON
119+
if maybeMarkDone(json.Unmarshal(values[2].([]byte), &v)) {
120+
return doneErr
121+
}
122+
updated, err := hlc.ParseHLC(v.Updated)
123+
if maybeMarkDone(err) {
124+
return doneErr
125+
}
126+
cfLatency.Record(timeutil.Since(updated.GoTime()))
127+
return nil
128+
}
129+
if maybeMarkDone(rows.Err()) {
130+
return doneErr
131+
}
132+
maybeMarkDone(errors.New("changefeed ended"))
133+
return doneErr
134+
})
135+
136+
prevClose := ql.Close
137+
ql.Close = func(ctx context.Context) error {
138+
cancel()
139+
_, _ = closeConn.Exec(ctx, "CANCEL SESSION $1", sessionID)
140+
if err := closeConn.Conn().Close(ctx); err != nil {
141+
return err
142+
}
143+
if conn != nil {
144+
if err := conn.Conn().Close(ctx); err != nil {
145+
return err
146+
}
147+
}
148+
if err := prevClose(ctx); err != nil {
149+
return err
150+
}
151+
return nil
152+
}
153+
return nil
154+
}

pkg/workload/cli/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ go_library(
2424
"//pkg/util/retry",
2525
"//pkg/util/timeutil",
2626
"//pkg/workload",
27+
"//pkg/workload/changefeeds",
2728
"//pkg/workload/histogram",
2829
"//pkg/workload/histogram/exporter",
2930
"//pkg/workload/workloadsql",

pkg/workload/cli/run.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/cockroachdb/cockroach/pkg/util/retry"
3030
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
3131
"github.com/cockroachdb/cockroach/pkg/workload"
32+
"github.com/cockroachdb/cockroach/pkg/workload/changefeeds"
3233
"github.com/cockroachdb/cockroach/pkg/workload/histogram"
3334
"github.com/cockroachdb/cockroach/pkg/workload/histogram/exporter"
3435
"github.com/cockroachdb/cockroach/pkg/workload/workloadsql"
@@ -70,6 +71,8 @@ var prometheusPort = sharedFlags.Int(
7071
2112,
7172
"Port to expose prometheus metrics if the workload has a prometheus gatherer set.",
7273
)
74+
var withChangefeed = runFlags.Bool("with-changefeed", false,
75+
"Optionally run a changefeed over the tables")
7376

7477
// individualOperationReceiverAddr is an address to send latency
7578
// measurements to. By default it will not send anything.
@@ -484,13 +487,20 @@ func runRun(gen workload.Generator, urls []string, dbName string) error {
484487
log.Dev.Warningf(ctx, "retrying after error while creating load: %v", err)
485488
}
486489
ops, err = o.Ops(ctx, urls, reg)
490+
if err != nil && !*tolerateErrors {
491+
return errors.Wrapf(err, "failed to initialize the load generator")
492+
}
493+
494+
if *withChangefeed {
495+
log.Dev.Infof(ctx, "adding changefeed to query load...")
496+
err = changefeeds.AddChangefeedToQueryLoad(ctx, gen.(workload.ConnFlagser), dbName, urls, reg, &ops)
497+
if err != nil && !*tolerateErrors {
498+
return errors.Wrapf(err, "failed to initialize changefeed")
499+
}
500+
}
487501
if err == nil {
488502
return nil
489503
}
490-
err = errors.Wrapf(err, "failed to initialize the load generator")
491-
if !*tolerateErrors {
492-
return err
493-
}
494504
}
495505
if ctx.Err() != nil {
496506
// Don't retry endlessly. Note that this retry loop is not under the

0 commit comments

Comments
 (0)