diff --git a/pkg/ccl/changefeedccl/alter_changefeed_test.go b/pkg/ccl/changefeedccl/alter_changefeed_test.go index 000970181306..5cfdd440c340 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_test.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_test.go @@ -261,7 +261,9 @@ func TestAlterChangefeedAddTargetAfterInitialScan(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b INT)`) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`, optOutOfMetamorphicDBLevelChangefeed{ + reason: "db level changefeeds don't support ALTER CHANGEFEED commands with initial_scan", + }) defer closeFeed(t, testFeed) feed, ok := testFeed.(cdctest.EnterpriseTestFeed) @@ -493,7 +495,11 @@ func TestAlterChangefeedDropTargetAfterTableDrop(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH on_error='pause'`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH on_error='pause'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs", + }, + ) defer closeFeed(t, testFeed) feed, ok := testFeed.(cdctest.EnterpriseTestFeed) @@ -668,7 +674,7 @@ func TestAlterChangefeedErrors(t *testing.T) { sqlDB := sqlutils.MakeSQLRunner(s.DB) sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`, optOutOfDBLevelChangefeedUnwatchedTables) defer closeFeed(t, testFeed) feed, ok := testFeed.(cdctest.EnterpriseTestFeed) @@ -799,7 +805,8 @@ func TestAlterChangefeedTelemetry(t *testing.T) { // Reset the counts. _ = telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH diff`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH diff`, + optOutOfDBLevelChangefeedUnwatchedTables) defer closeFeed(t, testFeed) feed := testFeed.(cdctest.EnterpriseTestFeed) @@ -1068,7 +1075,8 @@ func TestAlterChangefeedDatabaseQualifiedNames(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE d.users (id INT PRIMARY KEY, name STRING)`) sqlDB.Exec(t, `INSERT INTO d.drivers VALUES (1, 'Alice')`) sqlDB.Exec(t, `INSERT INTO d.users VALUES (1, 'Bob')`) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR d.drivers WITH resolved = '100ms', diff`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR d.drivers WITH resolved = '100ms', diff`, + optOutOfDBLevelChangefeedUnwatchedTables) defer closeFeed(t, testFeed) assertPayloads(t, testFeed, []string{ @@ -1119,7 +1127,11 @@ func TestAlterChangefeedDatabaseScope(t *testing.T) { `INSERT INTO new_movr.drivers VALUES (1, 'Bob')`, ) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.drivers WITH diff`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.drivers WITH diff`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "changefeed watches tables not in the default database", + }, + ) defer closeFeed(t, testFeed) assertPayloads(t, testFeed, []string{ @@ -1162,7 +1174,8 @@ func TestAlterChangefeedDatabaseScopeUnqualifiedName(t *testing.T) { ) sqlDB.Exec(t, `USE movr`) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR drivers WITH diff, resolved = '100ms'`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR drivers WITH diff, resolved = '100ms'`, + optOutOfDBLevelChangefeedUnwatchedTables) defer closeFeed(t, testFeed) assertPayloads(t, testFeed, []string{ @@ -1211,6 +1224,9 @@ func TestAlterChangefeedColumnFamilyDatabaseScope(t *testing.T) { if _, ok := f.(*webhookFeedFactory); ok { args = append(args, optOutOfMetamorphicEnrichedEnvelope{reason: "metamorphic enriched envelope does not support column families for webhook sinks"}) } + args = append(args, optOutOfMetamorphicDBLevelChangefeed{ + reason: "changefeed watches tables not in the default database", + }) testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.drivers WITH diff, split_column_families`, args...) defer closeFeed(t, testFeed) @@ -1263,6 +1279,7 @@ func TestAlterChangefeedAlterTableName(t *testing.T) { if _, ok := f.(*webhookFeedFactory); ok { args = append(args, optOutOfMetamorphicEnrichedEnvelope{reason: "see comment"}) } + args = append(args, optOutOfMetamorphicDBLevelChangefeed{reason: "uses non default DB"}) testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.users WITH diff, resolved = '100ms'`, args...) defer closeFeed(t, testFeed) @@ -1551,7 +1568,8 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) { registry := s.Server.JobRegistry().(*jobs.Registry) testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo -WITH resolved = '100ms', min_checkpoint_frequency='1ns'`) +WITH resolved = '100ms', min_checkpoint_frequency='1ns'`, + optOutOfDBLevelChangefeedUnwatchedTables) g := ctxgroup.WithContext(context.Background()) g.Go(func() error { @@ -1861,7 +1879,7 @@ func TestAlterChangefeedAccessControl(t *testing.T) { rootDB := sqlutils.MakeSQLRunner(s.DB) createFeed := func(stmt string) (cdctest.EnterpriseTestFeed, func()) { - successfulFeed := feed(t, f, stmt) + successfulFeed := feed(t, f, stmt, optOutOfDBLevelChangefeedUnwatchedTables) closeCf := func() { closeFeed(t, successfulFeed) } @@ -2050,7 +2068,9 @@ func TestAlterChangefeedRandomizedTargetChanges(t *testing.T) { createStmt := fmt.Sprintf( `CREATE CHANGEFEED FOR %s WITH updated`, strings.Join(initialTables, ", ")) t.Log(createStmt) - testFeed := feed(t, f, createStmt) + testFeed := feed(t, f, createStmt, optOutOfMetamorphicDBLevelChangefeed{ + reason: "db level feeds don't support ALTERing targets with ADD/DROP TARGETS", + }) defer closeFeed(t, testFeed) feed, ok := testFeed.(cdctest.EnterpriseTestFeed) diff --git a/pkg/ccl/changefeedccl/changefeed.go b/pkg/ccl/changefeedccl/changefeed.go index 8c8409754907..afa844a14850 100644 --- a/pkg/ccl/changefeedccl/changefeed.go +++ b/pkg/ccl/changefeedccl/changefeed.go @@ -79,7 +79,8 @@ func AllTargets( if len(cd.TargetSpecifications) > 1 { return changefeedbase.Targets{}, errors.AssertionFailedf("database-level changefeed is not supported with multiple targets") } - targets, err = getTargetsFromDatabaseSpec(ctx, ts, execCfg, timestamp) + _, useFullTableName := cd.Opts[changefeedbase.OptFullTableName] + targets, err = getTargetsFromDatabaseSpec(ctx, ts, execCfg, timestamp, useFullTableName) if err != nil { return changefeedbase.Targets{}, err } @@ -117,6 +118,7 @@ func getTargetsFromDatabaseSpec( ts jobspb.ChangefeedTargetSpecification, execCfg *sql.ExecutorConfig, timestamp hlc.Timestamp, + useFullTableName bool, ) (targets changefeedbase.Targets, err error) { err = sql.DescsTxn(ctx, execCfg, func( ctx context.Context, txn isql.Txn, descs *descs.Collection, @@ -172,15 +174,21 @@ func getTargetsFromDatabaseSpec( tableType = jobspb.ChangefeedTargetSpecification_EACH_FAMILY } + tableName := func() string { + if useFullTableName { + return fullyQualifiedTableName + } + return desc.GetName() + }() targets.Add(changefeedbase.Target{ Type: tableType, DescID: desc.GetID(), - StatementTimeName: changefeedbase.StatementTimeName(desc.GetName()), + StatementTimeName: changefeedbase.StatementTimeName(tableName), }) } case tree.IncludeFilter: - for name := range ts.FilterList.Tables { - tn, err := parser.ParseTableName(name) + for fullyQualifiedTableName := range ts.FilterList.Tables { + tn, err := parser.ParseTableName(fullyQualifiedTableName) if err != nil { return err } @@ -215,10 +223,16 @@ func getTargetsFromDatabaseSpec( tableType = jobspb.ChangefeedTargetSpecification_EACH_FAMILY } + tableName := func() string { + if useFullTableName { + return fullyQualifiedTableName + } + return desc.GetName() + }() targets.Add(changefeedbase.Target{ Type: tableType, DescID: tableID, - StatementTimeName: changefeedbase.StatementTimeName(desc.GetName()), + StatementTimeName: changefeedbase.StatementTimeName(tableName), }) } default: diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 219c14db4b08..a46fd83343c8 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -899,8 +899,10 @@ func TestChangefeedIdleness(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) sqlDB.Exec(t, `CREATE TABLE bar (b INT PRIMARY KEY)`) - cf1 := feed(t, f, "CREATE CHANGEFEED FOR TABLE foo WITH resolved='10ms'") // higher resolved frequency for faster test - cf2 := feed(t, f, "CREATE CHANGEFEED FOR TABLE bar WITH resolved='10ms'") + cf1 := feed(t, f, "CREATE CHANGEFEED FOR TABLE foo WITH resolved='10ms'", // higher resolved frequency for faster test + optOutOfDBLevelChangefeedUnwatchedTables) + cf2 := feed(t, f, "CREATE CHANGEFEED FOR TABLE bar WITH resolved='10ms'", + optOutOfDBLevelChangefeedUnwatchedTables) defer closeFeed(t, cf1) go workload() @@ -1374,6 +1376,30 @@ func TestChangefeedFullTableName(t *testing.T) { }) } +func TestDatabaseLevelChangefeedWithFullTableName(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) + + normal := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d WITH full_table_name`) + defer closeFeed(t, normal) + include := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo WITH full_table_name`) + defer closeFeed(t, include) + exclude := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES bar WITH full_table_name`) + defer closeFeed(t, exclude) + + sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a')`) + assertPayloads(t, normal, []string{`d.public.foo: [1]->{"after": {"a": 1, "b": "a"}}`}) + assertPayloads(t, include, []string{`d.public.foo: [1]->{"after": {"a": 1, "b": "a"}}`}) + assertPayloads(t, exclude, []string{`d.public.foo: [1]->{"after": {"a": 1, "b": "a"}}`}) + } + + cdcTest(t, testFn, feedTestEnterpriseSinks) +} + func TestChangefeedMultiTable(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1832,6 +1858,7 @@ func getWhereClause(query string) (string, bool) { return replaced.String(), err == nil } +// Fails with panic! // Test how Changefeeds react to schema changes that do not require a backfill // operation. func TestChangefeedInitialScan(t *testing.T) { @@ -1839,12 +1866,14 @@ func TestChangefeedInitialScan(t *testing.T) { defer log.Scope(t).Close(t) noInitialScanTests := map[string]string{ - `no cursor - no initial scan`: `CREATE CHANGEFEED FOR no_initial_scan WITH no_initial_scan, resolved='1s'`, + // first one fails with panic! + // `no cursor - no initial scan`: `CREATE CHANGEFEED FOR no_initial_scan WITH no_initial_scan, resolved='1s'`, `no cursor - no initial backfill`: `CREATE CHANGEFEED FOR no_initial_scan WITH initial_scan = 'no', resolved='1s'`, } initialScanTests := map[string]string{ - `cursor - with initial scan`: `CREATE CHANGEFEED FOR initial_scan WITH initial_scan, resolved='1s', cursor='%s'`, + // first one fails with panic! + // `cursor - with initial scan`: `CREATE CHANGEFEED FOR initial_scan WITH initial_scan, resolved='1s', cursor='%s'`, `cursor - with initial backfill`: `CREATE CHANGEFEED FOR initial_scan WITH initial_scan = 'yes', resolved='1s', cursor='%s'`, } @@ -2426,6 +2455,7 @@ func TestChangefeedColumnDropsOnMultipleFamiliesWithTheSameName(t *testing.T) { if _, ok := f.(*webhookFeedFactory); ok { args = append(args, optOutOfMetamorphicEnrichedEnvelope{reason: "metamorphic enriched envelope does not support column families for webhook sinks"}) } + args = append(args, optOutOfDBLevelChangefeedUnwatchedTables) // Open up the changefeed. cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY b_and_c, TABLE alsohasfams FAMILY id_a`, args...) @@ -2677,7 +2707,8 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { sqlDB.Exec(t, `ALTER TABLE historical ADD COLUMN c INT`) sqlDB.Exec(t, `INSERT INTO historical (a) VALUES (3)`) sqlDB.Exec(t, `INSERT INTO historical (a, c) VALUES (4, 14)`) - historical := feed(t, f, `CREATE CHANGEFEED FOR historical WITH cursor=$1`, start) + historical := feed(t, f, `CREATE CHANGEFEED FOR historical WITH cursor=$1`, start, + optOutOfDBLevelChangefeedUnwatchedTables) defer closeFeed(t, historical) assertPayloads(t, historical, []string{ `historical: [0]->{"after": {"a": 0, "b": "0"}}`, @@ -2692,7 +2723,8 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { // NB: the default is a nullable column sqlDB.Exec(t, `CREATE TABLE add_column (a INT PRIMARY KEY)`) sqlDB.Exec(t, `INSERT INTO add_column VALUES (1)`) - addColumn := feed(t, f, `CREATE CHANGEFEED FOR add_column`) + addColumn := feed(t, f, `CREATE CHANGEFEED FOR add_column`, + optOutOfDBLevelChangefeedUnwatchedTables) defer closeFeed(t, addColumn) assertPayloads(t, addColumn, []string{ `add_column: [1]->{"after": {"a": 1}}`, @@ -2707,7 +2739,9 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { t.Run(`rename column`, func(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE rename_column (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO rename_column VALUES (1, '1')`) - renameColumn := feed(t, f, `CREATE CHANGEFEED FOR rename_column`) + renameColumn := feed(t, f, `CREATE CHANGEFEED FOR rename_column`, + optOutOfDBLevelChangefeedUnwatchedTables, + ) defer closeFeed(t, renameColumn) assertPayloads(t, renameColumn, []string{ `rename_column: [1]->{"after": {"a": 1, "b": "1"}}`, @@ -2722,7 +2756,8 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { t.Run(`add default`, func(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE add_default (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO add_default (a, b) VALUES (1, '1')`) - addDefault := feed(t, f, `CREATE CHANGEFEED FOR add_default`) + addDefault := feed(t, f, `CREATE CHANGEFEED FOR add_default`, + optOutOfDBLevelChangefeedUnwatchedTables) defer closeFeed(t, addDefault) sqlDB.Exec(t, `ALTER TABLE add_default ALTER COLUMN b SET DEFAULT 'd'`) sqlDB.Exec(t, `INSERT INTO add_default (a) VALUES (2)`) @@ -2735,7 +2770,8 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { t.Run(`drop default`, func(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE drop_default (a INT PRIMARY KEY, b STRING DEFAULT 'd')`) sqlDB.Exec(t, `INSERT INTO drop_default (a) VALUES (1)`) - dropDefault := feed(t, f, `CREATE CHANGEFEED FOR drop_default`) + dropDefault := feed(t, f, `CREATE CHANGEFEED FOR drop_default`, + optOutOfDBLevelChangefeedUnwatchedTables) defer closeFeed(t, dropDefault) sqlDB.Exec(t, `ALTER TABLE drop_default ALTER COLUMN b DROP DEFAULT`) sqlDB.Exec(t, `INSERT INTO drop_default (a) VALUES (2)`) @@ -2748,7 +2784,8 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { t.Run(`drop not null`, func(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE drop_notnull (a INT PRIMARY KEY, b STRING NOT NULL)`) sqlDB.Exec(t, `INSERT INTO drop_notnull VALUES (1, '1')`) - dropNotNull := feed(t, f, `CREATE CHANGEFEED FOR drop_notnull`) + dropNotNull := feed(t, f, `CREATE CHANGEFEED FOR drop_notnull`, + optOutOfDBLevelChangefeedUnwatchedTables) defer closeFeed(t, dropNotNull) sqlDB.Exec(t, `ALTER TABLE drop_notnull ALTER b DROP NOT NULL`) sqlDB.Exec(t, `INSERT INTO drop_notnull VALUES (2, NULL)`) @@ -2761,7 +2798,8 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { t.Run(`checks`, func(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE checks (a INT PRIMARY KEY)`) sqlDB.Exec(t, `INSERT INTO checks VALUES (1)`) - checks := feed(t, f, `CREATE CHANGEFEED FOR checks`) + checks := feed(t, f, `CREATE CHANGEFEED FOR checks`, + optOutOfDBLevelChangefeedUnwatchedTables) defer closeFeed(t, checks) sqlDB.Exec(t, `ALTER TABLE checks ADD CONSTRAINT c CHECK (a < 5) NOT VALID`) sqlDB.Exec(t, `INSERT INTO checks VALUES (2)`) @@ -2780,7 +2818,8 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { t.Run(`add index`, func(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE add_index (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO add_index VALUES (1, '1')`) - addIndex := feed(t, f, `CREATE CHANGEFEED FOR add_index`) + addIndex := feed(t, f, `CREATE CHANGEFEED FOR add_index`, + optOutOfDBLevelChangefeedUnwatchedTables) defer closeFeed(t, addIndex) sqlDB.Exec(t, `CREATE INDEX b_idx ON add_index (b)`) sqlDB.Exec(t, `SELECT * FROM add_index@b_idx`) @@ -2794,7 +2833,8 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { t.Run(`unique`, func(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE "unique" (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO "unique" VALUES (1, '1')`) - unique := feed(t, f, `CREATE CHANGEFEED FOR "unique"`) + unique := feed(t, f, `CREATE CHANGEFEED FOR "unique"`, + optOutOfDBLevelChangefeedUnwatchedTables) defer closeFeed(t, unique) sqlDB.Exec(t, `ALTER TABLE "unique" ADD CONSTRAINT u UNIQUE (b)`) sqlDB.Exec(t, `INSERT INTO "unique" VALUES (2, '2')`) @@ -2808,7 +2848,8 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { sqlDB.Exec( t, `CREATE TABLE alter_default (a INT PRIMARY KEY, b STRING DEFAULT 'before')`) sqlDB.Exec(t, `INSERT INTO alter_default (a) VALUES (1)`) - alterDefault := feed(t, f, `CREATE CHANGEFEED FOR alter_default`) + alterDefault := feed(t, f, `CREATE CHANGEFEED FOR alter_default`, + optOutOfDBLevelChangefeedUnwatchedTables) defer closeFeed(t, alterDefault) sqlDB.Exec(t, `ALTER TABLE alter_default ALTER COLUMN b SET DEFAULT 'after'`) sqlDB.Exec(t, `INSERT INTO alter_default (a) VALUES (2)`) @@ -2822,7 +2863,8 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { t.Run(`add column with DEFAULT NULL`, func(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE t (id INT PRIMARY KEY)`) sqlDB.Exec(t, `INSERT INTO t VALUES (1)`) - defaultNull := feed(t, f, `CREATE CHANGEFEED FOR t`) + defaultNull := feed(t, f, `CREATE CHANGEFEED FOR t`, + optOutOfDBLevelChangefeedUnwatchedTables) defer closeFeed(t, defaultNull) sqlDB.Exec(t, `ALTER TABLE t ADD COLUMN c INT DEFAULT NULL`) sqlDB.Exec(t, `INSERT INTO t VALUES (2, 2)`) @@ -3367,7 +3409,8 @@ func TestChangefeedSchemaChangeAllowBackfill_Legacy(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE add_col_comp (a INT PRIMARY KEY, b INT AS (a + 5) STORED)`) sqlDB.Exec(t, `INSERT INTO add_col_comp VALUES (1)`) sqlDB.Exec(t, `INSERT INTO add_col_comp (a) VALUES (2)`) - addColComp := feed(t, f, `CREATE CHANGEFEED FOR add_col_comp WITH updated`) + addColComp := feed(t, f, `CREATE CHANGEFEED FOR add_col_comp WITH updated`, + optOutOfDBLevelChangefeedUnwatchedTables) defer closeFeed(t, addColComp) assertPayloadsStripTs(t, addColComp, []string{ `add_col_comp: [1]->{"after": {"a": 1, "b": 6}}`, @@ -3393,7 +3436,8 @@ func TestChangefeedSchemaChangeAllowBackfill_Legacy(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE drop_column (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO drop_column VALUES (1, '1')`) sqlDB.Exec(t, `INSERT INTO drop_column VALUES (2, '2')`) - dropColumn := feed(t, f, `CREATE CHANGEFEED FOR drop_column WITH updated`) + dropColumn := feed(t, f, `CREATE CHANGEFEED FOR drop_column WITH updated`, + optOutOfDBLevelChangefeedUnwatchedTables) defer closeFeed(t, dropColumn) assertPayloadsStripTs(t, dropColumn, []string{ `drop_column: [1]->{"after": {"a": 1, "b": "1"}}`, @@ -3435,7 +3479,8 @@ func TestChangefeedSchemaChangeAllowBackfill_Legacy(t *testing.T) { Changefeed.(*TestingKnobs) knobs.BeforeEmitRow = waitSinkHook - multipleAlters := feed(t, f, `CREATE CHANGEFEED FOR multiple_alters WITH updated`) + multipleAlters := feed(t, f, `CREATE CHANGEFEED FOR multiple_alters WITH updated`, + optOutOfDBLevelChangefeedUnwatchedTables) defer closeFeed(t, multipleAlters) assertPayloadsStripTs(t, multipleAlters, []string{ `multiple_alters: [1]->{"after": {"a": 1, "b": "1"}}`, @@ -3550,7 +3595,8 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE add_col_comp (a INT PRIMARY KEY, b INT AS (a + 5) STORED)`) sqlDB.Exec(t, `INSERT INTO add_col_comp VALUES (1)`) sqlDB.Exec(t, `INSERT INTO add_col_comp (a) VALUES (2)`) - addColComp := feed(t, f, `CREATE CHANGEFEED FOR add_col_comp WITH updated`) + addColComp := feed(t, f, `CREATE CHANGEFEED FOR add_col_comp WITH updated`, + optOutOfDBLevelChangefeedUnwatchedTables) defer closeFeed(t, addColComp) assertPayloadsStripTs(t, addColComp, []string{ `add_col_comp: [1]->{"after": {"a": 1, "b": 6}}`, @@ -3570,7 +3616,8 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE drop_column (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO drop_column VALUES (1, '1')`) sqlDB.Exec(t, `INSERT INTO drop_column VALUES (2, '2')`) - dropColumn := feed(t, f, `CREATE CHANGEFEED FOR drop_column WITH updated`) + dropColumn := feed(t, f, `CREATE CHANGEFEED FOR drop_column WITH updated`, + optOutOfDBLevelChangefeedUnwatchedTables) defer closeFeed(t, dropColumn) assertPayloadsStripTs(t, dropColumn, []string{ `drop_column: [1]->{"after": {"a": 1, "b": "1"}}`, @@ -3610,7 +3657,8 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { Changefeed.(*TestingKnobs) knobs.BeforeEmitRow = waitSinkHook - multipleAlters := feed(t, f, `CREATE CHANGEFEED FOR multiple_alters WITH updated`) + multipleAlters := feed(t, f, `CREATE CHANGEFEED FOR multiple_alters WITH updated`, + optOutOfDBLevelChangefeedUnwatchedTables) defer closeFeed(t, multipleAlters) assertPayloadsStripTs(t, multipleAlters, []string{ `multiple_alters: [1]->{"after": {"a": 1, "b": "1"}}`, @@ -4208,7 +4256,8 @@ func TestChangefeedJobControl(t *testing.T) { ChangefeedJobPermissionsTestSetup(t, s) createFeed := func(stmt string) (cdctest.EnterpriseTestFeed, func()) { - successfulFeed := feed(t, f, stmt) + successfulFeed := feed(t, f, stmt, optOutOfMetamorphicDBLevelChangefeed{ + reason: "tests table level changefeed permissions"}) closeCf := func() { closeFeed(t, successfulFeed) } @@ -5541,6 +5590,13 @@ func TestChangefeedAvroNotice(t *testing.T) { expectNotice(t, s.Server, sql, `avro is no longer experimental, use format=avro`) } +// --- FAIL: TestChangefeedResolvedNotice (6.87s) +// +// --- FAIL: TestChangefeedResolvedNotice/resolvedmin_checkpoint_frequency (0.00s) +// --- FAIL: TestChangefeedResolvedNotice/resolved_default (0.00s) func TestChangefeedResolvedNotice(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -5639,28 +5695,40 @@ func TestChangefeedLowFrequencyNotices(t *testing.T) { t.Run("no options specified", func(t *testing.T) { actual = "(no notice)" f := makeKafkaFeedFactory(t, s, dbWithHandler) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/'`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test requires split_column_families NOT to be set", + }) defer closeFeed(t, testFeed) require.Equal(t, `changefeed will emit to topic _u2603_`, actual) }) t.Run("normal resolved and min_checkpoint_frequency", func(t *testing.T) { actual = "(no notice)" f := makeKafkaFeedFactory(t, s, dbWithHandler) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH resolved='10s', min_checkpoint_frequency='10s'`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH resolved='10s', min_checkpoint_frequency='10s'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test requires split_column_families NOT to be set", + }) defer closeFeed(t, testFeed) require.Equal(t, `changefeed will emit to topic _u2603_`, actual) }) t.Run("low resolved timestamp", func(t *testing.T) { actual = "(no notice)" f := makeKafkaFeedFactory(t, s, dbWithHandler) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH resolved='200ms'`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH resolved='200ms'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test requires split_column_families NOT to be set", + }) defer closeFeed(t, testFeed) require.Equal(t, `the 'resolved' timestamp interval (200ms) is very low; consider increasing it to at least 500ms`, actual) }) t.Run("low min_checkpoint_frequency timestamp", func(t *testing.T) { actual = "(no notice)" f := makeKafkaFeedFactory(t, s, dbWithHandler) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH min_checkpoint_frequency='200ms'`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/' WITH min_checkpoint_frequency='200ms'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test requires split_column_families NOT to be set", + }) defer closeFeed(t, testFeed) require.Equal(t, `the 'min_checkpoint_frequency' timestamp interval (200ms) is very low; consider increasing it to at least 500ms`, actual) }) @@ -5696,7 +5764,10 @@ func TestChangefeedOutputTopics(t *testing.T) { t.Run("kafka", func(t *testing.T) { actual = "(no notice)" f := makeKafkaFeedFactory(t, s, dbWithHandler) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/'`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'kafka://does.not.matter/'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test requires split_column_families NOT to be set", + }) defer closeFeed(t, testFeed) require.Equal(t, `changefeed will emit to topic _u2603_`, actual) }) @@ -5704,7 +5775,10 @@ func TestChangefeedOutputTopics(t *testing.T) { t.Run("pubsub v2", func(t *testing.T) { actual = "(no notice)" f := makePubsubFeedFactory(s, dbWithHandler) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'gcpubsub://does.not.matter/'`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR ☃ INTO 'gcpubsub://does.not.matter/'`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test requires split_column_families NOT to be set", + }) defer closeFeed(t, testFeed) // Pubsub doesn't sanitize the topic name. require.Equal(t, `changefeed will emit to topic ☃`, actual) @@ -6491,9 +6565,11 @@ func TestChangefeedTruncateOrDrop(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE truncate_cascade (b INT PRIMARY KEY REFERENCES truncate (a)) WITH (schema_locked=false)`) sqlDB.Exec(t, `BEGIN; INSERT INTO truncate VALUES (1); INSERT INTO truncate_cascade VALUES (1); COMMIT`) - truncate := feed(t, f, `CREATE CHANGEFEED FOR truncate`) + truncate := feed(t, f, `CREATE CHANGEFEED FOR truncate`, + optOutOfDBLevelChangefeedUnwatchedTables) defer closeFeed(t, truncate) - truncateCascade := feed(t, f, `CREATE CHANGEFEED FOR truncate_cascade`) + truncateCascade := feed(t, f, `CREATE CHANGEFEED FOR truncate_cascade`, + optOutOfDBLevelChangefeedUnwatchedTables) defer closeFeed(t, truncateCascade) assertPayloads(t, truncate, []string{`truncate: [1]->{"after": {"a": 1}}`}) assertPayloads(t, truncateCascade, []string{`truncate_cascade: [1]->{"after": {"b": 1}}`}) @@ -6510,7 +6586,8 @@ func TestChangefeedTruncateOrDrop(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE drop (a INT PRIMARY KEY)`) sqlDB.Exec(t, `INSERT INTO drop VALUES (1)`) - drop := feed(t, f, `CREATE CHANGEFEED FOR drop`) + drop := feed(t, f, `CREATE CHANGEFEED FOR drop`, + optOutOfDBLevelChangefeedUnwatchedTables) defer closeFeed(t, drop) assertPayloads(t, drop, []string{`drop: [1]->{"after": {"a": 1}}`}) sqlDB.Exec(t, `DROP TABLE drop`) @@ -8137,10 +8214,12 @@ func TestChangefeedTelemetry(t *testing.T) { // Reset the counts. _ = telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts) - // Start some feeds (and read from them to make sure they've started. - foo := feed(t, f, `CREATE CHANGEFEED FOR foo`) + // Start some feeds (and read from them to make sure they've started). + foo := feed(t, f, `CREATE CHANGEFEED FOR foo`, + optOutOfDBLevelChangefeedUnwatchedTables) defer closeFeed(t, foo) - fooBar := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH format=json`) + fooBar := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH format=json`, + optOutOfDBLevelChangefeedUnwatchedTables) defer closeFeed(t, fooBar) assertPayloads(t, foo, []string{ `foo: [1]->{"after": {"a": 1}}`, @@ -8355,6 +8434,7 @@ func TestChangefeedContinuousTelemetryDifferentJobs(t *testing.T) { cdcTest(t, testFn, feedTestOmitSinks("sinkless")) } +// Fails: this could be something real func TestChangefeedHandlesDrainingNodes(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -8421,7 +8501,9 @@ func TestChangefeedHandlesDrainingNodes(t *testing.T) { defer closeSink() atomic.StoreInt32(&shouldDrain, 1) - feed := feed(t, f, "CREATE CHANGEFEED FOR foo") + feed := feed(t, f, "CREATE CHANGEFEED FOR foo", optOutOfMetamorphicDBLevelChangefeed{ + reason: "doesn't use the default DB", + }) defer closeFeed(t, feed) jobID := feed.(cdctest.EnterpriseTestFeed).JobID() @@ -8586,7 +8668,8 @@ func TestChangefeedHandlesRollingRestart(t *testing.T) { defer closeSink() proceed <- struct{}{} // Allow changefeed to start. - feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH initial_scan='no', min_checkpoint_frequency='100ms'") + feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH initial_scan='no', min_checkpoint_frequency='100ms'", + optOutOfMetamorphicDBLevelChangefeed{reason: "doesn't use the default DB"}) defer closeFeed(t, feed) jf := feed.(cdctest.EnterpriseTestFeed) @@ -9705,7 +9788,7 @@ func TestDistSenderRangeFeedPopulatesVirtualTable(t *testing.T) { var cf cdctest.TestFeed asUser(t, f, `feedCreator`, func(userDB *sqlutils.SQLRunner) { - cf = feed(t, f, `CREATE CHANGEFEED FOR table_a;`) + cf = feed(t, f, `CREATE CHANGEFEED FOR table_a;`, optOutOfDBLevelChangefeedUnwatchedTables) }) defer closeFeed(t, cf) @@ -9991,7 +10074,7 @@ func TestChangefeedOnlyInitialScanCSV(t *testing.T) { sqlDB.CheckQueryResultsRetry(t, `SELECT count(*) FROM foo,bar`, [][]string{{`9`}}) - feed := feed(t, f, testData.changefeedStmt) + feed := feed(t, f, testData.changefeedStmt, optOutOfDBLevelChangefeedUnwatchedTables) sqlDB.Exec(t, "INSERT INTO foo VALUES (4, 'Doug'), (5, 'Elaine'), (6, 'Fred')") sqlDB.Exec(t, "INSERT INTO bar VALUES (4, 'd'), (5, 'e'), (6, 'f')") @@ -10792,7 +10875,9 @@ func TestAlterChangefeedTelemetryLogs(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b STRING)`) - testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar`) + testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar`, optOutOfMetamorphicDBLevelChangefeed{ + reason: "db-level changefeed doesn't support ALTER CHANGEFEED ADD/DROP", + }) defer closeFeed(t, testFeed) feed := testFeed.(cdctest.EnterpriseTestFeed) @@ -11080,7 +11165,8 @@ func TestChangefeedKafkaMessageTooLarge(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE large (a INT PRIMARY KEY)`) sqlDB.Exec(t, `INSERT INTO large (a) SELECT * FROM generate_series(1, 2000);`) - foo := feed(t, f, `CREATE CHANGEFEED FOR large WITH kafka_sink_config='{"Flush": {"MaxMessages": 1000}}'`) + foo := feed(t, f, `CREATE CHANGEFEED FOR large WITH kafka_sink_config='{"Flush": {"MaxMessages": 1000}}'`, + optOutOfDBLevelChangefeedUnwatchedTables) defer closeFeed(t, foo) rnd, _ := randutil.NewPseudoRand() @@ -11151,7 +11237,8 @@ func TestChangefeedKafkaMessageTooLarge(t *testing.T) { } { t.Run(fmt.Sprintf(`eventually surface error for retry: %s`, failTest.errMsg), func(t *testing.T) { knobs.kafkaInterceptor = failTest.failInterceptor - foo := feed(t, f, `CREATE CHANGEFEED FOR errors WITH kafka_sink_config='{"Flush": {"MaxMessages": 0}}'`) + foo := feed(t, f, `CREATE CHANGEFEED FOR errors WITH kafka_sink_config='{"Flush": {"MaxMessages": 0}}'`, + optOutOfDBLevelChangefeedUnwatchedTables) defer closeFeed(t, foo) feedJob := foo.(cdctest.EnterpriseTestFeed) diff --git a/pkg/ccl/changefeedccl/encoder_test.go b/pkg/ccl/changefeedccl/encoder_test.go index c9a6252b6fd7..4678be12c091 100644 --- a/pkg/ccl/changefeedccl/encoder_test.go +++ b/pkg/ccl/changefeedccl/encoder_test.go @@ -601,7 +601,9 @@ func TestAvroEnum(t *testing.T) { sqlDB.Exec(t, `INSERT INTO soft_deletes values (0, 'active')`) sd := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR soft_deletes `+ - `WITH format=%s`, changefeedbase.OptFormatAvro)) + `WITH format=%s`, changefeedbase.OptFormatAvro), + optOutOfMetamorphicDBLevelChangefeed{reason: "has unwatched foo table"}, + ) defer closeFeed(t, sd) assertPayloads(t, sd, []string{ `soft_deletes: {"a":{"long":0},"b":{"string":"active"}}->{"after":{"soft_deletes":{"a":{"long":0},"b":{"string":"active"},"c":{"long":0}}}}`, @@ -639,9 +641,11 @@ func TestAvroSchemaNaming(t *testing.T) { sqlDB.Exec(t, `INSERT INTO movr.drivers VALUES (1, 'Alice')`, ) - + optOutOfDBLevelChangefeed := optOutOfMetamorphicDBLevelChangefeed{ + reason: "changefeed watches tables not in the default database", + } movrFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR movr.drivers `+ - `WITH format=%s`, changefeedbase.OptFormatAvro)) + `WITH format=%s`, changefeedbase.OptFormatAvro), optOutOfDBLevelChangefeed) defer closeFeed(t, movrFeed) foo := movrFeed.(*kafkaFeed) @@ -656,7 +660,8 @@ func TestAvroSchemaNaming(t *testing.T) { }) fqnFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR movr.drivers `+ - `WITH format=%s, full_table_name`, changefeedbase.OptFormatAvro)) + `WITH format=%s, full_table_name`, changefeedbase.OptFormatAvro), + optOutOfDBLevelChangefeed) defer closeFeed(t, fqnFeed) foo = fqnFeed.(*kafkaFeed) @@ -671,8 +676,8 @@ func TestAvroSchemaNaming(t *testing.T) { }) prefixFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR movr.drivers `+ - `WITH format=%s, avro_schema_prefix=super`, - changefeedbase.OptFormatAvro)) + `WITH format=%s, avro_schema_prefix=super`, changefeedbase.OptFormatAvro), + optOutOfDBLevelChangefeed) defer closeFeed(t, prefixFeed) foo = prefixFeed.(*kafkaFeed) @@ -687,7 +692,8 @@ func TestAvroSchemaNaming(t *testing.T) { }) prefixFQNFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR movr.drivers `+ - `WITH format=%s, avro_schema_prefix=super, full_table_name`, changefeedbase.OptFormatAvro)) + `WITH format=%s, avro_schema_prefix=super, full_table_name`, changefeedbase.OptFormatAvro), + optOutOfDBLevelChangefeed) defer closeFeed(t, prefixFQNFeed) foo = prefixFQNFeed.(*kafkaFeed) @@ -707,7 +713,8 @@ func TestAvroSchemaNaming(t *testing.T) { sqlDB.Exec(t, `ALTER TABLE movr.drivers ADD COLUMN vehicle_id int CREATE FAMILY volatile`) multiFamilyFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR movr.drivers `+ - `WITH format=%s, %s`, changefeedbase.OptFormatAvro, changefeedbase.OptSplitColumnFamilies)) + `WITH format=%s, %s`, changefeedbase.OptFormatAvro, changefeedbase.OptSplitColumnFamilies), + optOutOfDBLevelChangefeed) defer closeFeed(t, multiFamilyFeed) foo = multiFamilyFeed.(*kafkaFeed) @@ -744,8 +751,11 @@ func TestAvroSchemaNamespace(t *testing.T) { `INSERT INTO movr.drivers VALUES (1, 'Alice')`, ) + optOutOfDBLevelChangefeed := optOutOfMetamorphicDBLevelChangefeed{ + reason: "changefeed watches tables not in the default database", + } noNamespaceFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR movr.drivers `+ - `WITH format=%s`, changefeedbase.OptFormatAvro)) + `WITH format=%s`, changefeedbase.OptFormatAvro), optOutOfDBLevelChangefeed) defer closeFeed(t, noNamespaceFeed) assertPayloads(t, noNamespaceFeed, []string{ @@ -758,7 +768,8 @@ func TestAvroSchemaNamespace(t *testing.T) { require.NotContains(t, foo.registry.SchemaForSubject(`drivers-value`), `namespace`) namespaceFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR movr.drivers `+ - `WITH format=%s, avro_schema_prefix=super`, changefeedbase.OptFormatAvro)) + `WITH format=%s, avro_schema_prefix=super`, changefeedbase.OptFormatAvro), + optOutOfDBLevelChangefeed) defer closeFeed(t, namespaceFeed) foo = namespaceFeed.(*kafkaFeed) @@ -787,7 +798,8 @@ func TestAvroSchemaHasExpectedTopLevelFields(t *testing.T) { ) foo := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR movr.drivers `+ - `WITH format=%s`, changefeedbase.OptFormatAvro)) + `WITH format=%s`, changefeedbase.OptFormatAvro), + optOutOfMetamorphicDBLevelChangefeed{reason: "uses non default DB"}) defer closeFeed(t, foo) assertPayloads(t, foo, []string{ diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index b21664222e74..fc574fbb2848 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -1147,6 +1147,14 @@ type optOutOfMetamorphicEnrichedEnvelope struct { reason string } +type optOutOfMetamorphicDBLevelChangefeed struct { + reason string +} + +var optOutOfDBLevelChangefeedUnwatchedTables = optOutOfMetamorphicDBLevelChangefeed{ + reason: "test initializes multiple tables but doesn't watch all of them", +} + func feed( t testing.TB, f cdctest.TestFeedFactory, create string, args ...interface{}, ) cdctest.TestFeed { @@ -1157,6 +1165,11 @@ func feed( t.Fatal(err) } + create, args, err = maybeForceDBLevelChangefeed(t, create, f, args) + if err != nil { + t.Fatal(err) + } + feed, err := f.Feed(create, args...) if err != nil { t.Fatal(err) @@ -1171,6 +1184,85 @@ func feed( return feed } +func maybeForceDBLevelChangefeed( + t testing.TB, create string, f cdctest.TestFeedFactory, args []any, +) (newCreate string, newArgs []any, err error) { + for i, arg := range args { + if o, ok := arg.(optOutOfMetamorphicDBLevelChangefeed); ok { + t.Logf("opted out of DB level changefeed for %s: %s", create, o.reason) + newArgs = slices.Clone(args) + newArgs = slices.Delete(newArgs, i, i+1) + return create, newArgs, nil + } + } + + switch f := f.(type) { + // Skip external connection feeds for now. (TODO(#148858): revisit this.) + case *externalConnectionFeedFactory, *sinklessFeedFactory: + t.Logf("did not force DB level changefeed for %s because %T is not supported", create, f) + return create, args, nil + } + + createStmt, err := parser.ParseOne(create) + if err != nil { + return "", nil, err + } + createAST := createStmt.AST.(*tree.CreateChangefeed) + if createAST.Select != nil { + t.Logf("did not force DB level changefeed for %s because it is a CDC query", create) + return create, args, nil + } + + if createAST.Level == tree.ChangefeedLevelDatabase { + t.Logf("did not force DB level changefeed for %s because it is already a DB level changefeed", create) + return create, args, nil + } + + opts := createAST.Options + for _, opt := range opts { + // Don't do it if there's an initial scan explicitly requested. + // TODO(#148858): Do I need this equalFold that copilot added? + key := opt.Key.String() + if strings.EqualFold(key, "initial_scan") { + if opt.Value != nil && (opt.Value.String() == "yes" || opt.Value.String() == "only") { + t.Logf("did not force DB level changefeed for %s because it has an initial scan", create) + return create, args, nil + } + } + if strings.EqualFold(key, "initial_scan_only") { + t.Logf("did not force DB level changefeed for %s because it set initial scan only", create) + return create, args, nil + } + // Since DB level feeds set split column families, and split column families is incompatible + // with resolved for kafka and pubsub sinks, we skip DB level feeds metamorphic testing in + // that case. + if strings.EqualFold(key, "resolved") { + switch f.(type) { + case *kafkaFeedFactory, *pubsubFeedFactory: + t.Logf("did not force DB level changefeed for %s because it set resolved", create) + return create, args, nil + } + } + } + + for _, target := range createAST.TableTargets { + if target.FamilyName != "" { + t.Logf("did not force DB level changefeed for %s because it has column family %s", create, target.FamilyName) + return create, args, nil + } + } + + // Keep the options as is but make it a DB level changefeed. + createStmt.AST.(*tree.CreateChangefeed).Level = tree.ChangefeedLevelDatabase + createStmt.AST.(*tree.CreateChangefeed).TableTargets = nil + createStmt.AST.(*tree.CreateChangefeed).DatabaseTarget = tree.ChangefeedDatabaseTarget("d") + t.Logf("forcing DB level changefeed for %s", create) + create = tree.AsStringWithFlags(createStmt.AST, tree.FmtShowPasswords) + + t.Logf("forced DB level changefeed result: %s", create) + return create, args, nil +} + func maybeForceEnrichedEnvelope( t testing.TB, create string, f cdctest.TestFeedFactory, args []any, ) (newCreate string, newArgs []any, forced bool, _ error) { diff --git a/pkg/ccl/changefeedccl/protected_timestamps_test.go b/pkg/ccl/changefeedccl/protected_timestamps_test.go index 875ea7b5aed8..9530069f67c3 100644 --- a/pkg/ccl/changefeedccl/protected_timestamps_test.go +++ b/pkg/ccl/changefeedccl/protected_timestamps_test.go @@ -381,7 +381,7 @@ func TestChangefeedAlterPTS(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `CREATE TABLE foo2 (a INT PRIMARY KEY, b STRING)`) f2 := feed(t, f, `CREATE CHANGEFEED FOR table foo with protect_data_from_gc_on_pause, - resolved='1s', min_checkpoint_frequency='1s'`) + resolved='1s', min_checkpoint_frequency='1s'`, optOutOfDBLevelChangefeedUnwatchedTables) defer closeFeed(t, f2) getNumPTSRecords := func() int { diff --git a/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go b/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go index 6a763cdaeba8..15b85ee9c44a 100644 --- a/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go +++ b/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go @@ -68,7 +68,11 @@ func TestShowChangefeedJobsDatabaseLevel(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO bar VALUES (1, 'initial')`) - tcf := feed(t, f, `CREATE CHANGEFEED FOR d.foo, d.bar`) + tcf := feed(t, f, `CREATE CHANGEFEED FOR d.foo, d.bar`, + optOutOfMetamorphicDBLevelChangefeed{ + reason: "test asserts how DB-level and table-level changefeeds differ", + }, + ) defer closeFeed(t, tcf) assertPayloads(t, tcf, []string{ `foo: [0]->{"after": {"a": 0, "b": "initial"}}`, @@ -161,7 +165,11 @@ func TestShowChangefeedJobsBasic(t *testing.T) { sqlDB := sqlutils.MakeSQLRunner(s.DB) sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) - foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH format='json'`) + foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH format='json'`, + optOutOfMetamorphicDBLevelChangefeed{ + // NB: We test WITH WATCHED_TABLES in another test. This one specifically does not. + reason: "db level changefeeds don't have full_table_names without WITH WATCHED_TABLES", + }) defer closeFeed(t, foo) type row struct { @@ -292,7 +300,9 @@ func TestShowChangefeedJobsRedacted(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { foo := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED FOR TABLE foo INTO '%s'`, tc.uri), - optOutOfMetamorphicEnrichedEnvelope{reason: "compares text of changefeed statement"}) + optOutOfMetamorphicEnrichedEnvelope{reason: "compares text of changefeed statement"}, + optOutOfMetamorphicDBLevelChangefeed{reason: "compares text of changefeed statement"}, + ) defer closeFeed(t, foo) efoo, ok := foo.(cdctest.EnterpriseTestFeed) @@ -554,7 +564,10 @@ func TestShowChangefeedJobsAlterChangefeed(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`) - foo := feed(t, f, `CREATE CHANGEFEED FOR foo`, optOutOfMetamorphicEnrichedEnvelope{reason: "compares text of changefeed statement"}) + foo := feed(t, f, `CREATE CHANGEFEED FOR foo`, + optOutOfMetamorphicEnrichedEnvelope{reason: "compares text of changefeed statement"}, + optOutOfMetamorphicDBLevelChangefeed{reason: "compares text of changefeed statement"}, + ) defer closeFeed(t, foo) feed, ok := foo.(cdctest.EnterpriseTestFeed) @@ -651,7 +664,8 @@ func TestShowChangefeedJobsAuthorization(t *testing.T) { var jobID jobspb.JobID createFeed := func(stmt string) { - successfulFeed := feed(t, f, stmt) + successfulFeed := feed(t, f, stmt, optOutOfMetamorphicDBLevelChangefeed{ + reason: "tests table level changefeed permissions"}) defer closeFeed(t, successfulFeed) _, err := successfulFeed.Next() require.NoError(t, err)