Skip to content

Commit f2d1d9b

Browse files
craig[bot]jbowensmiraradeva
committed
152356: storage: add cluster setting to use old compaction eligibility r=RaduBerinde a=jbowens In cockroachdb/pebble#5187 we altered the default compaction picking logic that determines whether a level is elgible for a default compaction. We added a knob to allow the configuration of this value at runtime in case the change has some unforeseen impact on some workloads. This commit connects this knob to a new cluster setting. This cluster setting is just an escape hatch that we don't anticipate needing. Epic: none Release note: none 155712: kvnemesis: add node stopping and restarting r=stevendanna a=miraradeva This commit adds two new kvnemesis operations: stop a running node, and restart a stopped node. Similarly to previous operations that inject faults, we need to be careful about either preserving quorum (liveness mode) or allowing unavailability (safety mode). This commit adds only a liveness mode variant of kvenemesis due to the current behavior of `TestCluster` which could lead to hanging goroutines if nodes are stopped sequentially and quorum is lost (see `TestCluster.stopServers` for more details). Part of: #64828 Release note: None Co-authored-by: Jackson Owens <jackson@cockroachlabs.com> Co-authored-by: Mira Radeva <mira@cockroachlabs.com>
3 parents fbdbcb8 + a00d8f0 + 7dfba9d commit f2d1d9b

File tree

13 files changed

+284
-32
lines changed

13 files changed

+284
-32
lines changed

pkg/kv/kvnemesis/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,11 @@ go_test(
9999
"//pkg/spanconfig",
100100
"//pkg/storage",
101101
"//pkg/storage/enginepb",
102+
"//pkg/storage/fs",
102103
"//pkg/testutils",
103104
"//pkg/testutils/datapathutils",
104105
"//pkg/testutils/echotest",
106+
"//pkg/testutils/listenerutil",
105107
"//pkg/testutils/serverutils",
106108
"//pkg/testutils/skip",
107109
"//pkg/testutils/sqlutils",
@@ -112,6 +114,7 @@ go_test(
112114
"//pkg/util/leaktest",
113115
"//pkg/util/log",
114116
"//pkg/util/randutil",
117+
"//pkg/util/stop",
115118
"//pkg/util/syncutil",
116119
"//pkg/util/uuid",
117120
"@com_github_cockroachdb_errors//:errors",

pkg/kv/kvnemesis/applier.go

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,20 +28,22 @@ import (
2828

2929
// Applier executes Steps.
3030
type Applier struct {
31-
env *Env
32-
dbs []*kv.DB
33-
mu struct {
31+
env *Env
32+
dbs []*kv.DB
33+
nodes *nodes
34+
mu struct {
3435
dbIdx int
3536
syncutil.Mutex
3637
txns map[string]*kv.Txn
3738
}
3839
}
3940

4041
// MakeApplier constructs an Applier that executes against the given DBs.
41-
func MakeApplier(env *Env, dbs ...*kv.DB) *Applier {
42+
func MakeApplier(env *Env, n *nodes, dbs ...*kv.DB) *Applier {
4243
a := &Applier{
43-
env: env,
44-
dbs: dbs,
44+
env: env,
45+
dbs: dbs,
46+
nodes: n,
4547
}
4648
a.mu.txns = make(map[string]*kv.Txn)
4749
return a
@@ -63,7 +65,7 @@ func (a *Applier) Apply(ctx context.Context, step *Step) (trace tracingpb.Record
6365
}
6466
trace = collectAndFinish()
6567
}()
66-
applyOp(recCtx, a.env, db, &step.Op)
68+
a.applyOp(recCtx, db, &step.Op)
6769
return collectAndFinish(), nil
6870
}
6971

@@ -120,7 +122,7 @@ func exceptContextCanceled(err error) bool {
120122
strings.Contains(err.Error(), "query execution canceled")
121123
}
122124

123-
func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
125+
func (a *Applier) applyOp(ctx context.Context, db *kv.DB, op *Operation) {
124126
switch o := op.GetValue().(type) {
125127
case *GetOperation,
126128
*PutOperation,
@@ -146,10 +148,10 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
146148
err := db.AdminTransferLease(ctx, o.Key, o.Target)
147149
o.Result = resultInit(ctx, err)
148150
case *ChangeSettingOperation:
149-
err := changeClusterSettingInEnv(ctx, env, o)
151+
err := changeClusterSettingInEnv(ctx, a.env, o)
150152
o.Result = resultInit(ctx, err)
151153
case *ChangeZoneOperation:
152-
err := updateZoneConfigInEnv(ctx, env, o.Type)
154+
err := updateZoneConfigInEnv(ctx, a.env, o.Type)
153155
o.Result = resultInit(ctx, err)
154156
case *BarrierOperation:
155157
var err error
@@ -162,10 +164,20 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
162164
case *FlushLockTableOperation:
163165
o.Result = resultInit(ctx, db.FlushLockTable(ctx, o.Key, o.EndKey))
164166
case *AddNetworkPartitionOperation:
165-
err := env.Partitioner.AddPartition(roachpb.NodeID(o.FromNode), roachpb.NodeID(o.ToNode))
167+
err := a.env.Partitioner.AddPartition(roachpb.NodeID(o.FromNode), roachpb.NodeID(o.ToNode))
166168
o.Result = resultInit(ctx, err)
167169
case *RemoveNetworkPartitionOperation:
168-
err := env.Partitioner.RemovePartition(roachpb.NodeID(o.FromNode), roachpb.NodeID(o.ToNode))
170+
err := a.env.Partitioner.RemovePartition(roachpb.NodeID(o.FromNode), roachpb.NodeID(o.ToNode))
171+
o.Result = resultInit(ctx, err)
172+
case *StopNodeOperation:
173+
serverID := int(o.NodeId) - 1
174+
a.env.Restarter.StopServer(serverID)
175+
a.nodes.setStopped(int(o.NodeId))
176+
o.Result = resultInit(ctx, nil)
177+
case *RestartNodeOperation:
178+
serverID := int(o.NodeId) - 1
179+
err := a.env.Restarter.RestartServer(serverID)
180+
a.nodes.setRunning(int(o.NodeId))
169181
o.Result = resultInit(ctx, err)
170182
case *ClosureTxnOperation:
171183
// Use a backoff loop to avoid thrashing on txn aborts. Don't wait between

pkg/kv/kvnemesis/applier_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func TestApplier(t *testing.T) {
7070
require.NoError(t, w.Finish())
7171
}
7272

73-
a := MakeApplier(env, db)
73+
a := MakeApplier(env, &nodes{}, db)
7474

7575
tests := []testCase{
7676
{

pkg/kv/kvnemesis/env.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,19 @@ type Logger interface {
2727
WriteFile(basename string, contents string) string
2828
}
2929

30+
type Restarter interface {
31+
StopServer(idx int)
32+
RestartServer(idx int) error
33+
}
34+
3035
// Env manipulates the environment (cluster settings, zone configurations) that
3136
// the Applier operates in.
3237
type Env struct {
3338
SQLDBs []*gosql.DB
3439
Tracker *SeqTracker
3540
L Logger
3641
Partitioner *rpc.Partitioner
42+
Restarter Restarter
3743
}
3844

3945
func (e *Env) anyNode() *gosql.DB {

pkg/kv/kvnemesis/generator.go

Lines changed: 86 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,11 @@ type FaultConfig struct {
406406
// RemoveNetworkPartition is an operation that simulates healing a network
407407
// partition.
408408
RemoveNetworkPartition int
409-
// Disk stalls and node crashes belong here.
409+
// StopNode is an operation that stops a randomly chosen node.
410+
StopNode int
411+
// RestartNode is an operation that restarts a randomly chosen node.
412+
RestartNode int
413+
// Disk stalls and other faults belong here.
410414
}
411415

412416
// newAllOperationsConfig returns a GeneratorConfig that exercises *all*
@@ -531,6 +535,8 @@ func newAllOperationsConfig() GeneratorConfig {
531535
Fault: FaultConfig{
532536
AddNetworkPartition: 1,
533537
RemoveNetworkPartition: 1,
538+
StopNode: 1,
539+
RestartNode: 1,
534540
},
535541
}}
536542
}
@@ -625,10 +631,12 @@ func NewDefaultConfig() GeneratorConfig {
625631
config.Ops.ClosureTxn.TxnClientOps.FlushLockTable = 0
626632
config.Ops.ClosureTxn.TxnBatchOps.Ops.FlushLockTable = 0
627633

628-
// Network partitions can result in unavailability and need to be enabled with
629-
// care by specific test variants.
634+
// Network partitions and node restarts can result in unavailability and need
635+
// to be enabled with care by specific test variants.
630636
config.Ops.Fault.AddNetworkPartition = 0
631637
config.Ops.Fault.RemoveNetworkPartition = 0
638+
config.Ops.Fault.StopNode = 0
639+
config.Ops.Fault.RestartNode = 0
632640
return config
633641
}
634642

@@ -679,7 +687,7 @@ type Generator struct {
679687

680688
// MakeGenerator constructs a Generator.
681689
func MakeGenerator(
682-
config GeneratorConfig, replicasFn GetReplicasFn, mode TestMode,
690+
config GeneratorConfig, replicasFn GetReplicasFn, mode TestMode, n *nodes,
683691
) (*Generator, error) {
684692
if config.NumNodes <= 0 {
685693
return nil, errors.Errorf(`NumNodes must be positive got: %d`, config.NumNodes)
@@ -716,6 +724,7 @@ func MakeGenerator(
716724
currentSplits: make(map[string]struct{}),
717725
historicalSplits: make(map[string]struct{}),
718726
partitions: p,
727+
nodes: n,
719728
mode: mode,
720729
}
721730
return g, nil
@@ -755,6 +764,9 @@ type generator struct {
755764
// between nodes.
756765
partitions
757766

767+
// nodes contains the sets of running and stopped nodes.
768+
nodes *nodes
769+
758770
// mode is the test mode (e.g. Liveness or Safety). The generator needs it in
759771
// order to set a timeout for range lookups under safety mode.
760772
mode TestMode
@@ -770,6 +782,52 @@ type partitions struct {
770782
partitioned map[connection]struct{}
771783
}
772784

785+
// nodes contains the sets of running and stopped nodes. This struct is shared
786+
// between the generator and the applier to make sure nodes are promptly marked
787+
// as running/stopped when operations are generated/applied. The generator uses
788+
// removeRandRunning and removeRandStopped to pick nodes to stop/restart, and
789+
// the applier uses setRunning and setStopped to update the sets when operations
790+
// are actually applied. This is important because there could be a gap of
791+
// multiple seconds between generating a stop/restart operation and a node fully
792+
// stopping/restarting.
793+
type nodes struct {
794+
mu syncutil.RWMutex
795+
running map[int]struct{}
796+
stopped map[int]struct{}
797+
}
798+
799+
func randNodeFromMap(m map[int]struct{}, rng *rand.Rand) int {
800+
return maps.Keys(m)[rng.Intn(len(m))]
801+
}
802+
803+
func (n *nodes) removeRandRunning(rng *rand.Rand) int {
804+
n.mu.Lock()
805+
defer n.mu.Unlock()
806+
nodeID := randNodeFromMap(n.running, rng)
807+
delete(n.running, nodeID)
808+
return nodeID
809+
}
810+
811+
func (n *nodes) removeRandStopped(rng *rand.Rand) int {
812+
n.mu.Lock()
813+
defer n.mu.Unlock()
814+
nodeID := randNodeFromMap(n.stopped, rng)
815+
delete(n.stopped, nodeID)
816+
return nodeID
817+
}
818+
819+
func (n *nodes) setRunning(nodeID int) {
820+
n.mu.Lock()
821+
defer n.mu.Unlock()
822+
n.running[nodeID] = struct{}{}
823+
}
824+
825+
func (n *nodes) setStopped(nodeID int) {
826+
n.mu.Lock()
827+
defer n.mu.Unlock()
828+
n.stopped[nodeID] = struct{}{}
829+
}
830+
773831
// RandStep returns a single randomly generated next operation to execute.
774832
//
775833
// RandStep is not concurrency safe.
@@ -851,6 +909,12 @@ func (g *generator) RandStep(rng *rand.Rand) Step {
851909
addOpGen(&allowed, toggleGlobalReads, g.Config.Ops.ChangeZone.ToggleGlobalReads)
852910
addOpGen(&allowed, addRandNetworkPartition, g.Config.Ops.Fault.AddNetworkPartition)
853911
addOpGen(&allowed, removeRandNetworkPartition, g.Config.Ops.Fault.RemoveNetworkPartition)
912+
if len(g.nodes.running) > 0 {
913+
addOpGen(&allowed, stopRandNode, g.Config.Ops.Fault.StopNode)
914+
}
915+
if len(g.nodes.stopped) > 0 {
916+
addOpGen(&allowed, restartRandNode, g.Config.Ops.Fault.RestartNode)
917+
}
854918

855919
return step(g.selectOp(rng, allowed))
856920
}
@@ -1759,6 +1823,16 @@ func removeRandNetworkPartition(g *generator, rng *rand.Rand) Operation {
17591823
return removeNetworkPartition(randConn.from, randConn.to)
17601824
}
17611825

1826+
func stopRandNode(g *generator, rng *rand.Rand) Operation {
1827+
randNode := g.nodes.removeRandRunning(rng)
1828+
return stopNode(randNode)
1829+
}
1830+
1831+
func restartRandNode(g *generator, rng *rand.Rand) Operation {
1832+
randNode := g.nodes.removeRandStopped(rng)
1833+
return restartNode(randNode)
1834+
}
1835+
17621836
func makeRandBatch(c *ClientOperationConfig) opGenFunc {
17631837
return func(g *generator, rng *rand.Rand) Operation {
17641838
var allowed []opGen
@@ -2380,6 +2454,14 @@ func removeNetworkPartition(from int, to int) Operation {
23802454
}
23812455
}
23822456

2457+
func stopNode(nodeID int) Operation {
2458+
return Operation{StopNode: &StopNodeOperation{NodeId: int32(nodeID)}}
2459+
}
2460+
2461+
func restartNode(nodeID int) Operation {
2462+
return Operation{RestartNode: &RestartNodeOperation{NodeId: int32(nodeID)}}
2463+
}
2464+
23832465
type countingRandSource struct {
23842466
count atomic.Uint64
23852467
inner rand.Source64

pkg/kv/kvnemesis/generator_test.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,11 @@ func TestRandStep(t *testing.T) {
7777
return make([]roachpb.ReplicationTarget, rng.Intn(config.NumNodes)+1),
7878
make([]roachpb.ReplicationTarget, rng.Intn(config.NumNodes)+1)
7979
}
80-
g, err := MakeGenerator(config, getReplicasFn, 0)
80+
n := nodes{
81+
running: map[int]struct{}{1: {}, 2: {}, 3: {}},
82+
stopped: make(map[int]struct{}),
83+
}
84+
g, err := MakeGenerator(config, getReplicasFn, 0, &n)
8185
require.NoError(t, err)
8286

8387
keys := make(map[string]struct{})
@@ -422,6 +426,16 @@ func TestRandStep(t *testing.T) {
422426
counts.Fault.AddNetworkPartition++
423427
case *RemoveNetworkPartitionOperation:
424428
counts.Fault.RemoveNetworkPartition++
429+
case *StopNodeOperation:
430+
counts.Fault.StopNode++
431+
n.mu.Lock()
432+
n.stopped[int(o.NodeId)] = struct{}{}
433+
n.mu.Unlock()
434+
case *RestartNodeOperation:
435+
counts.Fault.RestartNode++
436+
n.mu.Lock()
437+
n.running[int(o.NodeId)] = struct{}{}
438+
n.mu.Unlock()
425439
default:
426440
t.Fatalf("%T", o)
427441
}

pkg/kv/kvnemesis/kvnemesis.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,23 @@ func RunNemesis(
106106
return nil, fmt.Errorf("numSteps must be >0, got %v", numSteps)
107107
}
108108

109-
dataSpan := GeneratorDataSpan()
109+
n := nodes{
110+
running: make(map[int]struct{}),
111+
stopped: make(map[int]struct{}),
112+
}
113+
for i := 1; i <= config.NumNodes; i++ {
114+
// In liveness mode, we don't allow stopping and restarting the two
115+
// protected nodes (node 1 and node 2), so we don't include them in the set
116+
// of running nodes at all.
117+
protectedNode := i == 1 || i == 2
118+
if mode == Liveness && protectedNode {
119+
continue
120+
}
121+
n.running[i] = struct{}{}
122+
}
110123

111-
g, err := MakeGenerator(config, newGetReplicasFn(dbs...), mode)
124+
dataSpan := GeneratorDataSpan()
125+
g, err := MakeGenerator(config, newGetReplicasFn(dbs...), mode, &n)
112126
if err != nil {
113127
return nil, err
114128
}
@@ -118,7 +132,7 @@ func RunNemesis(
118132
if mode == Liveness && len(applierDBs) >= 2 {
119133
applierDBs = applierDBs[:2]
120134
}
121-
a := MakeApplier(env, applierDBs...)
135+
a := MakeApplier(env, &n, applierDBs...)
122136
w, err := Watch(ctx, env, dbs, dataSpan)
123137
if err != nil {
124138
return nil, err
@@ -182,6 +196,9 @@ func RunNemesis(
182196
return nil, err
183197
}
184198
env.Partitioner.EnablePartitions(false)
199+
for i := 0; i < config.NumNodes; i++ {
200+
_ = env.Restarter.RestartServer(i)
201+
}
185202

186203
allSteps := make(steps, 0, numSteps)
187204
for _, steps := range stepsByWorker {

0 commit comments

Comments
 (0)