Skip to content

Commit f09626f

Browse files
craig[bot]yuzefovich
andcommitted
Merge #156110
156110: sql: harden DistSQL interactions with routines r=yuzefovich a=yuzefovich **sql: unconditionally check DistSQL supportability** This commit hardens c17591e. In particular, in that change we disabled the usage of the Streamer whenever some part of the plan isn't distributable. However, population of `distSQLProhibitedErr` was left on a best-effort basis. In particular, if we have `distsql=off` (as well as a couple of other conditions), we'd skip the supportability check. This meant that we could still end up using the Streamer in some illegal cases - one example that I came up with is if we have a routine (which currently prohibits distsql), it might access the RootTxn concurrently with the LeafTxn access by the Streamer, which is no bueno. This commit makes it so that we do DistSQL supportability check unconditionally. Note that this shouldn't really have a performance impact - after all, we do expect this check to be done pretty much all the time (unless someone runs with `distsql=off`). Additionally, this change happens to fix a nil pointer error around the recent fix to top-level query stats in presence of routines. Namely, that patch only set the metadata forwarder for local plans if we end up using the RootTxn, but in a query where we happened to use the streamer with routines, (before this patch) we'd end up with LeafTxn and unset metadata forwarder. Now we'll have streamer disabled, so we'll have the RootTxn and the forwarder will be set. Fixes: #155955. **sql: harden recent fix to top-level query stats with routines** This commit relaxes the requirements for when it's safe to set the routine metadata forwarder. Previously, we only set it when we end up using the RootTxn in a local plan, but now we examine all possible kinds of concurrency within the local plan to see whether it's actually safe to set the metadata forwarder. This seems like a nice cleanup as well. Note that this commit independently fixes the issue mentioned in the previous commit. Release note: None Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
2 parents c1e97f7 + c1af51d commit f09626f

File tree

10 files changed

+161
-70
lines changed

10 files changed

+161
-70
lines changed

pkg/sql/apply_join.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
1313
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
14+
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
1415
"github.com/cockroachdb/cockroach/pkg/sql/opt/exec"
1516
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
1617
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
@@ -321,7 +322,7 @@ func runPlanInsidePlan(
321322
recv,
322323
&subqueryResultMemAcc,
323324
false, /* skipDistSQLDiagramGeneration */
324-
params.p.mustUseLeafTxn(),
325+
params.p.innerPlansMustUseLeafTxn(),
325326
) {
326327
return recv.stats, resultWriter.Err()
327328
}
@@ -336,7 +337,9 @@ func runPlanInsidePlan(
336337
planCtx := execCfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, &plannerCopy, plannerCopy.txn, distributeType)
337338
planCtx.distSQLProhibitedErr = distSQLProhibitedErr
338339
planCtx.stmtType = recv.stmtType
339-
planCtx.mustUseLeafTxn = params.p.mustUseLeafTxn()
340+
if params.p.innerPlansMustUseLeafTxn() {
341+
planCtx.flowConcurrency = distsql.ConcurrencyWithOuterPlan
342+
}
340343
planCtx.stmtForDistSQLDiagram = stmtForDistSQLDiagram
341344

342345
// Wrap PlanAndRun in a function call so that we clean up immediately.

pkg/sql/distsql/server.go

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,28 @@ func newFlow(
545545
return rowflow.NewRowBasedFlow(base)
546546
}
547547

548+
// ConcurrencyKind indicates which concurrency type is present within the local
549+
// DistSQL flow. Note that inter-node concurrency (i.e. whether we have a
550+
// distributed plan) is not reflected here.
551+
type ConcurrencyKind uint32
552+
553+
const (
554+
// ConcurrencyHasParallelProcessors, if set, indicates that we have multiple
555+
// processors running for the same plan stage.
556+
ConcurrencyHasParallelProcessors ConcurrencyKind = (1 << iota)
557+
// ConcurrencyStreamer, if set, indicates we have concurrency due to usage
558+
// of the Streamer API.
559+
ConcurrencyStreamer
560+
// ConcurrencyParallelChecks, if set, indicates that we're running
561+
// post-query CHECKs in parallel with each other (i.e. the concurrency is
562+
// with _other_ local flows).
563+
ConcurrencyParallelChecks
564+
// ConcurrencyWithOuterPlan, if set, indicates that - if we're running an
565+
// "inner" plan (like an apply-join iteration or a routine) - we might have
566+
// concurrency with the "outer" plan.
567+
ConcurrencyWithOuterPlan
568+
)
569+
548570
// LocalState carries information that is required to set up a flow with wrapped
549571
// planNodes.
550572
type LocalState struct {
@@ -559,13 +581,9 @@ type LocalState struct {
559581
// remote flows.
560582
IsLocal bool
561583

562-
// HasConcurrency indicates whether the local flow uses multiple goroutines.
563-
HasConcurrency bool
564-
565-
// MustUseLeaf indicates whether the local flow must use the LeafTxn even if
566-
// there is no concurrency in the flow on its own because there would be
567-
// concurrency with other flows which prohibits the usage of the RootTxn.
568-
MustUseLeaf bool
584+
// concurrency tracks the types of concurrency present when accessing the
585+
// Txn.
586+
concurrency ConcurrencyKind
569587

570588
// Txn is filled in on the gateway only. It is the RootTxn that the query is running in.
571589
// This will be used directly by the flow if the flow has no concurrency and IsLocal is set.
@@ -582,10 +600,22 @@ type LocalState struct {
582600
LocalVectorSources map[int32]any
583601
}
584602

603+
// AddConcurrency marks the given concurrency kinds as present in the local
604+
// flow.
605+
func (l *LocalState) AddConcurrency(kind ConcurrencyKind) {
606+
l.concurrency |= kind
607+
}
608+
609+
// GetConcurrency returns the bit-mask representing all concurrency kinds
610+
// present in the local flow.
611+
func (l LocalState) GetConcurrency() ConcurrencyKind {
612+
return l.concurrency
613+
}
614+
585615
// MustUseLeafTxn returns true if a LeafTxn must be used. It is valid to call
586-
// this method only after IsLocal and HasConcurrency have been set correctly.
616+
// this method only after IsLocal and all concurrency kinds have been set.
587617
func (l LocalState) MustUseLeafTxn() bool {
588-
return !l.IsLocal || l.HasConcurrency || l.MustUseLeaf
618+
return !l.IsLocal || l.concurrency != 0
589619
}
590620

591621
// SetupLocalSyncFlow sets up a synchronous flow on the current (planning) node,

pkg/sql/distsql_physical_planner.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -977,7 +977,8 @@ type PlanningCtx struct {
977977
// isLocal is set to true if we're planning this query on a single node.
978978
isLocal bool
979979
// distSQLProhibitedErr, if set, indicates why the plan couldn't be
980-
// distributed.
980+
// distributed. If any part of the plan isn't distributable, then this is
981+
// guaranteed to be non-nil.
981982
distSQLProhibitedErr error
982983
planner *planner
983984

@@ -1019,11 +1020,11 @@ type PlanningCtx struct {
10191020
// query).
10201021
subOrPostQuery bool
10211022

1022-
// mustUseLeafTxn, if set, indicates that this PlanningCtx is used to handle
1023+
// flowConcurrency will be non-zero when this PlanningCtx is used to handle
10231024
// one of the plans that will run in parallel with other plans. As such, the
10241025
// DistSQL planner will need to use the LeafTxn (even if it's not needed
10251026
// based on other "regular" factors).
1026-
mustUseLeafTxn bool
1027+
flowConcurrency distsql.ConcurrencyKind
10271028

10281029
// onFlowCleanup contains non-nil functions that will be called after the
10291030
// local flow finished running and is being cleaned up. It allows us to

pkg/sql/distsql_running.go

Lines changed: 53 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -754,7 +754,7 @@ func (dsp *DistSQLPlanner) Run(
754754
// the line.
755755
localState.EvalContext = evalCtx
756756
localState.IsLocal = planCtx.isLocal
757-
localState.MustUseLeaf = planCtx.mustUseLeafTxn
757+
localState.AddConcurrency(planCtx.flowConcurrency)
758758
localState.Txn = txn
759759
localState.LocalProcs = plan.LocalProcessors
760760
localState.LocalVectorSources = plan.LocalVectorSources
@@ -777,15 +777,19 @@ func (dsp *DistSQLPlanner) Run(
777777
// cannot create a LeafTxn, so we cannot parallelize scans.
778778
planCtx.parallelizeScansIfLocal = false
779779
for _, flow := range flows {
780-
localState.HasConcurrency = localState.HasConcurrency || execinfra.HasParallelProcessors(flow)
780+
if execinfra.HasParallelProcessors(flow) {
781+
localState.AddConcurrency(distsql.ConcurrencyHasParallelProcessors)
782+
}
781783
}
782784
} else {
783785
if planCtx.isLocal && noMutations && planCtx.parallelizeScansIfLocal {
784786
// Even though we have a single flow on the gateway node, we might
785787
// have decided to parallelize the scans. If that's the case, we
786788
// will need to use the Leaf txn.
787789
for _, flow := range flows {
788-
localState.HasConcurrency = localState.HasConcurrency || execinfra.HasParallelProcessors(flow)
790+
if execinfra.HasParallelProcessors(flow) {
791+
localState.AddConcurrency(distsql.ConcurrencyHasParallelProcessors)
792+
}
789793
}
790794
}
791795
if noMutations {
@@ -875,23 +879,18 @@ func (dsp *DistSQLPlanner) Run(
875879
// that we might have a plan where some expression (e.g. a cast to
876880
// an Oid type) uses the planner's txn (which is the RootTxn), so
877881
// it'd be illegal to use LeafTxns for a part of such plan.
878-
// TODO(yuzefovich): this check is both excessive and insufficient.
879-
// For example:
880-
// - it disables the usage of the Streamer when a subquery has an
881-
// Oid type, but that would have no impact on usage of the Streamer
882-
// in the main query;
883-
// - it might allow the usage of the Streamer even when the internal
884-
// executor is used by a part of the plan, and the IE would use the
885-
// RootTxn. Arguably, this would be a bug in not prohibiting the
886-
// DistSQL altogether.
882+
// TODO(yuzefovich): this check could be excessive. For example, it
883+
// disables the usage of the Streamer when a subquery has an Oid
884+
// type (due to a serialization issue), but that would have no
885+
// impact on usage of the Streamer in the main query.
887886
if !containsLocking && !mustUseRootTxn && planCtx.distSQLProhibitedErr == nil {
888887
if evalCtx.SessionData().StreamerEnabled {
889888
for _, proc := range plan.Processors {
890889
if jr := proc.Spec.Core.JoinReader; jr != nil {
891890
// Both index and lookup joins, with and without
892891
// ordering, are executed via the Streamer API that has
893892
// concurrency.
894-
localState.HasConcurrency = true
893+
localState.AddConcurrency(distsql.ConcurrencyStreamer)
895894
break
896895
}
897896
}
@@ -1014,11 +1013,37 @@ func (dsp *DistSQLPlanner) Run(
10141013
return
10151014
}
10161015

1017-
if len(flows) == 1 && evalCtx.Txn != nil && evalCtx.Txn.Type() == kv.RootTxn {
1018-
// If we have a fully local plan and a RootTxn, we don't expect any
1019-
// concurrency, so it's safe to use the DistSQLReceiver to push the
1020-
// metadata into directly from routines.
1021-
if planCtx.planner != nil {
1016+
if len(flows) == 1 && planCtx.planner != nil {
1017+
// We have a fully local plan, so check whether it'll be safe to use the
1018+
// DistSQLReceiver to push the metadata into directly from routines
1019+
// (which is the case when we don't have any concurrency between
1020+
// routines themselves as well as a routine and the "head" processor -
1021+
// the one pushing into the DistSQLReceiver).
1022+
var safe bool
1023+
if evalCtx.Txn != nil && evalCtx.Txn.Type() == kv.RootTxn {
1024+
// We have a RootTxn, so we don't expect any concurrency whatsoever.
1025+
safe = true
1026+
} else {
1027+
// We have a LeafTxn, so we need to examine what kind of concurrency
1028+
// is present in the flow.
1029+
var safeConcurrency distsql.ConcurrencyKind
1030+
// We don't care whether we use the Streamer API - it has
1031+
// concurrency only at the KV client level and below.
1032+
safeConcurrency |= distsql.ConcurrencyStreamer
1033+
// If we have "outer plan" concurrency, the "inner" and the
1034+
// "outer" plans have their own DistSQLReceivers.
1035+
//
1036+
// Note that the same is the case with parallel CHECKs concurrency,
1037+
// but then planCtx.planner is shared between goroutines, so we'll
1038+
// avoid mutating it. (We can't have routines in post-query CHECKs
1039+
// since only FK and UNIQUE checks are run in parallel.)
1040+
safeConcurrency |= distsql.ConcurrencyWithOuterPlan
1041+
unsafeConcurrency := ^safeConcurrency
1042+
if localState.GetConcurrency()&unsafeConcurrency == 0 {
1043+
safe = true
1044+
}
1045+
}
1046+
if safe {
10221047
planCtx.planner.routineMetadataForwarder = recv
10231048
}
10241049
}
@@ -1994,7 +2019,7 @@ func (dsp *DistSQLPlanner) PlanAndRunAll(
19942019
// Skip the diagram generation since on this "main" query path we
19952020
// can get it via the statement bundle.
19962021
true, /* skipDistSQLDiagramGeneration */
1997-
false, /* mustUseLeafTxn */
2022+
false, /* innerPlansMustUseLeafTxn */
19982023
) {
19992024
return recv.commErr
20002025
}
@@ -2061,7 +2086,7 @@ func (dsp *DistSQLPlanner) PlanAndRunSubqueries(
20612086
recv *DistSQLReceiver,
20622087
subqueryResultMemAcc *mon.BoundAccount,
20632088
skipDistSQLDiagramGeneration bool,
2064-
mustUseLeafTxn bool,
2089+
innerPlansMustUseLeafTxn bool,
20652090
) bool {
20662091
for planIdx, subqueryPlan := range subqueryPlans {
20672092
if err := dsp.planAndRunSubquery(
@@ -2074,7 +2099,7 @@ func (dsp *DistSQLPlanner) PlanAndRunSubqueries(
20742099
recv,
20752100
subqueryResultMemAcc,
20762101
skipDistSQLDiagramGeneration,
2077-
mustUseLeafTxn,
2102+
innerPlansMustUseLeafTxn,
20782103
); err != nil {
20792104
recv.SetError(err)
20802105
return false
@@ -2098,7 +2123,7 @@ func (dsp *DistSQLPlanner) planAndRunSubquery(
20982123
recv *DistSQLReceiver,
20992124
subqueryResultMemAcc *mon.BoundAccount,
21002125
skipDistSQLDiagramGeneration bool,
2101-
mustUseLeafTxn bool,
2126+
innerPlansMustUseLeafTxn bool,
21022127
) error {
21032128
subqueryDistribution, distSQLProhibitedErr := planner.getPlanDistribution(ctx, subqueryPlan.plan)
21042129
distribute := DistributionType(LocalDistribution)
@@ -2110,7 +2135,9 @@ func (dsp *DistSQLPlanner) planAndRunSubquery(
21102135
subqueryPlanCtx.stmtType = tree.Rows
21112136
subqueryPlanCtx.skipDistSQLDiagramGeneration = skipDistSQLDiagramGeneration
21122137
subqueryPlanCtx.subOrPostQuery = true
2113-
subqueryPlanCtx.mustUseLeafTxn = mustUseLeafTxn
2138+
if innerPlansMustUseLeafTxn {
2139+
subqueryPlanCtx.flowConcurrency = distsql.ConcurrencyWithOuterPlan
2140+
}
21142141
if planner.instrumentation.ShouldSaveFlows() {
21152142
subqueryPlanCtx.saveFlows = getDefaultSaveFlowsFunc(ctx, planner, planComponentTypeSubquery)
21162143
}
@@ -2729,7 +2756,9 @@ func (dsp *DistSQLPlanner) planAndRunPostquery(
27292756
}
27302757
postqueryPlanCtx.associateNodeWithComponents = associateNodeWithComponents
27312758
postqueryPlanCtx.collectExecStats = planner.instrumentation.ShouldCollectExecStats()
2732-
postqueryPlanCtx.mustUseLeafTxn = parallelCheck
2759+
if parallelCheck {
2760+
postqueryPlanCtx.flowConcurrency = distsql.ConcurrencyParallelChecks
2761+
}
27332762

27342763
postqueryPhysPlan, physPlanCleanup, err := dsp.createPhysPlan(ctx, postqueryPlanCtx, postqueryPlan)
27352764
defer physPlanCleanup()

pkg/sql/distsql_running_test.go

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1221,12 +1221,13 @@ func TestTopLevelQueryStats(t *testing.T) {
12211221
defer leaktest.AfterTest(t)()
12221222
defer log.Scope(t).Close(t)
12231223

1224+
ctx := context.Background()
12241225
// testQuery will be updated throughout the test to the current target.
12251226
var testQuery atomic.Value
12261227
// The callback will send number of rows read and rows written (for each
12271228
// ProducerMetadata.Metrics object) on these channels, respectively.
12281229
rowsReadCh, rowsWrittenCh := make(chan int64), make(chan int64)
1229-
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
1230+
srv, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
12301231
Knobs: base.TestingKnobs{
12311232
SQLExecutor: &ExecutorTestingKnobs{
12321233
DistSQLReceiverPushCallbackFactory: func(_ context.Context, query string) func(rowenc.EncDatumRow, coldata.Batch, *execinfrapb.ProducerMetadata) (rowenc.EncDatumRow, coldata.Batch, *execinfrapb.ProducerMetadata) {
@@ -1244,11 +1245,13 @@ func TestTopLevelQueryStats(t *testing.T) {
12441245
},
12451246
},
12461247
})
1247-
defer s.Stopper().Stop(context.Background())
1248+
defer srv.Stopper().Stop(ctx)
1249+
conn, err := sqlDB.Conn(ctx)
1250+
require.NoError(t, err)
12481251

12491252
if _, err := sqlDB.Exec(`
1250-
CREATE TABLE t (k INT PRIMARY KEY);
1251-
INSERT INTO t SELECT generate_series(1, 10);
1253+
CREATE TABLE t (k INT PRIMARY KEY, i INT, v INT, INDEX(i));
1254+
INSERT INTO t SELECT i, 1, 1 FROM generate_series(1, 10) AS g(i);
12521255
CREATE FUNCTION no_reads() RETURNS INT AS 'SELECT 1' LANGUAGE SQL;
12531256
CREATE FUNCTION reads() RETURNS INT AS 'SELECT count(*) FROM t' LANGUAGE SQL;
12541257
CREATE FUNCTION write(x INT) RETURNS INT AS 'INSERT INTO t VALUES (x); SELECT x' LANGUAGE SQL;
@@ -1259,6 +1262,7 @@ CREATE FUNCTION write(x INT) RETURNS INT AS 'INSERT INTO t VALUES (x); SELECT x'
12591262
for _, tc := range []struct {
12601263
name string
12611264
query string
1265+
setup, cleanup string // optional
12621266
expRowsRead int64
12631267
expRowsWritten int64
12641268
}{
@@ -1268,6 +1272,16 @@ CREATE FUNCTION write(x INT) RETURNS INT AS 'INSERT INTO t VALUES (x); SELECT x'
12681272
expRowsRead: 10,
12691273
expRowsWritten: 0,
12701274
},
1275+
{
1276+
name: "routine and index join (used to be powered by streamer)",
1277+
query: "SELECT v FROM t@t_i_idx WHERE reads() > 0",
1278+
setup: "SET distsql=off",
1279+
cleanup: "RESET distsql",
1280+
// 10 rows for secondary index, 10 for index join into primary, and
1281+
// then for each row do ten-row-scan in the routine.
1282+
expRowsRead: 120,
1283+
expRowsWritten: 0,
1284+
},
12711285
{
12721286
name: "simple write",
12731287
query: "INSERT INTO t SELECT generate_series(11, 42)",
@@ -1309,13 +1323,23 @@ CREATE FUNCTION write(x INT) RETURNS INT AS 'INSERT INTO t VALUES (x); SELECT x'
13091323
},
13101324
} {
13111325
t.Run(tc.name, func(t *testing.T) {
1326+
if tc.setup != "" {
1327+
_, err := conn.ExecContext(ctx, tc.setup)
1328+
require.NoError(t, err)
1329+
}
1330+
if tc.cleanup != "" {
1331+
defer func() {
1332+
_, err := conn.ExecContext(ctx, tc.cleanup)
1333+
require.NoError(t, err)
1334+
}()
1335+
}
13121336
testQuery.Store(tc.query)
13131337
errCh := make(chan error)
13141338
// Spin up the worker goroutine which will actually execute the
13151339
// query.
13161340
go func() {
13171341
defer close(errCh)
1318-
_, err := sqlDB.Exec(tc.query)
1342+
_, err := conn.ExecContext(ctx, tc.query)
13191343
errCh <- err
13201344
}()
13211345
// In the main goroutine, loop until the query is completed while

0 commit comments

Comments
 (0)