Skip to content

Commit 2de8b88

Browse files
committed
feat: support timestampbound for single-use read-only tx
1 parent f9ff983 commit 2de8b88

File tree

8 files changed

+267
-44
lines changed

8 files changed

+267
-44
lines changed

conn.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1117,6 +1117,9 @@ func (c *conn) inReadWriteTransaction() bool {
11171117
}
11181118

11191119
func queryInSingleUse(ctx context.Context, c *spanner.Client, statement spanner.Statement, tb spanner.TimestampBound, options ExecOptions) *spanner.RowIterator {
1120+
if options.TimestampBound != nil {
1121+
tb = *options.TimestampBound
1122+
}
11201123
return c.Single().WithTimestampBound(tb).QueryWithOptions(ctx, statement, options.QueryOptions)
11211124
}
11221125

driver.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,10 @@ type ExecOptions struct {
167167
TransactionOptions spanner.TransactionOptions
168168
// QueryOptions are the query options that will be used for the statement.
169169
QueryOptions spanner.QueryOptions
170+
// TimestampBound is the timestamp bound that will be used for the statement
171+
// if it is a query outside a transaction. Setting this option will override
172+
// the default TimestampBound that is set on the connection.
173+
TimestampBound *spanner.TimestampBound
170174

171175
// PartitionedQueryOptions are used for partitioned queries, and ignored
172176
// for all other statements.

spannerlib/exported/connection.go

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ func BeginTransaction(poolId, connId int64, txOptsBytes []byte) *Message {
3737
return conn.BeginTransaction(&txOpts)
3838
}
3939

40-
func Execute(poolId, connId int64, statementBytes []byte) *Message {
41-
statement := spannerpb.ExecuteBatchDmlRequest_Statement{}
42-
if err := proto.Unmarshal(statementBytes, &statement); err != nil {
40+
func Execute(poolId, connId int64, executeSqlRequestBytes []byte) *Message {
41+
statement := spannerpb.ExecuteSqlRequest{}
42+
if err := proto.Unmarshal(executeSqlRequestBytes, &statement); err != nil {
4343
return errMessage(err)
4444
}
4545
conn, err := findConnection(poolId, connId)
@@ -163,15 +163,15 @@ func convertIsolationLevel(level spannerpb.TransactionOptions_IsolationLevel) sq
163163
return sql.LevelDefault
164164
}
165165

166-
func (conn *Connection) Execute(statement *spannerpb.ExecuteBatchDmlRequest_Statement) *Message {
166+
func (conn *Connection) Execute(statement *spannerpb.ExecuteSqlRequest) *Message {
167167
return execute(conn, conn.backend.Conn, statement)
168168
}
169169

170170
func (conn *Connection) ExecuteBatchDml(statements []*spannerpb.ExecuteBatchDmlRequest_Statement) *Message {
171171
return executeBatchDml(conn, conn.backend.Conn, statements)
172172
}
173173

174-
func execute(conn *Connection, executor queryExecutor, statement *spannerpb.ExecuteBatchDmlRequest_Statement) *Message {
174+
func execute(conn *Connection, executor queryExecutor, statement *spannerpb.ExecuteSqlRequest) *Message {
175175
params := extractParams(statement)
176176
it, err := executor.QueryContext(context.Background(), statement.Sql, params...)
177177
if err != nil {
@@ -210,7 +210,12 @@ func executeBatchDml(conn *Connection, executor queryExecutor, statements []*spa
210210
return errMessage(err)
211211
}
212212
for _, statement := range statements {
213-
params := extractParams(statement)
213+
request := &spannerpb.ExecuteSqlRequest{
214+
Sql: statement.Sql,
215+
Params: statement.Params,
216+
ParamTypes: statement.ParamTypes,
217+
}
218+
params := extractParams(request)
214219
_, err := executor.ExecContext(context.Background(), statement.Sql, params...)
215220
if err != nil {
216221
return errMessage(err)
@@ -240,17 +245,18 @@ func executeBatchDml(conn *Connection, executor queryExecutor, statements []*spa
240245
return &Message{Res: res}
241246
}
242247

243-
func extractParams(statement *spannerpb.ExecuteBatchDmlRequest_Statement) []any {
248+
func extractParams(statement *spannerpb.ExecuteSqlRequest) []any {
244249
paramsLen := 1
245250
if statement.Params != nil {
246251
paramsLen = 1 + len(statement.Params.Fields)
247252
}
248253
params := make([]any, paramsLen)
249254
params = append(params, spannerdriver.ExecOptions{
250255
DecodeOption: spannerdriver.DecodeOptionProto,
256+
TimestampBound: extractTimestampBound(statement),
251257
ReturnResultSetMetadata: true,
252258
ReturnResultSetStats: true,
253-
DirectExecute: true,
259+
DirectExecuteQuery: true,
254260
})
255261
if statement.Params != nil {
256262
if statement.ParamTypes == nil {
@@ -266,3 +272,23 @@ func extractParams(statement *spannerpb.ExecuteBatchDmlRequest_Statement) []any
266272
}
267273
return params
268274
}
275+
276+
func extractTimestampBound(statement *spannerpb.ExecuteSqlRequest) *spanner.TimestampBound {
277+
if statement.Transaction != nil && statement.Transaction.GetSingleUse() != nil && statement.Transaction.GetSingleUse().GetReadOnly() != nil {
278+
ro := statement.Transaction.GetSingleUse().GetReadOnly()
279+
var t spanner.TimestampBound
280+
if ro.GetStrong() {
281+
t = spanner.StrongRead()
282+
} else if ro.GetMaxStaleness() != nil {
283+
t = spanner.MaxStaleness(ro.GetMaxStaleness().AsDuration())
284+
} else if ro.GetExactStaleness() != nil {
285+
t = spanner.ExactStaleness(ro.GetExactStaleness().AsDuration())
286+
} else if ro.GetMinReadTimestamp() != nil {
287+
t = spanner.MinReadTimestamp(ro.GetMinReadTimestamp().AsTime())
288+
} else if ro.GetReadTimestamp() != nil {
289+
t = spanner.ReadTimestamp(ro.GetReadTimestamp().AsTime())
290+
}
291+
return &t
292+
}
293+
return nil
294+
}
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
package exported
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"fmt"
7+
"reflect"
8+
"testing"
9+
10+
"cloud.google.com/go/spanner"
11+
"cloud.google.com/go/spanner/admin/database/apiv1/databasepb"
12+
sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
13+
"github.com/googleapis/go-sql-spanner/testutil"
14+
"google.golang.org/api/option"
15+
"google.golang.org/protobuf/proto"
16+
"google.golang.org/protobuf/types/known/durationpb"
17+
"google.golang.org/protobuf/types/known/structpb"
18+
)
19+
20+
func TestSimpleQuery(t *testing.T) {
21+
t.Parallel()
22+
23+
dsn, _, teardown := setupTestDBConnection(t)
24+
defer teardown()
25+
26+
pool := CreatePool(dsn)
27+
conn := CreateConnection(pool.ObjectId)
28+
statement := sppb.ExecuteSqlRequest{
29+
Sql: testutil.SelectFooFromBar,
30+
}
31+
statementBytes, err := proto.Marshal(&statement)
32+
if err != nil {
33+
t.Fatalf("failed to marshal statement: %v", err)
34+
}
35+
results := Execute(pool.ObjectId, conn.ObjectId, statementBytes)
36+
metadata := Metadata(pool.ObjectId, conn.ObjectId, results.ObjectId)
37+
if metadata.Code != 0 {
38+
t.Fatalf("metadata.Code: %v", metadata.Code)
39+
}
40+
for {
41+
row := Next(pool.ObjectId, conn.ObjectId, results.ObjectId)
42+
if row.Length() == 0 {
43+
break
44+
}
45+
values := structpb.ListValue{}
46+
if err := proto.Unmarshal(row.Res, &values); err != nil {
47+
t.Fatalf("failed to unmarshal row: %v", err)
48+
}
49+
}
50+
stats := ResultSetStats(pool.ObjectId, conn.ObjectId, results.ObjectId)
51+
if stats.Code != 0 {
52+
t.Fatalf("stats.Code: %v", stats.Code)
53+
}
54+
CloseRows(pool.ObjectId, conn.ObjectId, results.ObjectId)
55+
CloseConnection(pool.ObjectId, conn.ObjectId)
56+
ClosePool(pool.ObjectId)
57+
}
58+
59+
func TestQueryWithTimestampBound(t *testing.T) {
60+
t.Parallel()
61+
62+
dsn, server, teardown := setupTestDBConnection(t)
63+
defer teardown()
64+
65+
pool := CreatePool(dsn)
66+
conn := CreateConnection(pool.ObjectId)
67+
statement := sppb.ExecuteSqlRequest{
68+
Sql: testutil.SelectFooFromBar,
69+
Transaction: &sppb.TransactionSelector{
70+
Selector: &sppb.TransactionSelector_SingleUse{
71+
SingleUse: &sppb.TransactionOptions{
72+
Mode: &sppb.TransactionOptions_ReadOnly_{
73+
ReadOnly: &sppb.TransactionOptions_ReadOnly{
74+
TimestampBound: &sppb.TransactionOptions_ReadOnly_MaxStaleness{
75+
MaxStaleness: &durationpb.Duration{Seconds: 10},
76+
},
77+
},
78+
},
79+
},
80+
},
81+
},
82+
}
83+
statementBytes, err := proto.Marshal(&statement)
84+
if err != nil {
85+
t.Fatalf("failed to marshal statement: %v", err)
86+
}
87+
results := Execute(pool.ObjectId, conn.ObjectId, statementBytes)
88+
metadata := Metadata(pool.ObjectId, conn.ObjectId, results.ObjectId)
89+
if metadata.Code != 0 {
90+
t.Fatalf("metadata.Code: %v", metadata.Code)
91+
}
92+
for {
93+
row := Next(pool.ObjectId, conn.ObjectId, results.ObjectId)
94+
if row.Length() == 0 {
95+
break
96+
}
97+
values := structpb.ListValue{}
98+
if err := proto.Unmarshal(row.Res, &values); err != nil {
99+
t.Fatalf("failed to unmarshal row: %v", err)
100+
}
101+
}
102+
stats := ResultSetStats(pool.ObjectId, conn.ObjectId, results.ObjectId)
103+
if stats.Code != 0 {
104+
t.Fatalf("stats.Code: %v", stats.Code)
105+
}
106+
CloseRows(pool.ObjectId, conn.ObjectId, results.ObjectId)
107+
CloseConnection(pool.ObjectId, conn.ObjectId)
108+
ClosePool(pool.ObjectId)
109+
110+
requests := drainRequestsFromServer(server.TestSpanner)
111+
sqlRequests := requestsOfType(requests, reflect.TypeOf(&sppb.ExecuteSqlRequest{}))
112+
if g, w := len(sqlRequests), 1; g != w {
113+
t.Fatalf("sql requests count mismatch\nGot: %v\nWant: %v", g, w)
114+
}
115+
req := sqlRequests[0].(*sppb.ExecuteSqlRequest)
116+
if req.Transaction == nil {
117+
t.Fatalf("missing transaction for ExecuteSqlRequest")
118+
}
119+
if req.Transaction.GetSingleUse() == nil {
120+
t.Fatalf("missing single use selector for ExecuteSqlRequest")
121+
}
122+
if req.Transaction.GetSingleUse().GetReadOnly() == nil {
123+
t.Fatalf("missing read-only option for ExecuteSqlRequest")
124+
}
125+
if req.Transaction.GetSingleUse().GetReadOnly().GetMaxStaleness() == nil {
126+
t.Fatalf("missing max staleness timestampbound for ExecuteSqlRequest")
127+
}
128+
if g, w := req.Transaction.GetSingleUse().GetReadOnly().GetMaxStaleness().GetSeconds(), int64(10); g != w {
129+
t.Fatalf("max staleness seconds mismatch\n Got: %v\nWant: %v", g, w)
130+
}
131+
}
132+
133+
func setupTestDBConnection(t *testing.T) (dsn string, server *testutil.MockedSpannerInMemTestServer, teardown func()) {
134+
return setupTestDBConnectionWithParams(t, "")
135+
}
136+
137+
func setupTestDBConnectionWithDialect(t *testing.T, dialect databasepb.DatabaseDialect) (dsn string, server *testutil.MockedSpannerInMemTestServer, teardown func()) {
138+
return setupTestDBConnectionWithParamsAndDialect(t, "", dialect)
139+
}
140+
141+
func setupTestDBConnectionWithParams(t *testing.T, params string) (dsn string, server *testutil.MockedSpannerInMemTestServer, teardown func()) {
142+
return setupTestDBConnectionWithParamsAndDialect(t, params, databasepb.DatabaseDialect_GOOGLE_STANDARD_SQL)
143+
}
144+
145+
func setupMockedTestServerWithDialect(t *testing.T, dialect databasepb.DatabaseDialect) (server *testutil.MockedSpannerInMemTestServer, client *spanner.Client, teardown func()) {
146+
return setupMockedTestServerWithConfigAndClientOptionsAndDialect(t, spanner.ClientConfig{}, []option.ClientOption{}, dialect)
147+
}
148+
149+
func setupTestDBConnectionWithParamsAndDialect(t *testing.T, params string, dialect databasepb.DatabaseDialect) (dsn string, server *testutil.MockedSpannerInMemTestServer, teardown func()) {
150+
server, _, serverTeardown := setupMockedTestServerWithDialect(t, dialect)
151+
dsn = fmt.Sprintf("%s/projects/p/instances/i/databases/d?useplaintext=true;%s", server.Address, params)
152+
db, err := sql.Open("spanner", dsn)
153+
if err != nil {
154+
serverTeardown()
155+
t.Fatal(err)
156+
}
157+
return dsn, server, func() {
158+
_ = db.Close()
159+
serverTeardown()
160+
}
161+
}
162+
163+
func setupMockedTestServerWithConfigAndClientOptionsAndDialect(t *testing.T, config spanner.ClientConfig, clientOptions []option.ClientOption, dialect databasepb.DatabaseDialect) (server *testutil.MockedSpannerInMemTestServer, client *spanner.Client, teardown func()) {
164+
server, opts, serverTeardown := testutil.NewMockedSpannerInMemTestServer(t)
165+
server.SetupSelectDialectResult(dialect)
166+
167+
opts = append(opts, clientOptions...)
168+
ctx := context.Background()
169+
formattedDatabase := fmt.Sprintf("projects/%s/instances/%s/databases/%s", "[PROJECT]", "[INSTANCE]", "[DATABASE]")
170+
config.DisableNativeMetrics = true
171+
client, err := spanner.NewClientWithConfig(ctx, formattedDatabase, config, opts...)
172+
if err != nil {
173+
t.Fatal(err)
174+
}
175+
return server, client, func() {
176+
client.Close()
177+
serverTeardown()
178+
}
179+
}
180+
181+
func requestsOfType(requests []interface{}, t reflect.Type) []interface{} {
182+
res := make([]interface{}, 0)
183+
for _, req := range requests {
184+
if reflect.TypeOf(req) == t {
185+
res = append(res, req)
186+
}
187+
}
188+
return res
189+
}
190+
191+
func drainRequestsFromServer(server testutil.InMemSpannerServer) []interface{} {
192+
var reqs []interface{}
193+
loop:
194+
for {
195+
select {
196+
case req := <-server.ReceivedRequests():
197+
reqs = append(reqs, req)
198+
default:
199+
break loop
200+
}
201+
}
202+
return reqs
203+
}

spannerlib/exported/pool_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ import (
1010
)
1111

1212
func TestExecute(t *testing.T) {
13-
pool := CreatePool()
14-
conn := CreateConnection(pool.ObjectId, "appdev-soda-spanner-staging", "knut-test-ycsb", "knut-test-db")
13+
pool := CreatePool("projects/appdev-soda-spanner-staging/instances/knut-test-ycsb/databases/knut-test-db")
14+
conn := CreateConnection(pool.ObjectId)
1515
stmt := spannerpb.ExecuteBatchDmlRequest_Statement{
1616
Sql: "select * from all_types where col_varchar=$1 /*and col_bigint=@id*/ limit 10",
1717
Params: &structpb.Struct{
@@ -39,8 +39,8 @@ func TestExecute(t *testing.T) {
3939
}
4040

4141
func TestExecuteDml(t *testing.T) {
42-
pool := CreatePool()
43-
conn := CreateConnection(pool.ObjectId, "appdev-soda-spanner-staging", "knut-test-ycsb", "knut-test-db")
42+
pool := CreatePool("projects/appdev-soda-spanner-staging/instances/knut-test-ycsb/databases/knut-test-db")
43+
conn := CreateConnection(pool.ObjectId)
4444
txOpts := &spannerpb.TransactionOptions{
4545
Mode: &spannerpb.TransactionOptions_ReadOnly_{
4646
ReadOnly: &spannerpb.TransactionOptions_ReadOnly{},

spannerlib/exported/transaction.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
)
99

1010
func ExecuteTransaction(poolId, connId, txId int64, statementBytes []byte) *Message {
11-
statement := spannerpb.ExecuteBatchDmlRequest_Statement{}
11+
statement := spannerpb.ExecuteSqlRequest{}
1212
if err := proto.Unmarshal(statementBytes, &statement); err != nil {
1313
return errMessage(err)
1414
}
@@ -62,7 +62,7 @@ func (tx *transaction) Close() *Message {
6262
return &Message{}
6363
}
6464

65-
func (tx *transaction) Execute(statement *spannerpb.ExecuteBatchDmlRequest_Statement) *Message {
65+
func (tx *transaction) Execute(statement *spannerpb.ExecuteSqlRequest) *Message {
6666
return execute(tx.conn, tx.backend, statement)
6767
}
6868

spannerlib/go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ require (
88
cloud.google.com/go/spanner v1.82.0
99
github.com/google/uuid v1.6.0
1010
github.com/googleapis/go-sql-spanner v1.13.2
11+
google.golang.org/api v0.237.0
1112
google.golang.org/grpc v1.73.0
1213
google.golang.org/protobuf v1.36.6
1314
)
@@ -32,6 +33,7 @@ require (
3233
github.com/go-logr/logr v1.4.2 // indirect
3334
github.com/go-logr/stdr v1.2.2 // indirect
3435
github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect
36+
github.com/golang/protobuf v1.5.4 // indirect
3537
github.com/google/s2a-go v0.1.9 // indirect
3638
github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect
3739
github.com/googleapis/gax-go/v2 v2.14.2 // indirect
@@ -56,7 +58,6 @@ require (
5658
golang.org/x/sys v0.33.0 // indirect
5759
golang.org/x/text v0.26.0 // indirect
5860
golang.org/x/time v0.12.0 // indirect
59-
google.golang.org/api v0.237.0 // indirect
6061
google.golang.org/genproto v0.0.0-20250505200425-f936aa4a68b2 // indirect
6162
google.golang.org/genproto/googleapis/api v0.0.0-20250505200425-f936aa4a68b2 // indirect
6263
google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect

0 commit comments

Comments
 (0)