Skip to content

Commit 129248c

Browse files
authored
fix: avoid session leak on transaction retry (#300) (#301)
When `(*readWriteTransaction).retry()` is used, there is already an existing pending (errored) transaction. It gets overwritten with the new transaction object without closing the old one. This change causes the old transaction to be closed (rollback'd) before the new one overwrites it, thus avoiding a potentially large session leak.
1 parent b74db54 commit 129248c

File tree

2 files changed

+20
-10
lines changed

2 files changed

+20
-10
lines changed

aborted_transactions_test.go

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"database/sql"
2020
"reflect"
2121
"testing"
22+
"time"
2223

2324
"cloud.google.com/go/spanner"
2425
sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
@@ -31,10 +32,11 @@ import (
3132
func TestCommitAborted(t *testing.T) {
3233
t.Parallel()
3334

34-
db, server, teardown := setupTestDBConnection(t)
35+
db, server, teardown := setupTestDBConnectionWithParams(t, "minSessions=1;maxSessions=1")
3536
defer teardown()
3637

37-
ctx := context.Background()
38+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
39+
defer cancel()
3840
tx, err := db.BeginTx(ctx, &sql.TxOptions{})
3941
if err != nil {
4042
t.Fatalf("begin failed: %v", err)
@@ -56,10 +58,11 @@ func TestCommitAborted(t *testing.T) {
5658
func TestCommitAbortedWithInternalRetriesDisabled(t *testing.T) {
5759
t.Parallel()
5860

59-
db, server, teardown := setupTestDBConnectionWithParams(t, "retryAbortsInternally=false")
61+
db, server, teardown := setupTestDBConnectionWithParams(t, "retryAbortsInternally=false;minSessions=1;maxSessions=1")
6062
defer teardown()
6163

62-
ctx := context.Background()
64+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
65+
defer cancel()
6366
tx, err := db.BeginTx(ctx, &sql.TxOptions{})
6467
if err != nil {
6568
t.Fatalf("begin failed: %v", err)
@@ -82,10 +85,11 @@ func TestCommitAbortedWithInternalRetriesDisabled(t *testing.T) {
8285
func TestUpdateAborted(t *testing.T) {
8386
t.Parallel()
8487

85-
db, server, teardown := setupTestDBConnection(t)
88+
db, server, teardown := setupTestDBConnectionWithParams(t, "minSessions=1;maxSessions=1")
8689
defer teardown()
8790

88-
ctx := context.Background()
91+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
92+
defer cancel()
8993
tx, err := db.BeginTx(ctx, &sql.TxOptions{})
9094
if err != nil {
9195
t.Fatalf("begin failed: %v", err)
@@ -122,10 +126,11 @@ func TestUpdateAborted(t *testing.T) {
122126
func TestBatchUpdateAborted(t *testing.T) {
123127
t.Parallel()
124128

125-
db, server, teardown := setupTestDBConnection(t)
129+
db, server, teardown := setupTestDBConnectionWithParams(t, "minSessions=1;maxSessions=1")
126130
defer teardown()
127131

128-
ctx := context.Background()
132+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
133+
defer cancel()
129134
tx, err := db.BeginTx(ctx, &sql.TxOptions{})
130135
if err != nil {
131136
t.Fatalf("begin failed: %v", err)
@@ -359,13 +364,14 @@ func testRetryReadWriteTransactionWithQuery(t *testing.T, setupServer func(serve
359364

360365
t.Parallel()
361366

362-
db, server, teardown := setupTestDBConnection(t)
367+
db, server, teardown := setupTestDBConnectionWithParams(t, "minSessions=1;maxSessions=1")
363368
defer teardown()
364369

365370
if setupServer != nil {
366371
setupServer(server.TestSpanner)
367372
}
368-
ctx := context.Background()
373+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
374+
defer cancel()
369375
tx, err := db.BeginTx(ctx, &sql.TxOptions{})
370376
if err != nil {
371377
t.Fatalf("begin failed: %v", err)

transaction.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,10 @@ func (tx *readWriteTransaction) runWithRetry(ctx context.Context, f func(ctx con
246246
// retry retries the entire read/write transaction on a new Spanner transaction.
247247
// It will return ErrAbortedDueToConcurrentModification if the retry fails.
248248
func (tx *readWriteTransaction) retry(ctx context.Context) (err error) {
249+
// TODO: This should use t.ResetForRetry(ctx) instead when that function is available.
250+
if tx.rwTx != nil {
251+
tx.rwTx.Rollback(tx.ctx)
252+
}
249253
tx.rwTx, err = spanner.NewReadWriteStmtBasedTransaction(ctx, tx.client)
250254
if err != nil {
251255
return err

0 commit comments

Comments
 (0)