Skip to content

Commit 13e1c86

Browse files
craig[bot]fqaziDarrylWongrafiss
committed
154879: changefeedccl: deflake TestChangefeedSchemaTTL r=fqazi a=fqazi Previously, TestChangefeedSchemaTTL started flaking when create_table_with_schema_locked was enabled by default in the test suite. This happened because schema_locked tables can cause historical versions of descriptors to be cached in the lease manager, as the changefeed logic intentionally queries prior versions. As a result, even after the test garbage-collected the descriptors table, these old versions could remain in the in-memory cache, leading to flakes. To address this, this patch adds a canary schema change, which bumps the descriptor's version. When a descriptor's version is bumped, all older cached versions are purged. Fixes: #149167 Informs: #153138 Informs: #154229 Informs: #154304 Informs: #154931 Release note: None 155775: mixedversion: fix panic in refresh{Cluster,Binary}Version r=golgeek a=DarrylWong A recent change #155713 fixed the indexing when nodes were unavailable. However, this change did not account for multi cluster tests where the roachprod node ID is not the same as the CRDB node ID, i.e. we can't assume the 4th VM is the same as the 4th node in the cluster. Fixes: none Epic: none Release note: none 155794: logictest: finalize upgrade explicitly in mixed_version_upgrade_preserve_ttl r=rafiss a=rafiss Replace retry-based waiting with explicit version finalization using SET CLUSTER SETTING version = crdb_internal.node_executable_version(). Resolves: #155429 Epic: None Release note: None 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Faizan Qazi <faizan@cockroachlabs.com> Co-authored-by: DarrylWong <darryl@cockroachlabs.com> Co-authored-by: Rafi Shamim <rafi@cockroachlabs.com>
4 parents e3c856b + 2a6a541 + 10da9b4 + 4e8c6d2 commit 13e1c86

File tree

4 files changed

+50
-8
lines changed

4 files changed

+50
-8
lines changed

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7044,6 +7044,7 @@ func TestChangefeedSchemaTTL(t *testing.T) {
70447044
// Create the data table; it will only contain a single row with multiple
70457045
// versions.
70467046
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
7047+
sqlDB.Exec(t, `CREATE USER joe`)
70477048

70487049
counter := 0
70497050
upsertRow := func() {
@@ -7074,6 +7075,15 @@ func TestChangefeedSchemaTTL(t *testing.T) {
70747075
// table to be deleted, with the middle version being lost to the changefeed.
70757076
forceTableGC(t, s.SystemServer, sqlDB, "system", "descriptor")
70767077

7078+
// Do an unnecessary version bump on the descriptor, which will purge old
7079+
// versions of the descriptor that the lease manager may have cached for
7080+
// historical queries. When schema_locked support is active, the act of
7081+
// detecting polling inside pauseOrResumePolling can cause old versions
7082+
// to be cached. The historical descriptors are cleared everytime a
7083+
// version bump occurs, otherwise this test can flake if the prior versions
7084+
// need to decode a row is already cached.
7085+
waitForSchemaChange(t, sqlDB, "ALTER TABLE foo OWNER TO joe")
7086+
70777087
// Resume our changefeed normally.
70787088
atomic.StoreInt32(&shouldWait, 0)
70797089
resume <- struct{}{}

pkg/cmd/roachtest/option/node_list_option.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,15 @@ func (n NodeListOption) Equals(o NodeListOption) bool {
4242
return true
4343
}
4444

45+
func (n NodeListOption) Contains(node int) bool {
46+
for _, n := range n {
47+
if n == node {
48+
return true
49+
}
50+
}
51+
return false
52+
}
53+
4554
// Option implements Option.
4655
func (n NodeListOption) Option() {}
4756

pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -525,16 +525,26 @@ func (tr *testRunner) refreshBinaryVersions(ctx context.Context, service *servic
525525
defer cancel()
526526

527527
group := ctxgroup.WithContext(connectionCtx)
528-
for _, node := range tr.getAvailableNodes(service.descriptor) {
528+
// We still attempt to refresh binary versions on nodes we expect to be unavailable:
529+
// 1. Some failure injections are overly conservative in marking nodes as
530+
// unavailable out of caution (e.g. network partitions).
531+
// 2. The monitor returns the roachprod node ID, which may be offset compared
532+
// to what the mixed-version test expects, e.g. a multi cluster test where
533+
// node 1 may be the 5th VM in the roachprod cluster.
534+
availableNodes := tr.getAvailableNodes(service.descriptor)
535+
for j, node := range service.descriptor.Nodes {
529536
group.GoCtx(func(ctx context.Context) error {
530537
bv, err := clusterupgrade.BinaryVersion(ctx, tr.conn(node, service.descriptor.Name))
531538
if err != nil {
539+
if !availableNodes.Contains(node) {
540+
return nil
541+
}
532542
return fmt.Errorf(
533543
"failed to get binary version for node %d (%s): %w",
534544
node, service.descriptor.Name, err,
535545
)
536546
}
537-
newBinaryVersions[node-1] = bv
547+
newBinaryVersions[j] = bv
538548
return nil
539549
})
540550
}
@@ -556,17 +566,28 @@ func (tr *testRunner) refreshClusterVersions(ctx context.Context, service *servi
556566
defer cancel()
557567

558568
group := ctxgroup.WithContext(connectionCtx)
559-
for _, node := range tr.getAvailableNodes(service.descriptor) {
569+
// We still attempt to refresh cluster versions on nodes we expect to be unavailable:
570+
// 1. Some failure injections are overly conservative in marking nodes as
571+
// unavailable out of caution (e.g. network partitions).
572+
// 2. The monitor returns the roachprod node ID, which may be offset compared
573+
// to what the mixed-version test expects, e.g. a multi cluster test where
574+
// node 1 may be the 5th VM in the roachprod cluster.
575+
availableNodes := tr.getAvailableNodes(service.descriptor)
576+
for j, node := range service.descriptor.Nodes {
560577
group.GoCtx(func(ctx context.Context) error {
561578
cv, err := clusterupgrade.ClusterVersion(ctx, tr.conn(node, service.descriptor.Name))
562579
if err != nil {
580+
if !availableNodes.Contains(node) {
581+
return nil
582+
}
583+
563584
return fmt.Errorf(
564585
"failed to get cluster version for node %d (%s): %w",
565586
node, service.descriptor.Name, err,
566587
)
567588
}
568589

569-
newClusterVersions[node-1] = cv
590+
newClusterVersions[j] = cv
570591
return nil
571592
})
572593
}

pkg/sql/logictest/testdata/logic_test/mixed_version_upgrade_preserve_ttl

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@ CREATE TABLE tbl (
1313

1414
upgrade all
1515

16-
# Verify that the cluster version upgrades have begun by asserting we're no
17-
# longer on the previous version. Note that the first cluster upgrade is the
18-
# one that repairs all descriptors.
19-
query B retry retry_duration 90s
16+
statement ok
17+
SET CLUSTER SETTING version = crdb_internal.node_executable_version()
18+
19+
# Verify that the cluster version has been upgraded and is no longer on the
20+
# previous version.
21+
query B
2022
SELECT version != '$initial_version' FROM [SHOW CLUSTER SETTING version]
2123
----
2224
true

0 commit comments

Comments
 (0)