@@ -784,8 +784,7 @@ func (rd *replicationDriver) ensureStandbyPollerAdvances(ctx context.Context, in
784784
785785 info , err := getStreamIngestionJobInfo (rd .setup .dst .db , ingestionJobID )
786786 require .NoError (rd .t , err )
787- pcrReplicatedTime := info .GetHighWater ()
788- require .False (rd .t , pcrReplicatedTime .IsZero (), "PCR job has no replicated time" )
787+ initialPCRReplicatedTime := info .GetHighWater ()
789788
790789 // Connect to the reader tenant
791790 readerTenantName := fmt .Sprintf ("%s-readonly" , rd .setup .dst .name )
@@ -795,28 +794,28 @@ func (rd *replicationDriver) ensureStandbyPollerAdvances(ctx context.Context, in
795794
796795 // Poll the standby poller job until its high water timestamp matches the PCR job's replicated time
797796 testutils .SucceedsWithin (rd .t , func () error {
798- var standbyHighWaterStr string
797+ var standbyTimeStr string
799798 readerTenantSQL .QueryRow (rd .t ,
800799 `SELECT COALESCE(high_water_timestamp, '0')
801800 FROM crdb_internal.jobs
802- WHERE job_type = 'STANDBY READ TS POLLER'` ).Scan (& standbyHighWaterStr )
801+ WHERE job_type = 'STANDBY READ TS POLLER'` ).Scan (& standbyTimeStr )
803802
804- if standbyHighWaterStr == "0" {
803+ if standbyTimeStr == "0" {
805804 return errors .New ("standby poller job not found or has no high water timestamp" )
806805 }
807806
808- standbyHighWater := DecimalTimeToHLC (rd .t , standbyHighWaterStr )
809- standbyHighWaterTime := standbyHighWater .GoTime ()
807+ standbyHLC := DecimalTimeToHLC (rd .t , standbyTimeStr )
808+ standbyTime := standbyHLC .GoTime ()
810809
811- rd .t .L ().Printf ("Standby poller high water: %s; replicated time %s" , standbyHighWaterTime , pcrReplicatedTime )
810+ rd .t .L ().Printf ("Standby poller high water: %s; replicated time %s" , standbyTime , initialPCRReplicatedTime )
812811
813- if standbyHighWaterTime .Compare (pcrReplicatedTime ) >= 0 {
812+ if standbyTime .Compare (initialPCRReplicatedTime ) >= 0 {
814813 rd .t .L ().Printf ("Standby poller has advanced to PCR replicated time" )
815814 return nil
816815 }
817816
818817 return errors .Newf ("standby poller high water %s not yet at PCR replicated time %s" ,
819- standbyHighWaterTime , pcrReplicatedTime )
818+ standbyTime , initialPCRReplicatedTime )
820819 }, 5 * time .Minute )
821820}
822821
@@ -1019,6 +1018,61 @@ func (rd *replicationDriver) maybeRunSchemaChangeWorkload(
10191018 }
10201019}
10211020
1021+ // maybeRestartReaderTenantService restarts the reader tenant service if
1022+ // physical_cluster_replication.reader_system_table_id_offset was set, as the
1023+ // namespace cache needs to be rehydrated after the reader tenant ingests the
1024+ // priviledge table at a higher id.
1025+ func (rd * replicationDriver ) maybeRestartReaderTenantService (ctx context.Context ) {
1026+ if rd .rs .withReaderWorkload == nil {
1027+ // No reader tenant configured, nothing to do
1028+ return
1029+ }
1030+
1031+ // Check if the reader system table ID offset setting is configured
1032+ var offsetValue int
1033+ rd .setup .dst .sysSQL .QueryRow (rd .t , "SHOW CLUSTER SETTING physical_cluster_replication.reader_system_table_id_offset" ).Scan (& offsetValue )
1034+
1035+ if offsetValue == 0 {
1036+ rd .t .L ().Printf ("reader_system_table_id_offset not set, skipping reader tenant service restart" )
1037+ return
1038+ }
1039+ readerTenantName := fmt .Sprintf ("%s-readonly" , rd .setup .dst .name )
1040+
1041+ // Wait for the reader tenant to be in the correct data state and service mode before restarting.
1042+ testutils .SucceedsSoon (rd .t , func () error {
1043+ var dataState , serviceMode string
1044+ rd .setup .dst .sysSQL .QueryRow (rd .t , fmt .Sprintf ("SELECT data_state, service_mode FROM [SHOW TENANTS] WHERE name = '%s'" , readerTenantName )).Scan (& dataState , & serviceMode )
1045+ if dataState != "ready" {
1046+ return errors .Newf ("reader tenant %q data state is %q, expected 'ready'" , readerTenantName , dataState )
1047+ }
1048+ if serviceMode != "shared" {
1049+ return errors .Newf ("reader tenant %q service mode is %q, expected 'shared'" , readerTenantName , serviceMode )
1050+ }
1051+ return nil
1052+ })
1053+
1054+ rd .t .Status ("restarting reader tenant service" )
1055+
1056+ // Stop the reader tenant service
1057+ rd .setup .dst .sysSQL .Exec (rd .t , fmt .Sprintf ("ALTER VIRTUAL CLUSTER '%s' STOP SERVICE" , readerTenantName ))
1058+
1059+ // Wait for the service to fully stop
1060+ testutils .SucceedsSoon (rd .t , func () error {
1061+ // Try to connect to the reader tenant - if it fails, the service is stopped
1062+ conn := rd .c .Conn (ctx , rd .t .L (), rd .setup .dst .gatewayNodes [0 ], option .VirtualClusterName (readerTenantName ))
1063+ defer conn .Close ()
1064+ if err := conn .Ping (); err == nil {
1065+ return errors .Newf ("reader tenant %q still accepting connections" , readerTenantName )
1066+ }
1067+ return nil
1068+ })
1069+
1070+ // Start the service back up
1071+ rd .setup .dst .sysSQL .Exec (rd .t , fmt .Sprintf ("ALTER VIRTUAL CLUSTER '%s' START SERVICE SHARED" , readerTenantName ))
1072+
1073+ rd .t .L ().Printf ("successfully restarted reader tenant service" )
1074+ }
1075+
10221076// checkParticipatingNodes asserts that multiple nodes in the source and dest cluster are
10231077// participating in the replication stream.
10241078//
@@ -1136,6 +1190,7 @@ func (rd *replicationDriver) main(ctx context.Context) {
11361190 rd .t .Status (fmt .Sprintf (`initial scan complete. run workload and repl. stream for another %s minutes` ,
11371191 rd .rs .additionalDuration ))
11381192
1193+ rd .maybeRestartReaderTenantService (ctx )
11391194 rd .maybeRunSchemaChangeWorkload (ctx , workloadMonitor )
11401195 rd .maybeRunReaderTenantWorkload (ctx , workloadMonitor )
11411196
@@ -1153,7 +1208,6 @@ func (rd *replicationDriver) main(ctx context.Context) {
11531208 return
11541209 }
11551210 rd .ensureStandbyPollerAdvances (ctx , ingestionJobID )
1156-
11571211 rd .checkParticipatingNodes (ctx , ingestionJobID )
11581212
11591213 retainedTime := rd .getReplicationRetainedTime ()
0 commit comments