Skip to content

Commit 3dbfaf3

Browse files
craig[bot]yuzefovich
andcommitted
155670: sql: minor cleanup around portal pausability r=yuzefovich a=yuzefovich This PR contains several commits that cleanup and harden code around pausable portals. See each commit for details. Informs: https://github.com/cockroachlabs/support/issues/3463. Epic: None Release note: None 155721: sql: add comment about skipped FKs r=yuzefovich a=yuzefovich We just made a change to include skipped FKs (those that we deemed "irrelevant") in the commented out form. This commit improves that logic by adding a comment for why these FKs are commented out, to reduce possible confusion. Epic: None Release note: None Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
3 parents eddf7cb + 623630f + 3393391 commit 3dbfaf3

File tree

4 files changed

+112
-95
lines changed

4 files changed

+112
-95
lines changed

pkg/sql/conn_executor.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2455,7 +2455,7 @@ func (ex *connExecutor) execCmd() (retErr error) {
24552455
// If this is the first-time execution of a portal without a limit set,
24562456
// it means all rows will be exhausted, so no need to pause this portal.
24572457
if tcmd.Limit == 0 && portal.pauseInfo != nil && portal.pauseInfo.curRes == nil {
2458-
portal.pauseInfo = nil
2458+
ex.disablePortalPausability(&portal)
24592459
}
24602460

24612461
stmtRes := ex.clientComm.CreateStatementResult(
@@ -2467,7 +2467,7 @@ func (ex *connExecutor) execCmd() (retErr error) {
24672467
ex.sessionData().DataConversionConfig,
24682468
ex.sessionData().GetLocation(),
24692469
tcmd.Limit,
2470-
portalName,
2470+
portal.Name,
24712471
ex.implicitTxn(),
24722472
portal.portalPausablity,
24732473
)
@@ -2483,7 +2483,7 @@ func (ex *connExecutor) execCmd() (retErr error) {
24832483
// followed by Sync (which is the common case), then we still can auto-commit,
24842484
// which allows the 1PC txn fast path to be used.
24852485
canAutoCommit := ex.implicitTxn() && tcmd.FollowedBySync
2486-
ev, payload, err = ex.execPortal(ctx, portal, portalName, stmtRes, pinfo, canAutoCommit)
2486+
ev, payload, err = ex.execPortal(ctx, portal, stmtRes, pinfo, canAutoCommit)
24872487
return err
24882488
}()
24892489
// Note: we write to ex.statsCollector.phaseTimes, instead of ex.phaseTimes,

pkg/sql/conn_executor_exec.go

Lines changed: 66 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,6 @@ func (ex *connExecutor) recordFailure(p eventNonRetryableErrPayload) {
281281
func (ex *connExecutor) execPortal(
282282
ctx context.Context,
283283
portal PreparedPortal,
284-
portalName string,
285284
stmtRes CommandResult,
286285
pinfo *tree.PlaceholderInfo,
287286
canAutoCommit bool,
@@ -290,7 +289,7 @@ func (ex *connExecutor) execPortal(
290289
if portal.isPausable() {
291290
if !portal.pauseInfo.exhaustPortal.cleanup.isComplete {
292291
portal.pauseInfo.exhaustPortal.cleanup.appendFunc(func(_ context.Context) {
293-
ex.exhaustPortal(portalName)
292+
ex.exhaustPortal(portal.Name)
294293
})
295294
portal.pauseInfo.exhaustPortal.cleanup.isComplete = true
296295
}
@@ -330,8 +329,8 @@ func (ex *connExecutor) execPortal(
330329
// to re-execute the portal from scratch.
331330
// The current statement may have just closed and deleted the portal,
332331
// so only exhaust it if it still exists.
333-
if _, ok := ex.extraTxnState.prepStmtsNamespace.portals[portalName]; ok && !portal.isPausable() {
334-
defer ex.exhaustPortal(portalName)
332+
if _, ok := ex.extraTxnState.prepStmtsNamespace.portals[portal.Name]; ok && !portal.isPausable() {
333+
defer ex.exhaustPortal(portal.Name)
335334
}
336335
return ev, payload, retErr
337336

@@ -1695,6 +1694,9 @@ func (ex *connExecutor) execStmtInOpenStateWithPausablePortal(
16951694
}
16961695
}
16971696

1697+
// Note that here we process the cleanup function (which will append it
1698+
// to the cleanup queue) without a defer since there is no more code
1699+
// relevant to pausable portals model below.
16981700
processCleanupFunc(func() {
16991701
cancelQueryCtx := ctx
17001702
if portal.isPausable() {
@@ -2526,6 +2528,13 @@ func (ex *connExecutor) rollbackSQLTransaction(
25262528
return eventTxnFinishAborted{}, nil
25272529
}
25282530

2531+
func getPausablePortalInfo(p *planner) *portalPauseInfo {
2532+
if p != nil && p.pausablePortal != nil {
2533+
return p.pausablePortal.pauseInfo
2534+
}
2535+
return nil
2536+
}
2537+
25292538
// Each statement in an explicit READ COMMITTED transaction has a SAVEPOINT.
25302539
// This allows for TransactionRetry errors to be retried automatically. We don't
25312540
// do this for implicit transactions because the conn_executor state machine
@@ -2543,13 +2552,7 @@ func (ex *connExecutor) dispatchReadCommittedStmtToExecutionEngine(
25432552
)
25442553
}
25452554

2546-
getPausablePortalInfo := func() *portalPauseInfo {
2547-
if p != nil && p.pausablePortal != nil {
2548-
return p.pausablePortal.pauseInfo
2549-
}
2550-
return nil
2551-
}
2552-
if ppInfo := getPausablePortalInfo(); ppInfo != nil {
2555+
if ppInfo := getPausablePortalInfo(p); ppInfo != nil {
25532556
p.autoRetryStmtReason = ppInfo.dispatchReadCommittedStmtToExecutionEngine.autoRetryStmtReason
25542557
p.autoRetryStmtCounter = ppInfo.dispatchReadCommittedStmtToExecutionEngine.autoRetryStmtCounter
25552558
}
@@ -2645,7 +2648,7 @@ func (ex *connExecutor) dispatchReadCommittedStmtToExecutionEngine(
26452648
}
26462649
p.autoRetryStmtCounter++
26472650
p.autoRetryStmtReason = maybeRetryableErr
2648-
if ppInfo := getPausablePortalInfo(); ppInfo != nil {
2651+
if ppInfo := getPausablePortalInfo(p); ppInfo != nil {
26492652
ppInfo.dispatchReadCommittedStmtToExecutionEngine.autoRetryStmtReason = p.autoRetryStmtReason
26502653
ppInfo.dispatchReadCommittedStmtToExecutionEngine.autoRetryStmtCounter = p.autoRetryStmtCounter
26512654
}
@@ -2672,14 +2675,8 @@ func (ex *connExecutor) dispatchReadCommittedStmtToExecutionEngine(
26722675
func (ex *connExecutor) dispatchToExecutionEngine(
26732676
ctx context.Context, planner *planner, res RestrictedCommandResult,
26742677
) (retErr error) {
2675-
getPausablePortalInfo := func() *portalPauseInfo {
2676-
if planner != nil && planner.pausablePortal != nil {
2677-
return planner.pausablePortal.pauseInfo
2678-
}
2679-
return nil
2680-
}
26812678
defer func() {
2682-
if ppInfo := getPausablePortalInfo(); ppInfo != nil {
2679+
if ppInfo := getPausablePortalInfo(planner); ppInfo != nil {
26832680
if !ppInfo.dispatchToExecutionEngine.cleanup.isComplete {
26842681
ppInfo.dispatchToExecutionEngine.cleanup.isComplete = true
26852682
}
@@ -2743,34 +2740,50 @@ func (ex *connExecutor) dispatchToExecutionEngine(
27432740
}
27442741

27452742
var err error
2746-
if ppInfo := getPausablePortalInfo(); ppInfo != nil {
2743+
if ppInfo := getPausablePortalInfo(planner); ppInfo != nil {
27472744
if !ppInfo.dispatchToExecutionEngine.cleanup.isComplete {
27482745
ctx, err = ex.makeExecPlan(ctx, planner)
2749-
// TODO(janexing): This is a temporary solution to disallow procedure
2750-
// call statements that contain mutations for pausable portals. Since
2751-
// relational.CanMutate is not yet propagated from the function body
2752-
// via builder.BuildCall(), we must temporarily disallow all
2753-
// TCL statements, which includes the CALL statements.
2754-
// This should be removed once CanMutate is fully propagated.
2755-
// (pending https://github.com/cockroachdb/cockroach/issues/147568)
2756-
isTCL := planner.curPlan.stmt.AST.StatementType() == tree.TypeTCL
2757-
if flags := planner.curPlan.flags; err == nil && (isTCL || flags.IsSet(planFlagContainsMutation) || flags.IsSet(planFlagIsDDL)) {
2758-
telemetry.Inc(sqltelemetry.NotReadOnlyStmtsTriedWithPausablePortals)
2759-
// We don't allow mutations in a pausable portal. Set it back to
2760-
// an un-pausable (normal) portal.
2761-
planner.pausablePortal.pauseInfo = nil
2762-
err = res.RevokePortalPausability()
2763-
// If this plan is a transaction control statement, we don't
2764-
// even execute it but just early exit.
2765-
if isTCL {
2766-
err = errors.CombineErrors(err, ErrStmtNotSupportedForPausablePortal)
2746+
if err == nil {
2747+
// TODO(janexing): This is a temporary solution to disallow procedure
2748+
// call statements that contain mutations for pausable portals. Since
2749+
// relational.CanMutate is not yet propagated from the function body
2750+
// via builder.BuildCall(), we must temporarily disallow all
2751+
// TCL statements, which includes the CALL statements.
2752+
// This should be removed once CanMutate is fully propagated.
2753+
// (pending https://github.com/cockroachdb/cockroach/issues/147568)
2754+
isTCL := planner.curPlan.stmt.AST.StatementType() == tree.TypeTCL
2755+
// We don't allow mutations in a pausable portal.
2756+
notReadOnly := isTCL || planner.curPlan.flags.IsSet(planFlagContainsMutation) || planner.curPlan.flags.IsSet(planFlagIsDDL)
2757+
// We don't allow sub / post queries for pausable portal.
2758+
hasSubOrPostQuery := len(planner.curPlan.subqueryPlans) != 0 || len(planner.curPlan.cascades) != 0 ||
2759+
len(planner.curPlan.checkPlans) != 0 || len(planner.curPlan.triggers) != 0
2760+
if notReadOnly || hasSubOrPostQuery {
2761+
if notReadOnly {
2762+
telemetry.Inc(sqltelemetry.NotReadOnlyStmtsTriedWithPausablePortals)
2763+
} else {
2764+
telemetry.Inc(sqltelemetry.SubOrPostQueryStmtsTriedWithPausablePortals)
2765+
}
2766+
// This stmt is not supported via the pausable portals model
2767+
// - set it back to an un-pausable (normal) portal.
2768+
ex.disablePortalPausability(planner.pausablePortal)
2769+
planner.pausablePortal = nil
2770+
err = res.RevokePortalPausability()
2771+
// If this plan is a transaction control statement, we don't
2772+
// even execute it but just early exit.
2773+
if isTCL {
2774+
err = errors.CombineErrors(err, ErrStmtNotSupportedForPausablePortal)
2775+
}
2776+
defer planner.curPlan.close(ctx)
2777+
} else {
2778+
ppInfo.dispatchToExecutionEngine.planTop = planner.curPlan
2779+
defer func() {
2780+
ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(func(ctx context.Context) {
2781+
ppInfo.dispatchToExecutionEngine.planTop.close(ctx)
2782+
})
2783+
}()
27672784
}
2768-
defer planner.curPlan.close(ctx)
27692785
} else {
2770-
ppInfo.dispatchToExecutionEngine.planTop = planner.curPlan
2771-
ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(func(ctx context.Context) {
2772-
ppInfo.dispatchToExecutionEngine.planTop.close(ctx)
2773-
})
2786+
defer planner.curPlan.close(ctx)
27742787
}
27752788
} else {
27762789
planner.curPlan = ppInfo.dispatchToExecutionEngine.planTop
@@ -2825,32 +2838,12 @@ func (ex *connExecutor) dispatchToExecutionEngine(
28252838
ex.sessionTracing.TracePlanCheckStart(ctx)
28262839

28272840
var afterGetPlanDistribution func()
2828-
if planner.pausablePortal != nil {
2829-
if len(planner.curPlan.subqueryPlans) == 0 &&
2830-
len(planner.curPlan.cascades) == 0 &&
2831-
len(planner.curPlan.checkPlans) == 0 &&
2832-
len(planner.curPlan.triggers) == 0 {
2833-
// We don't allow a distributed plan for pausable portals.
2834-
origDistSQLMode := ex.sessionData().DistSQLMode
2835-
ex.sessionData().DistSQLMode = sessiondatapb.DistSQLOff
2836-
afterGetPlanDistribution = func() {
2837-
ex.sessionData().DistSQLMode = origDistSQLMode
2838-
}
2839-
} else {
2840-
telemetry.Inc(sqltelemetry.SubOrPostQueryStmtsTriedWithPausablePortals)
2841-
// We don't allow sub / post queries for pausable portal. Set it back to an
2842-
// un-pausable (normal) portal.
2843-
// With pauseInfo is nil, no cleanup function will be added to the stack
2844-
// and all clean-up steps will be performed as for normal portals.
2845-
// TODO(#115887): We may need to move resetting pauseInfo before we add
2846-
// the pausable portal cleanup step above.
2847-
planner.pausablePortal.pauseInfo = nil
2848-
// We need this so that the result consumption for this portal cannot be
2849-
// paused either.
2850-
if err := res.RevokePortalPausability(); err != nil {
2851-
res.SetError(err)
2852-
return nil
2853-
}
2841+
if getPausablePortalInfo(planner) != nil {
2842+
// We don't allow a distributed plan for pausable portals.
2843+
origDistSQLMode := ex.sessionData().DistSQLMode
2844+
ex.sessionData().DistSQLMode = sessiondatapb.DistSQLOff
2845+
afterGetPlanDistribution = func() {
2846+
ex.sessionData().DistSQLMode = origDistSQLMode
28542847
}
28552848
}
28562849
distributePlan, distSQLProhibitedErr := planner.getPlanDistribution(ctx, planner.curPlan.main)
@@ -2903,7 +2896,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
29032896
stats, err := ex.execWithDistSQLEngine(
29042897
ctx, planner, stmt.AST.StatementReturnType(), res, distribute, progAtomic, distSQLProhibitedErr,
29052898
)
2906-
if ppInfo := getPausablePortalInfo(); ppInfo != nil {
2899+
if ppInfo := getPausablePortalInfo(planner); ppInfo != nil {
29072900
// For pausable portals, we log the stats when closing the portal, so we need
29082901
// to aggregate the stats for all executions.
29092902
ppInfo.dispatchToExecutionEngine.queryStats.add(&stats)
@@ -2930,10 +2923,12 @@ func (ex *connExecutor) dispatchToExecutionEngine(
29302923
ex.extraTxnState.bytesRead += stats.bytesRead
29312924
ex.extraTxnState.rowsWritten += stats.rowsWritten
29322925

2933-
if ppInfo := getPausablePortalInfo(); ppInfo != nil && !ppInfo.dispatchToExecutionEngine.cleanup.isComplete {
2926+
if ppInfo := getPausablePortalInfo(planner); ppInfo != nil && !ppInfo.dispatchToExecutionEngine.cleanup.isComplete {
29342927
// We need to ensure that we're using the planner bound to the first-time
29352928
// execution of a portal.
29362929
curPlanner := *planner
2930+
// Note that here we append the cleanup function without a defer since
2931+
// there is no more code relevant to pausable portals model below.
29372932
ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(func(ctx context.Context) {
29382933
populateQueryLevelStats(ctx, &curPlanner, ex.server.cfg, ppInfo.dispatchToExecutionEngine.queryStats, &ex.cpuStatsCollector)
29392934
ppInfo.dispatchToExecutionEngine.stmtFingerprintID = ex.recordStatementSummary(

pkg/sql/explain_bundle.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -894,9 +894,12 @@ func (b *stmtBundleBuilder) addEnv(ctx context.Context) {
894894
for _, addFK := range addFKs {
895895
fmt.Fprintf(&buf, "%s;\n", addFK)
896896
}
897-
// Include FK constraints that were skipped in commented out form.
898-
for _, skipFK := range skipFKs {
899-
fmt.Fprintf(&buf, "-- %s;\n", skipFK)
897+
if len(skipFKs) > 0 {
898+
// Include FK constraints that were skipped in commented out form.
899+
fmt.Fprintf(&buf, "-- NOTE: these FKs are active and are only commented out for ease of bundle recreation.\n--\n")
900+
for _, skipFK := range skipFKs {
901+
fmt.Fprintf(&buf, "-- %s;\n", skipFK)
902+
}
900903
}
901904
}
902905
for i := range views {

0 commit comments

Comments
 (0)