Skip to content

Commit 44f53e8

Browse files
committed
kvnemesis: add CPut
This commit adds the following CPut operations to kvnemesis: - `CPutMatchExisting`: the CPut condition matches an existing key's value. Both the write and the preceding read are validated. - `CPutMatchMissing`: the CPut condition matches a missing key's nil value. This corresponds to the CPut generated by a typical `INSERT` statement. Again both the write and the preceding read are validated. - `CPutNoMatch`: the CPut condition is not satisfied and the request/batch/transaction fails. These errors are expected and ignored by the test, but we still do some validation on the observed values for the condition to fail. - `CPutAllowIfDoesNotExist`: the CPut condition is not satisfied but the operation succeeds if the value does not exist. The test is set up in a way that the value most likely does not exist. Fixes: #64810 Release note: None
1 parent 3f6a4f4 commit 44f53e8

File tree

8 files changed

+242
-42
lines changed

8 files changed

+242
-42
lines changed

pkg/kv/kvnemesis/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ go_library(
5555
"@com_github_cockroachdb_pebble//:pebble",
5656
"@com_github_cockroachdb_pebble//vfs",
5757
"@org_golang_google_protobuf//proto",
58+
"@org_golang_x_exp//maps",
5859
],
5960
)
6061

@@ -112,6 +113,7 @@ go_test(
112113
"@com_github_stretchr_testify//assert",
113114
"@com_github_stretchr_testify//require",
114115
"@org_golang_google_protobuf//proto",
116+
"@org_golang_x_exp//maps",
115117
],
116118
)
117119

pkg/kv/kvnemesis/applier.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,15 @@ func exceptDelRangeUsingTombstoneStraddlesRangeBoundary(err error) bool {
105105
return errors.Is(err, errDelRangeUsingTombstoneStraddlesRangeBoundary)
106106
}
107107

108+
func exceptConditionFailed(err error) bool {
109+
return errors.HasType(err, (*kvpb.ConditionFailedError)(nil))
110+
}
111+
108112
func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
109113
switch o := op.GetValue().(type) {
110114
case *GetOperation,
111115
*PutOperation,
116+
*CPutOperation,
112117
*ScanOperation,
113118
*BatchOperation,
114119
*DeleteOperation,
@@ -342,6 +347,23 @@ func applyClientOp(
342347
return
343348
}
344349
o.Result.OptionalTimestamp = ts
350+
case *CPutOperation:
351+
_, ts, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) {
352+
expVal := roachpb.MakeValueFromBytes(o.ExpVal)
353+
if o.AllowIfDoesNotExist {
354+
b.CPutAllowingIfNotExists(o.Key, o.Value(), expVal.TagAndDataBytes())
355+
} else {
356+
b.CPut(o.Key, o.Value(), expVal.TagAndDataBytes())
357+
}
358+
setLastReqSeq(b, o.Seq)
359+
})
360+
o.Result = resultInit(ctx, err)
361+
// If the CPut failed with ConditionFailedError, we still want to record the
362+
// timestamp and do some validation later.
363+
if err != nil && !exceptConditionFailed(err) {
364+
return
365+
}
366+
o.Result.OptionalTimestamp = ts
345367
case *ScanOperation:
346368
res, ts, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) {
347369
if o.SkipLocked {
@@ -561,6 +583,14 @@ func applyBatchOp(
561583
b.Put(subO.Key, subO.Value())
562584
}
563585
setLastReqSeq(b, subO.Seq)
586+
case *CPutOperation:
587+
expVal := roachpb.MakeValueFromBytes(subO.ExpVal)
588+
if subO.AllowIfDoesNotExist {
589+
b.CPutAllowingIfNotExists(subO.Key, subO.Value(), expVal.TagAndDataBytes())
590+
} else {
591+
b.CPut(subO.Key, subO.Value(), expVal.TagAndDataBytes())
592+
}
593+
setLastReqSeq(b, subO.Seq)
564594
case *ScanOperation:
565595
if subO.SkipLocked {
566596
panic(errors.AssertionFailedf(`SkipLocked cannot be used in batches`))
@@ -643,6 +673,10 @@ func applyBatchOp(
643673
res := b.Results[resultIdx]
644674
subO.Result = resultInit(ctx, res.Err)
645675
subO.Result.ResumeSpan = res.ResumeSpan
676+
case *CPutOperation:
677+
res := b.Results[resultIdx]
678+
subO.Result = resultInit(ctx, res.Err)
679+
subO.Result.ResumeSpan = res.ResumeSpan
646680
case *ScanOperation:
647681
res := b.Results[resultIdx]
648682
if res.Err != nil {

0 commit comments

Comments
 (0)