Skip to content

Commit de428c1

Browse files
committed
Updated caching
1 parent df7479a commit de428c1

File tree

12 files changed

+244
-122
lines changed

12 files changed

+244
-122
lines changed

pkg/sqlite3/cache.go

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,70 @@
11
package sqlite3
22

3+
import (
4+
"sync"
5+
6+
// Namespace Imports
7+
. "github.com/djthorpe/go-errors"
8+
sqlite3 "github.com/djthorpe/go-sqlite/sys/sqlite3"
9+
multierror "github.com/hashicorp/go-multierror"
10+
)
11+
312
////////////////////////////////////////////////////////////////////////////////
413
// TYPES
514

615
// PoolCache caches prepared statements and profiling information for
716
// statements so it's possible to see slow queries, etc.
8-
type PoolCache struct {
17+
type PoolCache struct{}
18+
19+
type ConnCache struct {
20+
sync.Mutex
21+
sync.Map
922
}
1023

24+
////////////////////////////////////////////////////////////////////////////////
25+
// GLOBALS
26+
27+
const (
28+
defaultCapacity = 100
29+
)
30+
1131
////////////////////////////////////////////////////////////////////////////////
1232
// PUBLIC METHODS
1333

14-
// Return a prepared statement from the cache
15-
func (cache *PoolCache) Prepare(q string) (*Results, error) {
34+
// Return a prepared statement from the cache, or prepare a new statement
35+
// and put it in the cache before returning
36+
func (cache *ConnCache) Prepare(conn *sqlite3.ConnEx, q string) (*Results, error) {
37+
if conn == nil {
38+
return nil, ErrInternalAppError
39+
}
40+
st, _ := cache.Map.Load(q)
41+
if st == nil {
42+
// Prepare a statement and store in cache
43+
var err error
44+
cache.Mutex.Lock()
45+
defer cache.Mutex.Unlock()
46+
if st, err = conn.Prepare(q); err != nil {
47+
return nil, err
48+
} else {
49+
cache.Map.Store(q, st)
50+
}
51+
} else {
52+
// Increment counter by one
53+
st.(*sqlite3.StatementEx).Inc(1)
54+
}
55+
return NewResults(st.(*sqlite3.StatementEx)), nil
56+
}
57+
58+
// Close all conn cache prepared statements
59+
func (cache *ConnCache) Close() error {
60+
var result error
61+
cache.Map.Range(func(key, value interface{}) bool {
62+
if err := value.(*sqlite3.StatementEx).Close(); err != nil {
63+
result = multierror.Append(result, err)
64+
}
65+
return true
66+
})
1667

68+
// Return any errors
69+
return result
1770
}

pkg/sqlite3/cache_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package sqlite3_test
2+
3+
import (
4+
"context"
5+
"math/rand"
6+
"sync"
7+
"testing"
8+
9+
// Module imports
10+
sqlite3 "github.com/djthorpe/go-sqlite/sys/sqlite3"
11+
12+
// Namespace Imports
13+
. "github.com/djthorpe/go-sqlite"
14+
. "github.com/djthorpe/go-sqlite/pkg/lang"
15+
. "github.com/djthorpe/go-sqlite/pkg/sqlite3"
16+
)
17+
18+
func Test_Cache_001(t *testing.T) {
19+
// Create a connection
20+
conn, err := OpenPath(":memory:", sqlite3.DefaultFlags)
21+
if err != nil {
22+
t.Fatal(err)
23+
}
24+
defer conn.Close()
25+
26+
// Perform caching in a transaction
27+
conn.Do(context.Background(), 0, func(txn SQTransaction) error {
28+
// Read values from the cache
29+
var wg sync.WaitGroup
30+
for i := 0; i < 99; i++ {
31+
wg.Add(1)
32+
go func() {
33+
defer wg.Done()
34+
n := rand.Uint32() % 5
35+
if r, err := txn.Query(Q("SELECT ", n)); err != nil {
36+
t.Error(err)
37+
} else {
38+
t.Log(r)
39+
}
40+
}()
41+
}
42+
43+
// Wait for all goroutines to complete
44+
wg.Wait()
45+
46+
// Return success
47+
return nil
48+
})
49+
50+
// Close cache, release resources
51+
if err := conn.Close(); err != nil {
52+
t.Fatal(err)
53+
}
54+
}

pkg/sqlite3/conn.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
type Conn struct {
2323
sync.Mutex
2424
*sqlite3.ConnEx
25+
ConnCache
26+
2527
c chan struct{}
2628
ctx context.Context
2729
}
@@ -150,23 +152,21 @@ func (conn *Conn) Do(ctx context.Context, flag SQTxnFlag, fn func(SQTransaction)
150152
}
151153

152154
// Execute SQL statement and invoke a callback for each row of results which may return true to abort
153-
func (txn *Txn) Query(st SQStatement, v ...interface{}) (SQResult, error) {
155+
func (txn *Txn) Query(st SQStatement, v ...interface{}) (SQResults, error) {
154156
if st == nil {
155157
return nil, ErrBadParameter.With("Query")
156158
}
157159

158-
// TODO: Get prepared version of query from the cache
159-
stx, err := txn.Conn.Prepare(st.Query())
160+
// Get a results object
161+
r, err := txn.ConnCache.Prepare(txn.Conn.ConnEx, st.Query())
160162
if err != nil {
161163
return nil, err
162164
}
163165

164-
// Create a results object and return it
165-
results, err := stx.Exec(0, v...)
166-
if err != nil {
167-
stx.Close()
166+
// Execute first query
167+
if err := r.NextQuery(v...); err != nil {
168168
return nil, err
169+
} else {
170+
return r, nil
169171
}
170-
171-
return NewResults(results), nil
172172
}

pkg/sqlite3/pool.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ type Pool struct {
4040
sync.WaitGroup
4141
sync.Pool
4242
PoolConfig
43-
PoolCache
4443

4544
errs chan<- error
4645
ctx context.Context
@@ -301,7 +300,11 @@ func (p *Pool) put(conn *Conn) {
301300
n := atomic.AddInt32(&p.n, -1)
302301
if n < p.Max() {
303302
p.Pool.Put(conn)
304-
} else if err := conn.Close(); err != nil {
303+
return
304+
}
305+
306+
// Close connection and remove from cache
307+
if err := conn.Close(); err != nil {
305308
p.err(err)
306309
}
307310
}

pkg/sqlite3/result.go

Lines changed: 0 additions & 67 deletions
This file was deleted.

pkg/sqlite3/results.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package sqlite3
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"io"
7+
8+
// Packages
9+
sqlite3 "github.com/djthorpe/go-sqlite/sys/sqlite3"
10+
// Namespace imports
11+
//. "github.com/djthorpe/go-errors"
12+
)
13+
14+
////////////////////////////////////////////////////////////////////////////////
15+
// TYPES
16+
17+
// PoolConfig is the starting configuration for a pool
18+
type Results struct {
19+
st *sqlite3.StatementEx
20+
results *sqlite3.Results
21+
n uint // next statement to execute
22+
}
23+
24+
////////////////////////////////////////////////////////////////////////////////
25+
// LIFECYCLE
26+
27+
func NewResults(st *sqlite3.StatementEx) *Results {
28+
r := new(Results)
29+
r.st, r.n = st, 0
30+
return r
31+
}
32+
33+
////////////////////////////////////////////////////////////////////////////////
34+
// STRINGIFY
35+
36+
func (r *Results) String() string {
37+
str := "<results"
38+
if n := r.st.Count(); n > 0 {
39+
str += fmt.Sprint(" cached (", n, ")")
40+
}
41+
str += fmt.Sprint(" ", r.st)
42+
return str + ">"
43+
}
44+
45+
////////////////////////////////////////////////////////////////////////////////
46+
// PUBLIC METHODS
47+
48+
// NextQuery will execute the next query in the statement, return io.EOF if there
49+
// are no more statements. In order to read the rows, repeatedly read the rows
50+
// using the Next function.
51+
func (r *Results) NextQuery(v ...interface{}) error {
52+
if results, err := r.st.Exec(r.n, v...); errors.Is(err, sqlite3.SQLITE_DONE) {
53+
return io.EOF
54+
} else if err != nil {
55+
return err
56+
} else {
57+
r.results = results
58+
r.n++
59+
// TODO: Set LastInsertId, Changes, Columns, etc.
60+
fmt.Println(r.results)
61+
return nil
62+
}
63+
}

sqlite.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -88,28 +88,25 @@ type SQConnection interface {
8888
// SQTransaction is an sqlite transaction
8989
type SQTransaction interface {
9090
// Query and return a set of results
91-
Query(SQStatement, ...interface{}) (SQResult, error)
91+
Query(SQStatement, ...interface{}) (SQResults, error)
9292
}
9393

94-
// SQResult increments over returned rows from a query
95-
type SQResult interface {
94+
// SQResults increments over returned rows from a query
95+
type SQResults interface {
9696
// Return next row, returns nil when all rows consumed
97-
Next() []interface{}
97+
//Next() []interface{}
9898

9999
// Return next map of values, or nil if no more rows
100-
NextMap() map[string]interface{}
100+
//NextMap() map[string]interface{}
101101

102102
// NextQuery executes the next query or returns io.EOF
103103
NextQuery(...interface{}) error
104104

105-
// Close the rows, and free up any resources
106-
Close() error
107-
108105
// Return Last RowID inserted of last statement
109-
LastInsertId() int64
106+
//LastInsertId() int64
110107

111108
// Return number of changes made of last statement
112-
RowsAffected() int64
109+
//RowsAffected() int64
113110
}
114111

115112
// SQAuth is an interface for authenticating an action

0 commit comments

Comments
 (0)