Skip to content

Commit 5b53bfe

Browse files
committed
more wip
1 parent 3013a0f commit 5b53bfe

File tree

2 files changed

+19
-22
lines changed

2 files changed

+19
-22
lines changed

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,6 @@ func TestChangefeedBasics(t *testing.T) {
122122
defer leaktest.AfterTest(t)()
123123
defer log.Scope(t).Close(t)
124124

125-
skip.WithIssue(t, 148858) // initial scan?
126-
127125
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
128126
sqlDB := sqlutils.MakeSQLRunner(s.DB)
129127
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
@@ -138,7 +136,7 @@ func TestChangefeedBasics(t *testing.T) {
138136
`foo: [0]->{"after": {"a": 0, "b": "updated"}}`,
139137
})
140138

141-
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a')`) //, (2, 'b')`)
139+
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a'), (2, 'b')`)
142140
assertPayloads(t, foo, []string{
143141
`foo: [1]->{"after": {"a": 1, "b": "a"}}`,
144142
`foo: [2]->{"after": {"a": 2, "b": "b"}}`,
@@ -3350,7 +3348,6 @@ WITH resolved='100ms', min_checkpoint_frequency='1ns', no_initial_scan`)
33503348
// TODO: remove this test when the legacy schema changer is deprecated.
33513349
func TestChangefeedSchemaChangeAllowBackfill_Legacy(t *testing.T) {
33523350
defer leaktest.AfterTest(t)()
3353-
skip.WithIssue(t, 148858) // Also uses initial scan.
33543351
defer log.Scope(t).Close(t)
33553352

33563353
testutils.SetVModule(t, "kv_feed=2,changefeed_processors=2")
@@ -3414,7 +3411,8 @@ func TestChangefeedSchemaChangeAllowBackfill_Legacy(t *testing.T) {
34143411
sqlDB.Exec(t, `CREATE TABLE add_col_comp (a INT PRIMARY KEY, b INT AS (a + 5) STORED)`)
34153412
sqlDB.Exec(t, `INSERT INTO add_col_comp VALUES (1)`)
34163413
sqlDB.Exec(t, `INSERT INTO add_col_comp (a) VALUES (2)`)
3417-
addColComp := feed(t, f, `CREATE CHANGEFEED FOR add_col_comp WITH updated`)
3414+
addColComp := feed(t, f, `CREATE CHANGEFEED FOR add_col_comp WITH updated`,
3415+
optOutOfDBLevelChangefeedUnwatchedTables)
34183416
defer closeFeed(t, addColComp)
34193417
assertPayloadsStripTs(t, addColComp, []string{
34203418
`add_col_comp: [1]->{"after": {"a": 1, "b": 6}}`,
@@ -3440,7 +3438,8 @@ func TestChangefeedSchemaChangeAllowBackfill_Legacy(t *testing.T) {
34403438
sqlDB.Exec(t, `CREATE TABLE drop_column (a INT PRIMARY KEY, b STRING)`)
34413439
sqlDB.Exec(t, `INSERT INTO drop_column VALUES (1, '1')`)
34423440
sqlDB.Exec(t, `INSERT INTO drop_column VALUES (2, '2')`)
3443-
dropColumn := feed(t, f, `CREATE CHANGEFEED FOR drop_column WITH updated`)
3441+
dropColumn := feed(t, f, `CREATE CHANGEFEED FOR drop_column WITH updated`,
3442+
optOutOfDBLevelChangefeedUnwatchedTables)
34443443
defer closeFeed(t, dropColumn)
34453444
assertPayloadsStripTs(t, dropColumn, []string{
34463445
`drop_column: [1]->{"after": {"a": 1, "b": "1"}}`,
@@ -3482,7 +3481,8 @@ func TestChangefeedSchemaChangeAllowBackfill_Legacy(t *testing.T) {
34823481
Changefeed.(*TestingKnobs)
34833482
knobs.BeforeEmitRow = waitSinkHook
34843483

3485-
multipleAlters := feed(t, f, `CREATE CHANGEFEED FOR multiple_alters WITH updated`)
3484+
multipleAlters := feed(t, f, `CREATE CHANGEFEED FOR multiple_alters WITH updated`,
3485+
optOutOfDBLevelChangefeedUnwatchedTables)
34863486
defer closeFeed(t, multipleAlters)
34873487
assertPayloadsStripTs(t, multipleAlters, []string{
34883488
`multiple_alters: [1]->{"after": {"a": 1, "b": "1"}}`,
@@ -3551,7 +3551,6 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
35513551
defer leaktest.AfterTest(t)()
35523552
defer log.Scope(t).Close(t)
35533553

3554-
skip.WithIssue(t, 148858) // Also uses initial scan.
35553554
testutils.SetVModule(t, "kv_feed=2,changefeed_processors=2")
35563555

35573556
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
@@ -3598,7 +3597,8 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
35983597
sqlDB.Exec(t, `CREATE TABLE add_col_comp (a INT PRIMARY KEY, b INT AS (a + 5) STORED)`)
35993598
sqlDB.Exec(t, `INSERT INTO add_col_comp VALUES (1)`)
36003599
sqlDB.Exec(t, `INSERT INTO add_col_comp (a) VALUES (2)`)
3601-
addColComp := feed(t, f, `CREATE CHANGEFEED FOR add_col_comp WITH updated`)
3600+
addColComp := feed(t, f, `CREATE CHANGEFEED FOR add_col_comp WITH updated`,
3601+
optOutOfDBLevelChangefeedUnwatchedTables)
36023602
defer closeFeed(t, addColComp)
36033603
assertPayloadsStripTs(t, addColComp, []string{
36043604
`add_col_comp: [1]->{"after": {"a": 1, "b": 6}}`,
@@ -3618,7 +3618,8 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
36183618
sqlDB.Exec(t, `CREATE TABLE drop_column (a INT PRIMARY KEY, b STRING)`)
36193619
sqlDB.Exec(t, `INSERT INTO drop_column VALUES (1, '1')`)
36203620
sqlDB.Exec(t, `INSERT INTO drop_column VALUES (2, '2')`)
3621-
dropColumn := feed(t, f, `CREATE CHANGEFEED FOR drop_column WITH updated`)
3621+
dropColumn := feed(t, f, `CREATE CHANGEFEED FOR drop_column WITH updated`,
3622+
optOutOfDBLevelChangefeedUnwatchedTables)
36223623
defer closeFeed(t, dropColumn)
36233624
assertPayloadsStripTs(t, dropColumn, []string{
36243625
`drop_column: [1]->{"after": {"a": 1, "b": "1"}}`,
@@ -3658,7 +3659,8 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
36583659
Changefeed.(*TestingKnobs)
36593660
knobs.BeforeEmitRow = waitSinkHook
36603661

3661-
multipleAlters := feed(t, f, `CREATE CHANGEFEED FOR multiple_alters WITH updated`)
3662+
multipleAlters := feed(t, f, `CREATE CHANGEFEED FOR multiple_alters WITH updated`,
3663+
optOutOfDBLevelChangefeedUnwatchedTables)
36623664
defer closeFeed(t, multipleAlters)
36633665
assertPayloadsStripTs(t, multipleAlters, []string{
36643666
`multiple_alters: [1]->{"after": {"a": 1, "b": "1"}}`,
@@ -4252,13 +4254,12 @@ func TestChangefeedJobControl(t *testing.T) {
42524254
defer leaktest.AfterTest(t)()
42534255
defer log.Scope(t).Close(t)
42544256

4255-
skip.WithIssue(t, 148858) // ?
4256-
42574257
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
42584258
ChangefeedJobPermissionsTestSetup(t, s)
42594259

42604260
createFeed := func(stmt string) (cdctest.EnterpriseTestFeed, func()) {
4261-
successfulFeed := feed(t, f, stmt)
4261+
successfulFeed := feed(t, f, stmt, optOutOfMetamorphicDBLevelChangefeed{
4262+
reason: "tests table level changefeed permissions"})
42624263
closeCf := func() {
42634264
closeFeed(t, successfulFeed)
42644265
}
@@ -5668,7 +5669,7 @@ func TestChangefeedResolvedNotice(t *testing.T) {
56685669

56695670
func TestChangefeedLowFrequencyNotices(t *testing.T) {
56705671
defer leaktest.AfterTest(t)()
5671-
skip.WithIssue(t, 148858) // I think these notices have an issue
5672+
skip.WithIssue(t, 148858) // REAL ISSUE 2: These notices have an issue
56725673
defer log.Scope(t).Close(t)
56735674

56745675
cluster, _, cleanup := startTestCluster(t)
@@ -5733,7 +5734,7 @@ func TestChangefeedLowFrequencyNotices(t *testing.T) {
57335734
// --- PASS: TestChangefeedOutputTopics/webhooks_does_not_emit_anything (0.24s)
57345735
func TestChangefeedOutputTopics(t *testing.T) {
57355736
defer leaktest.AfterTest(t)()
5736-
skip.WithIssue(t, 148858) // I don't know exactly what this is, could be real.
5737+
skip.WithIssue(t, 148858) // REAL ISSUE 2: Notices have an issue
57375738
defer log.Scope(t).Close(t)
57385739

57395740
cluster, _, cleanup := startTestCluster(t)
@@ -9125,7 +9126,6 @@ func TestChangefeedCheckpointSchemaChange(t *testing.T) {
91259126
defer log.Scope(t).Close(t)
91269127

91279128
skip.UnderRace(t)
9128-
skip.WithIssue(t, 148858) // has unwatched tables? complicated, look
91299129
skip.UnderShort(t)
91309130

91319131
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
@@ -12187,7 +12187,7 @@ func TestChangefeedAvroDecimalColumnWithDiff(t *testing.T) {
1218712187
func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
1218812188
defer leaktest.AfterTest(t)()
1218912189
defer log.Scope(t).Close(t)
12190-
skip.WithIssue(t, 148858) // ?
12190+
skip.WithIssue(t, 148858) // REAL ISSUE 1: still pts? confirm
1219112191

1219212192
verifyFunc := func() {}
1219312193
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
@@ -12320,13 +12320,11 @@ WITH resolved='10ms', min_checkpoint_frequency='10ms', no_initial_scan`
1232012320
cdcTest(t, testFn, feedTestForceSink("kafka"), withTxnRetries)
1232112321
}
1232212322

12323-
// Fails: this could be something real
1232412323
// TestChangefeedProtectedTimestampUpdateError tests that a changefeed that
1232512324
// errors while managing its protected timestamp records will increment the
1232612325
// manage PTS error counter.
1232712326
func TestChangefeedProtectedTimestampUpdateError(t *testing.T) {
1232812327
defer leaktest.AfterTest(t)()
12329-
skip.WithIssue(t, 148858) // ?
1233012328
defer log.Scope(t).Close(t)
1233112329

1233212330
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {

pkg/ccl/changefeedccl/protected_timestamps_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -370,13 +370,12 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
370370
}))
371371
}
372372

373-
// Fails for DB level changefeeds: this could be real.
374373
// TestChangefeedAlterPTS is a regression test for (#103855).
375374
// It verifies that we do not lose track of existing PTS records nor create
376375
// extraneous PTS records when altering a changefeed by adding a table.
377376
func TestChangefeedAlterPTS(t *testing.T) {
378377
defer leaktest.AfterTest(t)()
379-
skip.WithIssue(t, 148858) // This is an issue with having too many PTS records.
378+
skip.WithIssue(t, 148858) // REAL ISSUE 1: This is an issue with having too many PTS records.
380379
defer log.Scope(t).Close(t)
381380

382381
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {

0 commit comments

Comments
 (0)