Skip to content

Commit 02bdcbc

Browse files
committed
logical: add initial/catchup scan metrics to offline scan
This patch teaches the LDR offline scan processor to emit checkpoint range stats and update the `logical_replication.scanning_ranges` and `logical_replication.catchup_ranges` metrics. Informs: #152273 Release note: LDR now updates the `logical_replication.scanning_ranges` and `logical_replication.catchup_ranges` metrics during fast initial scan.
1 parent 321213f commit 02bdcbc

File tree

2 files changed

+53
-15
lines changed

2 files changed

+53
-15
lines changed

pkg/crosscluster/logical/logical_replication_job.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -456,9 +456,11 @@ type logicalReplicationPlanner struct {
456456
}
457457

458458
type logicalReplicationPlanInfo struct {
459-
sourceSpans []roachpb.Span
460-
partitionPgUrls []string
461-
destTableBySrcID map[descpb.ID]dstTableMetadata
459+
sourceSpans []roachpb.Span
460+
partitionPgUrls []string
461+
destTableBySrcID map[descpb.ID]dstTableMetadata
462+
// Number of processors writing data on the destination cluster (offline or
463+
// otherwise).
462464
writeProcessorCount int
463465
}
464466

@@ -738,6 +740,7 @@ func (p *logicalReplicationPlanner) planOfflineInitialScan(
738740
SQLInstanceID: instanceID,
739741
Core: execinfrapb.ProcessorCoreUnion{LogicalReplicationOfflineScan: &spec},
740742
})
743+
info.writeProcessorCount++
741744
}
742745
}
743746

pkg/crosscluster/logical/offline_initial_scan_processor.go

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/cockroachdb/cockroach/pkg/backup"
1515
"github.com/cockroachdb/cockroach/pkg/crosscluster"
16+
"github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils"
1617
"github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient"
1718
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1819
"github.com/cockroachdb/cockroach/pkg/kv/bulk"
@@ -65,6 +66,8 @@ type offlineInitialScanProcessor struct {
6566

6667
checkpointCh chan offlineCheckpoint
6768

69+
rangeStatsCh chan *streampb.StreamEvent_RangeStats
70+
6871
rekey *backup.KeyRewriter
6972

7073
batcher *bulk.SSTBatcher
@@ -104,6 +107,7 @@ func newNewOfflineInitialScanProcessor(
104107
processorID: processorID,
105108
stopCh: make(chan struct{}),
106109
checkpointCh: make(chan offlineCheckpoint),
110+
rangeStatsCh: make(chan *streampb.StreamEvent_RangeStats),
107111
errCh: make(chan error, 1),
108112
rekey: rekeyer,
109113
lastKeyAdded: roachpb.Key{},
@@ -220,6 +224,7 @@ func (o *offlineInitialScanProcessor) Start(ctx context.Context) {
220224
})
221225
o.workerGroup.GoCtx(func(ctx context.Context) error {
222226
defer close(o.checkpointCh)
227+
defer close(o.rangeStatsCh)
223228
pprof.Do(ctx, pprof.Labels("proc", fmt.Sprintf("%d", o.ProcessorID)), func(ctx context.Context) {
224229
for event := range o.subscription.Events() {
225230
if err := o.handleEvent(ctx, event); err != nil {
@@ -245,16 +250,8 @@ func (o *offlineInitialScanProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.P
245250
case checkpoint, ok := <-o.checkpointCh:
246251
switch {
247252
case !ok:
248-
select {
249-
case err := <-o.errCh:
250-
o.MoveToDrainingAndLogError(err)
251-
return nil, o.DrainHelper()
252-
case <-time.After(10 * time.Second):
253-
logcrash.ReportOrPanic(o.Ctx(), &o.FlowCtx.Cfg.Settings.SV,
254-
"event channel closed but no error found on err channel after 10 seconds")
255-
o.MoveToDrainingAndLogError(nil /* error */)
256-
return nil, o.DrainHelper()
257-
}
253+
o.MoveToDrainingAndLogError(o.waitForErr())
254+
return nil, o.DrainHelper()
258255
case checkpoint.afterInitialScanCompletion:
259256
// The previous checkpoint completed the initial scan and was already
260257
// ingested by the coordinator, so we can gracefully shut down the
@@ -273,6 +270,18 @@ func (o *offlineInitialScanProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.P
273270
}
274271
return row, nil
275272
}
273+
case stats, ok := <-o.rangeStatsCh:
274+
if !ok {
275+
o.MoveToDrainingAndLogError(o.waitForErr())
276+
return nil, o.DrainHelper()
277+
}
278+
279+
meta, err := replicationutils.StreamRangeStatsToProgressMeta(o.FlowCtx, o.ProcessorID, stats)
280+
if err != nil {
281+
o.MoveToDrainingAndLogError(err)
282+
return nil, o.DrainHelper()
283+
}
284+
return nil, meta
276285
case err := <-o.errCh:
277286
o.MoveToDrainingAndLogError(err)
278287
return nil, o.DrainHelper()
@@ -345,7 +354,7 @@ func (o *offlineInitialScanProcessor) handleEvent(
345354
return err
346355
}
347356
case crosscluster.CheckpointEvent:
348-
if err := o.checkpoint(ctx, event.GetCheckpoint().ResolvedSpans); err != nil {
357+
if err := o.checkpoint(ctx, event.GetCheckpoint()); err != nil {
349358
return err
350359
}
351360
case crosscluster.SSTableEvent, crosscluster.DeleteRangeEvent:
@@ -358,9 +367,26 @@ func (o *offlineInitialScanProcessor) handleEvent(
358367
return nil
359368
}
360369

370+
// waitForErr waits for an error to be sent on the error channel and returns the
371+
// error if one is found within the timeout.
372+
func (o *offlineInitialScanProcessor) waitForErr() error {
373+
select {
374+
case err := <-o.errCh:
375+
return err
376+
case <-time.After(10 * time.Second):
377+
logcrash.ReportOrPanic(o.Ctx(), &o.FlowCtx.Cfg.Settings.SV,
378+
"event channel closed but no error found on err channel after 10 seconds")
379+
return nil
380+
}
381+
}
382+
361383
func (o *offlineInitialScanProcessor) checkpoint(
362-
ctx context.Context, resolvedSpans []jobspb.ResolvedSpan,
384+
ctx context.Context, checkpoint *streampb.StreamEvent_StreamCheckpoint,
363385
) error {
386+
if checkpoint == nil {
387+
return errors.New("nil checkpoint event")
388+
}
389+
resolvedSpans := checkpoint.ResolvedSpans
364390
if resolvedSpans == nil {
365391
return errors.New("checkpoint event expected to have resolved spans")
366392
}
@@ -406,6 +432,15 @@ func (o *offlineInitialScanProcessor) checkpoint(
406432
// shutdown the processor.
407433
o.initialScanCompleted = true
408434
}
435+
436+
if checkpoint.RangeStats != nil {
437+
select {
438+
case o.rangeStatsCh <- checkpoint.RangeStats:
439+
case <-o.stopCh:
440+
case <-ctx.Done():
441+
return ctx.Err()
442+
}
443+
}
409444
return nil
410445
}
411446

0 commit comments

Comments
 (0)