Skip to content

Commit 1d74cf9

Browse files
committed
physical: fix blocked span reconciliation jobs after stream of system
If PCR streams from a system tenant, then the span config reconciliation job on the destination tenant after cutover will repeatedly fail. This is due to the fact that the producer job on the source system tenant is also replicated to the destination tenant, along with its corresponding PTS. As the PTS targets specifically the system tenant, the span reconciliation job on the destination tenant is unable to validate the span configuration as a non-system tenant is attempting to apply a configuration on another tenant. Fixes: #155444 Release note: Span config reconciliation jobs no longer fail on the destination after cutover from a PCR stream of a system tenant.
1 parent 5c3e649 commit 1d74cf9

File tree

3 files changed

+96
-16
lines changed

3 files changed

+96
-16
lines changed

pkg/crosscluster/physical/stream_ingestion_job_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"net"
1313
"net/url"
1414
"testing"
15+
"time"
1516

1617
"github.com/cockroachdb/cockroach/pkg/base"
1718
_ "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl"
@@ -742,3 +743,69 @@ func TestPhysicalReplicationGatewayRoute(t *testing.T) {
742743
progress := jobutils.GetJobProgress(t, systemDB, jobspb.JobID(jobID))
743744
require.Empty(t, progress.Details.(*jobspb.Progress_StreamIngest).StreamIngest.PartitionConnUris)
744745
}
746+
747+
func TestPhysicalReplicationCancelsProducerOnCutoverFromSystem(t *testing.T) {
748+
defer leaktest.AfterTest(t)()
749+
defer log.Scope(t).Close(t)
750+
751+
// This test verifies that the span reconciliation job on a promoted standby
752+
// tenant that was streaming from a system tenant is able to make progress.
753+
// See https://github.com/cockroachdb/cockroach/issues/155444 for more
754+
// details.
755+
ctx := context.Background()
756+
args := replicationtestutils.DefaultTenantStreamingClustersArgs
757+
args.MultitenantSingleClusterNumNodes = 1
758+
args.SrcTenantID = roachpb.SystemTenantID
759+
args.SrcTenantName = "system"
760+
761+
c, cleanup := replicationtestutils.CreateMultiTenantStreamingCluster(ctx, t, args)
762+
defer cleanup()
763+
764+
producerJobID, consumerJobID := c.StartStreamReplication(ctx)
765+
jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))
766+
jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(consumerJobID))
767+
768+
replicationtestutils.WaitUntilStartTimeReached(t, c.DestSysSQL, jobspb.JobID(consumerJobID))
769+
srcTime := c.SrcCluster.Server(0).Clock().Now()
770+
c.Cutover(ctx, producerJobID, consumerJobID, srcTime.GoTime(), false /* async */)
771+
destCleanup := c.StartDestTenant(ctx, nil /* withTestingKnobs */, 0 /* server */)
772+
defer destCleanup()
773+
774+
// We speed up the span config reconciliation job manager so that it will
775+
// quickly detect the missing reconciliation job on the tenant (the previous
776+
// one was canceled as it was replicated) and spin up a new one.
777+
c.DestSysSQL.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING spanconfig.reconciliation_job.check_interval = '10ms'")
778+
779+
registry := c.DestTenantServer.JobRegistry().(*jobs.Registry)
780+
// We nudge the adoption queue to pick up the replicated stream producer job
781+
// so that it will be dropped.
782+
registry.TestingNudgeAdoptionQueue()
783+
784+
now := c.DestSysServer.Clock().Now()
785+
testutils.SucceedsWithin(t, func() error {
786+
var spanConfigJobID jobspb.JobID
787+
rows := c.DestTenantSQL.Query(
788+
t, `SELECT id FROM system.jobs WHERE job_type = 'AUTO SPAN CONFIG RECONCILIATION' AND status = 'running'`,
789+
)
790+
defer rows.Close()
791+
if !rows.Next() {
792+
return errors.New("no running span config reconciliation job found")
793+
}
794+
if err := rows.Scan(&spanConfigJobID); err != nil {
795+
return err
796+
}
797+
spanConfigJob, err := registry.LoadJob(ctx, spanConfigJobID)
798+
require.NoError(t, err)
799+
spanConfigProg := spanConfigJob.Progress().
800+
Details.(*jobspb.Progress_AutoSpanConfigReconciliation).
801+
AutoSpanConfigReconciliation
802+
803+
if spanConfigProg.Checkpoint.Less(now) {
804+
return errors.Newf(
805+
"waiting for span config reconciliation job to checkpoint past %v, at %v",
806+
now, spanConfigProg.Checkpoint,
807+
)
808+
}
809+
return nil
810+
}, 2*time.Minute)
811+
}

pkg/crosscluster/producer/producer_job.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,19 @@ func (p *producerJobResumer) Resume(ctx context.Context, execCtx interface{}) er
133133
jobExec := execCtx.(sql.JobExecContext)
134134
execCfg := jobExec.ExecCfg()
135135

136+
// If the source tenant of a PCR stream is the system tenant, then the
137+
// producer job will also be copied to the destination tenant, along with the
138+
// associated PTS. After cutover, this blocks span config reconciliation since
139+
// a tenant cannot lay a PTS on spans that another tenant owns. The producer
140+
// job of course is not needed on the destination tenant, so we check for a
141+
// mismatched cluster ID here and fast-exit the job.
142+
if p.job.Payload().CreationClusterID != execCfg.NodeInfo.LogicalClusterID() {
143+
return jobs.MarkAsPermanentJobError(errors.Newf(
144+
"replication stream %d belongs to cluster %s, cannot resume on cluster %s",
145+
p.job.ID(), p.job.Payload().CreationClusterID, execCfg.NodeInfo.LogicalClusterID(),
146+
))
147+
}
148+
136149
// Fire the timer immediately to start an initial progress check
137150
p.timer.Reset(0)
138151
for {

pkg/crosscluster/replicationtestutils/testutils.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,12 @@ type TenantStreamingClusters struct {
131131
SrcURL url.URL
132132
SrcCleanup func()
133133

134-
DestCluster *testcluster.TestCluster
135-
DestSysServer serverutils.ApplicationLayerInterface
136-
DestSysSQL *sqlutils.SQLRunner
137-
DestTenantConn *gosql.DB
138-
DestTenantSQL *sqlutils.SQLRunner
134+
DestCluster *testcluster.TestCluster
135+
DestSysServer serverutils.ApplicationLayerInterface
136+
DestSysSQL *sqlutils.SQLRunner
137+
DestTenantConn *gosql.DB
138+
DestTenantSQL *sqlutils.SQLRunner
139+
DestTenantServer serverutils.ApplicationLayerInterface
139140

140141
ReaderTenantSQL *sqlutils.SQLRunner
141142

@@ -186,19 +187,18 @@ func (c *TenantStreamingClusters) init(ctx context.Context) {
186187
func (c *TenantStreamingClusters) StartDestTenant(
187188
ctx context.Context, withTestingKnobs *base.TestingKnobs, server int,
188189
) func() {
190+
var err error
191+
var testKnobs base.TestingKnobs
189192
if withTestingKnobs != nil {
190-
var err error
191-
_, c.DestTenantConn, err = c.DestCluster.Server(server).TenantController().StartSharedProcessTenant(ctx, base.TestSharedProcessTenantArgs{
192-
TenantID: c.Args.DestTenantID,
193-
TenantName: c.Args.DestTenantName,
194-
Knobs: *withTestingKnobs,
195-
UseDatabase: "defaultdb",
196-
})
197-
require.NoError(c.T, err)
198-
} else {
199-
c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 START SERVICE SHARED`, c.Args.DestTenantName)
200-
c.DestTenantConn = c.DestCluster.Server(server).SystemLayer().SQLConn(c.T, serverutils.DBName("cluster:"+string(c.Args.DestTenantName)+"/defaultdb"))
193+
testKnobs = *withTestingKnobs
201194
}
195+
c.DestTenantServer, c.DestTenantConn, err = c.DestCluster.Server(server).TenantController().StartSharedProcessTenant(ctx, base.TestSharedProcessTenantArgs{
196+
TenantID: c.Args.DestTenantID,
197+
TenantName: c.Args.DestTenantName,
198+
Knobs: testKnobs,
199+
UseDatabase: "defaultdb",
200+
})
201+
require.NoError(c.T, err)
202202

203203
c.DestTenantSQL = sqlutils.MakeSQLRunner(c.DestTenantConn)
204204
testutils.SucceedsSoon(c.T, func() error {

0 commit comments

Comments
 (0)