From 7ad84700f3e974be8ef28bd11026ccb13d5f4bf8 Mon Sep 17 00:00:00 2001 From: James Cor Date: Tue, 29 Jul 2025 14:59:59 -0700 Subject: [PATCH 1/6] testing --- sql/aggregates.go | 2 +- sql/rowexec/builder.go | 17 +++++++++++++++++ sql/rowexec/merge_join.go | 4 +++- sql/rows.go | 6 ++---- 4 files changed, 23 insertions(+), 6 deletions(-) diff --git a/sql/aggregates.go b/sql/aggregates.go index ab09f44afe..ad793348e3 100644 --- a/sql/aggregates.go +++ b/sql/aggregates.go @@ -109,7 +109,7 @@ type WindowFrame interface { StartCurrentRow() bool // EndCurrentRow returns whether a frame end is CURRENT ROW EndCurrentRow() bool - // StartNFollowing returns a frame's start preceding Expression or nil + // StartNPreceding returns a frame's start preceding Expression or nil StartNPreceding() Expression // StartNFollowing returns a frame's start following Expression or nil StartNFollowing() Expression diff --git a/sql/rowexec/builder.go b/sql/rowexec/builder.go index 9133b1c443..06ea5ff4c8 100644 --- a/sql/rowexec/builder.go +++ b/sql/rowexec/builder.go @@ -16,6 +16,7 @@ package rowexec import ( "runtime/trace" + "sync" "github.com/dolthub/go-mysql-server/sql" "github.com/dolthub/go-mysql-server/sql/plan" @@ -58,3 +59,19 @@ func FinalizeIters(ctx *sql.Context, analyzed sql.Node, qFlags *sql.QueryFlags, iter = AddExpressionCloser(analyzed, iter) return iter, sch, nil } + +// TODO: find a proper place for this +var rowBuffers = sync.Pool{ + New: func() interface{} { + return make(sql.Row, 0, 4096) // TODO: this is apparently max number of columns + }, +} + +func GetRow(length int) sql.Row { + row := rowBuffers.Get().(sql.Row) + return row[:length] +} + +func PutRow(row sql.Row) { + rowBuffers.Put(row[:0]) +} diff --git a/sql/rowexec/merge_join.go b/sql/rowexec/merge_join.go index 5a92bcd9f2..3a6bdae81d 100644 --- a/sql/rowexec/merge_join.go +++ b/sql/rowexec/merge_join.go @@ -46,7 +46,8 @@ func newMergeJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode, return nil, err } - fullRow := make(sql.Row, len(row)+len(j.Left().Schema())+len(j.Right().Schema())) + fullRow := GetRow(len(row) + len(j.Left().Schema()) + len(j.Right().Schema())) + // TODO: what is this for fullRow[0] = row if len(row) > 0 { copy(fullRow[0:], row[:]) @@ -586,5 +587,6 @@ func (i *mergeJoinIter) Close(ctx *sql.Context) (err error) { } } + PutRow(i.fullRow) return err } diff --git a/sql/rows.go b/sql/rows.go index faf3a738bf..57f1ca048d 100644 --- a/sql/rows.go +++ b/sql/rows.go @@ -16,12 +16,10 @@ package sql import ( "fmt" + "github.com/dolthub/go-mysql-server/sql/values" + "github.com/dolthub/vitess/go/vt/proto/query" "io" "strings" - - "github.com/dolthub/vitess/go/vt/proto/query" - - "github.com/dolthub/go-mysql-server/sql/values" ) // Row is a tuple of values. From 7442b9229d0744d36f53e19f9b36d4b392b0c0ba Mon Sep 17 00:00:00 2001 From: James Cor Date: Wed, 30 Jul 2025 11:47:09 -0700 Subject: [PATCH 2/6] some buffer? --- sql/rowexec/agg.go | 3 ++- sql/rowexec/builder.go | 20 +------------------- sql/rowexec/merge_join.go | 8 ++++---- sql/rows.go | 17 +++++++++++++++++ 4 files changed, 24 insertions(+), 24 deletions(-) diff --git a/sql/rowexec/agg.go b/sql/rowexec/agg.go index 384e8efe91..548c27a9a8 100644 --- a/sql/rowexec/agg.go +++ b/sql/rowexec/agg.go @@ -238,8 +238,9 @@ func (i *groupByGroupingIter) Dispose() { } func groupingKey(ctx *sql.Context, exprs []sql.Expression, row sql.Row) (uint64, error) { - var keyRow = make(sql.Row, len(exprs)) var keySch = make(sql.Schema, len(exprs)) + keyRow := sql.GetRow(len(exprs)) + defer sql.PutRow(keyRow) for i, expr := range exprs { v, err := expr.Eval(ctx, row) if err != nil { diff --git a/sql/rowexec/builder.go b/sql/rowexec/builder.go index 06ea5ff4c8..6ee8d25241 100644 --- a/sql/rowexec/builder.go +++ b/sql/rowexec/builder.go @@ -15,11 +15,9 @@ package rowexec import ( - "runtime/trace" - "sync" - "github.com/dolthub/go-mysql-server/sql" "github.com/dolthub/go-mysql-server/sql/plan" + "runtime/trace" ) var DefaultBuilder = &BaseBuilder{} @@ -59,19 +57,3 @@ func FinalizeIters(ctx *sql.Context, analyzed sql.Node, qFlags *sql.QueryFlags, iter = AddExpressionCloser(analyzed, iter) return iter, sch, nil } - -// TODO: find a proper place for this -var rowBuffers = sync.Pool{ - New: func() interface{} { - return make(sql.Row, 0, 4096) // TODO: this is apparently max number of columns - }, -} - -func GetRow(length int) sql.Row { - row := rowBuffers.Get().(sql.Row) - return row[:length] -} - -func PutRow(row sql.Row) { - rowBuffers.Put(row[:0]) -} diff --git a/sql/rowexec/merge_join.go b/sql/rowexec/merge_join.go index 3a6bdae81d..0cb6bc2600 100644 --- a/sql/rowexec/merge_join.go +++ b/sql/rowexec/merge_join.go @@ -46,8 +46,8 @@ func newMergeJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode, return nil, err } - fullRow := GetRow(len(row) + len(j.Left().Schema()) + len(j.Right().Schema())) - // TODO: what is this for + fullRow := sql.GetRow(len(row) + len(j.Left().Schema()) + len(j.Right().Schema())) + // TODO: what is this for? fullRow[0] = row if len(row) > 0 { copy(fullRow[0:], row[:]) @@ -264,7 +264,7 @@ func (i *mergeJoinIter) Next(ctx *sql.Context) (sql.Row, error) { } nextState = msExhaustCheck case msSelect: - ret = i.copyReturnRow() + ret = i.copyReturnRow() // TODO: only copy once we know we're going to return this currLeftMatched := i.leftMatched ok, err := i.sel(ctx, ret) @@ -587,6 +587,6 @@ func (i *mergeJoinIter) Close(ctx *sql.Context) (err error) { } } - PutRow(i.fullRow) + sql.PutRow(i.fullRow) return err } diff --git a/sql/rows.go b/sql/rows.go index 57f1ca048d..98108f4422 100644 --- a/sql/rows.go +++ b/sql/rows.go @@ -20,6 +20,7 @@ import ( "github.com/dolthub/vitess/go/vt/proto/query" "io" "strings" + "sync" ) // Row is a tuple of values. @@ -67,6 +68,22 @@ func (r Row) Equals(ctx *Context, row Row, schema Schema) (bool, error) { return true, nil } +// TODO: find a proper place for this +var rowBuffers = sync.Pool{ + New: func() interface{} { + return make(Row, 0, 4096) // max number of columns for a table + }, +} + +func GetRow(length int) Row { + row := rowBuffers.Get().(Row) + return row[:length] +} + +func PutRow(row Row) { + rowBuffers.Put(row[:0]) +} + // FormatRow returns a formatted string representing this row's values func FormatRow(row Row) string { var sb strings.Builder From e59c03e024bc6062d380a3ea493a928207024720 Mon Sep 17 00:00:00 2001 From: James Cor Date: Wed, 30 Jul 2025 14:24:41 -0700 Subject: [PATCH 3/6] copy less in merge join --- sql/rowexec/merge_join.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/sql/rowexec/merge_join.go b/sql/rowexec/merge_join.go index 0cb6bc2600..bd6df7c699 100644 --- a/sql/rowexec/merge_join.go +++ b/sql/rowexec/merge_join.go @@ -47,8 +47,6 @@ func newMergeJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode, } fullRow := sql.GetRow(len(row) + len(j.Left().Schema()) + len(j.Right().Schema())) - // TODO: what is this for? - fullRow[0] = row if len(row) > 0 { copy(fullRow[0:], row[:]) } @@ -202,7 +200,6 @@ func (i *mergeJoinIter) Next(ctx *sql.Context) (sql.Row, error) { nextState = msExhaustCheck case msExhaustCheck: if i.lojFinalize() { - ret = i.copyReturnRow() nextState = msRetLeft } else if i.exhausted() { return nil, io.EOF @@ -228,7 +225,6 @@ func (i *mergeJoinIter) Next(ctx *sql.Context) (sql.Row, error) { if i.leftMatched { nextState = msIncLeft } else { - ret = i.copyReturnRow() nextState = msRetLeft } } else { @@ -243,7 +239,6 @@ func (i *mergeJoinIter) Next(ctx *sql.Context) (sql.Row, error) { left, _ := i.cmp.Left().Eval(ctx, i.fullRow) if left == nil { if i.typ.IsLeftOuter() && !i.leftMatched { - ret = i.copyReturnRow() nextState = msRetLeft } else { nextState = msIncLeft @@ -264,10 +259,9 @@ func (i *mergeJoinIter) Next(ctx *sql.Context) (sql.Row, error) { } nextState = msExhaustCheck case msSelect: - ret = i.copyReturnRow() // TODO: only copy once we know we're going to return this currLeftMatched := i.leftMatched - ok, err := i.sel(ctx, ret) + ok, err := i.sel(ctx, i.fullRow) if err != nil { return nil, err } @@ -296,14 +290,17 @@ func (i *mergeJoinIter) Next(ctx *sql.Context) (sql.Row, error) { // |i.incMatch| call incremented the left row. // |currLeftMatched| indicates whether |ret| has already // successfully met a join condition. + ret = i.copyReturnRow() return i.removeParentRow(i.nullifyRightRow(ret)), nil } else { nextState = msExhaustCheck } case msRet: + ret = i.copyReturnRow() return i.removeParentRow(ret), nil case msRetLeft: + ret = i.copyReturnRow() ret = i.removeParentRow(i.nullifyRightRow(ret)) err = i.incLeft(ctx) if err != nil { From af5352e14cee71cc6bf8a1abef2591df07d900c1 Mon Sep 17 00:00:00 2001 From: jycor Date: Wed, 30 Jul 2025 18:51:36 +0000 Subject: [PATCH 4/6] [ga-format-pr] Run ./format_repo.sh to fix formatting --- sql/rowexec/builder.go | 3 ++- sql/rows.go | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/rowexec/builder.go b/sql/rowexec/builder.go index 6ee8d25241..9133b1c443 100644 --- a/sql/rowexec/builder.go +++ b/sql/rowexec/builder.go @@ -15,9 +15,10 @@ package rowexec import ( + "runtime/trace" + "github.com/dolthub/go-mysql-server/sql" "github.com/dolthub/go-mysql-server/sql/plan" - "runtime/trace" ) var DefaultBuilder = &BaseBuilder{} diff --git a/sql/rows.go b/sql/rows.go index 98108f4422..3c5c5e4746 100644 --- a/sql/rows.go +++ b/sql/rows.go @@ -16,11 +16,13 @@ package sql import ( "fmt" - "github.com/dolthub/go-mysql-server/sql/values" - "github.com/dolthub/vitess/go/vt/proto/query" "io" "strings" "sync" + + "github.com/dolthub/vitess/go/vt/proto/query" + + "github.com/dolthub/go-mysql-server/sql/values" ) // Row is a tuple of values. From 700988d66259ea5fa176483c088d03db4d1c0b04 Mon Sep 17 00:00:00 2001 From: James Cor Date: Wed, 30 Jul 2025 15:23:36 -0700 Subject: [PATCH 5/6] copies are necessary because of lookaheads --- sql/rowexec/merge_join.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/rowexec/merge_join.go b/sql/rowexec/merge_join.go index bd6df7c699..04785df0b3 100644 --- a/sql/rowexec/merge_join.go +++ b/sql/rowexec/merge_join.go @@ -46,7 +46,7 @@ func newMergeJoinIter(ctx *sql.Context, b sql.NodeExecBuilder, j *plan.JoinNode, return nil, err } - fullRow := sql.GetRow(len(row) + len(j.Left().Schema()) + len(j.Right().Schema())) + fullRow := make(sql.Row, len(row)+len(j.Left().Schema())+len(j.Right().Schema())) if len(row) > 0 { copy(fullRow[0:], row[:]) } @@ -200,6 +200,7 @@ func (i *mergeJoinIter) Next(ctx *sql.Context) (sql.Row, error) { nextState = msExhaustCheck case msExhaustCheck: if i.lojFinalize() { + ret = i.copyReturnRow() nextState = msRetLeft } else if i.exhausted() { return nil, io.EOF @@ -225,6 +226,7 @@ func (i *mergeJoinIter) Next(ctx *sql.Context) (sql.Row, error) { if i.leftMatched { nextState = msIncLeft } else { + ret = i.copyReturnRow() nextState = msRetLeft } } else { @@ -239,6 +241,7 @@ func (i *mergeJoinIter) Next(ctx *sql.Context) (sql.Row, error) { left, _ := i.cmp.Left().Eval(ctx, i.fullRow) if left == nil { if i.typ.IsLeftOuter() && !i.leftMatched { + ret = i.copyReturnRow() nextState = msRetLeft } else { nextState = msIncLeft @@ -259,9 +262,10 @@ func (i *mergeJoinIter) Next(ctx *sql.Context) (sql.Row, error) { } nextState = msExhaustCheck case msSelect: + ret = i.copyReturnRow() currLeftMatched := i.leftMatched - ok, err := i.sel(ctx, i.fullRow) + ok, err := i.sel(ctx, ret) if err != nil { return nil, err } @@ -290,17 +294,14 @@ func (i *mergeJoinIter) Next(ctx *sql.Context) (sql.Row, error) { // |i.incMatch| call incremented the left row. // |currLeftMatched| indicates whether |ret| has already // successfully met a join condition. - ret = i.copyReturnRow() return i.removeParentRow(i.nullifyRightRow(ret)), nil } else { nextState = msExhaustCheck } case msRet: - ret = i.copyReturnRow() return i.removeParentRow(ret), nil case msRetLeft: - ret = i.copyReturnRow() ret = i.removeParentRow(i.nullifyRightRow(ret)) err = i.incLeft(ctx) if err != nil { @@ -584,6 +585,5 @@ func (i *mergeJoinIter) Close(ctx *sql.Context) (err error) { } } - sql.PutRow(i.fullRow) return err } From a37e01595edd9dbd43acf920acf23c1348d573a9 Mon Sep 17 00:00:00 2001 From: James Cor Date: Wed, 30 Jul 2025 16:08:31 -0700 Subject: [PATCH 6/6] use copy instead of custom function --- sql/rowexec/merge_join.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/rowexec/merge_join.go b/sql/rowexec/merge_join.go index 04785df0b3..cc63b7c8e4 100644 --- a/sql/rowexec/merge_join.go +++ b/sql/rowexec/merge_join.go @@ -375,7 +375,7 @@ func (i *mergeJoinIter) incMatch(ctx *sql.Context) error { if !i.leftDone { // rightBuf has already been validated, we don't need compare - copySubslice(i.fullRow, i.rightBuf[i.bufI], i.scopeLen+i.parentLen+i.leftRowLen) + copy(i.fullRow[i.scopeLen+i.parentLen+i.leftRowLen:], i.rightBuf[i.bufI]) i.bufI++ return nil } @@ -393,7 +393,7 @@ func (i *mergeJoinIter) incMatch(ctx *sql.Context) error { if i.lojFinalize() { // left joins expect the left row in |i.fullRow| as long // as the left iter is not exhausted. - copySubslice(i.fullRow, i.leftPeek, i.scopeLen+i.parentLen) + copy(i.fullRow[i.scopeLen+i.parentLen:], i.leftPeek) } return nil } @@ -401,8 +401,8 @@ func (i *mergeJoinIter) incMatch(ctx *sql.Context) error { // both lookaheads fail the join condition. Drain // lookahead rows / increment both iterators. i.matchIncLeft = true - copySubslice(i.fullRow, i.leftPeek, i.scopeLen+i.parentLen) - copySubslice(i.fullRow, i.rightPeek, i.scopeLen+i.parentLen+i.leftRowLen) + copy(i.fullRow[i.scopeLen+i.parentLen:], i.leftPeek) + copy(i.fullRow[i.scopeLen+i.parentLen+i.leftRowLen:], i.rightPeek) return nil } @@ -453,7 +453,7 @@ func (i *mergeJoinIter) resetMatchState() { // no match, no lookahead row, and no error. func (i *mergeJoinIter) peekMatch(ctx *sql.Context, iter sql.RowIter) (bool, sql.Row, error) { var off int - var restore sql.Row + var restore sql.Row // TODO: rowBuffer? switch iter { case i.left: off = i.scopeLen + i.parentLen @@ -476,17 +476,17 @@ func (i *mergeJoinIter) peekMatch(ctx *sql.Context, iter sql.RowIter) (bool, sql } // check if lookahead valid - copySubslice(i.fullRow, peek, off) + copy(i.fullRow[off:], peek) res, err := i.cmp.Compare(ctx, i.fullRow) if expression.ErrNilOperand.Is(err) { // revert change to output row if no match - copySubslice(i.fullRow, restore, off) + copy(i.fullRow[off:], restore) } else if err != nil { return false, nil, err } if res != 0 { // revert change to output row if no match - copySubslice(i.fullRow, restore, off) + copy(i.fullRow[off:], restore) } return res == 0, peek, nil }