@@ -419,6 +419,33 @@ func (bo replicateBulkOps) runDriver(
419419 return nil
420420}
421421
422+ type replicateSchemaChange struct {
423+ }
424+
425+ func (sc replicateSchemaChange ) sourceInitCmd (
426+ tenantName string , nodes option.NodeListOption ,
427+ ) string {
428+ return roachtestutil .NewCommand ("./workload init schemachange" ).
429+ Arg ("{pgurl%s:%s}" , nodes , tenantName ).String ()
430+ }
431+
432+ func (sc replicateSchemaChange ) sourceRunCmd (
433+ tenantName string , nodes option.NodeListOption ,
434+ ) string {
435+ return roachtestutil .NewCommand ("./workload run schemachange" ).
436+ Flag ("verbose" , 1 ).
437+ Flag ("max-ops" , 1000 ).
438+ Flag ("concurrency" , 5 ).
439+ Arg ("{pgurl%s:%s}" , nodes , tenantName ).String ()
440+ }
441+
442+ func (sc replicateSchemaChange ) runDriver (
443+ workloadCtx context.Context , c cluster.Cluster , t test.Test , setup * c2cSetup ,
444+ ) error {
445+ // The schema change workload does not need to run the init step.
446+ return defaultWorkloadDriver (workloadCtx , setup , c , sc )
447+ }
448+
422449// replicationSpec are inputs to a c2c roachtest set during roachtest
423450// registration, and can not be modified during roachtest execution.
424451type replicationSpec struct {
@@ -748,6 +775,50 @@ func (rd *replicationDriver) getReplicationRetainedTime() hlc.Timestamp {
748775 return hlc.Timestamp {WallTime : retainedTime .UnixNano ()}
749776}
750777
778+ // ensureStandbyPollerAdvances ensures that the standby poller job is advancing.
779+ func (rd * replicationDriver ) ensureStandbyPollerAdvances (ctx context.Context , ingestionJobID int ) {
780+ if rd .rs .withReaderWorkload == nil {
781+ return
782+ }
783+
784+ info , err := getStreamIngestionJobInfo (rd .setup .dst .db , ingestionJobID )
785+ require .NoError (rd .t , err )
786+ pcrReplicatedTime := info .GetHighWater ()
787+ require .False (rd .t , pcrReplicatedTime .IsZero (), "PCR job has no replicated time" )
788+
789+ // Connect to the reader tenant
790+ readerTenantName := fmt .Sprintf ("%s-readonly" , rd .setup .dst .name )
791+ readerTenantConn := rd .c .Conn (ctx , rd .t .L (), rd .setup .dst .gatewayNodes [0 ], option .VirtualClusterName (readerTenantName ))
792+ defer readerTenantConn .Close ()
793+ readerTenantSQL := sqlutils .MakeSQLRunner (readerTenantConn )
794+
795+ // Poll the standby poller job until its high water timestamp matches the PCR job's replicated time
796+ testutils .SucceedsWithin (rd .t , func () error {
797+ var standbyHighWaterStr string
798+ readerTenantSQL .QueryRow (rd .t ,
799+ `SELECT COALESCE(high_water_timestamp, '0')
800+ FROM crdb_internal.jobs
801+ WHERE job_type = 'STANDBY READ TS POLLER'` ).Scan (& standbyHighWaterStr )
802+
803+ if standbyHighWaterStr == "0" {
804+ return errors .New ("standby poller job not found or has no high water timestamp" )
805+ }
806+
807+ standbyHighWater := DecimalTimeToHLC (rd .t , standbyHighWaterStr )
808+ standbyHighWaterTime := standbyHighWater .GoTime ()
809+
810+ rd .t .L ().Printf ("Standby poller high water: %s; replicated time %s" , standbyHighWaterTime , pcrReplicatedTime )
811+
812+ if standbyHighWaterTime .Compare (pcrReplicatedTime ) >= 0 {
813+ rd .t .L ().Printf ("Standby poller has advanced to PCR replicated time" )
814+ return nil
815+ }
816+
817+ return errors .Newf ("standby poller high water %s not yet at PCR replicated time %s" ,
818+ standbyHighWaterTime , pcrReplicatedTime )
819+ }, 5 * time .Minute )
820+ }
821+
751822func DecimalTimeToHLC (t test.Test , s string ) hlc.Timestamp {
752823 d , _ , err := apd .NewFromString (s )
753824 require .NoError (t , err )
@@ -919,6 +990,34 @@ func (rd *replicationDriver) maybeRunReaderTenantWorkload(
919990 }
920991}
921992
993+ // maybeRunSchemaChangeWorkload runs the schema change workload on the source
994+ // tenant if we've set up a standby tenant. This workload tests that the standby
995+ // poller job will continue to advance even if we're replicating random schema
996+ // changes.
997+ func (rd * replicationDriver ) maybeRunSchemaChangeWorkload (
998+ ctx context.Context , workloadMonitor cluster.Monitor ,
999+ ) {
1000+ if rd .rs .withReaderWorkload != nil {
1001+
1002+ rd .t .Status ("running schema change workload on source" )
1003+ schemaChangeDriver := replicateSchemaChange {}
1004+ err := rd .c .RunE (ctx , option .WithNodes (rd .setup .workloadNode ), schemaChangeDriver .sourceInitCmd (rd .setup .src .name , rd .setup .src .gatewayNodes ))
1005+ require .NoError (rd .t , err , "failed to initialize schema change workload on source tenant" )
1006+
1007+ workloadMonitor .Go (func (ctx context.Context ) error {
1008+ err := rd .c .RunE (ctx , option .WithNodes (rd .setup .workloadNode ), schemaChangeDriver .sourceRunCmd (rd .setup .src .name , rd .setup .src .gatewayNodes ))
1009+ // The workload should only return an error if the roachtest driver cancels the
1010+ // ctx after the rd.additionalDuration has elapsed after the initial scan completes.
1011+ if err != nil && ctx .Err () == nil {
1012+ // Implies the workload context was not cancelled and the workload cmd returned a
1013+ // different error.
1014+ return errors .Wrapf (err , `schema change workload context was not cancelled. Error returned by workload cmd` )
1015+ }
1016+ return nil
1017+ })
1018+ }
1019+ }
1020+
9221021// checkParticipatingNodes asserts that multiple nodes in the source and dest cluster are
9231022// participating in the replication stream.
9241023//
@@ -1036,6 +1135,7 @@ func (rd *replicationDriver) main(ctx context.Context) {
10361135 rd .t .Status (fmt .Sprintf (`initial scan complete. run workload and repl. stream for another %s minutes` ,
10371136 rd .rs .additionalDuration ))
10381137
1138+ rd .maybeRunSchemaChangeWorkload (ctx , workloadMonitor )
10391139 rd .maybeRunReaderTenantWorkload (ctx , workloadMonitor )
10401140
10411141 select {
@@ -1051,6 +1151,7 @@ func (rd *replicationDriver) main(ctx context.Context) {
10511151 rd .t .L ().Printf (`roachtest context cancelled while waiting for workload duration to complete` )
10521152 return
10531153 }
1154+ rd .ensureStandbyPollerAdvances (ctx , ingestionJobID )
10541155
10551156 rd .checkParticipatingNodes (ctx , ingestionJobID )
10561157
@@ -1122,6 +1223,13 @@ func c2cRegisterWrapper(
11221223 clusterOps = append (clusterOps , spec .Geo ())
11231224 }
11241225
1226+ nativeLibs := []string {}
1227+ if sp .withReaderWorkload != nil {
1228+ // Read from standby tests also spin up the schema change workload which
1229+ // requires LibGEOS.
1230+ nativeLibs = registry .LibGEOS
1231+ }
1232+
11251233 r .Add (registry.TestSpec {
11261234 Name : sp .name ,
11271235 Owner : registry .OwnerDisasterRecovery ,
@@ -1134,6 +1242,10 @@ func c2cRegisterWrapper(
11341242 Suites : sp .suites ,
11351243 TestSelectionOptOutSuites : sp .suites ,
11361244 Run : run ,
1245+ // Read from standby tests also spin up the schema change workload which
1246+ // uses the workload binary.
1247+ RequiresDeprecatedWorkload : sp .withReaderWorkload != nil ,
1248+ NativeLibs : nativeLibs ,
11371249 })
11381250}
11391251
0 commit comments