@@ -1800,3 +1800,62 @@ func splitPrimaryKeyIndexSpan(
18001800 require .NoError (t , db .AdminSplit (ctx , pkStartKey , hlc .MaxTimestamp ))
18011801 require .NoError (t , db .AdminSplit (ctx , pkEndKey , hlc .MaxTimestamp ))
18021802}
1803+
1804+ func TestAlterExternalConnection (t * testing.T ) {
1805+ defer leaktest .AfterTest (t )()
1806+ skip .UnderDeadlock (t )
1807+ skip .UnderRace (t )
1808+ defer log .Scope (t ).Close (t )
1809+
1810+ ctx := context .Background ()
1811+ pollingInterval := 100 * time .Millisecond
1812+
1813+ var alreadyReplanned atomic.Int32
1814+
1815+ args := replicationtestutils .DefaultTenantStreamingClustersArgs
1816+ args .TestingKnobs = & sql.StreamingTestingKnobs {
1817+ ExternalConnectionPollingInterval : & pollingInterval ,
1818+ AfterRetryIteration : func (err error ) {
1819+ alreadyReplanned .Add (1 )
1820+ },
1821+ }
1822+
1823+ c , cleanup := replicationtestutils .CreateTenantStreamingClusters (ctx , t , args )
1824+ defer cleanup ()
1825+
1826+ externalConnection := "replication-source-addr"
1827+ ogConnection := c .SrcURL .String ()
1828+ c .DestSysSQL .Exec (c .T , fmt .Sprintf (`CREATE EXTERNAL CONNECTION "%s" AS "%s"` ,
1829+ externalConnection , ogConnection ))
1830+ c .DestSysSQL .Exec (c .T , c .BuildCreateTenantQuery (externalConnection ))
1831+ streamProducerJobID , ingestionJobID := replicationtestutils .GetStreamJobIds (c .T , ctx , c .DestSysSQL , c .Args .DestTenantName )
1832+
1833+ jobutils .WaitForJobToRun (c .T , c .SrcSysSQL , jobspb .JobID (streamProducerJobID ))
1834+ jobutils .WaitForJobToRun (c .T , c .DestSysSQL , jobspb .JobID (ingestionJobID ))
1835+ c .WaitUntilStartTimeReached (jobspb .JobID (ingestionJobID ))
1836+
1837+ // Alter the external connection to break the stream, to test that pcr watches for new connection changes
1838+ c .SrcSysSQL .Exec (t , fmt .Sprintf ("CREATE USER %s" , username .TestUser ))
1839+ srcAppURL , cleanupSinkCert := pgurlutils .PGUrl (t , c .SrcSysServer .AdvSQLAddr (), t .Name (), url .User (username .TestUser ))
1840+ defer cleanupSinkCert ()
1841+
1842+ beforeReplanCount := alreadyReplanned .Load ()
1843+ c .DestSysSQL .Exec (c .T , fmt .Sprintf (`ALTER EXTERNAL CONNECTION "%s" AS "%s"` ,
1844+ externalConnection , & srcAppURL ))
1845+
1846+ testutils .SucceedsSoon (t , func () error {
1847+ if alreadyReplanned .Load () <= beforeReplanCount + 2 {
1848+ return errors .New ("not yet replanned twice" )
1849+ }
1850+ return nil
1851+ })
1852+
1853+ // Alter the external connection to fix the stream, and ensure replication resumes
1854+ c .DestSysSQL .Exec (c .T , fmt .Sprintf (`ALTER EXTERNAL CONNECTION "%s" AS "%s"` ,
1855+ externalConnection , ogConnection ))
1856+ c .DestSysSQL .Exec (c .T , fmt .Sprintf ("RESUME JOB %d" , ingestionJobID ))
1857+ jobutils .WaitForJobToRun (c .T , c .DestSysSQL , jobspb .JobID (ingestionJobID ))
1858+ // ensure the stream advances
1859+ srcTime := c .SrcCluster .Server (0 ).Clock ().Now ()
1860+ c .WaitUntilReplicatedTime (srcTime , jobspb .JobID (ingestionJobID ))
1861+ }
0 commit comments