Skip to content

Commit b739945

Browse files
craig[bot]wenyihu6
andcommitted
154912: roachtest: metamorphically enable decommissioning nudger r=wenyihu6 a=wenyihu6 Epic: none Release note: none --- **roachtest: merge runDecommissionBench and runDecommissionBenchLong** Previously, decommission roachtests with duration > 0 used runDecommissionBenchLong, while the default used runDecommissionBench. It’s unclear to me why both were needed since most of the logic was duplicated except for minor differences. As a result, registering a decommission roachtest with a longer duration and setting whileUpreplicating = true or slow writes would not work, because the long version missed setup steps for additional nodes and slow writes. It's unclear whether this was intentional. This commit merges the two functions, keeping things more consistent. --- **roachtest: metamorphically enable decommissioning nudger** This commit enables the decommissioning nudger metamorphically for two roachtests. I selected these two ad hoc — one is labeled as slow, and the other has been intermittently failing on master to improve observability and help root casue. 155095: mmaintegration: improve logging around replace lease changes r=wenyihu6 a=wenyihu6 Epic: CRDB-55052 Release note: none --- **mmaintegration: improve logging around replace lease changes** This commit improves logging around allocator sync to make it easier to observe replica lease changes and understand thrashing issues. --- **asim: improve logging with asim/MMAStoreRebalancer** Previously, some commits printed the entire struct type, making the output unreadable. This commit improves the logging to print the info. Co-authored-by: wenyihu6 <wenyi@cockroachlabs.com>
3 parents 10e3b11 + 40ab8f6 + 84f34b9 commit b739945

File tree

18 files changed

+146
-173
lines changed

18 files changed

+146
-173
lines changed

pkg/cmd/roachtest/tests/decommissionbench.go

Lines changed: 61 additions & 144 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,12 @@ type decommissionBenchSpec struct {
8989
// instead of a random node.
9090
decommissionNode int
9191

92+
// When true, the decommissioning nudger
93+
// (kv.enqueue_in_replicate_queue_on_problem.interval) will be used to
94+
// periodically enqueue decommissioning ranges at its leaseholders every 5
95+
// minutes.
96+
useDecommissioningNudger bool
97+
9298
skip string
9399
}
94100

@@ -198,6 +204,16 @@ func registerDecommissionBench(r registry.Registry) {
198204
multiregion: true,
199205
decommissionNode: 2,
200206
},
207+
{
208+
// Same as above but using the decommissioning nudger.
209+
nodes: 6,
210+
warehouses: 1000,
211+
whileUpreplicating: true,
212+
drainFirst: true,
213+
multiregion: true,
214+
decommissionNode: 2,
215+
useDecommissioningNudger: true,
216+
},
201217
{
202218
// Multiregion decommission, and add a new node in a different region.
203219
nodes: 6,
@@ -207,6 +223,16 @@ func registerDecommissionBench(r registry.Registry) {
207223
multiregion: true,
208224
decommissionNode: 3,
209225
},
226+
{
227+
// Same as above but using the decommissioning nudger.
228+
nodes: 6,
229+
warehouses: 1000,
230+
whileUpreplicating: true,
231+
drainFirst: true,
232+
multiregion: true,
233+
decommissionNode: 3,
234+
useDecommissioningNudger: true,
235+
},
210236
} {
211237
registerDecommissionBenchSpec(r, benchSpec)
212238
}
@@ -274,6 +300,10 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe
274300
extraNameParts = append(extraNameParts, "multi-region")
275301
}
276302

303+
if benchSpec.useDecommissioningNudger {
304+
extraNameParts = append(extraNameParts, "use-nudger")
305+
}
306+
277307
// Save some money and CPU quota by using a smaller workload CPU. Only
278308
// do this for cluster of size 3 or smaller to avoid regressions.
279309
specOptions = append(specOptions, spec.WorkloadNode())
@@ -299,7 +329,7 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe
299329
specOptions...,
300330
),
301331
CompatibleClouds: registry.OnlyGCE,
302-
Suites: registry.Suites(registry.Nightly),
332+
Suites: registry.Suites(registry.Weekly),
303333
SkipPostValidations: registry.PostValidationNoDeadNodes,
304334
Timeout: timeout,
305335
NonReleaseBlocker: true,
@@ -336,11 +366,7 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe
336366
return aggregatedPerfMetrics, nil
337367
},
338368
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
339-
if benchSpec.duration > 0 {
340-
runDecommissionBenchLong(ctx, t, c, benchSpec, timeout)
341-
} else {
342-
runDecommissionBench(ctx, t, c, benchSpec, timeout)
343-
}
369+
runDecommissionBench(ctx, t, c, benchSpec, timeout)
344370
},
345371
})
346372
}
@@ -431,6 +457,15 @@ func setupDecommissionBench(
431457
db := c.Conn(ctx, t.L(), pinnedNode)
432458
defer db.Close()
433459

460+
// Metamorphically enable the decommissioning nudger to get more test
461+
// coverage on decommissioning nudger.
462+
if benchSpec.useDecommissioningNudger {
463+
if _, err := db.ExecContext(ctx, `SET CLUSTER SETTING kv.enqueue_in_replicate_queue_on_problem.interval = '5m'`); err != nil {
464+
t.Fatal(err)
465+
}
466+
t.L().Printf("enabled decommissioning nudger")
467+
}
468+
434469
// Note that we are waiting for 3 replicas only. We can't assume 5 replicas
435470
// here because 5 only applies to system ranges so we will never reach this
436471
// number globally. We also don't know if all upreplication succeeded, but
@@ -709,151 +744,34 @@ func runDecommissionBench(
709744
time.Sleep(1 * time.Minute)
710745
}
711746

712-
m.ExpectDeath()
713-
defer m.ResetDeaths()
714-
err := runSingleDecommission(ctx, c, h, pinnedNode, benchSpec.decommissionNode, &targetNodeAtomic, benchSpec.snapshotRate,
715-
benchSpec.whileDown, benchSpec.drainFirst, false /* reuse */, benchSpec.whileUpreplicating,
716-
true /* estimateDuration */, benchSpec.slowWrites, tickByName,
717-
)
718-
719-
// Include an additional minute of buffer time post-decommission to gather
720-
// workload stats.
721-
time.Sleep(1 * time.Minute)
722-
723-
return err
724-
})
725-
726-
m.Go(func(ctx context.Context) error {
727-
hists := reg.GetHandle()
728-
729-
db := c.Conn(ctx, t.L(), pinnedNode)
730-
defer db.Close()
731-
732-
return trackBytesUsed(ctx, db, &targetNodeAtomic, hists, tickByName)
733-
})
734-
735-
if err := m.WaitE(); err != nil {
736-
t.Fatal(err)
737-
}
738-
}
739-
740-
// runDecommissionBenchLong initializes a cluster with TPCC and attempts to
741-
// benchmark the decommissioning of nodes picked at random before subsequently
742-
// wiping them and re-adding them to the cluster to continually execute the
743-
// decommissioning process over the runtime of the test. The cluster may or may
744-
// not be running under load.
745-
func runDecommissionBenchLong(
746-
ctx context.Context,
747-
t test.Test,
748-
c cluster.Cluster,
749-
benchSpec decommissionBenchSpec,
750-
testTimeout time.Duration,
751-
) {
752-
// node1 is kept pinned (i.e. not decommissioned/restarted), and is the node
753-
// through which we run decommissions. The last node is used for the workload.
754-
pinnedNode := 1
755-
workloadNode := benchSpec.nodes + 1
756-
crdbNodes := c.Range(pinnedNode, benchSpec.nodes)
757-
t.L().Printf("nodes %d - %d are crdb nodes", crdbNodes[0], crdbNodes[len(crdbNodes)-1])
758-
t.L().Printf("node %d is the workload node", workloadNode)
759-
760-
maxRate := tpccMaxRate(benchSpec.warehouses)
761-
rampDuration := 3 * time.Minute
762-
rampStarted := make(chan struct{})
763-
importCmd := fmt.Sprintf(
764-
`./cockroach workload fixtures import tpcc --warehouses=%d`,
765-
benchSpec.warehouses,
766-
)
767-
workloadCmd := fmt.Sprintf("./cockroach workload run tpcc --warehouses=%d --max-rate=%d --duration=%s "+
768-
"%s --ramp=%s --tolerate-errors {pgurl:1-%d}", maxRate, benchSpec.warehouses,
769-
testTimeout, roachtestutil.GetWorkloadHistogramString(t, c, nil, true), rampDuration, benchSpec.nodes)
770-
771-
setupDecommissionBench(ctx, t, c, benchSpec, pinnedNode, importCmd)
772-
773-
workloadCtx, workloadCancel := context.WithCancel(ctx)
774-
m := c.NewDeprecatedMonitor(workloadCtx, crdbNodes)
775-
776-
if !benchSpec.noLoad {
777-
m.Go(
778-
func(ctx context.Context) error {
779-
close(rampStarted)
780-
781-
// Run workload indefinitely, to be later killed by context
782-
// cancellation once decommission has completed.
783-
err := c.RunE(ctx, option.WithNodes(c.Node(workloadNode)), workloadCmd)
784-
if errors.Is(ctx.Err(), context.Canceled) {
785-
// Workload intentionally cancelled via context, so don't return error.
786-
return nil
787-
}
747+
if benchSpec.duration > 0 {
748+
for tBegin := timeutil.Now(); timeutil.Since(tBegin) <= benchSpec.duration; {
749+
m.ExpectDeath()
750+
err := runSingleDecommission(ctx, c, h, pinnedNode, benchSpec.decommissionNode, &targetNodeAtomic, benchSpec.snapshotRate,
751+
benchSpec.whileDown, benchSpec.drainFirst, true /* reuse */, benchSpec.whileUpreplicating,
752+
true /* estimateDuration */, benchSpec.slowWrites, tickByName,
753+
)
754+
m.ResetDeaths()
788755
if err != nil {
789-
t.L().Printf("workload error: %s", err)
756+
return err
790757
}
791-
return err
792-
},
793-
)
794-
}
795-
796-
// Setup Prometheus/Grafana using workload node.
797-
cleanupFunc := setupGrafana(ctx, t, c, crdbNodes, workloadNode)
798-
defer cleanupFunc()
799-
800-
// Create a histogram registry for recording multiple decommission metrics.
801-
// Note that "decommission.*" metrics are special in that they are
802-
// long-running metrics measured by the elapsed time between each tick,
803-
// as opposed to the histograms of workload operation latencies or other
804-
// recorded values that are typically output in a "tick" each second.
805-
reg, tickByName, perfBuf, exporter := createDecommissionBenchPerfArtifacts(t, c,
806-
decommissionMetric, upreplicateMetric, bytesUsedMetric,
807-
)
808-
809-
defer func() {
810-
if err := exporter.Close(func() error {
811-
uploadPerfArtifacts(ctx, t, c, workloadNode, perfBuf)
812-
return nil
813-
}); err != nil {
814-
t.Errorf("error closing perf exporter: %s", err)
815-
}
816-
}()
817-
818-
// The logical node id of the current decommissioning node.
819-
var targetNodeAtomic uint32
820-
821-
m.Go(func(ctx context.Context) error {
822-
defer workloadCancel()
823-
824-
h := newDecommTestHelper(t, c)
825-
h.blockFromRandNode(workloadNode)
826-
827-
// If we are running a workload, wait until it has started and completed its
828-
// ramp time before initiating a decommission.
829-
if !benchSpec.noLoad {
830-
<-rampStarted
831-
t.Status("Waiting for workload to ramp up...")
832-
select {
833-
case <-ctx.Done():
834-
return ctx.Err()
835-
case <-time.After(rampDuration + 1*time.Minute):
836-
// Workload ramp-up complete, plus 1 minute of recording workload stats.
837758
}
838-
}
839-
840-
for tBegin := timeutil.Now(); timeutil.Since(tBegin) <= benchSpec.duration; {
759+
// Include an additional minute of buffer time post-decommission to gather
760+
// workload stats.
761+
time.Sleep(1 * time.Minute)
762+
return nil
763+
} else {
841764
m.ExpectDeath()
765+
defer m.ResetDeaths()
842766
err := runSingleDecommission(ctx, c, h, pinnedNode, benchSpec.decommissionNode, &targetNodeAtomic, benchSpec.snapshotRate,
843-
benchSpec.whileDown, benchSpec.drainFirst, true /* reuse */, benchSpec.whileUpreplicating,
767+
benchSpec.whileDown, benchSpec.drainFirst, false /* reuse */, benchSpec.whileUpreplicating,
844768
true /* estimateDuration */, benchSpec.slowWrites, tickByName,
845769
)
846-
m.ResetDeaths()
847-
if err != nil {
848-
return err
849-
}
770+
// Include an additional minute of buffer time post-decommission to gather
771+
// workload stats.
772+
time.Sleep(1 * time.Minute)
773+
return err
850774
}
851-
852-
// Include an additional minute of buffer time post-decommission to gather
853-
// workload stats.
854-
time.Sleep(1 * time.Minute)
855-
856-
return nil
857775
})
858776

859777
m.Go(func(ctx context.Context) error {
@@ -868,7 +786,6 @@ func runDecommissionBenchLong(
868786
if err := m.WaitE(); err != nil {
869787
t.Fatal(err)
870788
}
871-
872789
}
873790

874791
// runSingleDecommission picks a random node and attempts to decommission that

pkg/cmd/roachtest/tests/multi_store_remove.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package tests
77

88
import (
99
"context"
10+
"math/rand"
1011
"time"
1112

1213
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
@@ -104,6 +105,21 @@ func runMultiStoreRemove(ctx context.Context, t test.Test, c cluster.Cluster) {
104105
t.Fatal(err)
105106
}
106107

108+
// Metamorphically enable the decommissioning nudger to get more test coverage
109+
// on decommissioning nudger.
110+
{
111+
seed := timeutil.Now().UnixNano()
112+
t.L().Printf("seed: %d", seed)
113+
rng := rand.New(rand.NewSource(seed))
114+
115+
if rng.Intn(2) == 0 {
116+
if _, err := conn.ExecContext(ctx, `SET CLUSTER SETTING kv.enqueue_in_replicate_queue_on_problem.interval = '10m'`); err != nil {
117+
t.Fatal(err)
118+
}
119+
t.L().Printf("metamorphically enabled decommissioning nudger")
120+
}
121+
}
122+
107123
// Bring down node 1.
108124
t.Status("removing store from n1")
109125
node := c.Node(1)

pkg/kv/kvserver/asim/asim.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ func (s *Simulator) tickWorkload(ctx context.Context, tick time.Time) {
309309
// each store ticks the pending operations such as relocate range and lease
310310
// transfers.
311311
func (s *Simulator) tickStateChanges(ctx context.Context, tick time.Time) {
312-
s.changer.Tick(tick, s.state)
312+
s.changer.Tick(ctx, tick, s.state)
313313
stores := s.state.Stores()
314314
s.shuffler(len(stores), func(i, j int) { stores[i], stores[j] = stores[j], stores[i] })
315315
for _, store := range stores {
@@ -371,7 +371,7 @@ func (s *Simulator) tickQueues(ctx context.Context, tick time.Time, state state.
371371

372372
// Tick changes that may have been enqueued with a lower completion
373373
// than the current tick, from the queues.
374-
s.changer.Tick(tick, state)
374+
s.changer.Tick(ctx, tick, state)
375375

376376
// Try adding suggested load splits that are pending for this store.
377377
for _, rangeID := range state.LoadSplitterFor(storeID).ClearSplitKeys() {

pkg/kv/kvserver/asim/metrics/metrics_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func Example_leaseTransfer() {
9090
Author: 1,
9191
Wait: 0,
9292
})
93-
changer.Tick(state.TestingStartTime(), s)
93+
changer.Tick(ctx, state.TestingStartTime(), s)
9494
m.Tick(ctx, start, s)
9595
// Output:
9696
//tick,c_ranges,c_write,c_write_b,c_read,c_read_b,s_ranges,s_write,s_write_b,s_read,s_read_b,c_lease_moves,c_replica_moves,c_replica_b_moves
@@ -120,7 +120,7 @@ func Example_rebalance() {
120120
})...),
121121
Wait: 0,
122122
}
123-
c.Apply(s)
123+
c.Apply(ctx, s)
124124

125125
m.Tick(ctx, start, s)
126126
// Output:

pkg/kv/kvserver/asim/mmaintegration/mma_store_rebalancer.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -132,15 +132,15 @@ func (msr *MMAStoreRebalancer) Tick(ctx context.Context, tick time.Time, s state
132132
msr.pendingTicket = -1
133133
success := true
134134
if err := op.Errors(); err != nil {
135-
log.KvDistribution.Infof(ctx, "operation for pendingChange=%v failed: %v", curChange, err)
135+
log.KvDistribution.Infof(ctx, "operation for pendingChange=%v failed: %v", curChange.change, err)
136136
success = false
137137
} else {
138-
log.KvDistribution.VInfof(ctx, 1, "operation for pendingChange=%v completed successfully", curChange)
138+
log.KvDistribution.VInfof(ctx, 1, "operation for pendingChange=%v completed successfully", curChange.change)
139139
}
140140
msr.as.PostApply(curChange.syncChangeID, success)
141141
msr.pendingChangeIdx++
142142
} else {
143-
log.KvDistribution.VInfof(ctx, 1, "operation for pendingChange=%v is still in progress", curChange)
143+
log.KvDistribution.VInfof(ctx, 1, "operation for pendingChange=%v is still in progress", curChange.change)
144144
// Operation is still in progress, nothing to do this tick.
145145
return
146146
}
@@ -156,14 +156,15 @@ func (msr *MMAStoreRebalancer) Tick(ctx context.Context, tick time.Time, s state
156156
pendingChanges := msr.allocator.ComputeChanges(ctx, &storeLeaseholderMsg, mmaprototype.ChangeOptions{
157157
LocalStoreID: roachpb.StoreID(msr.localStoreID),
158158
})
159-
for _, change := range pendingChanges {
159+
log.KvDistribution.Infof(ctx, "store %d: computed %d changes", msr.localStoreID, len(pendingChanges))
160+
for i, change := range pendingChanges {
160161
usageInfo := s.RangeUsageInfo(state.RangeID(change.RangeID), msr.localStoreID)
161162
msr.pendingChanges = append(msr.pendingChanges, pendingChangeAndRangeUsageInfo{
162163
change: change,
163164
usage: usageInfo,
164165
})
166+
log.KvDistribution.Infof(ctx, "%v-th change: %v", i+1, change)
165167
}
166-
log.KvDistribution.Infof(ctx, "store %d: computed %d changes %v", msr.localStoreID, len(msr.pendingChanges), msr.pendingChanges)
167168
if len(msr.pendingChanges) == 0 {
168169
// Nothing to do, there were no changes returned.
169170
msr.currentlyRebalancing = false
@@ -202,9 +203,9 @@ func (msr *MMAStoreRebalancer) Tick(ctx context.Context, tick time.Time, s state
202203
} else {
203204
panic(fmt.Sprintf("unexpected pending change type: %v", curChange))
204205
}
205-
log.KvDistribution.VInfof(ctx, 1, "dispatching operation for pendingChange=%v", curChange)
206+
log.KvDistribution.VInfof(ctx, 1, "dispatching operation for pendingChange=%v", curChange.change)
206207
msr.pendingChanges[msr.pendingChangeIdx].syncChangeID =
207-
msr.as.MMAPreApply(curChange.usage, curChange.change)
208+
msr.as.MMAPreApply(ctx, curChange.usage, curChange.change)
208209
msr.pendingTicket = msr.controller.Dispatch(ctx, tick, s, curOp)
209210
}
210211
}

0 commit comments

Comments
 (0)