Skip to content

Commit a7d5de0

Browse files
committed
Updated sqlite
1 parent bb89304 commit a7d5de0

File tree

4 files changed

+312
-56
lines changed

4 files changed

+312
-56
lines changed

cmd/indexer/main.go

Lines changed: 114 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"context"
5+
"errors"
56
"flag"
67
"fmt"
78
"os"
@@ -12,15 +13,28 @@ import (
1213
"time"
1314
"unicode"
1415

16+
// Packages
1517
"github.com/mutablelogic/go-sqlite/pkg/indexer"
18+
"github.com/mutablelogic/go-sqlite/pkg/sqlite3"
19+
20+
// Namespace imports
21+
. "github.com/mutablelogic/go-sqlite"
22+
. "github.com/mutablelogic/go-sqlite/pkg/lang"
1623
)
1724

1825
var (
19-
flagInclude = flag.String("include", "", "Paths, names and extensions to include")
20-
flagExclude = flag.String("exclude", "", "Paths, names and extensions to exclude")
26+
flagName = flag.String("name", "index", "Index name")
27+
flagInclude = flag.String("include", "", "Paths, names and extensions to include")
28+
flagExclude = flag.String("exclude", "", "Paths, names and extensions to exclude")
29+
flagWorkers = flag.Uint("workers", 10, "Number of indexing workers")
30+
flagDatabase = flag.String("db", ":memory:", "Path to sqlite database")
2131
)
2232

2333
func main() {
34+
var wg sync.WaitGroup
35+
ctx := HandleSignal()
36+
37+
// Parse flags
2438
flag.Parse()
2539
if flag.NArg() != 1 {
2640
fmt.Fprintln(os.Stderr, "missing path argument")
@@ -34,32 +48,51 @@ func main() {
3448
fmt.Fprintln(os.Stderr, "Not a directory")
3549
os.Exit(-1)
3650
}
37-
indexer, err := indexer.NewIndexer("indexer", path)
51+
52+
// Open indexer to path
53+
indexer, err := indexer.NewIndexer(*flagName, path)
3854
if err != nil {
3955
fmt.Fprintln(os.Stderr, err)
4056
os.Exit(-1)
4157
}
4258

43-
// Inclusions
59+
// Indexer inclusions
4460
for _, include := range strings.FieldsFunc(*flagInclude, sep) {
4561
if err := indexer.Include(include); err != nil {
4662
fmt.Fprintln(os.Stderr, err)
4763
os.Exit(-1)
4864
}
4965
}
5066

51-
// Exclusions
67+
// Indexer exclusions
5268
for _, exclude := range strings.FieldsFunc(*flagExclude, sep) {
5369
if err := indexer.Exclude(exclude); err != nil {
5470
fmt.Fprintln(os.Stderr, err)
5571
os.Exit(-1)
5672
}
5773
}
5874

59-
var wg sync.WaitGroup
60-
61-
ctx := HandleSignal()
75+
// Create database pool
6276
errs := make(chan error)
77+
pool, err := sqlite3.OpenPool(sqlite3.PoolConfig{
78+
Max: int32(*flagWorkers),
79+
Create: true,
80+
Schemas: map[string]string{
81+
"main": *flagDatabase,
82+
},
83+
}, errs)
84+
if err != nil {
85+
fmt.Fprintln(os.Stderr, err)
86+
os.Exit(-1)
87+
}
88+
defer pool.Close()
89+
90+
// Create schema
91+
if err := CreateSchema(ctx, pool); err != nil {
92+
fmt.Fprintln(os.Stderr, err)
93+
os.Exit(-1)
94+
}
95+
6396
wg.Add(1)
6497
go func() {
6598
defer wg.Done()
@@ -85,11 +118,42 @@ func main() {
85118
go func() {
86119
defer wg.Done()
87120
<-time.After(time.Second)
88-
if err := indexer.Walk(ctx); err != nil {
89-
errs <- err
121+
err := indexer.Walk(ctx, func(err error) {
122+
if err != nil {
123+
errs <- fmt.Errorf("reindexing completed with errors: %w", err)
124+
} else {
125+
errs <- fmt.Errorf("reindexing completed")
126+
}
127+
})
128+
if err != nil {
129+
errs <- fmt.Errorf("reindexing cannot start: %w", err)
90130
}
91131
}()
92132

133+
for i := uint(0); i < *flagWorkers; i++ {
134+
wg.Add(1)
135+
go func(i uint) {
136+
defer wg.Done()
137+
conn := pool.Get(ctx)
138+
if conn == nil {
139+
return
140+
}
141+
defer pool.Put(conn)
142+
for {
143+
select {
144+
case <-ctx.Done():
145+
return
146+
default:
147+
if evt := indexer.Next(); evt != nil {
148+
if err := Process(ctx, conn, evt); err != nil {
149+
errs <- err
150+
}
151+
}
152+
}
153+
}
154+
}(i)
155+
}
156+
93157
// Wait for all goroutines to finish
94158
wg.Wait()
95159
os.Exit(0)
@@ -110,3 +174,43 @@ func HandleSignal() context.Context {
110174
func sep(r rune) bool {
111175
return r == ',' || unicode.IsSpace(r)
112176
}
177+
178+
func CreateSchema(ctx context.Context, pool SQPool) error {
179+
conn := pool.Get(ctx)
180+
if conn == nil {
181+
return errors.New("Unable to get a connection from pool")
182+
}
183+
defer pool.Put(conn)
184+
185+
// Create table
186+
return conn.Do(ctx, 0, func(txn SQTransaction) error {
187+
if _, err := txn.Query(Q(`CREATE TABLE IF NOT EXISTS files (
188+
name TEXT NOT NULL,
189+
path TEXT NOT NULL,
190+
PRIMARY KEY (name, path)
191+
)`)); err != nil {
192+
return err
193+
}
194+
return nil
195+
})
196+
}
197+
198+
func Process(ctx context.Context, conn SQConnection, evt *indexer.QueueEvent) error {
199+
return conn.Do(ctx, 0, func(txn SQTransaction) error {
200+
switch evt.Event {
201+
case indexer.EventAdd:
202+
if result, err := txn.Query(Q(`REPLACE INTO files (name, path) VALUES (?, ?)`), evt.Name, evt.Path); err != nil {
203+
return err
204+
} else if result.LastInsertId() > 0 {
205+
fmt.Println("ADDED:", evt)
206+
}
207+
case indexer.EventRemove:
208+
if result, err := txn.Query(Q(`DELETE FROM files WHERE name=? AND path=?`), evt.Name, evt.Path); err != nil {
209+
return err
210+
} else if result.RowsAffected() == 1 {
211+
fmt.Println("REMOVED:", evt)
212+
}
213+
}
214+
return nil
215+
})
216+
}

pkg/indexer/indexer.go

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,15 @@ import (
2323

2424
type Indexer struct {
2525
*walkfs.WalkFS
26+
*Queue
2627
name string
2728
path string
28-
walk chan struct{}
29+
walk chan WalkFunc
2930
}
3031

32+
// WalkFunc is called after a reindexing with any walk errors
33+
type WalkFunc func(err error)
34+
3135
///////////////////////////////////////////////////////////////////////////////
3236
// GLOBALS
3337

@@ -47,6 +51,8 @@ var (
4751
// and a channel to receive any errors
4852
func NewIndexer(name, path string) (*Indexer, error) {
4953
this := new(Indexer)
54+
this.Queue = NewQueueWithCapacity(defaultCapacity)
55+
this.WalkFS = walkfs.New(this.visit)
5056

5157
// Check path argument
5258
if stat, err := os.Stat(path); err != nil {
@@ -60,10 +66,11 @@ func NewIndexer(name, path string) (*Indexer, error) {
6066
} else {
6167
this.name = name
6268
this.path = abspath
63-
this.WalkFS = walkfs.New(this.visit)
64-
this.walk = make(chan struct{})
6569
}
6670

71+
// Channel to indicate we want to walk the index
72+
this.walk = make(chan WalkFunc)
73+
6774
// Return success
6875
return this, nil
6976
}
@@ -88,13 +95,12 @@ FOR_LOOP:
8895
if err := i.event(ctx, evt); err != nil {
8996
senderr(errs, err)
9097
}
91-
case <-i.walk:
98+
case fn := <-i.walk:
9299
walking.Lock()
93100
go func() {
94101
defer walking.Unlock()
95-
if err := i.WalkFS.Walk(ctx, i.path); err != nil {
96-
senderr(errs, err)
97-
}
102+
// Start the walk and return any errors
103+
fn(i.WalkFS.Walk(ctx, i.path))
98104
}()
99105
}
100106
}
@@ -140,11 +146,14 @@ func (i *Indexer) Path() string {
140146

141147
// Walk will initiate a walk of the index, and block until context is
142148
// cancelled or walk is started
143-
func (i *Indexer) Walk(ctx context.Context) error {
149+
func (i *Indexer) Walk(ctx context.Context, fn WalkFunc) error {
150+
if fn == nil {
151+
return ErrBadParameter.With("WalkFunc")
152+
}
144153
select {
145154
case <-ctx.Done():
146155
return ctx.Err()
147-
case i.walk <- struct{}{}:
156+
case i.walk <- fn:
148157
break
149158
}
150159
// Return success
@@ -164,15 +173,15 @@ func (i *Indexer) event(ctx context.Context, evt notify.EventInfo) error {
164173
case notify.Create, notify.Write:
165174
info, err := os.Stat(evt.Path())
166175
if err == nil && info.Mode().IsRegular() && i.ShouldVisit(relpath, info) {
167-
fmt.Println("INDEX ", relpath)
176+
i.Queue.Add(i.name, relpath)
168177
}
169178
case notify.Remove, notify.Rename:
170179
info, err := os.Stat(evt.Path())
171180
if err == nil && info.Mode().IsRegular() && i.ShouldVisit(relpath, info) {
172-
fmt.Println("INDEX ", relpath)
181+
i.Queue.Add(i.name, relpath)
173182
} else {
174183
// Always attempt removal from index
175-
fmt.Println("REMOVE ", relpath)
184+
i.Queue.Remove(i.name, relpath)
176185
}
177186
}
178187
// Return success
@@ -182,7 +191,7 @@ func (i *Indexer) event(ctx context.Context, evt notify.EventInfo) error {
182191
// visit is used to index a file from the indexer
183192
func (i *Indexer) visit(ctx context.Context, abspath, relpath string, info fs.FileInfo) error {
184193
if info.Mode().IsRegular() {
185-
fmt.Println("INDEX ", relpath)
194+
i.Queue.Add(i.name, relpath)
186195
}
187196
return nil
188197
}

0 commit comments

Comments
 (0)