@@ -2941,6 +2941,85 @@ func registerCDC(r registry.Registry) {
29412941 })
29422942 }
29432943 }
2944+ for _ , interval := range []string {"30s" , "5m" , "10m" } {
2945+ for _ , perTableTracking := range []bool {false , true } {
2946+ r .Add (registry.TestSpec {
2947+ Name : "cdc/frontier-persistence-benchmark" +
2948+ fmt .Sprintf ("/interval=%s/per-table-tracking=%t" , interval , perTableTracking ),
2949+ Owner : registry .OwnerCDC ,
2950+ Benchmark : true ,
2951+ Cluster : r .MakeClusterSpec (4 , spec .CPU (16 ), spec .WorkloadNode ()),
2952+ CompatibleClouds : registry .AllClouds ,
2953+ Suites : registry .Suites (registry .Nightly ),
2954+ Timeout : time .Hour ,
2955+ Run : func (ctx context.Context , t test.Test , c cluster.Cluster ) {
2956+ ct := newCDCTester (ctx , t , c )
2957+ defer ct .Close ()
2958+
2959+ db := ct .DB ()
2960+
2961+ // Configure various cluster settings.
2962+ for name , value := range map [string ]string {
2963+ "changefeed.progress.frontier_persistence.interval" : fmt .Sprintf ("'%s'" , interval ),
2964+ "changefeed.progress.per_table_tracking.enabled" : fmt .Sprintf ("%t" , perTableTracking ),
2965+ // Disable span-level checkpointing since it's not necessary
2966+ // when frontier persistence is on.
2967+ "changefeed.span_checkpoint.interval" : "'0'" ,
2968+ // Disable per-table PTS to avoid impact on results.
2969+ "changefeed.protected_timestamp.per_table.enabled" : "false" ,
2970+ } {
2971+ stmt := fmt .Sprintf (`SET CLUSTER SETTING %s = %s` , name , value )
2972+ if _ , err := db .ExecContext (ctx , stmt ); err != nil {
2973+ t .Fatalf ("failed to run %q: %v" , stmt , err )
2974+ }
2975+ }
2976+
2977+ // Initialize bank workload with multiple tables.
2978+ numTables := 1_000
2979+ numRows := 1_000
2980+ numRanges := 10
2981+ initCmd := fmt .Sprintf (
2982+ "./cockroach workload init bank --rows=%d --ranges=%d --tables=%d {pgurl%s}" ,
2983+ numRows , numRanges , numTables , ct .crdbNodes .RandNode ())
2984+ if err := c .RunE (ctx , option .WithNodes (ct .workloadNode ), initCmd ); err != nil {
2985+ t .Fatalf ("failed to initialize bank tables: %v" , err )
2986+ }
2987+
2988+ // Run bank workload.
2989+ ct .workloadWg .Add (1 )
2990+ ct .mon .Go (func (ctx context.Context ) error {
2991+ defer ct .workloadWg .Done ()
2992+ workloadCmd := fmt .Sprintf (
2993+ "./cockroach workload run bank --rows=%d --duration=30m --tables=%d {pgurl%s}" ,
2994+ numRows , numTables , ct .crdbNodes )
2995+ return c .RunE (ctx , option .WithNodes (ct .workloadNode ), workloadCmd )
2996+ })
2997+
2998+ // Create changefeed targeting all the bank tables.
2999+ targetNames := make ([]string , 0 , numTables )
3000+ for i := range numTables {
3001+ targetNames = append (targetNames , fmt .Sprintf ("bank.bank_%d" , i ))
3002+ }
3003+
3004+ feed := ct .newChangefeed (feedArgs {
3005+ sinkType : nullSink ,
3006+ targets : targetNames ,
3007+ opts : map [string ]string {
3008+ "initial_scan" : "'no'" ,
3009+ "resolved" : "'3s'" ,
3010+ "min_checkpoint_frequency" : "'30s'" ,
3011+ },
3012+ })
3013+
3014+ ct .runFeedLatencyVerifier (feed , latencyTargets {
3015+ steadyLatency : 2 * time .Minute ,
3016+ })
3017+
3018+ ct .waitForWorkload ()
3019+ },
3020+ })
3021+ }
3022+ }
29443023}
29453024
29463025const (
0 commit comments