Skip to content

Commit 10e2308

Browse files
committed
now we're cooking
1 parent 7ba2b99 commit 10e2308

File tree

1 file changed

+53
-2
lines changed

1 file changed

+53
-2
lines changed

pkg/ccl/changefeedccl/helpers_test.go

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1188,8 +1188,59 @@ func maybeForceDBLevelChangefeed(
11881188
t.Logf("opted out of DB level changefeed for %s: %s", create, o.reason)
11891189
}
11901190
}
1191-
// return create, nil
1192-
return `CREATE CHANGEFEED FOR DATABASE d`, nil
1191+
1192+
switch f := f.(type) {
1193+
// Don't do it for sinkless feeds.
1194+
// Skip external connection feeds for now. (TODO: revisit this.)
1195+
case *externalConnectionFeedFactory, *sinklessFeedFactory:
1196+
t.Logf("did not force DB level changefeed for %s because %T is not supported", create, f)
1197+
return create, nil
1198+
}
1199+
1200+
// Don't do it if it's a CDC query.
1201+
createStmt, err := parser.ParseOne(create)
1202+
if err != nil {
1203+
return "", err
1204+
}
1205+
createAST := createStmt.AST.(*tree.CreateChangefeed)
1206+
if createAST.Select != nil {
1207+
t.Logf("did not force DB level changefeed for %s because it is a CDC query", create)
1208+
return create, nil
1209+
}
1210+
1211+
// Don't do it if it's already a DB level changefeed.
1212+
if createAST.Level == tree.ChangefeedLevelDatabase {
1213+
t.Logf("did not force DB level changefeed for %s because it is already a DB level changefeed", create)
1214+
return create, nil
1215+
}
1216+
1217+
opts := createAST.Options
1218+
for _, opt := range opts {
1219+
// Don't do it if there's an initial scan explicitly requested.
1220+
// TODO: double check that this doesn't exclude the default case where the initial scan is not specified.
1221+
// Do I need this equalFold that copilot added?
1222+
key := opt.Key.String()
1223+
if strings.EqualFold(key, "initial_scan") {
1224+
if opt.Value.String() != "yes" || opt.Value.String() != "only" {
1225+
t.Logf("did not force DB level changefeed for %s because it has an initial scan", create)
1226+
return create, nil
1227+
}
1228+
}
1229+
if strings.EqualFold(key, "initial_scan_only") {
1230+
t.Logf("did not force DB level changefeed for %s because it set initial scan only", create)
1231+
return create, nil
1232+
}
1233+
}
1234+
1235+
// Keep the options as is but make it a DB level changefeed.
1236+
createStmt.AST.(*tree.CreateChangefeed).Level = tree.ChangefeedLevelDatabase
1237+
createStmt.AST.(*tree.CreateChangefeed).TableTargets = nil
1238+
createStmt.AST.(*tree.CreateChangefeed).DatabaseTarget = tree.ChangefeedDatabaseTarget("d")
1239+
t.Logf("forcing DB level changefeed for %s", create)
1240+
create = tree.AsStringWithFlags(createStmt.AST, tree.FmtShowPasswords)
1241+
1242+
t.Logf("forced DB level changefeed result: %s", create)
1243+
return create, nil
11931244
}
11941245

11951246
func maybeForceEnrichedEnvelope(

0 commit comments

Comments
 (0)