Skip to content

Commit 52aff4c

Browse files
committed
* Fixed goroutine leak on closing database/sql driver
1 parent e7acaa3 commit 52aff4c

File tree

8 files changed

+49
-25
lines changed

8 files changed

+49
-25
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Fixed goroutine leak on closing `database/sql` driver
12
* "No endpoints" is retriable error now
23

34
## v3.99.3

internal/repeater/repeater.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,10 @@ func (r *repeater) wakeUp(e Event) (err error) {
163163
}
164164

165165
func (r *repeater) worker(ctx context.Context, tick clockwork.Ticker) {
166-
defer close(r.stopped)
167-
defer tick.Stop()
166+
defer func() {
167+
close(r.stopped)
168+
tick.Stop()
169+
}()
168170

169171
// force returns backoff with delays [500ms...32s]
170172
force := backoff.New(

internal/xsql/connector.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ type (
4343
LegacyOpts []legacy.Option
4444
Options []propose.Option
4545
disableServerBalancer bool
46-
onCLose []func(*Connector)
46+
onClose []func(*Connector)
4747

4848
clock clockwork.Clock
4949
idleThreshold time.Duration
@@ -204,7 +204,7 @@ func (c *Connector) Close() error {
204204
default:
205205
close(c.done)
206206

207-
for _, onClose := range c.onCLose {
207+
for _, onClose := range c.onClose {
208208
onClose(c)
209209
}
210210

internal/xsql/options.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func (opt traceRetryOption) Apply(c *Connector) error {
7676
}
7777

7878
func (onClose onCloseOption) Apply(c *Connector) error {
79-
c.onCLose = append(c.onCLose, onClose)
79+
c.onClose = append(c.onClose, onClose)
8080

8181
return nil
8282
}

sql.go

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -41,22 +41,6 @@ var (
4141
_ driver.DriverContext = &sqlDriver{}
4242
)
4343

44-
func (d *sqlDriver) Close() error {
45-
var errs []error
46-
d.connectors.Range(func(c *xsql.Connector, _ *Driver) bool {
47-
if err := c.Close(); err != nil {
48-
errs = append(errs, err)
49-
}
50-
51-
return true
52-
})
53-
if len(errs) > 0 {
54-
return xerrors.NewWithIssues("ydb legacy driver close failed", errs...)
55-
}
56-
57-
return nil
58-
}
59-
6044
// Open returns a new Driver to the ydb.
6145
func (d *sqlDriver) Open(string) (driver.Conn, error) {
6246
return nil, xsql.ErrUnsupported
@@ -68,15 +52,25 @@ func (d *sqlDriver) OpenConnector(dataSourceName string) (driver.Connector, erro
6852
return nil, xerrors.WithStackTrace(fmt.Errorf("failed to connect by data source name '%s': %w", dataSourceName, err))
6953
}
7054

71-
return Connector(db, db.databaseSQLOptions...)
55+
c, err := connector(db, db.databaseSQLOptions...)
56+
if err != nil {
57+
return nil, xerrors.WithStackTrace(fmt.Errorf("failed to create connector: %w", err))
58+
}
59+
60+
d.attach(c, db)
61+
62+
return c, nil
7263
}
7364

7465
func (d *sqlDriver) attach(c *xsql.Connector, parent *Driver) {
7566
d.connectors.Set(c, parent)
7667
}
7768

7869
func (d *sqlDriver) detach(c *xsql.Connector) {
79-
d.connectors.Delete(c)
70+
parent, _ := d.connectors.Extract(c)
71+
if d.connectors.Len() == 0 && parent != nil {
72+
_ = parent.Close(context.Background())
73+
}
8074
}
8175

8276
type QueryMode int
@@ -235,7 +229,7 @@ type SQLConnector interface {
235229
Close() error
236230
}
237231

238-
func Connector(parent *Driver, opts ...ConnectorOption) (SQLConnector, error) {
232+
func connector(parent *Driver, opts ...ConnectorOption) (*xsql.Connector, error) {
239233
c, err := xsql.Open(parent, parent.metaBalancer,
240234
append(
241235
append(
@@ -250,7 +244,17 @@ func Connector(parent *Driver, opts ...ConnectorOption) (SQLConnector, error) {
250244
if err != nil {
251245
return nil, xerrors.WithStackTrace(err)
252246
}
253-
d.attach(c, parent)
247+
248+
return c, nil
249+
}
250+
251+
func Connector(parent *Driver, opts ...ConnectorOption) (SQLConnector, error) {
252+
c, err := connector(parent, opts...)
253+
if err != nil {
254+
return nil, xerrors.WithStackTrace(err)
255+
}
256+
257+
d.attach(c, nil)
254258

255259
return c, nil
256260
}

tests/integration/basic_example_database_sql_bindings_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import (
2626
)
2727

2828
func TestBasicExampleDatabaseSqlBindings(t *testing.T) {
29+
defer simpleDetectGoroutineLeak(t)
30+
2931
folder := t.Name()
3032

3133
ctx, cancel := context.WithTimeout(xtest.Context(t), 42*time.Second)

tests/integration/basic_example_database_sql_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import (
2626
)
2727

2828
func TestBasicExampleDatabaseSql(t *testing.T) {
29+
defer simpleDetectGoroutineLeak(t)
30+
2931
folder := t.Name()
3032

3133
ctx, cancel := context.WithTimeout(xtest.Context(t), 42*time.Second)

tests/integration/helpers_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"fmt"
1111
"os"
1212
"path"
13+
"runtime"
1314
"strings"
1415
"testing"
1516
"text/template"
@@ -468,3 +469,15 @@ func driverEngine(db *sql.DB) (engine xsql.Engine) {
468469

469470
return engine
470471
}
472+
473+
func simpleDetectGoroutineLeak(t *testing.T) {
474+
// 1) testing.go => main.main()
475+
// 2) current test
476+
const expectedGoroutinesCount = 2
477+
if num := runtime.NumGoroutine(); num > expectedGoroutinesCount {
478+
bb := make([]byte, 2<<32)
479+
if n := runtime.Stack(bb, true); n < len(bb) {
480+
t.Error(fmt.Sprintf("unexpected goroutines:\n%s\n", string(bb[:n])))
481+
}
482+
}
483+
}

0 commit comments

Comments
 (0)