Skip to content

Commit 5b61ffe

Browse files
committed
more wip
1 parent 3013a0f commit 5b61ffe

File tree

2 files changed

+41
-42
lines changed

2 files changed

+41
-42
lines changed

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 40 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,6 @@ func TestChangefeedBasics(t *testing.T) {
122122
defer leaktest.AfterTest(t)()
123123
defer log.Scope(t).Close(t)
124124

125-
skip.WithIssue(t, 148858) // initial scan?
126-
127125
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
128126
sqlDB := sqlutils.MakeSQLRunner(s.DB)
129127
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
@@ -138,7 +136,7 @@ func TestChangefeedBasics(t *testing.T) {
138136
`foo: [0]->{"after": {"a": 0, "b": "updated"}}`,
139137
})
140138

141-
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a')`) //, (2, 'b')`)
139+
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a'), (2, 'b')`)
142140
assertPayloads(t, foo, []string{
143141
`foo: [1]->{"after": {"a": 1, "b": "a"}}`,
144142
`foo: [2]->{"after": {"a": 2, "b": "b"}}`,
@@ -901,7 +899,6 @@ func TestChangefeedIdleness(t *testing.T) {
901899

902900
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
903901
sqlDB.Exec(t, `CREATE TABLE bar (b INT PRIMARY KEY)`)
904-
// TODO(#148858): Add coverage for DB-level changefeed testing.
905902
cf1 := feed(t, f, "CREATE CHANGEFEED FOR TABLE foo WITH resolved='10ms'", // higher resolved frequency for faster test
906903
optOutOfDBLevelChangefeedUnwatchedTables)
907904
cf2 := feed(t, f, "CREATE CHANGEFEED FOR TABLE bar WITH resolved='10ms'",
@@ -2458,7 +2455,6 @@ func TestChangefeedColumnDropsOnMultipleFamiliesWithTheSameName(t *testing.T) {
24582455
if _, ok := f.(*webhookFeedFactory); ok {
24592456
args = append(args, optOutOfMetamorphicEnrichedEnvelope{reason: "metamorphic enriched envelope does not support column families for webhook sinks"})
24602457
}
2461-
// TODO(#148858): Add coverage for DB-level changefeed testing.
24622458
args = append(args, optOutOfDBLevelChangefeedUnwatchedTables)
24632459

24642460
// Open up the changefeed.
@@ -3350,7 +3346,6 @@ WITH resolved='100ms', min_checkpoint_frequency='1ns', no_initial_scan`)
33503346
// TODO: remove this test when the legacy schema changer is deprecated.
33513347
func TestChangefeedSchemaChangeAllowBackfill_Legacy(t *testing.T) {
33523348
defer leaktest.AfterTest(t)()
3353-
skip.WithIssue(t, 148858) // Also uses initial scan.
33543349
defer log.Scope(t).Close(t)
33553350

33563351
testutils.SetVModule(t, "kv_feed=2,changefeed_processors=2")
@@ -3414,7 +3409,8 @@ func TestChangefeedSchemaChangeAllowBackfill_Legacy(t *testing.T) {
34143409
sqlDB.Exec(t, `CREATE TABLE add_col_comp (a INT PRIMARY KEY, b INT AS (a + 5) STORED)`)
34153410
sqlDB.Exec(t, `INSERT INTO add_col_comp VALUES (1)`)
34163411
sqlDB.Exec(t, `INSERT INTO add_col_comp (a) VALUES (2)`)
3417-
addColComp := feed(t, f, `CREATE CHANGEFEED FOR add_col_comp WITH updated`)
3412+
addColComp := feed(t, f, `CREATE CHANGEFEED FOR add_col_comp WITH updated`,
3413+
optOutOfDBLevelChangefeedUnwatchedTables)
34183414
defer closeFeed(t, addColComp)
34193415
assertPayloadsStripTs(t, addColComp, []string{
34203416
`add_col_comp: [1]->{"after": {"a": 1, "b": 6}}`,
@@ -3440,7 +3436,8 @@ func TestChangefeedSchemaChangeAllowBackfill_Legacy(t *testing.T) {
34403436
sqlDB.Exec(t, `CREATE TABLE drop_column (a INT PRIMARY KEY, b STRING)`)
34413437
sqlDB.Exec(t, `INSERT INTO drop_column VALUES (1, '1')`)
34423438
sqlDB.Exec(t, `INSERT INTO drop_column VALUES (2, '2')`)
3443-
dropColumn := feed(t, f, `CREATE CHANGEFEED FOR drop_column WITH updated`)
3439+
dropColumn := feed(t, f, `CREATE CHANGEFEED FOR drop_column WITH updated`,
3440+
optOutOfDBLevelChangefeedUnwatchedTables)
34443441
defer closeFeed(t, dropColumn)
34453442
assertPayloadsStripTs(t, dropColumn, []string{
34463443
`drop_column: [1]->{"after": {"a": 1, "b": "1"}}`,
@@ -3482,7 +3479,8 @@ func TestChangefeedSchemaChangeAllowBackfill_Legacy(t *testing.T) {
34823479
Changefeed.(*TestingKnobs)
34833480
knobs.BeforeEmitRow = waitSinkHook
34843481

3485-
multipleAlters := feed(t, f, `CREATE CHANGEFEED FOR multiple_alters WITH updated`)
3482+
multipleAlters := feed(t, f, `CREATE CHANGEFEED FOR multiple_alters WITH updated`,
3483+
optOutOfDBLevelChangefeedUnwatchedTables)
34863484
defer closeFeed(t, multipleAlters)
34873485
assertPayloadsStripTs(t, multipleAlters, []string{
34883486
`multiple_alters: [1]->{"after": {"a": 1, "b": "1"}}`,
@@ -3551,7 +3549,6 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
35513549
defer leaktest.AfterTest(t)()
35523550
defer log.Scope(t).Close(t)
35533551

3554-
skip.WithIssue(t, 148858) // Also uses initial scan.
35553552
testutils.SetVModule(t, "kv_feed=2,changefeed_processors=2")
35563553

35573554
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
@@ -3598,7 +3595,8 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
35983595
sqlDB.Exec(t, `CREATE TABLE add_col_comp (a INT PRIMARY KEY, b INT AS (a + 5) STORED)`)
35993596
sqlDB.Exec(t, `INSERT INTO add_col_comp VALUES (1)`)
36003597
sqlDB.Exec(t, `INSERT INTO add_col_comp (a) VALUES (2)`)
3601-
addColComp := feed(t, f, `CREATE CHANGEFEED FOR add_col_comp WITH updated`)
3598+
addColComp := feed(t, f, `CREATE CHANGEFEED FOR add_col_comp WITH updated`,
3599+
optOutOfDBLevelChangefeedUnwatchedTables)
36023600
defer closeFeed(t, addColComp)
36033601
assertPayloadsStripTs(t, addColComp, []string{
36043602
`add_col_comp: [1]->{"after": {"a": 1, "b": 6}}`,
@@ -3618,7 +3616,8 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
36183616
sqlDB.Exec(t, `CREATE TABLE drop_column (a INT PRIMARY KEY, b STRING)`)
36193617
sqlDB.Exec(t, `INSERT INTO drop_column VALUES (1, '1')`)
36203618
sqlDB.Exec(t, `INSERT INTO drop_column VALUES (2, '2')`)
3621-
dropColumn := feed(t, f, `CREATE CHANGEFEED FOR drop_column WITH updated`)
3619+
dropColumn := feed(t, f, `CREATE CHANGEFEED FOR drop_column WITH updated`,
3620+
optOutOfDBLevelChangefeedUnwatchedTables)
36223621
defer closeFeed(t, dropColumn)
36233622
assertPayloadsStripTs(t, dropColumn, []string{
36243623
`drop_column: [1]->{"after": {"a": 1, "b": "1"}}`,
@@ -3658,7 +3657,8 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
36583657
Changefeed.(*TestingKnobs)
36593658
knobs.BeforeEmitRow = waitSinkHook
36603659

3661-
multipleAlters := feed(t, f, `CREATE CHANGEFEED FOR multiple_alters WITH updated`)
3660+
multipleAlters := feed(t, f, `CREATE CHANGEFEED FOR multiple_alters WITH updated`,
3661+
optOutOfDBLevelChangefeedUnwatchedTables)
36623662
defer closeFeed(t, multipleAlters)
36633663
assertPayloadsStripTs(t, multipleAlters, []string{
36643664
`multiple_alters: [1]->{"after": {"a": 1, "b": "1"}}`,
@@ -4252,13 +4252,12 @@ func TestChangefeedJobControl(t *testing.T) {
42524252
defer leaktest.AfterTest(t)()
42534253
defer log.Scope(t).Close(t)
42544254

4255-
skip.WithIssue(t, 148858) // ?
4256-
42574255
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
42584256
ChangefeedJobPermissionsTestSetup(t, s)
42594257

42604258
createFeed := func(stmt string) (cdctest.EnterpriseTestFeed, func()) {
4261-
successfulFeed := feed(t, f, stmt)
4259+
successfulFeed := feed(t, f, stmt, optOutOfMetamorphicDBLevelChangefeed{
4260+
reason: "tests table level changefeed permissions"})
42624261
closeCf := func() {
42634262
closeFeed(t, successfulFeed)
42644263
}
@@ -5668,7 +5667,6 @@ func TestChangefeedResolvedNotice(t *testing.T) {
56685667

56695668
func TestChangefeedLowFrequencyNotices(t *testing.T) {
56705669
defer leaktest.AfterTest(t)()
5671-
skip.WithIssue(t, 148858) // I think these notices have an issue
56725670
defer log.Scope(t).Close(t)
56735671

56745672
cluster, _, cleanup := startTestCluster(t)
@@ -5697,43 +5695,47 @@ func TestChangefeedLowFrequencyNotices(t *testing.T) {
56975695
t.Run("no options specified", func(t *testing.T) {
56985696
actual = "(no notice)"
56995697
f := makeKafkaFeedFactory(t, s, dbWithHandler)
5700-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/'`)
5698+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/'`,
5699+
optOutOfMetamorphicDBLevelChangefeed{
5700+
reason: "test requires split_column_families NOT to be set",
5701+
})
57015702
defer closeFeed(t, testFeed)
57025703
require.Equal(t, `changefeed will emit to topic _u2603_`, actual)
57035704
})
57045705
t.Run("normal resolved and min_checkpoint_frequency", func(t *testing.T) {
57055706
actual = "(no notice)"
57065707
f := makeKafkaFeedFactory(t, s, dbWithHandler)
5707-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH resolved='10s', min_checkpoint_frequency='10s'`)
5708+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH resolved='10s', min_checkpoint_frequency='10s'`,
5709+
optOutOfMetamorphicDBLevelChangefeed{
5710+
reason: "test requires split_column_families NOT to be set",
5711+
})
57085712
defer closeFeed(t, testFeed)
57095713
require.Equal(t, `changefeed will emit to topic _u2603_`, actual)
57105714
})
57115715
t.Run("low resolved timestamp", func(t *testing.T) {
57125716
actual = "(no notice)"
57135717
f := makeKafkaFeedFactory(t, s, dbWithHandler)
5714-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH resolved='200ms'`)
5718+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH resolved='200ms'`,
5719+
optOutOfMetamorphicDBLevelChangefeed{
5720+
reason: "test requires split_column_families NOT to be set",
5721+
})
57155722
defer closeFeed(t, testFeed)
57165723
require.Equal(t, `the 'resolved' timestamp interval (200ms) is very low; consider increasing it to at least 500ms`, actual)
57175724
})
57185725
t.Run("low min_checkpoint_frequency timestamp", func(t *testing.T) {
57195726
actual = "(no notice)"
57205727
f := makeKafkaFeedFactory(t, s, dbWithHandler)
5721-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH min_checkpoint_frequency='200ms'`)
5728+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH min_checkpoint_frequency='200ms'`,
5729+
optOutOfMetamorphicDBLevelChangefeed{
5730+
reason: "test requires split_column_families NOT to be set",
5731+
})
57225732
defer closeFeed(t, testFeed)
57235733
require.Equal(t, `the 'min_checkpoint_frequency' timestamp interval (200ms) is very low; consider increasing it to at least 500ms`, actual)
57245734
})
57255735
}
57265736

5727-
// Fails: this could be something real
5728-
// test logs left over in: /tmp/cockroach/_tmp/f5cb02fa45891c8256821a187ced6fe9/logTestChangefeedOutputTopics3499146109
5729-
// --- FAIL: TestChangefeedOutputTopics (7.67s)
5730-
//
5731-
// --- FAIL: TestChangefeedOutputTopics/kafka (0.25s)
5732-
// --- FAIL: TestChangefeedOutputTopics/pubsub_v2 (0.22s)
5733-
// --- PASS: TestChangefeedOutputTopics/webhooks_does_not_emit_anything (0.24s)
57345737
func TestChangefeedOutputTopics(t *testing.T) {
57355738
defer leaktest.AfterTest(t)()
5736-
skip.WithIssue(t, 148858) // I don't know exactly what this is, could be real.
57375739
defer log.Scope(t).Close(t)
57385740

57395741
cluster, _, cleanup := startTestCluster(t)
@@ -5762,15 +5764,21 @@ func TestChangefeedOutputTopics(t *testing.T) {
57625764
t.Run("kafka", func(t *testing.T) {
57635765
actual = "(no notice)"
57645766
f := makeKafkaFeedFactory(t, s, dbWithHandler)
5765-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/'`)
5767+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/'`,
5768+
optOutOfMetamorphicDBLevelChangefeed{
5769+
reason: "test requires split_column_families NOT to be set",
5770+
})
57665771
defer closeFeed(t, testFeed)
57675772
require.Equal(t, `changefeed will emit to topic _u2603_`, actual)
57685773
})
57695774

57705775
t.Run("pubsub v2", func(t *testing.T) {
57715776
actual = "(no notice)"
57725777
f := makePubsubFeedFactory(s, dbWithHandler)
5773-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'gcpubsub://does.not.matter/'`)
5778+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'gcpubsub://does.not.matter/'`,
5779+
optOutOfMetamorphicDBLevelChangefeed{
5780+
reason: "test requires split_column_families NOT to be set",
5781+
})
57745782
defer closeFeed(t, testFeed)
57755783
// Pubsub doesn't sanitize the topic name.
57765784
require.Equal(t, `changefeed will emit to topic ☃`, actual)
@@ -6557,7 +6565,6 @@ func TestChangefeedTruncateOrDrop(t *testing.T) {
65576565
sqlDB.Exec(t, `CREATE TABLE truncate_cascade (b INT PRIMARY KEY REFERENCES truncate (a)) WITH (schema_locked=false)`)
65586566
sqlDB.Exec(t,
65596567
`BEGIN; INSERT INTO truncate VALUES (1); INSERT INTO truncate_cascade VALUES (1); COMMIT`)
6560-
// TODO(#148858): Add coverage for DB-level changefeed testing.
65616568
truncate := feed(t, f, `CREATE CHANGEFEED FOR truncate`,
65626569
optOutOfDBLevelChangefeedUnwatchedTables)
65636570
defer closeFeed(t, truncate)
@@ -8207,8 +8214,7 @@ func TestChangefeedTelemetry(t *testing.T) {
82078214
// Reset the counts.
82088215
_ = telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts)
82098216

8210-
// Start some feeds (and read from them to make sure they've started.
8211-
// TODO(#148858): Add coverage for DB-level changefeed testing.
8217+
// Start some feeds (and read from them to make sure they've started).
82128218
foo := feed(t, f, `CREATE CHANGEFEED FOR foo`,
82138219
optOutOfDBLevelChangefeedUnwatchedTables)
82148220
defer closeFeed(t, foo)
@@ -9125,7 +9131,6 @@ func TestChangefeedCheckpointSchemaChange(t *testing.T) {
91259131
defer log.Scope(t).Close(t)
91269132

91279133
skip.UnderRace(t)
9128-
skip.WithIssue(t, 148858) // has unwatched tables? complicated, look
91299134
skip.UnderShort(t)
91309135

91319136
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
@@ -12187,7 +12192,6 @@ func TestChangefeedAvroDecimalColumnWithDiff(t *testing.T) {
1218712192
func TestChangefeedProtectedTimestampUpdate(t *testing.T) {
1218812193
defer leaktest.AfterTest(t)()
1218912194
defer log.Scope(t).Close(t)
12190-
skip.WithIssue(t, 148858) // ?
1219112195

1219212196
verifyFunc := func() {}
1219312197
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
@@ -12320,13 +12324,11 @@ WITH resolved='10ms', min_checkpoint_frequency='10ms', no_initial_scan`
1232012324
cdcTest(t, testFn, feedTestForceSink("kafka"), withTxnRetries)
1232112325
}
1232212326

12323-
// Fails: this could be something real
1232412327
// TestChangefeedProtectedTimestampUpdateError tests that a changefeed that
1232512328
// errors while managing its protected timestamp records will increment the
1232612329
// manage PTS error counter.
1232712330
func TestChangefeedProtectedTimestampUpdateError(t *testing.T) {
1232812331
defer leaktest.AfterTest(t)()
12329-
skip.WithIssue(t, 148858) // ?
1233012332
defer log.Scope(t).Close(t)
1233112333

1233212334
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {

pkg/ccl/changefeedccl/protected_timestamps_test.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ import (
4545
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
4646
"github.com/cockroachdb/cockroach/pkg/testutils"
4747
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
48-
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
4948
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
5049
"github.com/cockroachdb/cockroach/pkg/util/hlc"
5150
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
@@ -370,21 +369,19 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
370369
}))
371370
}
372371

373-
// Fails for DB level changefeeds: this could be real.
374372
// TestChangefeedAlterPTS is a regression test for (#103855).
375373
// It verifies that we do not lose track of existing PTS records nor create
376374
// extraneous PTS records when altering a changefeed by adding a table.
377375
func TestChangefeedAlterPTS(t *testing.T) {
378376
defer leaktest.AfterTest(t)()
379-
skip.WithIssue(t, 148858) // This is an issue with having too many PTS records.
380377
defer log.Scope(t).Close(t)
381378

382379
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
383380
sqlDB := sqlutils.MakeSQLRunner(s.DB)
384381
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
385382
sqlDB.Exec(t, `CREATE TABLE foo2 (a INT PRIMARY KEY, b STRING)`)
386383
f2 := feed(t, f, `CREATE CHANGEFEED FOR table foo with protect_data_from_gc_on_pause,
387-
resolved='1s', min_checkpoint_frequency='1s'`)
384+
resolved='1s', min_checkpoint_frequency='1s'`, optOutOfDBLevelChangefeedUnwatchedTables)
388385
defer closeFeed(t, f2)
389386

390387
getNumPTSRecords := func() int {

0 commit comments

Comments
 (0)