Skip to content

Commit 8f5f026

Browse files
committed
Add concurrent result sets in db.Query.Query(...)
1 parent 8f235f3 commit 8f5f026

File tree

5 files changed

+282
-18
lines changed

5 files changed

+282
-18
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
* Supported `sql.Null*` from `database/sql` as query params in `toValue` func
2+
* Added `WithConcurrentResultSets` option for `db.Query().Query()`
23

34
## v3.118.0
45
* Added support for nullable `Date32`, `Datetime64`, `Timestamp64`, and `Interval64` types in the `optional` parameter builder

internal/query/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,7 @@ func clientQuery(ctx context.Context, pool sessionPool, q string, opts ...option
404404
if err != nil {
405405
return xerrors.WithStackTrace(err)
406406
}
407+
407408
defer func() {
408409
_ = streamResult.Close(ctx)
409410
}()

internal/query/client_test.go

Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1087,6 +1087,241 @@ func TestClient(t *testing.T) {
10871087
require.Nil(t, r3)
10881088
}
10891089
})
1090+
t.Run("ConcurrentResultSets", func(t *testing.T) {
1091+
ctrl := gomock.NewController(t)
1092+
r, err := clientQuery(ctx, testPool(ctx, func(ctx context.Context) (*Session, error) {
1093+
stream := NewMockQueryService_ExecuteQueryClient(ctrl)
1094+
stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{
1095+
Status: Ydb.StatusIds_SUCCESS,
1096+
TxMeta: &Ydb_Query.TransactionMeta{
1097+
Id: "456",
1098+
},
1099+
ResultSetIndex: 0,
1100+
ResultSet: &Ydb.ResultSet{
1101+
Columns: []*Ydb.Column{
1102+
{
1103+
Name: "a",
1104+
Type: &Ydb.Type{
1105+
Type: &Ydb.Type_TypeId{
1106+
TypeId: Ydb.Type_UINT64,
1107+
},
1108+
},
1109+
},
1110+
{
1111+
Name: "b",
1112+
Type: &Ydb.Type{
1113+
Type: &Ydb.Type_TypeId{
1114+
TypeId: Ydb.Type_UTF8,
1115+
},
1116+
},
1117+
},
1118+
},
1119+
Rows: []*Ydb.Value{
1120+
{
1121+
Items: []*Ydb.Value{{
1122+
Value: &Ydb.Value_Uint64Value{
1123+
Uint64Value: 1,
1124+
},
1125+
}, {
1126+
Value: &Ydb.Value_TextValue{
1127+
TextValue: "1",
1128+
},
1129+
}},
1130+
},
1131+
{
1132+
Items: []*Ydb.Value{{
1133+
Value: &Ydb.Value_Uint64Value{
1134+
Uint64Value: 2,
1135+
},
1136+
}, {
1137+
Value: &Ydb.Value_TextValue{
1138+
TextValue: "2",
1139+
},
1140+
}},
1141+
},
1142+
{
1143+
Items: []*Ydb.Value{{
1144+
Value: &Ydb.Value_Uint64Value{
1145+
Uint64Value: 3,
1146+
},
1147+
}, {
1148+
Value: &Ydb.Value_TextValue{
1149+
TextValue: "3",
1150+
},
1151+
}},
1152+
},
1153+
},
1154+
},
1155+
}, nil)
1156+
stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{
1157+
Status: Ydb.StatusIds_SUCCESS,
1158+
ResultSetIndex: 1,
1159+
ResultSet: &Ydb.ResultSet{
1160+
Columns: []*Ydb.Column{
1161+
{
1162+
Name: "c",
1163+
Type: &Ydb.Type{
1164+
Type: &Ydb.Type_TypeId{
1165+
TypeId: Ydb.Type_UINT64,
1166+
},
1167+
},
1168+
},
1169+
{
1170+
Name: "d",
1171+
Type: &Ydb.Type{
1172+
Type: &Ydb.Type_TypeId{
1173+
TypeId: Ydb.Type_UTF8,
1174+
},
1175+
},
1176+
},
1177+
{
1178+
Name: "e",
1179+
Type: &Ydb.Type{
1180+
Type: &Ydb.Type_TypeId{
1181+
TypeId: Ydb.Type_BOOL,
1182+
},
1183+
},
1184+
},
1185+
},
1186+
Rows: []*Ydb.Value{
1187+
{
1188+
Items: []*Ydb.Value{{
1189+
Value: &Ydb.Value_Uint64Value{
1190+
Uint64Value: 1,
1191+
},
1192+
}, {
1193+
Value: &Ydb.Value_TextValue{
1194+
TextValue: "1",
1195+
},
1196+
}, {
1197+
Value: &Ydb.Value_BoolValue{
1198+
BoolValue: true,
1199+
},
1200+
}},
1201+
},
1202+
{
1203+
Items: []*Ydb.Value{{
1204+
Value: &Ydb.Value_Uint64Value{
1205+
Uint64Value: 2,
1206+
},
1207+
}, {
1208+
Value: &Ydb.Value_TextValue{
1209+
TextValue: "2",
1210+
},
1211+
}, {
1212+
Value: &Ydb.Value_BoolValue{
1213+
BoolValue: false,
1214+
},
1215+
}},
1216+
},
1217+
},
1218+
},
1219+
}, nil)
1220+
stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{
1221+
Status: Ydb.StatusIds_SUCCESS,
1222+
ResultSetIndex: 0,
1223+
ResultSet: &Ydb.ResultSet{
1224+
Rows: []*Ydb.Value{
1225+
{
1226+
Items: []*Ydb.Value{{
1227+
Value: &Ydb.Value_Uint64Value{
1228+
Uint64Value: 4,
1229+
},
1230+
}, {
1231+
Value: &Ydb.Value_TextValue{
1232+
TextValue: "4",
1233+
},
1234+
}},
1235+
},
1236+
{
1237+
Items: []*Ydb.Value{{
1238+
Value: &Ydb.Value_Uint64Value{
1239+
Uint64Value: 5,
1240+
},
1241+
}, {
1242+
Value: &Ydb.Value_TextValue{
1243+
TextValue: "5",
1244+
},
1245+
}},
1246+
},
1247+
},
1248+
},
1249+
}, nil)
1250+
stream.EXPECT().Recv().Return(nil, io.EOF)
1251+
client := NewMockQueryServiceClient(ctrl)
1252+
client.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).Return(stream, nil)
1253+
1254+
return newTestSessionWithClient("123", client, true), nil
1255+
}), "", query.WithConcurrentResultSets(true))
1256+
require.NoError(t, err)
1257+
{
1258+
rs, err := r.NextResultSet(ctx)
1259+
require.NoError(t, err)
1260+
r1, err := rs.NextRow(ctx)
1261+
require.NoError(t, err)
1262+
var (
1263+
a uint64
1264+
b string
1265+
)
1266+
err = r1.Scan(&a, &b)
1267+
require.NoError(t, err)
1268+
require.EqualValues(t, 1, a)
1269+
require.EqualValues(t, "1", b)
1270+
r2, err := rs.NextRow(ctx)
1271+
require.NoError(t, err)
1272+
err = r2.Scan(&a, &b)
1273+
require.NoError(t, err)
1274+
require.EqualValues(t, 2, a)
1275+
require.EqualValues(t, "2", b)
1276+
r3, err := rs.NextRow(ctx)
1277+
require.NoError(t, err)
1278+
err = r3.Scan(&a, &b)
1279+
require.NoError(t, err)
1280+
require.EqualValues(t, 3, a)
1281+
require.EqualValues(t, "3", b)
1282+
r4, err := rs.NextRow(ctx)
1283+
require.NoError(t, err)
1284+
err = r4.Scan(&a, &b)
1285+
require.NoError(t, err)
1286+
require.EqualValues(t, 4, a)
1287+
require.EqualValues(t, "4", b)
1288+
r5, err := rs.NextRow(ctx)
1289+
require.NoError(t, err)
1290+
err = r5.Scan(&a, &b)
1291+
require.NoError(t, err)
1292+
require.EqualValues(t, 5, a)
1293+
require.EqualValues(t, "5", b)
1294+
r6, err := rs.NextRow(ctx)
1295+
require.ErrorIs(t, err, io.EOF)
1296+
require.Nil(t, r6)
1297+
}
1298+
{
1299+
rs, err := r.NextResultSet(ctx)
1300+
require.NoError(t, err)
1301+
r1, err := rs.NextRow(ctx)
1302+
require.NoError(t, err)
1303+
var (
1304+
a uint64
1305+
b string
1306+
c bool
1307+
)
1308+
err = r1.Scan(&a, &b, &c)
1309+
require.NoError(t, err)
1310+
require.EqualValues(t, 1, a)
1311+
require.EqualValues(t, "1", b)
1312+
require.EqualValues(t, true, c)
1313+
r2, err := rs.NextRow(ctx)
1314+
require.NoError(t, err)
1315+
err = r2.Scan(&a, &b, &c)
1316+
require.NoError(t, err)
1317+
require.EqualValues(t, 2, a)
1318+
require.EqualValues(t, "2", b)
1319+
require.EqualValues(t, false, c)
1320+
r3, err := rs.NextRow(ctx)
1321+
require.ErrorIs(t, err, io.EOF)
1322+
require.Nil(t, r3)
1323+
}
1324+
})
10901325
t.Run("AllowImplicitSessions", func(t *testing.T) {
10911326
_, err := mockClientForImplicitSessionTest(ctx, t).
10921327
Query(ctx, "SELECT 1")

internal/query/result.go

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ import (
88
"time"
99

1010
"github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
11+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
1112
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue"
1213
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query"
1314

1415
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/result"
1516
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
1617
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stats"
18+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/types"
1719
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1820
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xiter"
1921
"github.com/ydb-platform/ydb-go-sdk/v3/query"
@@ -433,34 +435,59 @@ func exactlyOneResultSetFromResult(ctx context.Context, r result.Result) (rs res
433435
return MaterializedResultSet(rs.Index(), rs.Columns(), rs.ColumnTypes(), rows), nil
434436
}
435437

436-
func resultToMaterializedResult(ctx context.Context, r result.Result) (result.Result, error) {
437-
var resultSets []result.Set
438+
func resultToMaterializedResult(ctx context.Context, r *streamResult) (result.Result, error) {
439+
type resultSet struct {
440+
rows []query.Row
441+
columns []*Ydb.Column
442+
}
443+
resultSetByIndex := make(map[int64]resultSet)
438444

439445
for {
440-
rs, err := r.NextResultSet(ctx)
446+
if ctx.Err() != nil {
447+
return nil, xerrors.WithStackTrace(ctx.Err())
448+
}
449+
if r.closer.Err() != nil {
450+
return nil, xerrors.WithStackTrace(r.closer.Err())
451+
}
452+
453+
rs := resultSetByIndex[r.lastPart.GetResultSetIndex()]
454+
if len(rs.columns) == 0 {
455+
rs.columns = r.lastPart.GetResultSet().GetColumns()
456+
}
457+
458+
rows := make([]query.Row, len(r.lastPart.GetResultSet().GetRows()))
459+
for i := range r.lastPart.GetResultSet().GetRows() {
460+
rows[i] = NewRow(rs.columns, r.lastPart.GetResultSet().GetRows()[i])
461+
}
462+
rs.rows = append(rs.rows, rows...)
463+
464+
resultSetByIndex[r.lastPart.GetResultSetIndex()] = rs
465+
466+
var err error
467+
r.lastPart, err = r.nextPart(ctx)
441468
if err != nil {
442469
if xerrors.Is(err, io.EOF) {
443470
break
444471
}
445472

446473
return nil, xerrors.WithStackTrace(err)
447474
}
475+
if r.lastPart.GetExecStats() != nil && r.statsCallback != nil {
476+
r.statsCallback(stats.FromQueryStats(r.lastPart.GetExecStats()))
477+
}
478+
}
448479

449-
var rows []query.Row
450-
for {
451-
row, err := rs.NextRow(ctx)
452-
if err != nil {
453-
if xerrors.Is(err, io.EOF) {
454-
break
455-
}
456-
457-
return nil, xerrors.WithStackTrace(err)
458-
}
480+
resultSets := make([]result.Set, len(resultSetByIndex))
481+
for rsIndex, rs := range resultSetByIndex {
482+
columnNames := make([]string, len(rs.columns))
483+
columnTypes := make([]types.Type, len(rs.columns))
459484

460-
rows = append(rows, row)
485+
for i := range rs.columns {
486+
columnNames[i] = rs.columns[i].GetName()
487+
columnTypes[i] = types.TypeFromYDB(rs.columns[i].GetType())
461488
}
462489

463-
resultSets = append(resultSets, MaterializedResultSet(rs.Index(), rs.Columns(), rs.ColumnTypes(), rows))
490+
resultSets[rsIndex] = MaterializedResultSet(int(rsIndex), columnNames, columnTypes, rs.rows)
464491
}
465492

466493
return &materializedResult{

query/execute_options.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ func WithResponsePartLimitSizeBytes(size int64) ExecuteOption {
7070
return options.WithResponsePartLimitSizeBytes(size)
7171
}
7272

73-
//func WithConcurrentResultSets(isEnabled bool) ExecuteOption {
74-
// return options.WithConcurrentResultSets(isEnabled)
75-
//}
73+
func WithConcurrentResultSets(isEnabled bool) ExecuteOption {
74+
return options.WithConcurrentResultSets(isEnabled)
75+
}
7676

7777
func WithCallOptions(opts ...grpc.CallOption) ExecuteOption {
7878
return options.WithCallOptions(opts...)

0 commit comments

Comments
 (0)