@@ -86,12 +86,6 @@ type streamIngestionFrontier struct {
8686 replicatedTimeAtLastPositiveLagNodeCheck hlc.Timestamp
8787
8888 rangeStats replicationutils.AggregateRangeStatsCollector
89-
90- // This stores the last aggregate stats we computed. Because stats are only
91- // updated on a checkpoint event, the stats will be stale until the next
92- // checkpoint and should not be used to update job statuses. Only on a fresh
93- // checkpoint should we update job statuses.
94- lastAggStats streampb.StreamEvent_RangeStats
9589}
9690
9791var _ execinfra.Processor = & streamIngestionFrontier {}
@@ -345,7 +339,7 @@ func (sf *streamIngestionFrontier) maybeUpdateProgress() error {
345339 sf .lastPartitionUpdate = timeutil .Now ()
346340 log .Dev .VInfof (ctx , 2 , "persisting replicated time of %s" , replicatedTime )
347341
348- statusByStats := sf .aggregateAndUpdateRangeMetrics ()
342+ sf .aggregateAndUpdateRangeMetrics ()
349343
350344 if err := registry .UpdateJobWithTxn (ctx , jobID , nil /* txn */ , func (
351345 txn isql.Txn , md jobs.JobMetadata , ju * jobs.JobUpdater ,
@@ -361,8 +355,6 @@ func (sf *streamIngestionFrontier) maybeUpdateProgress() error {
361355 if replicatedTime .IsSet () && streamProgress .ReplicationStatus == jobspb .InitialScan {
362356 streamProgress .ReplicationStatus = jobspb .Replicating
363357 md .Progress .StatusMessage = streamProgress .ReplicationStatus .String ()
364- } else if statusByStats != "" {
365- md .Progress .StatusMessage = statusByStats
366358 }
367359
368360 // Keep the recorded replicatedTime empty until some advancement has been made
@@ -447,24 +439,13 @@ func (sf *streamIngestionFrontier) maybeCollectRangeStats(
447439}
448440
449441// aggregateAndUpdateRangeMetrics aggregates the range stats collected from each
450- // of the ingestion processors and updates the corresponding metrics. If the
451- // stats have changed since the last aggregation, it returns a status message
452- // to update the job status with. We do this to avoid overwriting job statuses
453- // with stale stats as the stats will be the same until the next checkpoint
454- // event.
455- func (sf * streamIngestionFrontier ) aggregateAndUpdateRangeMetrics () string {
456- aggRangeStats , _ , statusMsg := sf .rangeStats .RollupStats ()
442+ // of the ingestion processors and updates the corresponding metrics.
443+ func (sf * streamIngestionFrontier ) aggregateAndUpdateRangeMetrics () {
444+ aggRangeStats , _ , _ := sf .rangeStats .RollupStats ()
457445 if aggRangeStats .RangeCount != 0 {
458446 sf .metrics .ScanningRanges .Update (aggRangeStats .ScanningRangeCount )
459447 sf .metrics .CatchupRanges .Update (aggRangeStats .LaggingRangeCount )
460448 }
461- if sf .lastAggStats == aggRangeStats {
462- // This is the same stats as last time, so we don't need to update the job
463- // status.
464- return ""
465- }
466- sf .lastAggStats = aggRangeStats
467- return statusMsg
468449}
469450
470451// maybePersistFrontierEntries periodically persists the current state of the
0 commit comments