Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"unsafe"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
Expand Down Expand Up @@ -461,13 +460,9 @@ func (dsp *DistSQLPlanner) setupFlows(
if len(statementSQL) > setupFlowRequestStmtMaxLength {
statementSQL = statementSQL[:setupFlowRequestStmtMaxLength]
}
v := execversion.V25_2
if dsp.st.Version.IsActive(ctx, clusterversion.V25_4) {
v = execversion.V25_4
}
setupReq := execinfrapb.SetupFlowRequest{
LeafTxnInputState: leafInputState,
Version: v,
Version: execversion.V25_4,
TraceKV: recv.tracing.KVTracingEnabled(),
CollectStats: planCtx.collectExecStats,
StatementSQL: statementSQL,
Expand Down
8 changes: 2 additions & 6 deletions pkg/sql/execversion/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,13 @@ import (
// consulting the cluster version.
type V uint32

// V25_2 is the exec version of all binaries of 25.2 cockroach versions. It can
// only be used by the flows once the cluster has upgraded to 25.2.
const V25_2 = V(73)

// V25_4 is the exec version of all binaries of 25.4 cockroach versions. It can
// V25_4 is the exec version of all binaries of 25.4+ cockroach versions. It can
// only be used by the flows once the cluster has upgraded to 25.4.
const V25_4 = V(74)

// MinAccepted is the oldest version that the server is compatible with. A
// server will not accept flows with older versions.
const MinAccepted = V25_2
const MinAccepted = V25_4

// Latest is the latest exec version supported by this binary.
const Latest = V25_4
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/ttl/ttljob/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ go_library(
"//pkg/sql/catalog/descs",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/execversion",
"//pkg/sql/isql",
"//pkg/sql/lexbase",
"//pkg/sql/pgwire/pgcode",
Expand Down
164 changes: 1 addition & 163 deletions pkg/sql/ttl/ttljob/ttljob_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,10 @@ import (
"bytes"
"context"
"math"
"math/rand"
"runtime"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -25,7 +23,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execversion"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/lexbase"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
Expand Down Expand Up @@ -87,40 +84,6 @@ type ttlProgressUpdater interface {
FinalizeProgress(ctx context.Context, output execinfra.RowReceiver) error
}

// directJobProgressUpdater handles TTL progress updates by writing directly to
// the jobs table from this processor. This is the legacy model and exists to
// support mixed-version scenarios.
//
// This can be removed once version 25.4 is the minimum supported version.
type directJobProgressUpdater struct {
// proc references the running TTL processor.
proc *ttlProcessor

// updateEvery is the number of spans that must be processed before triggering a progress update.
updateEvery int64

// updateEveryDuration is the minimum amount of time that must pass between progress updates.
updateEveryDuration time.Duration

// lastUpdated records the time of the last progress update.
lastUpdated time.Time

// totalSpanCount is the total number of spans assigned to this processor.
totalSpanCount int64

// rowsProcessed is the cumulative number of rows deleted.
rowsProcessed atomic.Int64

// rowsProcessedSinceLastUpdate is the number of rows deleted since the last progress update.
rowsProcessedSinceLastUpdate atomic.Int64

// spansProcessed is the cumulative number of spans processed.
spansProcessed atomic.Int64

// spansProcessedSinceLastUpdate is the number of spans processed since the last progress update.
spansProcessedSinceLastUpdate atomic.Int64
}

// coordinatorStreamUpdater handles TTL progress updates by flowing the
// information back to the coordinator. The coordinator is then responsible for
// writing that back to the jobs table.
Expand Down Expand Up @@ -155,17 +118,6 @@ func (t *ttlProcessor) Start(context.Context) {}
// Run implements the execinfra.Processor interface.
func (t *ttlProcessor) Run(ctx context.Context, output execinfra.RowReceiver) {
ctx = t.StartInternal(ctx, "ttl")
v := execversion.FromContext(ctx)
// TTL processors support two progress update models. The legacy model (used in V25_2)
// has each processor write progress directly to the job table. The newer model flows
// progress metadata back to the coordinator, which handles the job table updates centrally.
// The selected behavior is gated on the active cluster version.
// TODO(spilchen): remove directJobProgerssUpdater once 25.4 is the minimum supported version.
if v == execversion.V25_2 {
t.progressUpdater = &directJobProgressUpdater{proc: t}
} else {
t.progressUpdater = &coordinatorStreamUpdater{proc: t, progressLogger: log.Every(1 * time.Minute)}
}
err := t.work(ctx, output)
if err != nil {
output.Push(nil, &execinfrapb.ProducerMetadata{Err: err})
Expand Down Expand Up @@ -538,6 +490,7 @@ func newTTLProcessor(
); err != nil {
return nil, err
}
ttlProcessor.progressUpdater = &coordinatorStreamUpdater{proc: ttlProcessor, progressLogger: log.Every(1 * time.Minute)}
return ttlProcessor, nil
}

Expand Down Expand Up @@ -605,121 +558,6 @@ func (c *coordinatorStreamUpdater) FinalizeProgress(
return c.UpdateProgress(ctx, output)
}

// InitProgress implements the ttlProgressUpdater interface.
func (d *directJobProgressUpdater) InitProgress(totalSpanCount int64) {
// Update progress for approximately every 1% of spans processed, at least
// 60 seconds apart with jitter.
d.totalSpanCount = totalSpanCount
d.updateEvery = max(1, totalSpanCount/100)
d.updateEveryDuration = 60*time.Second + time.Duration(rand.Int63n(10*1000))*time.Millisecond
d.lastUpdated = timeutil.Now()
}

// OnSpanProcessed implements the ttlProgressUpdater interface.
func (d *directJobProgressUpdater) OnSpanProcessed(span roachpb.Span, deletedRowCount int64) {
d.rowsProcessed.Add(deletedRowCount)
d.rowsProcessedSinceLastUpdate.Add(deletedRowCount)
d.spansProcessed.Add(1)
d.spansProcessedSinceLastUpdate.Add(1)
}

func (d *directJobProgressUpdater) updateFractionCompleted(ctx context.Context) error {
jobID := d.proc.ttlSpec.JobID
d.lastUpdated = timeutil.Now()
spansToAdd := d.spansProcessedSinceLastUpdate.Swap(0)
rowsToAdd := d.rowsProcessedSinceLastUpdate.Swap(0)

var deletedRowCount, processedSpanCount, totalSpanCount int64
var fractionCompleted float32

err := d.proc.FlowCtx.Cfg.JobRegistry.UpdateJobWithTxn(
ctx,
jobID,
nil, /* txn */
func(_ isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
progress := md.Progress
rowLevelTTL := progress.Details.(*jobspb.Progress_RowLevelTTL).RowLevelTTL
rowLevelTTL.JobProcessedSpanCount += spansToAdd
rowLevelTTL.JobDeletedRowCount += rowsToAdd
deletedRowCount = rowLevelTTL.JobDeletedRowCount
processedSpanCount = rowLevelTTL.JobProcessedSpanCount
totalSpanCount = rowLevelTTL.JobTotalSpanCount

fractionCompleted = float32(rowLevelTTL.JobProcessedSpanCount) / float32(rowLevelTTL.JobTotalSpanCount)
progress.Progress = &jobspb.Progress_FractionCompleted{
FractionCompleted: fractionCompleted,
}

ju.UpdateProgress(progress)
return nil
},
)
if err != nil {
return err
}
log.Dev.Infof(
ctx,
"TTL fractionCompleted updated processorID=%d tableID=%d deletedRowCount=%d processedSpanCount=%d totalSpanCount=%d fractionCompleted=%.3f",
d.proc.ProcessorID, d.proc.ttlSpec.RowLevelTTLDetails.TableID, deletedRowCount, processedSpanCount, totalSpanCount, fractionCompleted,
)
return nil
}

// UpdateProgress implements the ttlProgressUpdater interface.
func (d *directJobProgressUpdater) UpdateProgress(
ctx context.Context, _ execinfra.RowReceiver,
) error {
if d.spansProcessedSinceLastUpdate.Load() >= d.updateEvery &&
timeutil.Since(d.lastUpdated) >= d.updateEveryDuration {
if err := d.updateFractionCompleted(ctx); err != nil {
return err
}
}
return nil
}

// FinalizeProgress implements the ttlProgressUpdater interface.
func (d *directJobProgressUpdater) FinalizeProgress(
ctx context.Context, _ execinfra.RowReceiver,
) error {
if err := d.updateFractionCompleted(ctx); err != nil {
return err
}

sqlInstanceID := d.proc.FlowCtx.NodeID.SQLInstanceID()
jobID := d.proc.ttlSpec.JobID
return d.proc.FlowCtx.Cfg.JobRegistry.UpdateJobWithTxn(
ctx,
jobID,
nil, /* txn */
func(_ isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
progress := md.Progress
rowLevelTTL := progress.Details.(*jobspb.Progress_RowLevelTTL).RowLevelTTL
processorID := d.proc.ProcessorID
rowLevelTTL.ProcessorProgresses = append(rowLevelTTL.ProcessorProgresses, jobspb.RowLevelTTLProcessorProgress{
ProcessorID: processorID,
SQLInstanceID: sqlInstanceID,
DeletedRowCount: d.rowsProcessed.Load(),
ProcessedSpanCount: d.spansProcessed.Load(),
ProcessorConcurrency: d.proc.processorConcurrency,
})
var fractionCompleted float32
if f, ok := progress.Progress.(*jobspb.Progress_FractionCompleted); ok {
fractionCompleted = f.FractionCompleted
}
ju.UpdateProgress(progress)
log.Dev.VInfof(
ctx,
2, /* level */
"TTL processorRowCount updated processorID=%d sqlInstanceID=%d tableID=%d jobRowCount=%d processorRowCount=%d fractionCompleted=%.3f",
processorID, sqlInstanceID, d.proc.ttlSpec.RowLevelTTLDetails.TableID, rowLevelTTL.JobDeletedRowCount,
d.rowsProcessed.Load(), fractionCompleted,
)
return nil
},
)
}

func init() {
rowexec.NewTTLProcessor = newTTLProcessor
}
Loading