Skip to content

Commit 3a33264

Browse files
committed
sql: cleanup the code around pausable portals a bit
Extract a common helper function as well as remove redundant argument for another function. Release note: None
1 parent fbdbcb8 commit 3a33264

File tree

2 files changed

+19
-25
lines changed

2 files changed

+19
-25
lines changed

pkg/sql/conn_executor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2467,7 +2467,7 @@ func (ex *connExecutor) execCmd() (retErr error) {
24672467
ex.sessionData().DataConversionConfig,
24682468
ex.sessionData().GetLocation(),
24692469
tcmd.Limit,
2470-
portalName,
2470+
portal.Name,
24712471
ex.implicitTxn(),
24722472
portal.portalPausablity,
24732473
)
@@ -2483,7 +2483,7 @@ func (ex *connExecutor) execCmd() (retErr error) {
24832483
// followed by Sync (which is the common case), then we still can auto-commit,
24842484
// which allows the 1PC txn fast path to be used.
24852485
canAutoCommit := ex.implicitTxn() && tcmd.FollowedBySync
2486-
ev, payload, err = ex.execPortal(ctx, portal, portalName, stmtRes, pinfo, canAutoCommit)
2486+
ev, payload, err = ex.execPortal(ctx, portal, stmtRes, pinfo, canAutoCommit)
24872487
return err
24882488
}()
24892489
// Note: we write to ex.statsCollector.phaseTimes, instead of ex.phaseTimes,

pkg/sql/conn_executor_exec.go

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,6 @@ func (ex *connExecutor) recordFailure(p eventNonRetryableErrPayload) {
281281
func (ex *connExecutor) execPortal(
282282
ctx context.Context,
283283
portal PreparedPortal,
284-
portalName string,
285284
stmtRes CommandResult,
286285
pinfo *tree.PlaceholderInfo,
287286
canAutoCommit bool,
@@ -290,7 +289,7 @@ func (ex *connExecutor) execPortal(
290289
if portal.isPausable() {
291290
if !portal.pauseInfo.exhaustPortal.cleanup.isComplete {
292291
portal.pauseInfo.exhaustPortal.cleanup.appendFunc(func(_ context.Context) {
293-
ex.exhaustPortal(portalName)
292+
ex.exhaustPortal(portal.Name)
294293
})
295294
portal.pauseInfo.exhaustPortal.cleanup.isComplete = true
296295
}
@@ -330,8 +329,8 @@ func (ex *connExecutor) execPortal(
330329
// to re-execute the portal from scratch.
331330
// The current statement may have just closed and deleted the portal,
332331
// so only exhaust it if it still exists.
333-
if _, ok := ex.extraTxnState.prepStmtsNamespace.portals[portalName]; ok && !portal.isPausable() {
334-
defer ex.exhaustPortal(portalName)
332+
if _, ok := ex.extraTxnState.prepStmtsNamespace.portals[portal.Name]; ok && !portal.isPausable() {
333+
defer ex.exhaustPortal(portal.Name)
335334
}
336335
return ev, payload, retErr
337336

@@ -2526,6 +2525,13 @@ func (ex *connExecutor) rollbackSQLTransaction(
25262525
return eventTxnFinishAborted{}, nil
25272526
}
25282527

2528+
func getPausablePortalInfo(p *planner) *portalPauseInfo {
2529+
if p != nil && p.pausablePortal != nil {
2530+
return p.pausablePortal.pauseInfo
2531+
}
2532+
return nil
2533+
}
2534+
25292535
// Each statement in an explicit READ COMMITTED transaction has a SAVEPOINT.
25302536
// This allows for TransactionRetry errors to be retried automatically. We don't
25312537
// do this for implicit transactions because the conn_executor state machine
@@ -2543,13 +2549,7 @@ func (ex *connExecutor) dispatchReadCommittedStmtToExecutionEngine(
25432549
)
25442550
}
25452551

2546-
getPausablePortalInfo := func() *portalPauseInfo {
2547-
if p != nil && p.pausablePortal != nil {
2548-
return p.pausablePortal.pauseInfo
2549-
}
2550-
return nil
2551-
}
2552-
if ppInfo := getPausablePortalInfo(); ppInfo != nil {
2552+
if ppInfo := getPausablePortalInfo(p); ppInfo != nil {
25532553
p.autoRetryStmtReason = ppInfo.dispatchReadCommittedStmtToExecutionEngine.autoRetryStmtReason
25542554
p.autoRetryStmtCounter = ppInfo.dispatchReadCommittedStmtToExecutionEngine.autoRetryStmtCounter
25552555
}
@@ -2645,7 +2645,7 @@ func (ex *connExecutor) dispatchReadCommittedStmtToExecutionEngine(
26452645
}
26462646
p.autoRetryStmtCounter++
26472647
p.autoRetryStmtReason = maybeRetryableErr
2648-
if ppInfo := getPausablePortalInfo(); ppInfo != nil {
2648+
if ppInfo := getPausablePortalInfo(p); ppInfo != nil {
26492649
ppInfo.dispatchReadCommittedStmtToExecutionEngine.autoRetryStmtReason = p.autoRetryStmtReason
26502650
ppInfo.dispatchReadCommittedStmtToExecutionEngine.autoRetryStmtCounter = p.autoRetryStmtCounter
26512651
}
@@ -2672,14 +2672,8 @@ func (ex *connExecutor) dispatchReadCommittedStmtToExecutionEngine(
26722672
func (ex *connExecutor) dispatchToExecutionEngine(
26732673
ctx context.Context, planner *planner, res RestrictedCommandResult,
26742674
) (retErr error) {
2675-
getPausablePortalInfo := func() *portalPauseInfo {
2676-
if planner != nil && planner.pausablePortal != nil {
2677-
return planner.pausablePortal.pauseInfo
2678-
}
2679-
return nil
2680-
}
26812675
defer func() {
2682-
if ppInfo := getPausablePortalInfo(); ppInfo != nil {
2676+
if ppInfo := getPausablePortalInfo(planner); ppInfo != nil {
26832677
if !ppInfo.dispatchToExecutionEngine.cleanup.isComplete {
26842678
ppInfo.dispatchToExecutionEngine.cleanup.isComplete = true
26852679
}
@@ -2743,7 +2737,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
27432737
}
27442738

27452739
var err error
2746-
if ppInfo := getPausablePortalInfo(); ppInfo != nil {
2740+
if ppInfo := getPausablePortalInfo(planner); ppInfo != nil {
27472741
if !ppInfo.dispatchToExecutionEngine.cleanup.isComplete {
27482742
ctx, err = ex.makeExecPlan(ctx, planner)
27492743
// TODO(janexing): This is a temporary solution to disallow procedure
@@ -2825,7 +2819,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
28252819
ex.sessionTracing.TracePlanCheckStart(ctx)
28262820

28272821
var afterGetPlanDistribution func()
2828-
if planner.pausablePortal != nil {
2822+
if getPausablePortalInfo(planner) != nil {
28292823
if len(planner.curPlan.subqueryPlans) == 0 &&
28302824
len(planner.curPlan.cascades) == 0 &&
28312825
len(planner.curPlan.checkPlans) == 0 &&
@@ -2903,7 +2897,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
29032897
stats, err := ex.execWithDistSQLEngine(
29042898
ctx, planner, stmt.AST.StatementReturnType(), res, distribute, progAtomic, distSQLProhibitedErr,
29052899
)
2906-
if ppInfo := getPausablePortalInfo(); ppInfo != nil {
2900+
if ppInfo := getPausablePortalInfo(planner); ppInfo != nil {
29072901
// For pausable portals, we log the stats when closing the portal, so we need
29082902
// to aggregate the stats for all executions.
29092903
ppInfo.dispatchToExecutionEngine.queryStats.add(&stats)
@@ -2930,7 +2924,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
29302924
ex.extraTxnState.bytesRead += stats.bytesRead
29312925
ex.extraTxnState.rowsWritten += stats.rowsWritten
29322926

2933-
if ppInfo := getPausablePortalInfo(); ppInfo != nil && !ppInfo.dispatchToExecutionEngine.cleanup.isComplete {
2927+
if ppInfo := getPausablePortalInfo(planner); ppInfo != nil && !ppInfo.dispatchToExecutionEngine.cleanup.isComplete {
29342928
// We need to ensure that we're using the planner bound to the first-time
29352929
// execution of a portal.
29362930
curPlanner := *planner

0 commit comments

Comments
 (0)