Skip to content

Commit bbbfdcd

Browse files
committed
chore: create shared lib for connections
1 parent bc33a3a commit bbbfdcd

File tree

11 files changed

+5231
-0
lines changed

11 files changed

+5231
-0
lines changed

spannerlib/backend/connection.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package backend
2+
3+
import (
4+
"database/sql"
5+
)
6+
7+
type SpannerConnection struct {
8+
Conn *sql.Conn
9+
}
10+
11+
func (conn *SpannerConnection) Close() error {
12+
return conn.Conn.Close()
13+
}

spannerlib/backend/db_pool.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package backend
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"errors"
7+
"fmt"
8+
"sync"
9+
10+
spannerdriver "github.com/googleapis/go-sql-spanner"
11+
)
12+
13+
type Pool struct {
14+
Project string
15+
Instance string
16+
Database string
17+
18+
mu sync.Mutex
19+
entries map[string]*sql.DB
20+
}
21+
22+
func (pool *Pool) Close() (err error) {
23+
pool.mu.Lock()
24+
defer pool.mu.Unlock()
25+
for _, db := range pool.entries {
26+
err = errors.Join(err, db.Close())
27+
}
28+
return err
29+
}
30+
31+
func (pool *Pool) Conn(ctx context.Context, project, instance, database string) (*sql.Conn, error) {
32+
if project == "" {
33+
project = pool.Project
34+
}
35+
if instance == "" {
36+
instance = pool.Instance
37+
}
38+
if database == "" {
39+
database = pool.Database
40+
}
41+
key := fmt.Sprintf("projects/%s/instances/%s/databases/%s", project, instance, database)
42+
pool.mu.Lock()
43+
defer pool.mu.Unlock()
44+
if db, ok := pool.entries[key]; ok {
45+
return db.Conn(ctx)
46+
}
47+
config := spannerdriver.ConnectorConfig{
48+
Project: project,
49+
Instance: instance,
50+
Database: database,
51+
}
52+
connector, err := spannerdriver.CreateConnector(config)
53+
if err != nil {
54+
return nil, err
55+
}
56+
db := sql.OpenDB(connector)
57+
if pool.entries == nil {
58+
pool.entries = make(map[string]*sql.DB)
59+
}
60+
pool.entries[key] = db
61+
return db.Conn(ctx)
62+
}

spannerlib/exported/connection.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package exported
2+
3+
import "C"
4+
import (
5+
"cloud.google.com/go/spanner/apiv1/spannerpb"
6+
"context"
7+
"sync"
8+
"sync/atomic"
9+
10+
spannerdriver "github.com/googleapis/go-sql-spanner"
11+
"spannerlib/backend"
12+
)
13+
14+
type Connection struct {
15+
results *sync.Map
16+
resultsIdx atomic.Int64
17+
18+
backend *backend.SpannerConnection
19+
}
20+
21+
func (conn *Connection) close() *Message {
22+
conn.results.Range(func(key, value interface{}) bool {
23+
res := value.(*rows)
24+
res.Close()
25+
return true
26+
})
27+
err := conn.backend.Close()
28+
if err != nil {
29+
return errMessage(err)
30+
}
31+
return &Message{}
32+
}
33+
34+
func (conn *Connection) Execute(statement *spannerpb.ExecuteBatchDmlRequest_Statement) *Message {
35+
it, err := conn.backend.Conn.QueryContext(context.Background(), statement.Sql, spannerdriver.ExecOptions{DecodeOption: spannerdriver.DecodeOptionProto})
36+
if err != nil {
37+
return errMessage(err)
38+
}
39+
id := conn.resultsIdx.Add(1)
40+
res := &rows{
41+
backend: it,
42+
}
43+
conn.results.Store(id, res)
44+
return idMessage(id)
45+
}

spannerlib/exported/message.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package exported
2+
3+
import (
4+
"unsafe"
5+
6+
"github.com/google/uuid"
7+
"google.golang.org/grpc/status"
8+
)
9+
10+
type Message struct {
11+
Code int32
12+
ObjectId int64
13+
Res []byte
14+
}
15+
16+
func (m *Message) Length() int32 {
17+
if m.Res == nil {
18+
return 0
19+
}
20+
return int32(len(m.Res))
21+
}
22+
23+
func (m *Message) ResPointer() unsafe.Pointer {
24+
if m.Res == nil {
25+
return nil
26+
}
27+
return unsafe.Pointer(&(m.Res[0]))
28+
}
29+
30+
func generateId() (string, error) {
31+
id, err := uuid.NewRandom()
32+
if err != nil {
33+
return "", err
34+
}
35+
return id.String(), nil
36+
}
37+
38+
type BaseMsg struct {
39+
Allowed bool
40+
}
41+
42+
type OtherMsg struct {
43+
BaseMsg
44+
Foo string
45+
}
46+
47+
func idMessage(id int64) *Message {
48+
return &Message{ObjectId: id}
49+
}
50+
51+
func errMessage(err error) *Message {
52+
errCode := status.Code(err)
53+
b := []byte(err.Error())
54+
return &Message{Code: int32(errCode), Res: b}
55+
}

spannerlib/exported/pool.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package exported
2+
3+
import "C"
4+
import (
5+
"cloud.google.com/go/spanner/apiv1/spannerpb"
6+
"context"
7+
"fmt"
8+
"google.golang.org/protobuf/proto"
9+
"sync"
10+
"sync/atomic"
11+
12+
"spannerlib/backend"
13+
)
14+
15+
var pools = sync.Map{}
16+
var poolsIdx = atomic.Int64{}
17+
18+
type Pool struct {
19+
backend *backend.Pool
20+
connections *sync.Map
21+
connectionsIdx atomic.Int64
22+
}
23+
24+
func CreatePool() *Message {
25+
backendPool := &backend.Pool{}
26+
id := poolsIdx.Add(1)
27+
pool := &Pool{
28+
backend: backendPool,
29+
connections: &sync.Map{},
30+
}
31+
pools.Store(id, pool)
32+
return idMessage(id)
33+
}
34+
35+
func ClosePool(id int64) *Message {
36+
p, ok := pools.LoadAndDelete(id)
37+
if !ok {
38+
return errMessage(fmt.Errorf("pool %v not found", id))
39+
}
40+
pool := p.(*Pool)
41+
pool.connections.Range(func(key, value interface{}) bool {
42+
conn := value.(*Connection)
43+
conn.close()
44+
return true
45+
})
46+
return &Message{}
47+
}
48+
49+
func CreateConnection(poolId int64, database string) *Message {
50+
p, ok := pools.Load(poolId)
51+
if !ok {
52+
return errMessage(fmt.Errorf("pool %v not found", poolId))
53+
}
54+
pool := p.(*Pool)
55+
sqlConn, err := pool.backend.Conn(context.Background(), "appdev-soda-spanner-staging", "knut-test-ycsb", "knut-test-db")
56+
if err != nil {
57+
return errMessage(err)
58+
}
59+
id := poolsIdx.Add(1)
60+
conn := &Connection{
61+
backend: &backend.SpannerConnection{Conn: sqlConn},
62+
results: &sync.Map{},
63+
}
64+
pool.connections.Store(id, conn)
65+
66+
return idMessage(id)
67+
}
68+
69+
func CloseConnection(poolId, connId int64) *Message {
70+
conn, err := findConnection(poolId, connId)
71+
if err != nil {
72+
return errMessage(err)
73+
}
74+
return conn.close()
75+
}
76+
77+
func Execute(poolId, connId int64, statementBytes []byte) *Message {
78+
statement := spannerpb.ExecuteBatchDmlRequest_Statement{}
79+
if err := proto.Unmarshal(statementBytes, &statement); err != nil {
80+
return errMessage(err)
81+
}
82+
fmt.Printf("Statement: %v\n", statement.Sql)
83+
fmt.Printf("Params: %v\n", statement.Params)
84+
conn, err := findConnection(poolId, connId)
85+
if err != nil {
86+
return errMessage(err)
87+
}
88+
return conn.Execute(&statement)
89+
}
90+
91+
func Next(poolId, connId, rowsId int64) *Message {
92+
res, err := findRows(poolId, connId, rowsId)
93+
if err != nil {
94+
return errMessage(err)
95+
}
96+
return res.Next()
97+
}
98+
99+
func CloseRows(poolId, connId, rowsId int64) *Message {
100+
res, err := findRows(poolId, connId, rowsId)
101+
if err != nil {
102+
return errMessage(err)
103+
}
104+
return res.Close()
105+
}
106+
107+
func findConnection(poolId, connId int64) (*Connection, error) {
108+
p, ok := pools.Load(poolId)
109+
if !ok {
110+
return nil, fmt.Errorf("pool %v not found", poolId)
111+
}
112+
pool := p.(*Pool)
113+
c, ok := pool.connections.Load(connId)
114+
if !ok {
115+
return nil, fmt.Errorf("connection %v not found", connId)
116+
}
117+
conn := c.(*Connection)
118+
return conn, nil
119+
}
120+
121+
func findRows(poolId, connId, rowsId int64) (*rows, error) {
122+
conn, err := findConnection(poolId, connId)
123+
if err != nil {
124+
return nil, err
125+
}
126+
r, ok := conn.results.Load(rowsId)
127+
if !ok {
128+
return nil, fmt.Errorf("rows %v not found", rowsId)
129+
}
130+
res := r.(*rows)
131+
return res, nil
132+
}

spannerlib/exported/rows.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package exported
2+
3+
import (
4+
"database/sql"
5+
6+
"cloud.google.com/go/spanner"
7+
"google.golang.org/protobuf/proto"
8+
"google.golang.org/protobuf/types/known/structpb"
9+
)
10+
11+
type rows struct {
12+
backend *sql.Rows
13+
}
14+
15+
func (rows *rows) Close() *Message {
16+
err := rows.backend.Close()
17+
if err != nil {
18+
return errMessage(err)
19+
}
20+
return &Message{}
21+
}
22+
23+
func (rows *rows) Next() *Message {
24+
ok := rows.backend.Next()
25+
if !ok {
26+
// An empty message indicates no more rows.
27+
return &Message{}
28+
}
29+
cols, err := rows.backend.Columns()
30+
if err != nil {
31+
return errMessage(err)
32+
}
33+
buffer := make([]any, len(cols))
34+
for i := range buffer {
35+
buffer[i] = &spanner.GenericColumnValue{}
36+
}
37+
if err := rows.backend.Scan(buffer...); err != nil {
38+
return errMessage(err)
39+
}
40+
row := &structpb.ListValue{
41+
Values: make([]*structpb.Value, len(buffer)),
42+
}
43+
for i := range buffer {
44+
row.Values[i] = buffer[i].(*spanner.GenericColumnValue).Value
45+
}
46+
res, err := proto.Marshal(row)
47+
if err != nil {
48+
return errMessage(err)
49+
}
50+
return &Message{Res: res}
51+
}

0 commit comments

Comments
 (0)