@@ -59,6 +59,7 @@ import (
5959 "github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
6060 "github.com/cockroachdb/cockroach/pkg/roachprod/vm"
6161 roachprodaws "github.com/cockroachdb/cockroach/pkg/roachprod/vm/aws"
62+ "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
6263 "github.com/cockroachdb/cockroach/pkg/testutils"
6364 "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
6465 "github.com/cockroachdb/cockroach/pkg/util/retry"
@@ -1899,7 +1900,182 @@ func configureDBForMultiTablePTSBenchmark(db *gosql.DB) error {
18991900 return nil
19001901}
19011902
1903+ func getDiagramProcessors (ctx context.Context , db * gosql.DB ) ([]any , error ) {
1904+ var diagramURL string
1905+ diagramQuery := `SELECT value
1906+ FROM system.job_info ji
1907+ INNER JOIN system.jobs j ON ji.job_id = j.id
1908+ WHERE j.job_type = 'CHANGEFEED' AND ji.info_key LIKE '~dsp-diag-url-%'`
1909+ if err := db .QueryRowContext (ctx , diagramQuery ).Scan (& diagramURL ); err != nil {
1910+ return nil , err
1911+ }
1912+ diagram , err := execinfrapb .FromURL (diagramURL )
1913+ if err != nil {
1914+ return nil , err
1915+ }
1916+ diagramJSON , err := json .Marshal (diagram )
1917+ if err != nil {
1918+ return nil , err
1919+ }
1920+ var flow map [string ]any
1921+ if err := json .Unmarshal (diagramJSON , & flow ); err != nil {
1922+ return nil , err
1923+ }
1924+ processors , ok := flow ["processors" ].([]any )
1925+ if ! ok {
1926+ return nil , fmt .Errorf ("processors not found in flow" )
1927+ }
1928+ return processors , nil
1929+ }
1930+
1931+ type ChangefeedDistribution struct {
1932+ NodeToSpansWatched map [int ]int
1933+ ZoneToSpansWatched map [string ]int
1934+ TotalSpansWatched int
1935+ TotalAggregators int
1936+ TotalLeaseHolders int
1937+ TotalRanges int
1938+ NodeToZone map [int ]string
1939+ }
1940+
1941+ func getChangefeedDistribution (
1942+ processors []any , nodeToZone map [int ]string , t test.Test ,
1943+ ) ChangefeedDistribution {
1944+ changefeedDistribution := ChangefeedDistribution {
1945+ NodeToSpansWatched : make (map [int ]int ),
1946+ ZoneToSpansWatched : make (map [string ]int ),
1947+ TotalSpansWatched : 0 ,
1948+ TotalAggregators : 0 ,
1949+ TotalLeaseHolders : 0 ,
1950+ TotalRanges : 0 ,
1951+ NodeToZone : nodeToZone ,
1952+ }
1953+ for _ , p := range processors {
1954+ procMap , ok := p .(map [string ]any )
1955+ if ! ok {
1956+ t .Fatalf ("processor not a map" )
1957+ }
1958+ nodeIdx , ok := procMap ["nodeIdx" ].(float64 )
1959+ require .True (t , ok , "node idx not found in processor" )
1960+ core , ok := procMap ["core" ].(map [string ]any )
1961+ require .True (t , ok , "core not found in processor" )
1962+ title , ok := core ["title" ].(string )
1963+ require .True (t , ok , "title not found in core" )
1964+ if strings .HasPrefix (title , "ChangeAggregator" ) {
1965+ changefeedDistribution .TotalAggregators ++
1966+ details := core ["details" ].([]any )
1967+ for _ , detail := range details {
1968+ if strings .HasPrefix (detail .(string ), "Watches" ) {
1969+ re := regexp .MustCompile (`Watches \[(\d+)\]:` )
1970+ matches := re .FindStringSubmatch (detail .(string ))
1971+ if len (matches ) > 1 {
1972+ numWatches , err := strconv .Atoi (matches [1 ])
1973+ require .NoError (t , err )
1974+ changefeedDistribution .NodeToSpansWatched [int (nodeIdx )] += numWatches
1975+ changefeedDistribution .TotalSpansWatched += numWatches
1976+ changefeedDistribution .ZoneToSpansWatched [changefeedDistribution .NodeToZone [int (nodeIdx )]] += numWatches
1977+
1978+ }
1979+ }
1980+ }
1981+ }
1982+ }
1983+ return changefeedDistribution
1984+ }
1985+
1986+ func veryifyLeaseHolderDistribution (
1987+ db * gosql.DB , t test.Test , nodeToZone map [int ]string ,
1988+ ) map [string ]int {
1989+ var rows * gosql.Rows
1990+ // Get lease holders for all ranges in tpcc database.
1991+ leaseHolderQuery := `SELECT r.start_pretty, r.replicas, r.replica_localities, r.lease_holder
1992+ FROM crdb_internal.ranges r
1993+ JOIN crdb_internal.tables t ON r.start_pretty like concat('/Table/', t.table_id::STRING,'%')
1994+ WHERE t.database_name = 'tpcc'`
1995+ rows , err := db .Query (leaseHolderQuery )
1996+ zoneToLeaseHolderCount := make (map [string ]int )
1997+ require .NoError (t , err )
1998+ defer rows .Close ()
1999+ for rows .Next () {
2000+ var startKeyPretty string
2001+ var replicas []uint8
2002+ var replicaLocalities []uint8
2003+ var leaseHolder int
2004+ require .NoError (t , rows .Scan (& startKeyPretty , & replicas , & replicaLocalities , & leaseHolder ))
2005+ for indx := range replicas {
2006+ require .NotEqual (t , replicas [indx ], 0 )
2007+ replicas [indx ]--
2008+ }
2009+ leaseHolder --
2010+ zoneToLeaseHolderCount [nodeToZone [leaseHolder ]]++
2011+ }
2012+ return zoneToLeaseHolderCount
2013+ }
2014+
19022015func registerCDC (r registry.Registry ) {
2016+ r .Add (registry.TestSpec {
2017+ // This test
2018+ // 1. Creates a cluster with 3 nodes each in us-east and us-west
2019+ // 2. Runs a tpcc workload, then sets tpcc database to primary region us-west
2020+ // 3. Creates a changefeed with execution locality set to us-east
2021+ // 4. Gets the changefeed diagram and creates mappings
2022+
2023+ // This test is used to verify that ranges are evenly distributed across
2024+ // change aggregators in the execution_locality region while targeting tables
2025+ // whose primary region is different. In issue #2955, in that scenario,
2026+ // a single change aggregator (on the gateway node) would watch all the ranges.
2027+ // The above scenario occured with the older bin-packing oracle rather than
2028+ // the bulk oracle.
2029+ Name : "cdc/multi-region-execution-locality-tpcc" ,
2030+ Owner : registry .OwnerCDC ,
2031+ Cluster : r .MakeClusterSpec (7 , spec .Geo (), spec .GatherCores (), spec .GCEZones ("us-east1-b,us-west1-b" )),
2032+ CompatibleClouds : registry .OnlyGCE ,
2033+ Suites : registry .Suites (),
2034+ Run : func (ctx context.Context , t test.Test , c cluster.Cluster ) {
2035+ nodeToZone := map [int ]string {
2036+ 0 : "us-east1-b" ,
2037+ 1 : "us-east1-b" ,
2038+ 2 : "us-east1-b" ,
2039+ 3 : "us-west1-b" ,
2040+ 4 : "us-west1-b" ,
2041+ 5 : "us-west1-b" ,
2042+ }
2043+ ct := newCDCTester (ctx , t , c )
2044+ defer ct .Close ()
2045+
2046+ ct .runTPCCWorkload (tpccArgs {warehouses : 100 })
2047+
2048+ var err error
2049+ _ , err = ct .DB ().Exec ("ALTER DATABASE tpcc SET PRIMARY REGION 'us-west1'" )
2050+ require .NoError (t , err )
2051+
2052+ feed := ct .newChangefeed (feedArgs {
2053+ sinkType : cloudStorageSink ,
2054+ targets : allTpccTargets ,
2055+ opts : map [string ]string {
2056+ "execution_locality" : "'region=us-east1'" ,
2057+ },
2058+ })
2059+ ct .waitForWorkload ()
2060+ feed .waitForCompletion ()
2061+
2062+ processors , err := getDiagramProcessors (ctx , ct .DB ())
2063+ require .NoError (t , err )
2064+
2065+ changefeedDistribution := getChangefeedDistribution (processors , nodeToZone , t )
2066+ require .Greater (t , changefeedDistribution .TotalAggregators , 1 )
2067+ for nodeIdx , spansWatched := range changefeedDistribution .NodeToSpansWatched {
2068+ require .LessOrEqual (t , spansWatched , changefeedDistribution .TotalSpansWatched / 2 , "nodeIdx %d watched %d spans, total spans watched %d" , nodeIdx , spansWatched , changefeedDistribution .TotalSpansWatched )
2069+ }
2070+ require .Equal (t , 1 , len (changefeedDistribution .ZoneToSpansWatched ))
2071+ require .Equal (t , changefeedDistribution .ZoneToSpansWatched ["us-east1-b" ], changefeedDistribution .TotalSpansWatched )
2072+ zoneToLeaseHolderCount := veryifyLeaseHolderDistribution (ct .DB (), t , nodeToZone )
2073+ // Majority of lease holders should be in us-west1-b. Some may not, but most should.
2074+ if zoneToLeaseHolderCount ["us-east1-b" ] != 0 {
2075+ require .Greater (t , zoneToLeaseHolderCount ["us-west1-b" ]/ zoneToLeaseHolderCount ["us-east1-b" ], 10 )
2076+ }
2077+ },
2078+ })
19032079 r .Add (registry.TestSpec {
19042080 Name : "cdc/initial-scan-only" ,
19052081 Owner : registry .OwnerCDC ,
0 commit comments