Skip to content

Commit b8adb07

Browse files
committed
sql: plumb StageID into table reader processors
This commit plumbs `StageID` from the general `ProcessorSpec` into table reader processors. Specifically, `StageID` is plumbed into `tableReader` for the row-based flow and `colBatchScanBase` (via `ColBatchScan` and `ColBatchDirectScan`) for the vectorized flow. Although `stageID` isn't used in these processors yet, it will be useful for aggregating row counts from metrics across distributed table readers for misestimate logging. Part of: #153748 Release note: None
1 parent 5761676 commit b8adb07

File tree

13 files changed

+50
-23
lines changed

13 files changed

+50
-23
lines changed

pkg/sql/colexec/colbuilder/execplan.go

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,7 @@ func (r opResult) createAndWrapRowSource(
566566
core *execinfrapb.ProcessorCoreUnion,
567567
post *execinfrapb.PostProcessSpec,
568568
processorID int32,
569+
stageID int32,
569570
factory coldata.ColumnFactory,
570571
causeToWrap error,
571572
) error {
@@ -592,7 +593,7 @@ func (r opResult) createAndWrapRowSource(
592593
// here because when wrapping the processor, the materializer will
593594
// be its output, and it will be set up in wrapRowSources.
594595
proc, err := args.ProcessorConstructor(
595-
ctx, flowCtx, processorID, core, post, inputs, args.LocalProcessors,
596+
ctx, flowCtx, processorID, stageID, core, post, inputs, args.LocalProcessors,
596597
)
597598
if err != nil {
598599
return nil, err
@@ -819,7 +820,7 @@ func NewColOperator(
819820
post = &newPosts[1]
820821
err = result.createAndWrapRowSource(
821822
ctx, flowCtx, args, inputs, inputTypes, core,
822-
wrappingPost, spec.ProcessorID, factory, err,
823+
wrappingPost, spec.ProcessorID, spec.StageID, factory, err,
823824
)
824825
} else {
825826
switch {
@@ -969,7 +970,8 @@ func NewColOperator(
969970
if canUseDirectScan() {
970971
scanOp, resultTypes, err = colfetcher.NewColBatchDirectScan(
971972
ctx, colmem.NewAllocator(ctx, accounts[0], factory), accounts[1],
972-
flowCtx, spec.ProcessorID, core.TableReader, post, args.TypeResolver,
973+
flowCtx, spec.ProcessorID, spec.StageID, core.TableReader, post,
974+
args.TypeResolver,
973975
)
974976
if err != nil {
975977
return r, err
@@ -979,7 +981,8 @@ func NewColOperator(
979981
if scanOp == nil {
980982
scanOp, resultTypes, err = colfetcher.NewColBatchScan(
981983
ctx, colmem.NewAllocator(ctx, accounts[0], factory), accounts[1],
982-
flowCtx, spec.ProcessorID, core.TableReader, post, estimatedRowCount, args.TypeResolver,
984+
flowCtx, spec.ProcessorID, spec.StageID, core.TableReader, post,
985+
estimatedRowCount, args.TypeResolver,
983986
)
984987
if err != nil {
985988
return r, err
@@ -1027,7 +1030,7 @@ func NewColOperator(
10271030
result.ColumnTypes = spec.Input[0].ColumnTypes
10281031
result.Root = inputs[0].Root
10291032
if err := result.planAndMaybeWrapFilter(
1030-
ctx, flowCtx, args, spec.ProcessorID, core.Filterer.Filter, factory,
1033+
ctx, flowCtx, args, spec.ProcessorID, spec.StageID, core.Filterer.Filter, factory,
10311034
); err != nil {
10321035
return r, err
10331036
}
@@ -1309,7 +1312,7 @@ func NewColOperator(
13091312

13101313
if !core.HashJoiner.OnExpr.Empty() && core.HashJoiner.Type == descpb.InnerJoin {
13111314
if err = result.planAndMaybeWrapFilter(
1312-
ctx, flowCtx, args, spec.ProcessorID, core.HashJoiner.OnExpr, factory,
1315+
ctx, flowCtx, args, spec.ProcessorID, spec.StageID, core.HashJoiner.OnExpr, factory,
13131316
); err != nil {
13141317
return r, err
13151318
}
@@ -1354,7 +1357,7 @@ func NewColOperator(
13541357

13551358
if onExpr != nil {
13561359
if err = result.planAndMaybeWrapFilter(
1357-
ctx, flowCtx, args, spec.ProcessorID, *onExpr, factory,
1360+
ctx, flowCtx, args, spec.ProcessorID, spec.StageID, *onExpr, factory,
13581361
); err != nil {
13591362
return r, err
13601363
}
@@ -1797,7 +1800,8 @@ func NewColOperator(
17971800
}
17981801
err = ppr.planPostProcessSpec(ctx, flowCtx, args, post, factory, &r.Releasables, args.Spec.EstimatedRowCount)
17991802
if err != nil {
1800-
err = result.wrapPostProcessSpec(ctx, flowCtx, args, post, spec.ProcessorID, factory, err)
1803+
err = result.wrapPostProcessSpec(ctx, flowCtx, args, post,
1804+
spec.ProcessorID, spec.StageID, factory, err)
18011805
} else {
18021806
// The result can be updated with the post process result.
18031807
r.Root = ppr.Op
@@ -1845,7 +1849,8 @@ func NewColOperator(
18451849
post.RenderExprs[i].LocalExpr = tree.NewTypedOrdinalReference(i, args.Spec.ResultTypes[i])
18461850
}
18471851
}
1848-
if err = result.wrapPostProcessSpec(ctx, flowCtx, args, post, spec.ProcessorID, factory, errWrappedCast); err != nil {
1852+
if err = result.wrapPostProcessSpec(ctx, flowCtx, args, post,
1853+
spec.ProcessorID, spec.StageID, factory, errWrappedCast); err != nil {
18491854
return r, err
18501855
}
18511856
} else if numMismatchedTypes > 0 {
@@ -1909,6 +1914,7 @@ func (r opResult) planAndMaybeWrapFilter(
19091914
flowCtx *execinfra.FlowCtx,
19101915
args *colexecargs.NewColOperatorArgs,
19111916
processorID int32,
1917+
stageID int32,
19121918
filter execinfrapb.Expression,
19131919
factory coldata.ColumnFactory,
19141920
) error {
@@ -1928,7 +1934,7 @@ func (r opResult) planAndMaybeWrapFilter(
19281934
return r.createAndWrapRowSource(
19291935
ctx, flowCtx, args, []colexecargs.OpWithMetaInfo{inputToMaterializer},
19301936
[][]*types.T{r.ColumnTypes}, filtererCore, &execinfrapb.PostProcessSpec{},
1931-
processorID, factory, err,
1937+
processorID, stageID, factory, err,
19321938
)
19331939
}
19341940
return nil
@@ -1945,6 +1951,7 @@ func (r opResult) wrapPostProcessSpec(
19451951
args *colexecargs.NewColOperatorArgs,
19461952
post *execinfrapb.PostProcessSpec,
19471953
processorID int32,
1954+
stageID int32,
19481955
factory coldata.ColumnFactory,
19491956
causeToWrap error,
19501957
) error {
@@ -1956,7 +1963,8 @@ func (r opResult) wrapPostProcessSpec(
19561963
// createAndWrapRowSource updates r.ColumnTypes accordingly.
19571964
return r.createAndWrapRowSource(
19581965
ctx, flowCtx, args, []colexecargs.OpWithMetaInfo{inputToMaterializer},
1959-
[][]*types.T{r.ColumnTypes}, noopCore, post, processorID, factory, causeToWrap,
1966+
[][]*types.T{r.ColumnTypes}, noopCore, post, processorID, stageID, factory,
1967+
causeToWrap,
19601968
)
19611969
}
19621970

pkg/sql/colexec/values_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,8 @@ func BenchmarkValues(b *testing.B) {
150150
var core execinfrapb.ProcessorCoreUnion
151151
core.Values = spec
152152
proc, err := rowexec.NewProcessor(
153-
ctx, &flowCtx, 0 /* processorID */, &core, &post, nil /* inputs */, nil, /* localProcessors */
153+
ctx, &flowCtx, 0 /* processorID */, 0 /* stageID */, &core, &post,
154+
nil /* inputs */, nil, /* localProcessors */
154155
)
155156
if err != nil {
156157
b.Fatal(err)

pkg/sql/colfetcher/colbatch_direct_scan.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,12 +181,13 @@ func NewColBatchDirectScan(
181181
kvFetcherMemAcc *mon.BoundAccount,
182182
flowCtx *execinfra.FlowCtx,
183183
processorID int32,
184+
stageID int32,
184185
spec *execinfrapb.TableReaderSpec,
185186
post *execinfrapb.PostProcessSpec,
186187
typeResolver *descs.DistSQLTypeResolver,
187188
) (*ColBatchDirectScan, []*types.T, error) {
188189
base, bsHeader, tableArgs, err := newColBatchScanBase(
189-
ctx, kvFetcherMemAcc, flowCtx, processorID, spec, post, typeResolver,
190+
ctx, kvFetcherMemAcc, flowCtx, processorID, stageID, spec, post, typeResolver,
190191
)
191192
if err != nil {
192193
return nil, nil, err

pkg/sql/colfetcher/colbatch_scan.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type colBatchScanBase struct {
3838

3939
flowCtx *execinfra.FlowCtx
4040
processorID int32
41+
stageID int32
4142
limitHint rowinfra.RowLimit
4243
batchBytesLimit rowinfra.BytesLimit
4344
parallelize bool
@@ -122,6 +123,7 @@ func newColBatchScanBase(
122123
kvFetcherMemAcc *mon.BoundAccount,
123124
flowCtx *execinfra.FlowCtx,
124125
processorID int32,
126+
stageID int32,
125127
spec *execinfrapb.TableReaderSpec,
126128
post *execinfrapb.PostProcessSpec,
127129
typeResolver *descs.DistSQLTypeResolver,
@@ -186,6 +188,7 @@ func newColBatchScanBase(
186188
SpansWithCopy: s.SpansWithCopy,
187189
flowCtx: flowCtx,
188190
processorID: processorID,
191+
stageID: stageID,
189192
limitHint: limitHint,
190193
batchBytesLimit: batchBytesLimit,
191194
parallelize: spec.Parallelize,
@@ -312,13 +315,14 @@ func NewColBatchScan(
312315
kvFetcherMemAcc *mon.BoundAccount,
313316
flowCtx *execinfra.FlowCtx,
314317
processorID int32,
318+
stageID int32,
315319
spec *execinfrapb.TableReaderSpec,
316320
post *execinfrapb.PostProcessSpec,
317321
estimatedRowCount uint64,
318322
typeResolver *descs.DistSQLTypeResolver,
319323
) (*ColBatchScan, []*types.T, error) {
320324
base, bsHeader, tableArgs, err := newColBatchScanBase(
321-
ctx, kvFetcherMemAcc, flowCtx, processorID, spec, post, typeResolver,
325+
ctx, kvFetcherMemAcc, flowCtx, processorID, stageID, spec, post, typeResolver,
322326
)
323327
if err != nil {
324328
return nil, nil, err

pkg/sql/distsql/columnar_utils_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ func verifyColOperator(t *testing.T, args verifyColOperatorArgs) error {
115115
}
116116

117117
proc, err := rowexec.NewProcessor(
118-
ctx, flowCtx, 0, &args.pspec.Core, &args.pspec.Post, inputsProc, nil,
118+
ctx, flowCtx, 0, 0, &args.pspec.Core, &args.pspec.Post, inputsProc, nil,
119119
)
120120
if err != nil {
121121
return err

pkg/sql/execinfra/processorsbase.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ type ProcessorConstructor func(
326326
ctx context.Context,
327327
flowCtx *FlowCtx,
328328
processorID int32,
329+
stageID int32,
329330
core *execinfrapb.ProcessorCoreUnion,
330331
post *execinfrapb.PostProcessSpec,
331332
inputs []RowSource,

pkg/sql/rowexec/processor_utils_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ func (p *ProcessorTest) RunTestCases(
153153
ctx,
154154
p.config.FlowCtx,
155155
processorID,
156+
0, /* stageID */
156157
&tc.ProcessorCore,
157158
&tc.Post,
158159
inputs,

pkg/sql/rowexec/processors.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ func NewProcessor(
100100
ctx context.Context,
101101
flowCtx *execinfra.FlowCtx,
102102
processorID int32,
103+
stageID int32,
103104
core *execinfrapb.ProcessorCoreUnion,
104105
post *execinfrapb.PostProcessSpec,
105106
inputs []execinfra.RowSource,
@@ -121,7 +122,7 @@ func NewProcessor(
121122
if err := checkNumIn(inputs, 0); err != nil {
122123
return nil, err
123124
}
124-
return newTableReader(ctx, flowCtx, processorID, core.TableReader, post)
125+
return newTableReader(ctx, flowCtx, processorID, stageID, core.TableReader, post)
125126
}
126127
if core.Filterer != nil {
127128
if err := checkNumIn(inputs, 1); err != nil {

pkg/sql/rowexec/project_set_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ func BenchmarkProjectSet(b *testing.B) {
166166
in := distsqlutils.NewRowBuffer(c.inputTypes, c.input, distsqlutils.RowBufferArgs{})
167167
out := &distsqlutils.RowBuffer{}
168168
p, err := NewProcessor(
169-
context.Background(), &flowCtx, 0, /* processorID */
169+
context.Background(), &flowCtx, 0 /* processorID */, 0, /* stageID */
170170
&execinfrapb.ProcessorCoreUnion{ProjectSet: &c.spec}, &execinfrapb.PostProcessSpec{},
171171
[]execinfra.RowSource{in}, []execinfra.LocalProcessor{})
172172
if err != nil {

pkg/sql/rowexec/tablereader.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ type tableReader struct {
5454
contentionEventsListener execstats.ContentionEventsListener
5555
scanStatsListener execstats.ScanStatsListener
5656
tenantConsumptionListener execstats.TenantConsumptionListener
57+
58+
stageID int32
5759
}
5860

5961
var _ execinfra.Processor = &tableReader{}
@@ -74,6 +76,7 @@ func newTableReader(
7476
ctx context.Context,
7577
flowCtx *execinfra.FlowCtx,
7678
processorID int32,
79+
stageID int32,
7780
spec *execinfrapb.TableReaderSpec,
7881
post *execinfrapb.PostProcessSpec,
7982
) (*tableReader, error) {
@@ -93,6 +96,7 @@ func newTableReader(
9396
tr.limitHint = rowinfra.RowLimit(execinfra.LimitHint(spec.LimitHint, post))
9497
tr.parallelize = spec.Parallelize
9598
tr.maxTimestampAge = time.Duration(spec.MaxTimestampAgeNanos)
99+
tr.stageID = stageID
96100

97101
// Make sure the key column types are hydrated. The fetched column types
98102
// will be hydrated in ProcessorBase.Init below.

0 commit comments

Comments
 (0)