@@ -783,11 +783,17 @@ func newCDCTester(ctx context.Context, t test.Test, c cluster.Cluster, opts ...o
783783 settings .ClusterSettings ["changefeed.slow_span_log_threshold" ] = "30s"
784784 settings .ClusterSettings ["server.child_metrics.enabled" ] = "true"
785785
786- // Randomly set a quantization interval since metamorphic settings
787- // don't extend to roachtests.
788- quantization := fmt .Sprintf ("%ds" , rand .Intn (30 ))
789- settings .ClusterSettings ["changefeed.resolved_timestamp.granularity" ] = quantization
790- t .Status (fmt .Sprintf ("changefeed.resolved_timestamp.granularity: %s" , quantization ))
786+ // Set cluster settings that we want to test metamorphically to random values
787+ // since metamorphic settings don't extend to roachtests.
788+ {
789+ quantization := fmt .Sprintf ("%ds" , rand .Intn (30 ))
790+ settings .ClusterSettings ["changefeed.resolved_timestamp.granularity" ] = quantization
791+ t .Status (fmt .Sprintf ("changefeed.resolved_timestamp.granularity: %s" , quantization ))
792+
793+ perTableTracking := fmt .Sprintf ("%t" , rand .Intn (2 ) == 0 )
794+ settings .ClusterSettings ["changefeed.progress.per_table_tracking.enabled" ] = perTableTracking
795+ t .Status (fmt .Sprintf ("changefeed.progress.per_table_tracking.enabled: %s" , perTableTracking ))
796+ }
791797
792798 settings .Env = append (settings .Env , envVars ... )
793799
@@ -3106,7 +3112,6 @@ func registerCDC(r registry.Registry) {
31063112 CompatibleClouds : registry .AllExceptIBM ,
31073113 Run : runMessageTooLarge ,
31083114 })
3109-
31103115 for _ , perTablePTS := range []bool {false , true } {
31113116 for _ , config := range []struct {
31123117 numTables int
@@ -3138,17 +3143,30 @@ func registerCDC(r registry.Registry) {
31383143 })
31393144 }
31403145 }
3141- for _ , interval := range []string {"30s" , "5m" , "10m" } {
3142- for _ , perTableTracking := range []bool {false , true } {
3146+ for _ , interval := range []string {
3147+ "5s" , // min interval
3148+ "30s" , // default interval
3149+ "10m" , // max interval
3150+ } {
3151+ for _ , cfg := range []struct {
3152+ tables int
3153+ ranges int
3154+ }{
3155+ {tables : 1 , ranges : 10_000 },
3156+ {tables : 10 , ranges : 1_000 },
3157+ {tables : 100 , ranges : 100 },
3158+ {tables : 1_000 , ranges : 10 },
3159+ {tables : 10_000 , ranges : 1 },
3160+ } {
31433161 r .Add (registry.TestSpec {
31443162 Name : "cdc/frontier-persistence-benchmark" +
3145- fmt .Sprintf ("/interval=%s/per-table-tracking=%t " , interval , perTableTracking ),
3163+ fmt .Sprintf ("/interval=%s/tables=%d/ranges=%d " , interval , cfg . tables , cfg . ranges ),
31463164 Owner : registry .OwnerCDC ,
31473165 Benchmark : true ,
31483166 Cluster : r .MakeClusterSpec (4 , spec .CPU (16 ), spec .WorkloadNode ()),
31493167 CompatibleClouds : registry .AllClouds ,
31503168 Suites : registry .Suites (registry .Nightly ),
3151- Timeout : time .Hour ,
3169+ Timeout : 2 * time .Hour ,
31523170 Run : func (ctx context.Context , t test.Test , c cluster.Cluster ) {
31533171 ct := newCDCTester (ctx , t , c )
31543172 defer ct .Close ()
@@ -3158,7 +3176,6 @@ func registerCDC(r registry.Registry) {
31583176 // Configure various cluster settings.
31593177 for name , value := range map [string ]string {
31603178 "changefeed.progress.frontier_persistence.interval" : fmt .Sprintf ("'%s'" , interval ),
3161- "changefeed.progress.per_table_tracking.enabled" : fmt .Sprintf ("%t" , perTableTracking ),
31623179 // Disable span-level checkpointing since it's not necessary
31633180 // when frontier persistence is on.
31643181 "changefeed.span_checkpoint.interval" : "'0'" ,
@@ -3171,13 +3188,13 @@ func registerCDC(r registry.Registry) {
31713188 }
31723189 }
31733190
3174- // Initialize bank workload with multiple tables.
3175- numTables := 1_000
3176- numRows := 1_000
3177- numRanges := 10
3191+ // Initialize bank workload with multiple tables with multiple ranges .
3192+ // Each range will have a single row (or 2 when there's a single range)
3193+ // to maximize the likelihood of unmerged spans in the span frontier.
3194+ rows := max ( cfg . ranges , 2 )
31783195 initCmd := fmt .Sprintf (
3179- "./cockroach workload init bank --rows =%d --ranges=%d --tables =%d {pgurl%s}" ,
3180- numRows , numRanges , numTables , ct .crdbNodes .RandNode ())
3196+ "./cockroach workload init bank --tables =%d --ranges=%d --rows =%d {pgurl%s}" ,
3197+ cfg . tables , cfg . ranges , rows , ct .crdbNodes .RandNode ())
31813198 if err := c .RunE (ctx , option .WithNodes (ct .workloadNode ), initCmd ); err != nil {
31823199 t .Fatalf ("failed to initialize bank tables: %v" , err )
31833200 }
@@ -3187,20 +3204,24 @@ func registerCDC(r registry.Registry) {
31873204 ct .mon .Go (func (ctx context.Context ) error {
31883205 defer ct .workloadWg .Done ()
31893206 workloadCmd := fmt .Sprintf (
3190- "./cockroach workload run bank --rows =%d --duration=30m --tables =%d {pgurl%s}" ,
3191- numRows , numTables , ct .crdbNodes )
3207+ "./cockroach workload run bank --tables =%d --ranges=%d --rows =%d --duration=30m {pgurl%s}" ,
3208+ cfg . tables , cfg . ranges , rows , ct .crdbNodes )
31923209 return c .RunE (ctx , option .WithNodes (ct .workloadNode ), workloadCmd )
31933210 })
31943211
31953212 // Create changefeed targeting all the bank tables.
3196- targetNames := make ([]string , 0 , numTables )
3197- for i := range numTables {
3198- targetNames = append (targetNames , fmt .Sprintf ("bank.bank_%d" , i ))
3213+ targets := make ([]string , cfg .tables )
3214+ if cfg .tables == 1 {
3215+ targets [0 ] = "bank.bank"
3216+ } else {
3217+ for i := range targets {
3218+ targets [i ] = fmt .Sprintf ("bank.bank_%d" , i )
3219+ }
31993220 }
32003221
32013222 feed := ct .newChangefeed (feedArgs {
32023223 sinkType : nullSink ,
3203- targets : targetNames ,
3224+ targets : targets ,
32043225 opts : map [string ]string {
32053226 "initial_scan" : "'no'" ,
32063227 "resolved" : "'3s'" ,
0 commit comments