Skip to content

Commit 7536bef

Browse files
committed
sql: clarify cleanup queue for pausable portals
We have several cleanup "stacks" for pausable portals model, but in reality we execute the functions in the "forward" direction, matching the behavior of a "queue", so this commit renames the struct accordingly. Only `execStmtInOpenState` and `dispatchToExecutionEngine` stages have multiple cleanup functions. Additionally, it fixes the inverted order of executing two cleanup functions in `dispatchToExecutionEngine` stage. Namely, previously we performed `planTop.Close` _before_ `recordStatementSummary` which doesn't match the behavior on the main path, outside the pausable portals. We now defer appending the former so that it's executed after the latter. Deferring `planTop.Close` was necessary for the following commit which is what prompted this one. Release note: None
1 parent a75f7a7 commit 7536bef

File tree

2 files changed

+33
-21
lines changed

2 files changed

+33
-21
lines changed

pkg/sql/conn_executor_exec.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1694,6 +1694,9 @@ func (ex *connExecutor) execStmtInOpenStateWithPausablePortal(
16941694
}
16951695
}
16961696

1697+
// Note that here we process the cleanup function (which will append it
1698+
// to the cleanup queue) without a defer since there is no more code
1699+
// relevant to pausable portals model below.
16971700
processCleanupFunc(func() {
16981701
cancelQueryCtx := ctx
16991702
if portal.isPausable() {
@@ -2772,9 +2775,11 @@ func (ex *connExecutor) dispatchToExecutionEngine(
27722775
defer planner.curPlan.close(ctx)
27732776
} else {
27742777
ppInfo.dispatchToExecutionEngine.planTop = planner.curPlan
2775-
ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(func(ctx context.Context) {
2776-
ppInfo.dispatchToExecutionEngine.planTop.close(ctx)
2777-
})
2778+
defer func() {
2779+
ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(func(ctx context.Context) {
2780+
ppInfo.dispatchToExecutionEngine.planTop.close(ctx)
2781+
})
2782+
}()
27782783
}
27792784
} else {
27802785
defer planner.curPlan.close(ctx)
@@ -2921,6 +2926,8 @@ func (ex *connExecutor) dispatchToExecutionEngine(
29212926
// We need to ensure that we're using the planner bound to the first-time
29222927
// execution of a portal.
29232928
curPlanner := *planner
2929+
// Note that here we append the cleanup function without a defer since
2930+
// there is no more code relevant to pausable portals model below.
29242931
ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(func(ctx context.Context) {
29252932
populateQueryLevelStats(ctx, &curPlanner, ex.server.cfg, ppInfo.dispatchToExecutionEngine.queryStats, &ex.cpuStatsCollector)
29262933
ppInfo.dispatchToExecutionEngine.stmtFingerprintID = ex.recordStatementSummary(

pkg/sql/prepared_stmt.go

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -175,23 +175,28 @@ func (p *PreparedPortal) isPausable() bool {
175175
return p != nil && p.pauseInfo != nil
176176
}
177177

178-
// cleanupFuncStack stores cleanup functions for a portal. The clean-up
178+
// cleanupFuncQueue stores cleanup functions for a portal. The clean-up
179179
// functions are added during the first-time execution of a portal. When the
180180
// first-time execution is finished, we mark isComplete to true.
181-
type cleanupFuncStack struct {
182-
stack []func(context.Context)
181+
//
182+
// Generally, cleanup functions should be added in a defer (assuming that
183+
// originally they were deferred as well). The functions will be appended to the
184+
// end of the queue, which preserves the order of their execution if pausable
185+
// portals model wasn't used.
186+
type cleanupFuncQueue struct {
187+
queue []func(context.Context)
183188
isComplete bool
184189
}
185190

186-
func (n *cleanupFuncStack) appendFunc(f func(context.Context)) {
187-
n.stack = append(n.stack, f)
191+
func (n *cleanupFuncQueue) appendFunc(f func(context.Context)) {
192+
n.queue = append(n.queue, f)
188193
}
189194

190-
func (n *cleanupFuncStack) run(ctx context.Context) {
191-
for i := 0; i < len(n.stack); i++ {
192-
n.stack[i](ctx)
195+
func (n *cleanupFuncQueue) run(ctx context.Context) {
196+
for i := 0; i < len(n.queue); i++ {
197+
n.queue[i](ctx)
193198
}
194-
*n = cleanupFuncStack{}
199+
*n = cleanupFuncQueue{}
195200
}
196201

197202
// instrumentationHelperWrapper wraps the instrumentation helper.
@@ -226,23 +231,23 @@ type portalPauseInfo struct {
226231
// When closing a portal, we need to follow the reverse order of its execution,
227232
// which means running the cleanup functions of the four structs in the
228233
// following order:
229-
// - exhaustPortal.cleanup
230-
// - execStmtInOpenState.cleanup
231-
// - dispatchToExecutionEngine.cleanup
232234
// - resumableFlow.cleanup
235+
// - dispatchToExecutionEngine.cleanup
236+
// - execStmtInOpenState.cleanup
237+
// - exhaustPortal.cleanup
233238
//
234239
// If an error occurs in any of these functions, we run the cleanup functions of
235240
// this layer and its children layers, and propagate the error to the parent
236241
// layer. For example, if an error occurs in execStmtInOpenStateCleanup(), we
237242
// run the cleanup functions in the following order:
238-
// - execStmtInOpenState.cleanup
239-
// - dispatchToExecutionEngine.cleanup
240243
// - resumableFlow.cleanup
244+
// - dispatchToExecutionEngine.cleanup
245+
// - execStmtInOpenState.cleanup
241246
//
242247
// When exiting connExecutor.execStmtInOpenState(), we finally run the
243248
// exhaustPortal.cleanup function in connExecutor.execPortal().
244249
exhaustPortal struct {
245-
cleanup cleanupFuncStack
250+
cleanup cleanupFuncQueue
246251
}
247252

248253
// TODO(sql-session): replace certain fields here with planner.
@@ -266,7 +271,7 @@ type portalPauseInfo struct {
266271
// retErr is needed for the cleanup steps as we will have to check the latest
267272
// encountered error, so this field should be updated for each execution.
268273
retErr error
269-
cleanup cleanupFuncStack
274+
cleanup cleanupFuncQueue
270275
}
271276

272277
dispatchReadCommittedStmtToExecutionEngine struct {
@@ -295,7 +300,7 @@ type portalPauseInfo struct {
295300
// queryStats stores statistics on query execution. It is incremented for
296301
// each execution of the portal.
297302
queryStats *topLevelQueryStats
298-
cleanup cleanupFuncStack
303+
cleanup cleanupFuncQueue
299304
}
300305

301306
resumableFlow struct {
@@ -307,7 +312,7 @@ type portalPauseInfo struct {
307312
// We need this as when re-executing the portal, we are reusing the flow
308313
// with the new receiver, but not re-generating the physical plan.
309314
outputTypes []*types.T
310-
cleanup cleanupFuncStack
315+
cleanup cleanupFuncQueue
311316
}
312317
}
313318

0 commit comments

Comments
 (0)