Skip to content

Commit 219fe6e

Browse files
authored
feat: support default isolation level for connection (#404)
* feat: support isolation level REPEATABLE READ Add support for isolation level REPEATABLE READ with the BeginTx function. This allows the caller to specify the isolation level for a single transaction. A follow-up pull request will add support for setting the default isolation level that should be used by a connection. * feat: support default isolation level for connection Support setting a default isolation level for a connection and connector. All read/write ransactions on a connection will use the default isolation level, unless an isolation level is specified in the BeginTx function call.
1 parent 95ef5f3 commit 219fe6e

File tree

4 files changed

+272
-1
lines changed

4 files changed

+272
-1
lines changed

conn.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,13 @@ type SpannerConn interface {
114114
// mode and for read-only transaction.
115115
SetReadOnlyStaleness(staleness spanner.TimestampBound) error
116116

117+
// IsolationLevel returns the current default isolation level that is
118+
// used for read/write transactions on this connection.
119+
IsolationLevel() sql.IsolationLevel
120+
// SetIsolationLevel sets the default isolation level to use for read/write
121+
// transactions on this connection.
122+
SetIsolationLevel(level sql.IsolationLevel) error
123+
117124
// TransactionTag returns the transaction tag that will be applied to the next
118125
// read/write transaction on this connection. The transaction tag that is set
119126
// on the connection is cleared when a read/write transaction is started.
@@ -235,6 +242,10 @@ type conn struct {
235242
autocommitDMLMode AutocommitDMLMode
236243
// readOnlyStaleness is used for queries in autocommit mode and for read-only transactions.
237244
readOnlyStaleness spanner.TimestampBound
245+
// isolationLevel determines the default isolation level that is used for read/write
246+
// transactions on this connection. This default is ignored if the BeginTx function is
247+
// called with an isolation level other than sql.LevelDefault.
248+
isolationLevel sql.IsolationLevel
238249

239250
// execOptions are applied to the next statement or transaction that is executed
240251
// on this connection. It can also be set by passing it in as an argument to
@@ -309,6 +320,15 @@ func (c *conn) setReadOnlyStaleness(staleness spanner.TimestampBound) (driver.Re
309320
return driver.ResultNoRows, nil
310321
}
311322

323+
func (c *conn) IsolationLevel() sql.IsolationLevel {
324+
return c.isolationLevel
325+
}
326+
327+
func (c *conn) SetIsolationLevel(level sql.IsolationLevel) error {
328+
c.isolationLevel = level
329+
return nil
330+
}
331+
312332
func (c *conn) MaxCommitDelay() time.Duration {
313333
return *c.execOptions.TransactionOptions.CommitOptions.MaxCommitDelay
314334
}
@@ -633,6 +653,7 @@ func (c *conn) ResetSession(_ context.Context) error {
633653
c.autoBatchDmlUpdateCount = c.connector.connectorConfig.AutoBatchDmlUpdateCount
634654
c.autoBatchDmlUpdateCountVerification = !c.connector.connectorConfig.DisableAutoBatchDmlUpdateCountVerification
635655
c.retryAborts = c.connector.retryAbortsInternally
656+
c.isolationLevel = c.connector.connectorConfig.IsolationLevel
636657
// TODO: Reset the following fields to the connector default
637658
c.autocommitDMLMode = Transactional
638659
c.readOnlyStaleness = spanner.TimestampBound{}
@@ -888,10 +909,20 @@ func (c *conn) getTransactionOptions() ReadWriteTransactionOptions {
888909
defer func() {
889910
c.execOptions.TransactionOptions.TransactionTag = ""
890911
}()
891-
return ReadWriteTransactionOptions{
912+
txOpts := ReadWriteTransactionOptions{
892913
TransactionOptions: c.execOptions.TransactionOptions,
893914
DisableInternalRetries: !c.retryAborts,
894915
}
916+
// Only use the default isolation level from the connection if the ExecOptions
917+
// did not contain a more specific isolation level.
918+
if txOpts.TransactionOptions.IsolationLevel == spannerpb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED {
919+
// This should never really return an error, but we check just to be absolutely sure.
920+
level, err := toProtoIsolationLevel(c.isolationLevel)
921+
if err == nil {
922+
txOpts.TransactionOptions.IsolationLevel = level
923+
}
924+
}
925+
return txOpts
895926
}
896927

897928
func (c *conn) withTempReadOnlyTransactionOptions(options *ReadOnlyTransactionOptions) {

conn_with_mockserver_test.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@ package spannerdriver
1717
import (
1818
"context"
1919
"database/sql"
20+
"fmt"
2021
"reflect"
2122
"testing"
2223

24+
"cloud.google.com/go/spanner"
2325
"cloud.google.com/go/spanner/apiv1/spannerpb"
2426
"github.com/googleapis/go-sql-spanner/testutil"
2527
)
@@ -115,3 +117,125 @@ func TestBeginTxWithInvalidIsolationLevel(t *testing.T) {
115117
}
116118
}
117119
}
120+
121+
func TestDefaultIsolationLevel(t *testing.T) {
122+
t.Parallel()
123+
124+
for _, level := range []sql.IsolationLevel{
125+
sql.LevelDefault,
126+
sql.LevelSnapshot,
127+
sql.LevelRepeatableRead,
128+
sql.LevelSerializable,
129+
} {
130+
db, server, teardown := setupTestDBConnectionWithParams(t, fmt.Sprintf("isolationLevel=%v", level))
131+
defer teardown()
132+
ctx := context.Background()
133+
134+
conn, err := db.Conn(ctx)
135+
if err != nil {
136+
t.Fatal(err)
137+
}
138+
if err := conn.Raw(func(driverConn interface{}) error {
139+
spannerConn, ok := driverConn.(SpannerConn)
140+
if !ok {
141+
return fmt.Errorf("expected spanner conn, got %T", driverConn)
142+
}
143+
if spannerConn.IsolationLevel() != level {
144+
return fmt.Errorf("expected isolation level %v, got %v", level, spannerConn.IsolationLevel())
145+
}
146+
return nil
147+
}); err != nil {
148+
t.Fatal(err)
149+
}
150+
151+
originalLevel := level
152+
for _, disableRetryAborts := range []bool{true, false} {
153+
if disableRetryAborts {
154+
level = WithDisableRetryAborts(originalLevel)
155+
} else {
156+
level = originalLevel
157+
}
158+
// Note: No isolation level is passed in here, so it will use the default.
159+
tx, _ := db.BeginTx(ctx, &sql.TxOptions{})
160+
_, _ = tx.ExecContext(ctx, testutil.UpdateBarSetFoo)
161+
_ = tx.Rollback()
162+
163+
requests := drainRequestsFromServer(server.TestSpanner)
164+
beginRequests := requestsOfType(requests, reflect.TypeOf(&spannerpb.BeginTransactionRequest{}))
165+
if g, w := len(beginRequests), 1; g != w {
166+
t.Fatalf("begin requests count mismatch\n Got: %v\nWant: %v", g, w)
167+
}
168+
request := beginRequests[0].(*spannerpb.BeginTransactionRequest)
169+
wantIsolationLevel, _ := toProtoIsolationLevel(originalLevel)
170+
if g, w := request.Options.GetIsolationLevel(), wantIsolationLevel; g != w {
171+
t.Fatalf("begin isolation level mismatch\n Got: %v\nWant: %v", g, w)
172+
}
173+
}
174+
}
175+
}
176+
177+
func TestSetIsolationLevel(t *testing.T) {
178+
t.Parallel()
179+
180+
db, _, teardown := setupTestDBConnection(t)
181+
defer teardown()
182+
ctx := context.Background()
183+
184+
// Repeat twice to ensure that the state is reset after closing the connection.
185+
for i := 0; i < 2; i++ {
186+
conn, err := db.Conn(ctx)
187+
if err != nil {
188+
t.Fatal(err)
189+
}
190+
var level sql.IsolationLevel
191+
_ = conn.Raw(func(driverConn interface{}) error {
192+
level = driverConn.(SpannerConn).IsolationLevel()
193+
return nil
194+
})
195+
if g, w := level, sql.LevelDefault; g != w {
196+
t.Fatalf("isolation level mismatch\n Got: %v\nWant: %v", g, w)
197+
}
198+
_ = conn.Raw(func(driverConn interface{}) error {
199+
return driverConn.(SpannerConn).SetIsolationLevel(sql.LevelSnapshot)
200+
})
201+
_ = conn.Raw(func(driverConn interface{}) error {
202+
level = driverConn.(SpannerConn).IsolationLevel()
203+
return nil
204+
})
205+
if g, w := level, sql.LevelSnapshot; g != w {
206+
t.Fatalf("isolation level mismatch\n Got: %v\nWant: %v", g, w)
207+
}
208+
conn.Close()
209+
}
210+
}
211+
212+
func TestIsolationLevelAutoCommit(t *testing.T) {
213+
t.Parallel()
214+
215+
db, server, teardown := setupTestDBConnection(t)
216+
defer teardown()
217+
ctx := context.Background()
218+
219+
for _, level := range []sql.IsolationLevel{
220+
sql.LevelDefault,
221+
sql.LevelSnapshot,
222+
sql.LevelRepeatableRead,
223+
sql.LevelSerializable,
224+
} {
225+
spannerLevel, _ := toProtoIsolationLevel(level)
226+
_, _ = db.ExecContext(ctx, testutil.UpdateBarSetFoo, ExecOptions{TransactionOptions: spanner.TransactionOptions{
227+
IsolationLevel: spannerLevel,
228+
}})
229+
230+
requests := drainRequestsFromServer(server.TestSpanner)
231+
executeRequests := requestsOfType(requests, reflect.TypeOf(&spannerpb.ExecuteSqlRequest{}))
232+
if g, w := len(executeRequests), 1; g != w {
233+
t.Fatalf("execute requests count mismatch\n Got: %v\nWant: %v", g, w)
234+
}
235+
request := executeRequests[0].(*spannerpb.ExecuteSqlRequest)
236+
wantIsolationLevel, _ := toProtoIsolationLevel(level)
237+
if g, w := request.Transaction.GetBegin().GetIsolationLevel(), wantIsolationLevel; g != w {
238+
t.Fatalf("begin isolation level mismatch\n Got: %v\nWant: %v", g, w)
239+
}
240+
}
241+
}

driver.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,9 @@ type ConnectorConfig struct {
231231
AutoBatchDmlUpdateCount int64
232232
DisableAutoBatchDmlUpdateCountVerification bool
233233

234+
// IsolationLevel is the default isolation level for read/write transactions.
235+
IsolationLevel sql.IsolationLevel
236+
234237
// DecodeToNativeArrays determines whether arrays that have a Go native
235238
// type should be decoded to those types rather than the corresponding
236239
// spanner.NullTypeName type.
@@ -472,6 +475,11 @@ func createConnector(d *Driver, connectorConfig ConnectorConfig) (*connector, er
472475
connectorConfig.AutoConfigEmulator = val
473476
}
474477
}
478+
if strval, ok := connectorConfig.Params[strings.ToLower("IsolationLevel")]; ok {
479+
if val, err := parseIsolationLevel(strval); err == nil {
480+
connectorConfig.IsolationLevel = val
481+
}
482+
}
475483

476484
// Check if it is Spanner gorm that is creating the connection.
477485
// If so, we should set a different user-agent header than the
@@ -1058,6 +1066,28 @@ func checkIsValidType(v driver.Value) bool {
10581066
return true
10591067
}
10601068

1069+
func parseIsolationLevel(val string) (sql.IsolationLevel, error) {
1070+
switch strings.Replace(strings.ToLower(strings.TrimSpace(val)), " ", "_", 1) {
1071+
case "default":
1072+
return sql.LevelDefault, nil
1073+
case "read_uncommitted":
1074+
return sql.LevelReadUncommitted, nil
1075+
case "read_committed":
1076+
return sql.LevelReadCommitted, nil
1077+
case "write_committed":
1078+
return sql.LevelWriteCommitted, nil
1079+
case "repeatable_read":
1080+
return sql.LevelRepeatableRead, nil
1081+
case "snapshot":
1082+
return sql.LevelSnapshot, nil
1083+
case "serializable":
1084+
return sql.LevelSerializable, nil
1085+
case "linearizable":
1086+
return sql.LevelLinearizable, nil
1087+
}
1088+
return sql.LevelDefault, spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "invalid or unsupported isolation level: %v", val))
1089+
}
1090+
10611091
func toProtoIsolationLevel(level sql.IsolationLevel) (spannerpb.TransactionOptions_IsolationLevel, error) {
10621092
switch level {
10631093
case sql.LevelSerializable:

driver_test.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,21 @@ func TestExtractDnsParts(t *testing.T) {
160160
DisableNativeMetrics: true,
161161
},
162162
},
163+
{
164+
input: "projects/p/instances/i/databases/d?isolationLevel=repeatable_read;",
165+
wantConnectorConfig: ConnectorConfig{
166+
Project: "p",
167+
Instance: "i",
168+
Database: "d",
169+
Params: map[string]string{
170+
"isolationlevel": "repeatable_read",
171+
},
172+
},
173+
wantSpannerConfig: spanner.ClientConfig{
174+
SessionPoolConfig: spanner.DefaultSessionPoolConfig,
175+
UserAgent: userAgent,
176+
},
177+
},
163178
{
164179
input: "spanner.googleapis.com/projects/p/instances/i/databases/d?minSessions=200;maxSessions=1000;numChannels=10;disableRouteToLeader=true;enableEndToEndTracing=true;disableNativeMetrics=true;rpcPriority=Medium;optimizerVersion=1;optimizerStatisticsPackage=latest;databaseRole=child",
165180
wantConnectorConfig: ConnectorConfig{
@@ -291,6 +306,77 @@ func TestToProtoIsolationLevel(t *testing.T) {
291306
}
292307
}
293308

309+
func TestParseIsolationLevel(t *testing.T) {
310+
tests := []struct {
311+
input string
312+
want sql.IsolationLevel
313+
wantErr bool
314+
}{
315+
{
316+
input: "default",
317+
want: sql.LevelDefault,
318+
},
319+
{
320+
input: " DEFAULT ",
321+
want: sql.LevelDefault,
322+
},
323+
{
324+
input: "read uncommitted",
325+
want: sql.LevelReadUncommitted,
326+
},
327+
{
328+
input: " read_uncommitted\n",
329+
want: sql.LevelReadUncommitted,
330+
},
331+
{
332+
input: "read committed",
333+
want: sql.LevelReadCommitted,
334+
},
335+
{
336+
input: "write committed",
337+
want: sql.LevelWriteCommitted,
338+
},
339+
{
340+
input: "repeatable read",
341+
want: sql.LevelRepeatableRead,
342+
},
343+
{
344+
input: "snapshot",
345+
want: sql.LevelSnapshot,
346+
},
347+
{
348+
input: "serializable",
349+
want: sql.LevelSerializable,
350+
},
351+
{
352+
input: "linearizable",
353+
want: sql.LevelLinearizable,
354+
},
355+
{
356+
input: "read serializable",
357+
wantErr: true,
358+
},
359+
{
360+
input: "",
361+
wantErr: true,
362+
},
363+
{
364+
input: "read-committed",
365+
wantErr: true,
366+
},
367+
}
368+
for _, tc := range tests {
369+
level, err := parseIsolationLevel(tc.input)
370+
if tc.wantErr && err == nil {
371+
t.Errorf("parseIsolationLevel(%q): expected error", tc.input)
372+
} else if !tc.wantErr && err != nil {
373+
t.Errorf("parseIsolationLevel(%q): unexpected error: %v", tc.input, err)
374+
} else if level != tc.want {
375+
t.Errorf("parseIsolationLevel(%q): got %v, want %v", tc.input, level, tc.want)
376+
}
377+
}
378+
}
379+
294380
func ExampleCreateConnector() {
295381
connectorConfig := ConnectorConfig{
296382
Project: "my-project",

0 commit comments

Comments
 (0)