@@ -245,7 +245,7 @@ func TestDatabaseLevelChangefeedWithIncludeFilter(t *testing.T) {
245245
246246 expectSuccess (`CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo` )
247247 expectSuccess (`CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo,foo2` )
248- expectSuccess (`CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo .bar.fizz, foo.foo2, foo` )
248+ expectSuccess (`CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES d .bar.fizz, foo.foo2, foo` )
249249 expectErrCreatingFeed (t , f , `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo.*` ,
250250 `at or near "*": syntax error` )
251251 // TODO(#147421): Assert payload once the filter works
@@ -258,26 +258,85 @@ func TestDatabaseLevelChangefeedWithExcludeFilter(t *testing.T) {
258258 defer log .Scope (t ).Close (t )
259259
260260 testFn := func (t * testing.T , s TestServer , f cdctest.TestFeedFactory ) {
261- expectSuccess := func (stmt string ) {
262- successfulFeed := feed (t , f , stmt )
263- defer closeFeed (t , successfulFeed )
264- _ , err := successfulFeed .Next ()
265- require .NoError (t , err )
266- }
267261 sqlDB := sqlutils .MakeSQLRunner (s .DB )
268- sqlDB .Exec (t , `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)` )
269- sqlDB .Exec (t , `INSERT INTO foo VALUES (0, 'initial')` )
270- sqlDB .Exec (t , `UPSERT INTO foo VALUES (0, 'updated')` )
271- sqlDB .Exec (t , `CREATE TABLE foo2 (a INT PRIMARY KEY, b STRING)` )
272- sqlDB .Exec (t , `INSERT INTO foo2 VALUES (0, 'initial')` )
273- sqlDB .Exec (t , `UPSERT INTO foo2 VALUES (0, 'updated')` )
262+ for i := range 4 {
263+ name := fmt .Sprintf ("foo%d" , i + 1 )
264+ sqlDB .Exec (t , fmt .Sprintf (`CREATE TABLE %s (a INT PRIMARY KEY, b STRING)` , name ))
265+ sqlDB .Exec (t , fmt .Sprintf (`INSERT INTO %s VALUES (0, 'initial')` , name ))
266+ sqlDB .Exec (t , fmt .Sprintf (`UPSERT INTO %s VALUES (0, 'updated')` , name ))
267+ }
268+
269+ sqlDB .Exec (t , `CREATE SCHEMA private` )
270+ sqlDB .Exec (t , `CREATE TABLE private.foo1 (a INT PRIMARY KEY, b STRING)` )
271+ sqlDB .Exec (t , `INSERT INTO private.foo1 VALUES (0, 'initial')` )
272+ // Test that if there are multiple tables with the same name the correct
273+ // one will still be found using the default schema of public.
274+ feed1 := feed (t , f , `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo1` )
275+ defer closeFeed (t , feed1 )
276+ assertPayloads (t , feed1 , []string {
277+ `foo1: [0]->{"after": {"a": 0, "b": "initial"}}` ,
278+ `foo2: [0]->{"after": {"a": 0, "b": "updated"}}` ,
279+ `foo3: [0]->{"after": {"a": 0, "b": "updated"}}` ,
280+ `foo4: [0]->{"after": {"a": 0, "b": "updated"}}` ,
281+ })
282+
283+ feed2 := feed (t , f , `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo2` )
284+ defer closeFeed (t , feed2 )
285+ assertPayloads (t , feed2 , []string {
286+ `foo1: [0]->{"after": {"a": 0, "b": "initial"}}` ,
287+ `foo1: [0]->{"after": {"a": 0, "b": "updated"}}` ,
288+ `foo3: [0]->{"after": {"a": 0, "b": "updated"}}` ,
289+ `foo4: [0]->{"after": {"a": 0, "b": "updated"}}` ,
290+ })
291+
292+ // Test that we can exclude multiple tables.
293+ feed3 := feed (t , f , `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo2,foo3` )
294+ defer closeFeed (t , feed3 )
295+ assertPayloads (t , feed3 , []string {
296+ `foo1: [0]->{"after": {"a": 0, "b": "initial"}}` ,
297+ `foo1: [0]->{"after": {"a": 0, "b": "updated"}}` ,
298+ `foo4: [0]->{"after": {"a": 0, "b": "updated"}}` ,
299+ })
300+
301+ // Test that we can handle fully qualfied, partially qualified, and unqualified table names.
302+ feed4 := feed (t , f , `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES d.public.foo2, public.foo3, foo4` )
303+ defer closeFeed (t , feed4 )
304+ assertPayloads (t , feed4 , []string {
305+ `foo1: [0]->{"after": {"a": 0, "b": "initial"}}` ,
306+ `foo1: [0]->{"after": {"a": 0, "b": "updated"}}` ,
307+ })
308+
309+ // Test that we can handle tables that don't exist.
310+ feed5 := feed (t , f , `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES bob` )
311+ defer closeFeed (t , feed5 )
312+ assertPayloads (t , feed5 , []string {
313+ `foo1: [0]->{"after": {"a": 0, "b": "initial"}}` ,
314+ `foo1: [0]->{"after": {"a": 0, "b": "updated"}}` ,
315+ `foo2: [0]->{"after": {"a": 0, "b": "updated"}}` ,
316+ `foo3: [0]->{"after": {"a": 0, "b": "updated"}}` ,
317+ `foo4: [0]->{"after": {"a": 0, "b": "updated"}}` ,
318+ })
274319
275- expectSuccess (`CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo` )
276- expectSuccess (`CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo,foo2` )
277- expectSuccess (`CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo.bar.fizz, foo.foo2, foo` )
320+ // Test that fully qualified table names outside of the target database will
321+ // cause an error.
322+ expectErrCreatingFeed (t , f , `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES fizz.buzz.foo` ,
323+ `table "fizz.buzz.foo" must be in target database "d"` )
324+
325+ // Table patterns are not supported.
278326 expectErrCreatingFeed (t , f , `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo.*` ,
279327 `at or near "*": syntax error` )
280- // TODO(#147421): Assert payload once the filter works
328+
329+ // Test that name resolution is not dependent on search_path() or current DB
330+ sqlDB .Exec (t , `CREATE DATABASE notd` )
331+ sqlDB .Exec (t , `USE notd` )
332+ sqlDB .Exec (t , `SET search_path TO notpublic` )
333+ feed6 := feed (t , f , `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo1, private.foo1` )
334+ defer closeFeed (t , feed6 )
335+ assertPayloads (t , feed6 , []string {
336+ `foo2: [0]->{"after": {"a": 0, "b": "updated"}}` ,
337+ `foo3: [0]->{"after": {"a": 0, "b": "updated"}}` ,
338+ `foo4: [0]->{"after": {"a": 0, "b": "updated"}}` ,
339+ })
281340 }
282341 cdcTest (t , testFn , feedTestEnterpriseSinks )
283342}
0 commit comments