@@ -224,7 +224,36 @@ func TestDatabaseLevelChangefeedBasics(t *testing.T) {
224224 cdcTest (t , testFn )
225225}
226226
227- func TestDatabaseLevelChangefeedWithFilter (t * testing.T ) {
227+ func TestDatabaseLevelChangefeedWithIncludeFilter (t * testing.T ) {
228+ defer leaktest .AfterTest (t )()
229+ defer log .Scope (t ).Close (t )
230+
231+ testFn := func (t * testing.T , s TestServer , f cdctest.TestFeedFactory ) {
232+ expectSuccess := func (stmt string ) {
233+ successfulFeed := feed (t , f , stmt )
234+ defer closeFeed (t , successfulFeed )
235+ _ , err := successfulFeed .Next ()
236+ require .NoError (t , err )
237+ }
238+ sqlDB := sqlutils .MakeSQLRunner (s .DB )
239+ sqlDB .Exec (t , `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)` )
240+ sqlDB .Exec (t , `INSERT INTO foo VALUES (0, 'initial')` )
241+ sqlDB .Exec (t , `UPSERT INTO foo VALUES (0, 'updated')` )
242+ sqlDB .Exec (t , `CREATE TABLE foo2 (a INT PRIMARY KEY, b STRING)` )
243+ sqlDB .Exec (t , `INSERT INTO foo2 VALUES (0, 'initial')` )
244+ sqlDB .Exec (t , `UPSERT INTO foo2 VALUES (0, 'updated')` )
245+
246+ expectSuccess (`CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo` )
247+ expectSuccess (`CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo,foo2` )
248+ expectSuccess (`CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo.bar.fizz, foo.foo2, foo` )
249+ expectErrCreatingFeed (t , f , `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo.*` ,
250+ `at or near "*": syntax error` )
251+ // TODO(#147421): Assert payload once the filter works
252+ }
253+ cdcTest (t , testFn , feedTestEnterpriseSinks )
254+ }
255+
256+ func TestDatabaseLevelChangefeedWithExcludeFilter (t * testing.T ) {
228257 defer leaktest .AfterTest (t )()
229258 defer log .Scope (t ).Close (t )
230259
0 commit comments