Skip to content

Commit 4b80cd5

Browse files
craig[bot]mw5hspilchen
committed
155284: backfill: add a separate control for number of vectors merged per batch r=mw5h a=mw5h Add the 'bulkio.index_backfill.vector_merge_batch_size', which controls the number of rows to merge into vector indexes per transaction while the index is being created. This is analogous to the 'bulkio.index_backfill.merge_batch_size' setting, but it only applies when the target index is a vector index. By default, 3 vectors will be merged per transaction to reduce contention with fixup tasks. Fixes: #155283 Release note (sql change): Added the bulkio.index_backfill.vector_merge_batch_size cluster setting to control how many vectors to merge into a vector index per transaction during create operations. By default, this defaults to 3. 155439: roachtest/inspect: use INSPECT instead of SCRUB in inspect test r=spilchen a=spilchen Now that the INSPECT command has been implemented, we can use INSPECT instead of SCRUB to start inspect in the inspect throughput test. Informs #154457 Epic: CRDB-30356 Release note: none Co-authored-by: Matt White <matt.white@cockroachlabs.com> Co-authored-by: Matt Spilchen <matt.spilchen@cockroachlabs.com>
3 parents 9df3295 + 61efdca + 53f7a91 commit 4b80cd5

File tree

4 files changed

+32
-23
lines changed

4 files changed

+32
-23
lines changed

pkg/cmd/roachtest/tests/inspect_throughput.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,7 @@ func registerInspectThoughput(r registry.Registry) {
3333
r.Add(makeInspectThroughputTest(r, 12, 8, 500_000_000, 3*time.Hour, 1))
3434

3535
// Long run: 12 nodes × 8 CPUs, 1B rows, 2 index checks (runs INSPECT twice: 1 index, then 2 indexes), ~5 hours (in v25.4)
36-
// TODO(148365): Until we have INSPECT syntax, we cannot execute the INSPECT
37-
// job on a subset of indexes. Set this to 2 when INSPECT SQL is available.
38-
const indexesForLongRun = 1
36+
const indexesForLongRun = 2
3937
r.Add(makeInspectThroughputTest(r, 12, 8, 1_000_000_000, 8*time.Hour, indexesForLongRun))
4038
}
4139

@@ -212,8 +210,8 @@ func makeInspectThroughputTest(
212210
t.Fatal(err)
213211
}
214212

215-
// Enable scrub jobs for EXPERIMENTAL SCRUB to use job system.
216-
if _, err := db.Exec("SET enable_scrub_job = on"); err != nil {
213+
// Enable INSPECT feature flag.
214+
if _, err := db.Exec("USE bulkingest; SET enable_inspect_command = true"); err != nil {
217215
t.Fatal(err)
218216
}
219217

@@ -226,9 +224,8 @@ func makeInspectThroughputTest(
226224
tickHistogram(cfg.metricName)
227225
before := timeutil.Now()
228226

229-
// TODO(148365): Update to use INSPECT syntax when SQL is ready.
230-
scrubSQL := fmt.Sprintf("EXPERIMENTAL SCRUB TABLE bulkingest.bulkingest WITH OPTIONS INDEX (%s)", cfg.indexListSQL)
231-
if _, err := db.Exec(scrubSQL); err != nil {
227+
inspectSQL := fmt.Sprintf("INSPECT TABLE bulkingest.bulkingest WITH OPTIONS INDEX (%s)", cfg.indexListSQL)
228+
if _, err := db.Exec(inspectSQL); err != nil {
232229
t.Fatal(err)
233230
}
234231

pkg/sql/backfill/mvcc_index_merger.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,17 @@ var indexBackfillMergeBatchSize = settings.RegisterIntSetting(
4747
settings.NonNegativeInt, /* validateFn */
4848
)
4949

50+
// indexBackfillVectorMergeBatchSize is the maximum number of vectors we attempt to merge
51+
// in a single transaction during the merging process. This is a much smaller number because
52+
// fixups create a lot of conflicts with big batches.
53+
var indexBackfillVectorMergeBatchSize = settings.RegisterIntSetting(
54+
settings.ApplicationLevel,
55+
"bulkio.index_backfill.vector_merge_batch_size",
56+
"the number of vectors we merge between temporary and adding indexes in a single batch",
57+
3,
58+
settings.NonNegativeInt, /* validateFn */
59+
)
60+
5061
// indexBackfillMergeBatchBytes is the maximum number of bytes we attempt to
5162
// merge from the temporary index in a single transaction during the merging
5263
// process.
@@ -315,7 +326,12 @@ func (ibm *IndexBackfillMerger) scan(
315326
}
316327
}
317328
}
318-
chunkSize := indexBackfillMergeBatchSize.Get(&ibm.flowCtx.Cfg.Settings.SV)
329+
var chunkSize int64
330+
if _, ok := ibm.VectorIndexes[ibm.spec.AddedIndexes[spanIdx]]; ok {
331+
chunkSize = indexBackfillVectorMergeBatchSize.Get(&ibm.flowCtx.Cfg.Settings.SV)
332+
} else {
333+
chunkSize = indexBackfillMergeBatchSize.Get(&ibm.flowCtx.Cfg.Settings.SV)
334+
}
319335
chunkBytes := indexBackfillMergeBatchBytes.Get(&ibm.flowCtx.Cfg.Settings.SV)
320336

321337
var br *kvpb.BatchResponse

pkg/sql/inspect/inspect_job_test.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,6 @@ import (
3131
// triggered by statements that run in implicit transactions. It verifies that the job
3232
// starts correctly, that errors or timeouts propagate to the user, and that
3333
// client-visible semantics (like statement timeout or job failure) behave as expected.
34-
//
35-
// Note: This test currently uses SCRUB to trigger a job, but is not testing SCRUB
36-
// itself. The goal is to verify general execution semantics for async job statements.
3734
func TestInspectJobImplicitTxnSemantics(t *testing.T) {
3835
defer leaktest.AfterTest(t)()
3936
defer log.Scope(t).Close(t)
@@ -71,7 +68,7 @@ func TestInspectJobImplicitTxnSemantics(t *testing.T) {
7168

7269
runner.Exec(t, `
7370
CREATE DATABASE db;
74-
SET enable_scrub_job = true;
71+
SET enable_inspect_command = true;
7572
CREATE TABLE db.t (
7673
id INT PRIMARY KEY,
7774
val INT
@@ -115,7 +112,7 @@ func TestInspectJobImplicitTxnSemantics(t *testing.T) {
115112
onInspectErrorToReturn.Store(&tc.onStartError)
116113
defer func() { onInspectErrorToReturn.Store(nil) }()
117114
}
118-
_, err := db.Exec("EXPERIMENTAL SCRUB TABLE db.t AS OF SYSTEM TIME '-1us'")
115+
_, err := db.Exec("INSPECT TABLE db.t AS OF SYSTEM TIME '-1us'")
119116
pauseJobStart.Store(false)
120117
if tc.expectedErrRegex != "" {
121118
require.Error(t, err)
@@ -202,7 +199,7 @@ func TestInspectJobProtectedTimestamp(t *testing.T) {
202199
runner := sqlutils.MakeSQLRunner(db)
203200
runner.Exec(t, `
204201
CREATE DATABASE db;
205-
SET enable_scrub_job = true;
202+
SET enable_inspect_command = true;
206203
CREATE TABLE db.t (
207204
id INT PRIMARY KEY,
208205
val INT
@@ -216,7 +213,7 @@ func TestInspectJobProtectedTimestamp(t *testing.T) {
216213
// Start INSPECT job with AS OF timestamp in a goroutine
217214
errCh := make(chan error, 1)
218215
go func() {
219-
_, err := db.Exec("EXPERIMENTAL SCRUB TABLE db.t AS OF SYSTEM TIME '-1us'")
216+
_, err := db.Exec("INSPECT TABLE db.t AS OF SYSTEM TIME '-1us'")
220217
errCh <- err
221218
}()
222219

@@ -368,12 +365,11 @@ func TestInspectProgressWithMultiRangeTable(t *testing.T) {
368365
})
369366

370367
// Start the INSPECT job.
371-
// TODO(148365): Run INSPECT instead of SCRUB.
372368
t.Logf("Starting INSPECT job on table with %d ranges distributed across %d nodes", rangeCount, nodeCount)
373369
_, err := db.Exec(`
374-
SET enable_scrub_job = true;
370+
SET enable_inspect_command = true;
375371
COMMIT;
376-
EXPERIMENTAL SCRUB TABLE multi_range_table`)
372+
INSPECT TABLE multi_range_table`)
377373
require.NoError(t, err)
378374

379375
var jobID int64

pkg/sql/inspect/inspect_metrics_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func TestInspectMetrics(t *testing.T) {
3535
runner := sqlutils.MakeSQLRunner(db)
3636
runner.Exec(t, `
3737
CREATE DATABASE db;
38-
SET enable_scrub_job = true;
38+
SET enable_inspect_command = true;
3939
CREATE TABLE db.t (
4040
id INT PRIMARY KEY,
4141
val INT
@@ -52,7 +52,7 @@ func TestInspectMetrics(t *testing.T) {
5252
initialIssuesFound := metrics.IssuesFound.Count()
5353

5454
// First run: no corruption, should succeed without issues
55-
runner.Exec(t, "EXPERIMENTAL SCRUB TABLE db.t")
55+
runner.Exec(t, "INSPECT TABLE db.t")
5656
require.Equal(t, initialRuns+1, metrics.Runs.Count(), "Runs counter should increment")
5757
require.Equal(t, initialRunsWithIssues, metrics.RunsWithIssues.Count(), "RunsWithIssues should not increment")
5858
require.Equal(t, initialIssuesFound, metrics.IssuesFound.Count(), "IssuesFound should not increment")
@@ -72,7 +72,7 @@ func TestInspectMetrics(t *testing.T) {
7272
require.NoError(t, err)
7373

7474
// Second run: with corruption, should detect the missing index entry.
75-
_, err = db.Exec("EXPERIMENTAL SCRUB TABLE db.t")
75+
_, err = db.Exec("INSPECT TABLE db.t")
7676
require.Error(t, err, "INSPECT should fail when corruption is detected")
7777
require.Contains(t, err.Error(), "INSPECT found inconsistencies")
7878
require.Equal(t, initialRuns+2, metrics.Runs.Count(),
@@ -91,7 +91,7 @@ func TestInspectMetrics(t *testing.T) {
9191
CREATE INDEX i2 on db.t2 (val);
9292
INSERT INTO db.t2 VALUES (1, 2), (2, 3);
9393
`)
94-
runner.Exec(t, "EXPERIMENTAL SCRUB TABLE db.t2")
94+
runner.Exec(t, "INSPECT TABLE db.t2")
9595
require.Equal(t, initialRuns+3, metrics.Runs.Count(),
9696
"Runs counter should increment for third job execution")
9797
require.Equal(t, initialRunsWithIssues+1, metrics.RunsWithIssues.Count(),

0 commit comments

Comments
 (0)