Skip to content

Commit 602001f

Browse files
authored
Merge pull request #156812 from kev-cao/blathers/backport-release-25.4-156003
release-25.4: physical: fix blocked span reconciliation jobs after stream of system
2 parents a79421e + 1d74cf9 commit 602001f

File tree

3 files changed

+96
-16
lines changed

3 files changed

+96
-16
lines changed

pkg/crosscluster/physical/stream_ingestion_job_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"net"
1313
"net/url"
1414
"testing"
15+
"time"
1516

1617
"github.com/cockroachdb/cockroach/pkg/base"
1718
_ "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl"
@@ -742,3 +743,69 @@ func TestPhysicalReplicationGatewayRoute(t *testing.T) {
742743
progress := jobutils.GetJobProgress(t, systemDB, jobspb.JobID(jobID))
743744
require.Empty(t, progress.Details.(*jobspb.Progress_StreamIngest).StreamIngest.PartitionConnUris)
744745
}
746+
747+
func TestPhysicalReplicationCancelsProducerOnCutoverFromSystem(t *testing.T) {
748+
defer leaktest.AfterTest(t)()
749+
defer log.Scope(t).Close(t)
750+
751+
// This test verifies that the span reconciliation job on a promoted standby
752+
// tenant that was streaming from a system tenant is able to make progress.
753+
// See https://github.com/cockroachdb/cockroach/issues/155444 for more
754+
// details.
755+
ctx := context.Background()
756+
args := replicationtestutils.DefaultTenantStreamingClustersArgs
757+
args.MultitenantSingleClusterNumNodes = 1
758+
args.SrcTenantID = roachpb.SystemTenantID
759+
args.SrcTenantName = "system"
760+
761+
c, cleanup := replicationtestutils.CreateMultiTenantStreamingCluster(ctx, t, args)
762+
defer cleanup()
763+
764+
producerJobID, consumerJobID := c.StartStreamReplication(ctx)
765+
jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))
766+
jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(consumerJobID))
767+
768+
replicationtestutils.WaitUntilStartTimeReached(t, c.DestSysSQL, jobspb.JobID(consumerJobID))
769+
srcTime := c.SrcCluster.Server(0).Clock().Now()
770+
c.Cutover(ctx, producerJobID, consumerJobID, srcTime.GoTime(), false /* async */)
771+
destCleanup := c.StartDestTenant(ctx, nil /* withTestingKnobs */, 0 /* server */)
772+
defer destCleanup()
773+
774+
// We speed up the span config reconciliation job manager so that it will
775+
// quickly detect the missing reconciliation job on the tenant (the previous
776+
// one was canceled as it was replicated) and spin up a new one.
777+
c.DestSysSQL.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING spanconfig.reconciliation_job.check_interval = '10ms'")
778+
779+
registry := c.DestTenantServer.JobRegistry().(*jobs.Registry)
780+
// We nudge the adoption queue to pick up the replicated stream producer job
781+
// so that it will be dropped.
782+
registry.TestingNudgeAdoptionQueue()
783+
784+
now := c.DestSysServer.Clock().Now()
785+
testutils.SucceedsWithin(t, func() error {
786+
var spanConfigJobID jobspb.JobID
787+
rows := c.DestTenantSQL.Query(
788+
t, `SELECT id FROM system.jobs WHERE job_type = 'AUTO SPAN CONFIG RECONCILIATION' AND status = 'running'`,
789+
)
790+
defer rows.Close()
791+
if !rows.Next() {
792+
return errors.New("no running span config reconciliation job found")
793+
}
794+
if err := rows.Scan(&spanConfigJobID); err != nil {
795+
return err
796+
}
797+
spanConfigJob, err := registry.LoadJob(ctx, spanConfigJobID)
798+
require.NoError(t, err)
799+
spanConfigProg := spanConfigJob.Progress().
800+
Details.(*jobspb.Progress_AutoSpanConfigReconciliation).
801+
AutoSpanConfigReconciliation
802+
803+
if spanConfigProg.Checkpoint.Less(now) {
804+
return errors.Newf(
805+
"waiting for span config reconciliation job to checkpoint past %v, at %v",
806+
now, spanConfigProg.Checkpoint,
807+
)
808+
}
809+
return nil
810+
}, 2*time.Minute)
811+
}

pkg/crosscluster/producer/producer_job.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,19 @@ func (p *producerJobResumer) Resume(ctx context.Context, execCtx interface{}) er
133133
jobExec := execCtx.(sql.JobExecContext)
134134
execCfg := jobExec.ExecCfg()
135135

136+
// If the source tenant of a PCR stream is the system tenant, then the
137+
// producer job will also be copied to the destination tenant, along with the
138+
// associated PTS. After cutover, this blocks span config reconciliation since
139+
// a tenant cannot lay a PTS on spans that another tenant owns. The producer
140+
// job of course is not needed on the destination tenant, so we check for a
141+
// mismatched cluster ID here and fast-exit the job.
142+
if p.job.Payload().CreationClusterID != execCfg.NodeInfo.LogicalClusterID() {
143+
return jobs.MarkAsPermanentJobError(errors.Newf(
144+
"replication stream %d belongs to cluster %s, cannot resume on cluster %s",
145+
p.job.ID(), p.job.Payload().CreationClusterID, execCfg.NodeInfo.LogicalClusterID(),
146+
))
147+
}
148+
136149
// Fire the timer immediately to start an initial progress check
137150
p.timer.Reset(0)
138151
for {

pkg/crosscluster/replicationtestutils/testutils.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,12 @@ type TenantStreamingClusters struct {
131131
SrcURL url.URL
132132
SrcCleanup func()
133133

134-
DestCluster *testcluster.TestCluster
135-
DestSysServer serverutils.ApplicationLayerInterface
136-
DestSysSQL *sqlutils.SQLRunner
137-
DestTenantConn *gosql.DB
138-
DestTenantSQL *sqlutils.SQLRunner
134+
DestCluster *testcluster.TestCluster
135+
DestSysServer serverutils.ApplicationLayerInterface
136+
DestSysSQL *sqlutils.SQLRunner
137+
DestTenantConn *gosql.DB
138+
DestTenantSQL *sqlutils.SQLRunner
139+
DestTenantServer serverutils.ApplicationLayerInterface
139140

140141
ReaderTenantSQL *sqlutils.SQLRunner
141142

@@ -186,19 +187,18 @@ func (c *TenantStreamingClusters) init(ctx context.Context) {
186187
func (c *TenantStreamingClusters) StartDestTenant(
187188
ctx context.Context, withTestingKnobs *base.TestingKnobs, server int,
188189
) func() {
190+
var err error
191+
var testKnobs base.TestingKnobs
189192
if withTestingKnobs != nil {
190-
var err error
191-
_, c.DestTenantConn, err = c.DestCluster.Server(server).TenantController().StartSharedProcessTenant(ctx, base.TestSharedProcessTenantArgs{
192-
TenantID: c.Args.DestTenantID,
193-
TenantName: c.Args.DestTenantName,
194-
Knobs: *withTestingKnobs,
195-
UseDatabase: "defaultdb",
196-
})
197-
require.NoError(c.T, err)
198-
} else {
199-
c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 START SERVICE SHARED`, c.Args.DestTenantName)
200-
c.DestTenantConn = c.DestCluster.Server(server).SystemLayer().SQLConn(c.T, serverutils.DBName("cluster:"+string(c.Args.DestTenantName)+"/defaultdb"))
193+
testKnobs = *withTestingKnobs
201194
}
195+
c.DestTenantServer, c.DestTenantConn, err = c.DestCluster.Server(server).TenantController().StartSharedProcessTenant(ctx, base.TestSharedProcessTenantArgs{
196+
TenantID: c.Args.DestTenantID,
197+
TenantName: c.Args.DestTenantName,
198+
Knobs: testKnobs,
199+
UseDatabase: "defaultdb",
200+
})
201+
require.NoError(c.T, err)
202202

203203
c.DestTenantSQL = sqlutils.MakeSQLRunner(c.DestTenantConn)
204204
testutils.SucceedsSoon(c.T, func() error {

0 commit comments

Comments
 (0)