diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 93e4f4d8ec2b..b0e695788cbf 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -18,7 +18,6 @@ import ( "unsafe" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" @@ -461,13 +460,9 @@ func (dsp *DistSQLPlanner) setupFlows( if len(statementSQL) > setupFlowRequestStmtMaxLength { statementSQL = statementSQL[:setupFlowRequestStmtMaxLength] } - v := execversion.V25_2 - if dsp.st.Version.IsActive(ctx, clusterversion.V25_4) { - v = execversion.V25_4 - } setupReq := execinfrapb.SetupFlowRequest{ LeafTxnInputState: leafInputState, - Version: v, + Version: execversion.V25_4, TraceKV: recv.tracing.KVTracingEnabled(), CollectStats: planCtx.collectExecStats, StatementSQL: statementSQL, diff --git a/pkg/sql/execversion/version.go b/pkg/sql/execversion/version.go index efca23163404..4e086753bd87 100644 --- a/pkg/sql/execversion/version.go +++ b/pkg/sql/execversion/version.go @@ -17,17 +17,13 @@ import ( // consulting the cluster version. type V uint32 -// V25_2 is the exec version of all binaries of 25.2 cockroach versions. It can -// only be used by the flows once the cluster has upgraded to 25.2. -const V25_2 = V(73) - -// V25_4 is the exec version of all binaries of 25.4 cockroach versions. It can +// V25_4 is the exec version of all binaries of 25.4+ cockroach versions. It can // only be used by the flows once the cluster has upgraded to 25.4. const V25_4 = V(74) // MinAccepted is the oldest version that the server is compatible with. A // server will not accept flows with older versions. -const MinAccepted = V25_2 +const MinAccepted = V25_4 // Latest is the latest exec version supported by this binary. const Latest = V25_4 diff --git a/pkg/sql/ttl/ttljob/BUILD.bazel b/pkg/sql/ttl/ttljob/BUILD.bazel index a24cb1c830ae..517c4ed839fe 100644 --- a/pkg/sql/ttl/ttljob/BUILD.bazel +++ b/pkg/sql/ttl/ttljob/BUILD.bazel @@ -31,7 +31,6 @@ go_library( "//pkg/sql/catalog/descs", "//pkg/sql/execinfra", "//pkg/sql/execinfrapb", - "//pkg/sql/execversion", "//pkg/sql/isql", "//pkg/sql/lexbase", "//pkg/sql/pgwire/pgcode", diff --git a/pkg/sql/ttl/ttljob/ttljob_processor.go b/pkg/sql/ttl/ttljob/ttljob_processor.go index 8c0aa3357a30..3182dc24baee 100644 --- a/pkg/sql/ttl/ttljob/ttljob_processor.go +++ b/pkg/sql/ttl/ttljob/ttljob_processor.go @@ -9,12 +9,10 @@ import ( "bytes" "context" "math" - "math/rand" "runtime" "sync/atomic" "time" - "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -25,7 +23,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" - "github.com/cockroachdb/cockroach/pkg/sql/execversion" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/lexbase" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" @@ -87,40 +84,6 @@ type ttlProgressUpdater interface { FinalizeProgress(ctx context.Context, output execinfra.RowReceiver) error } -// directJobProgressUpdater handles TTL progress updates by writing directly to -// the jobs table from this processor. This is the legacy model and exists to -// support mixed-version scenarios. -// -// This can be removed once version 25.4 is the minimum supported version. -type directJobProgressUpdater struct { - // proc references the running TTL processor. - proc *ttlProcessor - - // updateEvery is the number of spans that must be processed before triggering a progress update. - updateEvery int64 - - // updateEveryDuration is the minimum amount of time that must pass between progress updates. - updateEveryDuration time.Duration - - // lastUpdated records the time of the last progress update. - lastUpdated time.Time - - // totalSpanCount is the total number of spans assigned to this processor. - totalSpanCount int64 - - // rowsProcessed is the cumulative number of rows deleted. - rowsProcessed atomic.Int64 - - // rowsProcessedSinceLastUpdate is the number of rows deleted since the last progress update. - rowsProcessedSinceLastUpdate atomic.Int64 - - // spansProcessed is the cumulative number of spans processed. - spansProcessed atomic.Int64 - - // spansProcessedSinceLastUpdate is the number of spans processed since the last progress update. - spansProcessedSinceLastUpdate atomic.Int64 -} - // coordinatorStreamUpdater handles TTL progress updates by flowing the // information back to the coordinator. The coordinator is then responsible for // writing that back to the jobs table. @@ -155,17 +118,6 @@ func (t *ttlProcessor) Start(context.Context) {} // Run implements the execinfra.Processor interface. func (t *ttlProcessor) Run(ctx context.Context, output execinfra.RowReceiver) { ctx = t.StartInternal(ctx, "ttl") - v := execversion.FromContext(ctx) - // TTL processors support two progress update models. The legacy model (used in V25_2) - // has each processor write progress directly to the job table. The newer model flows - // progress metadata back to the coordinator, which handles the job table updates centrally. - // The selected behavior is gated on the active cluster version. - // TODO(spilchen): remove directJobProgerssUpdater once 25.4 is the minimum supported version. - if v == execversion.V25_2 { - t.progressUpdater = &directJobProgressUpdater{proc: t} - } else { - t.progressUpdater = &coordinatorStreamUpdater{proc: t, progressLogger: log.Every(1 * time.Minute)} - } err := t.work(ctx, output) if err != nil { output.Push(nil, &execinfrapb.ProducerMetadata{Err: err}) @@ -538,6 +490,7 @@ func newTTLProcessor( ); err != nil { return nil, err } + ttlProcessor.progressUpdater = &coordinatorStreamUpdater{proc: ttlProcessor, progressLogger: log.Every(1 * time.Minute)} return ttlProcessor, nil } @@ -605,121 +558,6 @@ func (c *coordinatorStreamUpdater) FinalizeProgress( return c.UpdateProgress(ctx, output) } -// InitProgress implements the ttlProgressUpdater interface. -func (d *directJobProgressUpdater) InitProgress(totalSpanCount int64) { - // Update progress for approximately every 1% of spans processed, at least - // 60 seconds apart with jitter. - d.totalSpanCount = totalSpanCount - d.updateEvery = max(1, totalSpanCount/100) - d.updateEveryDuration = 60*time.Second + time.Duration(rand.Int63n(10*1000))*time.Millisecond - d.lastUpdated = timeutil.Now() -} - -// OnSpanProcessed implements the ttlProgressUpdater interface. -func (d *directJobProgressUpdater) OnSpanProcessed(span roachpb.Span, deletedRowCount int64) { - d.rowsProcessed.Add(deletedRowCount) - d.rowsProcessedSinceLastUpdate.Add(deletedRowCount) - d.spansProcessed.Add(1) - d.spansProcessedSinceLastUpdate.Add(1) -} - -func (d *directJobProgressUpdater) updateFractionCompleted(ctx context.Context) error { - jobID := d.proc.ttlSpec.JobID - d.lastUpdated = timeutil.Now() - spansToAdd := d.spansProcessedSinceLastUpdate.Swap(0) - rowsToAdd := d.rowsProcessedSinceLastUpdate.Swap(0) - - var deletedRowCount, processedSpanCount, totalSpanCount int64 - var fractionCompleted float32 - - err := d.proc.FlowCtx.Cfg.JobRegistry.UpdateJobWithTxn( - ctx, - jobID, - nil, /* txn */ - func(_ isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { - progress := md.Progress - rowLevelTTL := progress.Details.(*jobspb.Progress_RowLevelTTL).RowLevelTTL - rowLevelTTL.JobProcessedSpanCount += spansToAdd - rowLevelTTL.JobDeletedRowCount += rowsToAdd - deletedRowCount = rowLevelTTL.JobDeletedRowCount - processedSpanCount = rowLevelTTL.JobProcessedSpanCount - totalSpanCount = rowLevelTTL.JobTotalSpanCount - - fractionCompleted = float32(rowLevelTTL.JobProcessedSpanCount) / float32(rowLevelTTL.JobTotalSpanCount) - progress.Progress = &jobspb.Progress_FractionCompleted{ - FractionCompleted: fractionCompleted, - } - - ju.UpdateProgress(progress) - return nil - }, - ) - if err != nil { - return err - } - log.Dev.Infof( - ctx, - "TTL fractionCompleted updated processorID=%d tableID=%d deletedRowCount=%d processedSpanCount=%d totalSpanCount=%d fractionCompleted=%.3f", - d.proc.ProcessorID, d.proc.ttlSpec.RowLevelTTLDetails.TableID, deletedRowCount, processedSpanCount, totalSpanCount, fractionCompleted, - ) - return nil -} - -// UpdateProgress implements the ttlProgressUpdater interface. -func (d *directJobProgressUpdater) UpdateProgress( - ctx context.Context, _ execinfra.RowReceiver, -) error { - if d.spansProcessedSinceLastUpdate.Load() >= d.updateEvery && - timeutil.Since(d.lastUpdated) >= d.updateEveryDuration { - if err := d.updateFractionCompleted(ctx); err != nil { - return err - } - } - return nil -} - -// FinalizeProgress implements the ttlProgressUpdater interface. -func (d *directJobProgressUpdater) FinalizeProgress( - ctx context.Context, _ execinfra.RowReceiver, -) error { - if err := d.updateFractionCompleted(ctx); err != nil { - return err - } - - sqlInstanceID := d.proc.FlowCtx.NodeID.SQLInstanceID() - jobID := d.proc.ttlSpec.JobID - return d.proc.FlowCtx.Cfg.JobRegistry.UpdateJobWithTxn( - ctx, - jobID, - nil, /* txn */ - func(_ isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { - progress := md.Progress - rowLevelTTL := progress.Details.(*jobspb.Progress_RowLevelTTL).RowLevelTTL - processorID := d.proc.ProcessorID - rowLevelTTL.ProcessorProgresses = append(rowLevelTTL.ProcessorProgresses, jobspb.RowLevelTTLProcessorProgress{ - ProcessorID: processorID, - SQLInstanceID: sqlInstanceID, - DeletedRowCount: d.rowsProcessed.Load(), - ProcessedSpanCount: d.spansProcessed.Load(), - ProcessorConcurrency: d.proc.processorConcurrency, - }) - var fractionCompleted float32 - if f, ok := progress.Progress.(*jobspb.Progress_FractionCompleted); ok { - fractionCompleted = f.FractionCompleted - } - ju.UpdateProgress(progress) - log.Dev.VInfof( - ctx, - 2, /* level */ - "TTL processorRowCount updated processorID=%d sqlInstanceID=%d tableID=%d jobRowCount=%d processorRowCount=%d fractionCompleted=%.3f", - processorID, sqlInstanceID, d.proc.ttlSpec.RowLevelTTLDetails.TableID, rowLevelTTL.JobDeletedRowCount, - d.rowsProcessed.Load(), fractionCompleted, - ) - return nil - }, - ) -} - func init() { rowexec.NewTTLProcessor = newTTLProcessor }