Skip to content

Commit 785aa77

Browse files
authored
chore: pull retry_aborts_internally when needed (#518)
Pull the value of retry_aborts_internally once from the connecion state when needed, instead of trying to set it (and keep it in sync) on the current transaction of a connection. This also enables the use of `SET LOCAL retry_aborts_internally = true|false`
1 parent e88c303 commit 785aa77

File tree

4 files changed

+47
-35
lines changed

4 files changed

+47
-35
lines changed

conn.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"errors"
2222
"log/slog"
2323
"slices"
24+
"sync"
2425
"time"
2526

2627
"cloud.google.com/go/spanner"
@@ -359,11 +360,6 @@ func (c *conn) SetRetryAbortsInternally(retry bool) error {
359360
}
360361

361362
func (c *conn) setRetryAbortsInternally(retry bool) (driver.Result, error) {
362-
if c.inTransaction() {
363-
if _, err := c.tx.setRetryAbortsInternally(retry); err != nil {
364-
return nil, err
365-
}
366-
}
367363
if err := propertyRetryAbortsInternally.SetValue(c.state, retry, connectionstate.ContextUser); err != nil {
368364
return nil, err
369365
}
@@ -1175,9 +1171,8 @@ func (c *conn) BeginTx(ctx context.Context, driverOpts driver.TxOptions) (driver
11751171
if c.tempTransactionOptions != nil && c.tempTransactionOptions.close != nil {
11761172
tempCloseFunc = c.tempTransactionOptions.close
11771173
}
1178-
disableInternalRetries := !c.RetryAbortsInternally()
1179-
if c.tempTransactionOptions != nil {
1180-
disableInternalRetries = c.tempTransactionOptions.DisableInternalRetries
1174+
if !disableRetryAborts && c.tempTransactionOptions != nil {
1175+
disableRetryAborts = c.tempTransactionOptions.DisableInternalRetries
11811176
}
11821177

11831178
tx, err := spanner.NewReadWriteStmtBasedTransactionWithCallbackForOptions(ctx, c.client, opts, func() spanner.TransactionOptions {
@@ -1212,7 +1207,9 @@ func (c *conn) BeginTx(ctx context.Context, driverOpts driver.TxOptions) (driver
12121207
}
12131208
},
12141209
// Disable internal retries if any of these options have been set.
1215-
retryAborts: !disableInternalRetries && !disableRetryAborts,
1210+
retryAborts: sync.OnceValue(func() bool {
1211+
return c.RetryAbortsInternally() && !disableRetryAborts
1212+
}),
12161213
}
12171214
c.clearCommitResponse()
12181215
return c.tx, nil

conn_with_mockserver_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/googleapis/go-sql-spanner/connectionstate"
2929
"github.com/googleapis/go-sql-spanner/testutil"
3030
"google.golang.org/grpc/codes"
31+
"google.golang.org/grpc/status"
3132
"google.golang.org/protobuf/proto"
3233
"google.golang.org/protobuf/types/known/anypb"
3334
"google.golang.org/protobuf/types/known/emptypb"
@@ -809,6 +810,35 @@ func TestSetRetryAbortsInternallyInActiveTransaction(t *testing.T) {
809810
_ = tx.Rollback()
810811
}
811812

813+
func TestSetLocalRetryAbortsInternally(t *testing.T) {
814+
t.Parallel()
815+
816+
db, server, teardown := setupTestDBConnection(t)
817+
defer teardown()
818+
ctx := context.Background()
819+
820+
tx, err := db.BeginTx(ctx, &sql.TxOptions{})
821+
if err != nil {
822+
t.Fatal(err)
823+
}
824+
if _, err := tx.ExecContext(ctx, "set local retry_aborts_internally = false"); err != nil {
825+
t.Fatal(err)
826+
}
827+
if _, err := tx.ExecContext(ctx, testutil.UpdateBarSetFoo); err != nil {
828+
t.Fatal(err)
829+
}
830+
// Instruct the mock server to return Aborted when Commit is called.
831+
// This should cause the transaction to fail, and as internal retries have been
832+
// disabled, it should not be retried.
833+
server.TestSpanner.PutExecutionTime(testutil.MethodCommitTransaction, testutil.SimulatedExecutionTime{
834+
Errors: []error{status.Error(codes.Aborted, "Aborted")},
835+
})
836+
err = tx.Commit()
837+
if g, w := spanner.ErrCode(err), codes.Aborted; g != w {
838+
t.Fatalf("error mismatch\n Got: %v\nWant: %v", g, w)
839+
}
840+
}
841+
812842
func TestSetAutocommitDMLMode(t *testing.T) {
813843
t.Parallel()
814844

driver_with_mockserver_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2118,8 +2118,8 @@ func TestQueryWithDuplicateNamedParameterStartingWithUnderscore(t *testing.T) {
21182118
t.Fatal(err)
21192119
}
21202120
// Verify that 'bar' is used for both instances of the parameter @__name.
2121-
requests := drainRequestsFromServer(server.TestSpanner)
2122-
sqlRequests := requestsOfType(requests, reflect.TypeOf(&sppb.ExecuteSqlRequest{}))
2121+
requests := server.TestSpanner.DrainRequestsFromServer()
2122+
sqlRequests := testutil.RequestsOfType(requests, reflect.TypeOf(&sppb.ExecuteSqlRequest{}))
21232123
if len(sqlRequests) != 1 {
21242124
t.Fatalf("sql requests count mismatch\nGot: %v\nWant: %v", len(sqlRequests), 1)
21252125
}
@@ -2178,8 +2178,8 @@ func TestQueryWithReusedNamedParameterStartingWithUnderscore(t *testing.T) {
21782178
t.Fatal(err)
21792179
}
21802180
// Verify that 'foo' is used for both instances of the parameter @__name.
2181-
requests := drainRequestsFromServer(server.TestSpanner)
2182-
sqlRequests := requestsOfType(requests, reflect.TypeOf(&sppb.ExecuteSqlRequest{}))
2181+
requests := server.TestSpanner.DrainRequestsFromServer()
2182+
sqlRequests := testutil.RequestsOfType(requests, reflect.TypeOf(&sppb.ExecuteSqlRequest{}))
21832183
if len(sqlRequests) != 1 {
21842184
t.Fatalf("sql requests count mismatch\nGot: %v\nWant: %v", len(sqlRequests), 1)
21852185
}
@@ -4567,7 +4567,7 @@ func TestRunTransaction(t *testing.T) {
45674567
if g, w := rwTx.ctx, ctx; g != w {
45684568
return fmt.Errorf("getting the transaction through reflection failed")
45694569
}
4570-
if rwTx.retryAborts {
4570+
if rwTx.retryAborts() {
45714571
return fmt.Errorf("internal retries should be disabled during RunTransaction")
45724572
}
45734573

@@ -5028,7 +5028,7 @@ func TestBeginReadWriteTransaction(t *testing.T) {
50285028
if g, w := rwTx.ctx, ctx; g != w {
50295029
t.Fatal("getting the transaction through reflection failed")
50305030
}
5031-
if rwTx.retryAborts {
5031+
if rwTx.retryAborts() {
50325032
t.Fatal("internal retries should be disabled during this transaction")
50335033
}
50345034

transaction.go

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,6 @@ type contextTransaction interface {
5454
AbortBatch() (driver.Result, error)
5555

5656
BufferWrite(ms []*spanner.Mutation) error
57-
58-
setRetryAbortsInternally(retry bool) (driver.Result, error)
5957
}
6058

6159
type rowIterator interface {
@@ -205,11 +203,6 @@ func (tx *readOnlyTransaction) BufferWrite([]*spanner.Mutation) error {
205203
return spanner.ToSpannerError(status.Errorf(codes.FailedPrecondition, "read-only transactions cannot write"))
206204
}
207205

208-
func (tx *readOnlyTransaction) setRetryAbortsInternally(_ bool) (driver.Result, error) {
209-
// no-op, ignore
210-
return driver.ResultNoRows, nil
211-
}
212-
213206
// ErrAbortedDueToConcurrentModification is returned by a read/write transaction
214207
// that was aborted by Cloud Spanner, and where the internal retry attempt
215208
// failed because it detected that the results during the retry were different
@@ -244,7 +237,7 @@ type readWriteTransaction struct {
244237
close func(result txResult, commitResponse *spanner.CommitResponse, commitErr error)
245238
// retryAborts indicates whether this transaction will automatically retry
246239
// the transaction if it is aborted by Spanner. The default is true.
247-
retryAborts bool
240+
retryAborts func() bool
248241

249242
// statements contains the list of statements that has been executed on this
250243
// transaction so far. These statements will be replayed on a new read write
@@ -415,7 +408,7 @@ func (tx *readWriteTransaction) Commit() (err error) {
415408
}
416409
var commitResponse spanner.CommitResponse
417410
if tx.rwTx != nil {
418-
if !tx.retryAborts {
411+
if !tx.retryAborts() {
419412
ts, err := tx.rwTx.CommitWithReturnResp(tx.ctx)
420413
tx.close(txResultCommit, &ts, err)
421414
return err
@@ -471,7 +464,7 @@ func (tx *readWriteTransaction) Query(ctx context.Context, stmt spanner.Statemen
471464
}
472465
// If internal retries have been disabled, we don't need to keep track of a
473466
// running checksum for all results that we have seen.
474-
if !tx.retryAborts {
467+
if !tx.retryAborts() {
475468
return &readOnlyRowIterator{tx.rwTx.QueryWithOptions(ctx, stmt, execOptions.QueryOptions)}, nil
476469
}
477470

@@ -509,7 +502,7 @@ func (tx *readWriteTransaction) ExecContext(ctx context.Context, stmt spanner.St
509502
return &result{rowsAffected: updateCount}, nil
510503
}
511504

512-
if !tx.retryAborts {
505+
if !tx.retryAborts() {
513506
return execTransactionalDML(ctx, tx.rwTx, stmt, statementInfo, options)
514507
}
515508

@@ -608,7 +601,7 @@ func (tx *readWriteTransaction) runDmlBatch(ctx context.Context) (*result, error
608601
options := tx.batch.options
609602
tx.batch = nil
610603

611-
if !tx.retryAborts {
604+
if !tx.retryAborts() {
612605
affected, err := tx.rwTx.BatchUpdateWithOptions(ctx, statements, options.QueryOptions)
613606
return &result{rowsAffected: sum(affected)}, err
614607
}
@@ -650,11 +643,3 @@ func errorsEqualForRetry(err1, err2 error) bool {
650643
}
651644
return false
652645
}
653-
654-
func (tx *readWriteTransaction) setRetryAbortsInternally(retry bool) (driver.Result, error) {
655-
if tx.active {
656-
return nil, spanner.ToSpannerError(status.Error(codes.FailedPrecondition, "cannot change retry mode while a transaction is active"))
657-
}
658-
tx.retryAborts = retry
659-
return driver.ResultNoRows, nil
660-
}

0 commit comments

Comments
 (0)