Skip to content

Commit 89ec658

Browse files
committed
more wip
1 parent 3013a0f commit 89ec658

File tree

2 files changed

+18
-31
lines changed

2 files changed

+18
-31
lines changed

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 17 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,6 @@ func TestChangefeedBasics(t *testing.T) {
122122
defer leaktest.AfterTest(t)()
123123
defer log.Scope(t).Close(t)
124124

125-
skip.WithIssue(t, 148858) // initial scan?
126-
127125
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
128126
sqlDB := sqlutils.MakeSQLRunner(s.DB)
129127
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
@@ -138,7 +136,7 @@ func TestChangefeedBasics(t *testing.T) {
138136
`foo: [0]->{"after": {"a": 0, "b": "updated"}}`,
139137
})
140138

141-
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a')`) //, (2, 'b')`)
139+
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a'), (2, 'b')`)
142140
assertPayloads(t, foo, []string{
143141
`foo: [1]->{"after": {"a": 1, "b": "a"}}`,
144142
`foo: [2]->{"after": {"a": 2, "b": "b"}}`,
@@ -3350,7 +3348,6 @@ WITH resolved='100ms', min_checkpoint_frequency='1ns', no_initial_scan`)
33503348
// TODO: remove this test when the legacy schema changer is deprecated.
33513349
func TestChangefeedSchemaChangeAllowBackfill_Legacy(t *testing.T) {
33523350
defer leaktest.AfterTest(t)()
3353-
skip.WithIssue(t, 148858) // Also uses initial scan.
33543351
defer log.Scope(t).Close(t)
33553352

33563353
testutils.SetVModule(t, "kv_feed=2,changefeed_processors=2")
@@ -3414,7 +3411,8 @@ func TestChangefeedSchemaChangeAllowBackfill_Legacy(t *testing.T) {
34143411
sqlDB.Exec(t, `CREATE TABLE add_col_comp (a INT PRIMARY KEY, b INT AS (a + 5) STORED)`)
34153412
sqlDB.Exec(t, `INSERT INTO add_col_comp VALUES (1)`)
34163413
sqlDB.Exec(t, `INSERT INTO add_col_comp (a) VALUES (2)`)
3417-
addColComp := feed(t, f, `CREATE CHANGEFEED FOR add_col_comp WITH updated`)
3414+
addColComp := feed(t, f, `CREATE CHANGEFEED FOR add_col_comp WITH updated`,
3415+
optOutOfDBLevelChangefeedUnwatchedTables)
34183416
defer closeFeed(t, addColComp)
34193417
assertPayloadsStripTs(t, addColComp, []string{
34203418
`add_col_comp: [1]->{"after": {"a": 1, "b": 6}}`,
@@ -3440,7 +3438,8 @@ func TestChangefeedSchemaChangeAllowBackfill_Legacy(t *testing.T) {
34403438
sqlDB.Exec(t, `CREATE TABLE drop_column (a INT PRIMARY KEY, b STRING)`)
34413439
sqlDB.Exec(t, `INSERT INTO drop_column VALUES (1, '1')`)
34423440
sqlDB.Exec(t, `INSERT INTO drop_column VALUES (2, '2')`)
3443-
dropColumn := feed(t, f, `CREATE CHANGEFEED FOR drop_column WITH updated`)
3441+
dropColumn := feed(t, f, `CREATE CHANGEFEED FOR drop_column WITH updated`,
3442+
optOutOfDBLevelChangefeedUnwatchedTables)
34443443
defer closeFeed(t, dropColumn)
34453444
assertPayloadsStripTs(t, dropColumn, []string{
34463445
`drop_column: [1]->{"after": {"a": 1, "b": "1"}}`,
@@ -3482,7 +3481,8 @@ func TestChangefeedSchemaChangeAllowBackfill_Legacy(t *testing.T) {
34823481
Changefeed.(*TestingKnobs)
34833482
knobs.BeforeEmitRow = waitSinkHook
34843483

3485-
multipleAlters := feed(t, f, `CREATE CHANGEFEED FOR multiple_alters WITH updated`)
3484+
multipleAlters := feed(t, f, `CREATE CHANGEFEED FOR multiple_alters WITH updated`,
3485+
optOutOfDBLevelChangefeedUnwatchedTables)
34863486
defer closeFeed(t, multipleAlters)
34873487
assertPayloadsStripTs(t, multipleAlters, []string{
34883488
`multiple_alters: [1]->{"after": {"a": 1, "b": "1"}}`,
@@ -3551,7 +3551,6 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
35513551
defer leaktest.AfterTest(t)()
35523552
defer log.Scope(t).Close(t)
35533553

3554-
skip.WithIssue(t, 148858) // Also uses initial scan.
35553554
testutils.SetVModule(t, "kv_feed=2,changefeed_processors=2")
35563555

35573556
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
@@ -3598,7 +3597,8 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
35983597
sqlDB.Exec(t, `CREATE TABLE add_col_comp (a INT PRIMARY KEY, b INT AS (a + 5) STORED)`)
35993598
sqlDB.Exec(t, `INSERT INTO add_col_comp VALUES (1)`)
36003599
sqlDB.Exec(t, `INSERT INTO add_col_comp (a) VALUES (2)`)
3601-
addColComp := feed(t, f, `CREATE CHANGEFEED FOR add_col_comp WITH updated`)
3600+
addColComp := feed(t, f, `CREATE CHANGEFEED FOR add_col_comp WITH updated`,
3601+
optOutOfDBLevelChangefeedUnwatchedTables)
36023602
defer closeFeed(t, addColComp)
36033603
assertPayloadsStripTs(t, addColComp, []string{
36043604
`add_col_comp: [1]->{"after": {"a": 1, "b": 6}}`,
@@ -3618,7 +3618,8 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
36183618
sqlDB.Exec(t, `CREATE TABLE drop_column (a INT PRIMARY KEY, b STRING)`)
36193619
sqlDB.Exec(t, `INSERT INTO drop_column VALUES (1, '1')`)
36203620
sqlDB.Exec(t, `INSERT INTO drop_column VALUES (2, '2')`)
3621-
dropColumn := feed(t, f, `CREATE CHANGEFEED FOR drop_column WITH updated`)
3621+
dropColumn := feed(t, f, `CREATE CHANGEFEED FOR drop_column WITH updated`,
3622+
optOutOfDBLevelChangefeedUnwatchedTables)
36223623
defer closeFeed(t, dropColumn)
36233624
assertPayloadsStripTs(t, dropColumn, []string{
36243625
`drop_column: [1]->{"after": {"a": 1, "b": "1"}}`,
@@ -3658,7 +3659,8 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
36583659
Changefeed.(*TestingKnobs)
36593660
knobs.BeforeEmitRow = waitSinkHook
36603661

3661-
multipleAlters := feed(t, f, `CREATE CHANGEFEED FOR multiple_alters WITH updated`)
3662+
multipleAlters := feed(t, f, `CREATE CHANGEFEED FOR multiple_alters WITH updated`,
3663+
optOutOfDBLevelChangefeedUnwatchedTables)
36623664
defer closeFeed(t, multipleAlters)
36633665
assertPayloadsStripTs(t, multipleAlters, []string{
36643666
`multiple_alters: [1]->{"after": {"a": 1, "b": "1"}}`,
@@ -4252,13 +4254,12 @@ func TestChangefeedJobControl(t *testing.T) {
42524254
defer leaktest.AfterTest(t)()
42534255
defer log.Scope(t).Close(t)
42544256

4255-
skip.WithIssue(t, 148858) // ?
4256-
42574257
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
42584258
ChangefeedJobPermissionsTestSetup(t, s)
42594259

42604260
createFeed := func(stmt string) (cdctest.EnterpriseTestFeed, func()) {
4261-
successfulFeed := feed(t, f, stmt)
4261+
successfulFeed := feed(t, f, stmt, optOutOfMetamorphicDBLevelChangefeed{
4262+
reason: "tests table level changefeed permissions"})
42624263
closeCf := func() {
42634264
closeFeed(t, successfulFeed)
42644265
}
@@ -5668,7 +5669,7 @@ func TestChangefeedResolvedNotice(t *testing.T) {
56685669

56695670
func TestChangefeedLowFrequencyNotices(t *testing.T) {
56705671
defer leaktest.AfterTest(t)()
5671-
skip.WithIssue(t, 148858) // I think these notices have an issue
5672+
skip.WithIssue(t, 148858) // REAL ISSUE 2: These notices have an issue
56725673
defer log.Scope(t).Close(t)
56735674

56745675
cluster, _, cleanup := startTestCluster(t)
@@ -5724,16 +5725,9 @@ func TestChangefeedLowFrequencyNotices(t *testing.T) {
57245725
})
57255726
}
57265727

5727-
// Fails: this could be something real
5728-
// test logs left over in: /tmp/cockroach/_tmp/f5cb02fa45891c8256821a187ced6fe9/logTestChangefeedOutputTopics3499146109
5729-
// --- FAIL: TestChangefeedOutputTopics (7.67s)
5730-
//
5731-
// --- FAIL: TestChangefeedOutputTopics/kafka (0.25s)
5732-
// --- FAIL: TestChangefeedOutputTopics/pubsub_v2 (0.22s)
5733-
// --- PASS: TestChangefeedOutputTopics/webhooks_does_not_emit_anything (0.24s)
57345728
func TestChangefeedOutputTopics(t *testing.T) {
57355729
defer leaktest.AfterTest(t)()
5736-
skip.WithIssue(t, 148858) // I don't know exactly what this is, could be real.
5730+
skip.WithIssue(t, 148858) // REAL ISSUE 2: Notices have an issue
57375731
defer log.Scope(t).Close(t)
57385732

57395733
cluster, _, cleanup := startTestCluster(t)
@@ -9125,7 +9119,6 @@ func TestChangefeedCheckpointSchemaChange(t *testing.T) {
91259119
defer log.Scope(t).Close(t)
91269120

91279121
skip.UnderRace(t)
9128-
skip.WithIssue(t, 148858) // has unwatched tables? complicated, look
91299122
skip.UnderShort(t)
91309123

91319124
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
@@ -12187,7 +12180,6 @@ func TestChangefeedAvroDecimalColumnWithDiff(t *testing.T) {
1218712180
func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
1218812181
defer leaktest.AfterTest(t)()
1218912182
defer log.Scope(t).Close(t)
12190-
skip.WithIssue(t, 148858) // ?
1219112183

1219212184
verifyFunc := func() {}
1219312185
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
@@ -12320,13 +12312,11 @@ WITH resolved='10ms', min_checkpoint_frequency='10ms', no_initial_scan`
1232012312
cdcTest(t, testFn, feedTestForceSink("kafka"), withTxnRetries)
1232112313
}
1232212314

12323-
// Fails: this could be something real
1232412315
// TestChangefeedProtectedTimestampUpdateError tests that a changefeed that
1232512316
// errors while managing its protected timestamp records will increment the
1232612317
// manage PTS error counter.
1232712318
func TestChangefeedProtectedTimestampUpdateError(t *testing.T) {
1232812319
defer leaktest.AfterTest(t)()
12329-
skip.WithIssue(t, 148858) // ?
1233012320
defer log.Scope(t).Close(t)
1233112321

1233212322
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {

pkg/ccl/changefeedccl/protected_timestamps_test.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ import (
4545
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
4646
"github.com/cockroachdb/cockroach/pkg/testutils"
4747
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
48-
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
4948
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
5049
"github.com/cockroachdb/cockroach/pkg/util/hlc"
5150
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
@@ -370,21 +369,19 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
370369
}))
371370
}
372371

373-
// Fails for DB level changefeeds: this could be real.
374372
// TestChangefeedAlterPTS is a regression test for (#103855).
375373
// It verifies that we do not lose track of existing PTS records nor create
376374
// extraneous PTS records when altering a changefeed by adding a table.
377375
func TestChangefeedAlterPTS(t *testing.T) {
378376
defer leaktest.AfterTest(t)()
379-
skip.WithIssue(t, 148858) // This is an issue with having too many PTS records.
380377
defer log.Scope(t).Close(t)
381378

382379
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
383380
sqlDB := sqlutils.MakeSQLRunner(s.DB)
384381
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
385382
sqlDB.Exec(t, `CREATE TABLE foo2 (a INT PRIMARY KEY, b STRING)`)
386383
f2 := feed(t, f, `CREATE CHANGEFEED FOR table foo with protect_data_from_gc_on_pause,
387-
resolved='1s', min_checkpoint_frequency='1s'`)
384+
resolved='1s', min_checkpoint_frequency='1s'`, optOutOfDBLevelChangefeedUnwatchedTables)
388385
defer closeFeed(t, f2)
389386

390387
getNumPTSRecords := func() int {

0 commit comments

Comments
 (0)