Skip to content

Commit 3d78753

Browse files
committed
Updated for notify locking
1 parent a7d5de0 commit 3d78753

File tree

2 files changed

+73
-50
lines changed

2 files changed

+73
-50
lines changed

cmd/indexer/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,8 @@ func CreateSchema(ctx context.Context, pool SQPool) error {
185185
// Create table
186186
return conn.Do(ctx, 0, func(txn SQTransaction) error {
187187
if _, err := txn.Query(Q(`CREATE TABLE IF NOT EXISTS files (
188-
name TEXT NOT NULL,
189-
path TEXT NOT NULL,
188+
name TEXT NOT NULL,
189+
path TEXT NOT NULL,
190190
PRIMARY KEY (name, path)
191191
)`)); err != nil {
192192
return err

sys/sqlite3/statement.go

Lines changed: 71 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -2,73 +2,96 @@ package sqlite3
22

33
/*
44
#cgo pkg-config: sqlite3
5-
#include <sqlite3.h>
65
#include <stdlib.h>
6+
#include <sqlite3.h>
77
#include <pthread.h>
88
#include <assert.h>
99
10+
// sqlite library needs to be compiled with -DSQLITE_ENABLE_UNLOCK_NOTIFY
1011
// https://www.sqlite.org/unlock_notify.html
1112
1213
typedef struct UnlockNotification UnlockNotification;
1314
1415
struct UnlockNotification {
15-
int fired; // True after unlock event has occurred
16-
pthread_cond_t cond; // Condition variable to wait on
17-
pthread_mutex_t mutex; // Mutex to protect structure
16+
int fired; // True after unlock event has occurred
17+
pthread_cond_t cond; // Condition variable to wait on
18+
pthread_mutex_t mutex; // Mutex to protect structure
1819
};
1920
2021
static void _unlock_notify_cb(void** apArg, int nArg) {
21-
int i;
22-
for(i = 0; i < nArg; i++) {
23-
UnlockNotification *p = (UnlockNotification* )apArg[i];
24-
pthread_mutex_lock(&p->mutex);
25-
p->fired = 1;
26-
pthread_cond_signal(&p->cond);
27-
pthread_mutex_unlock(&p->mutex);
28-
}
22+
int i;
23+
for(i = 0; i < nArg; i++) {
24+
UnlockNotification *p = (UnlockNotification* )apArg[i];
25+
pthread_mutex_lock(&p->mutex);
26+
p->fired = 1;
27+
pthread_cond_signal(&p->cond);
28+
pthread_mutex_unlock(&p->mutex);
29+
}
2930
}
3031
3132
int _wait_for_unlock_notify(sqlite3* db) {
32-
int rc;
33-
UnlockNotification un;
34-
35-
// Initialize the UnlockNotification structure
36-
un.fired = 0;
37-
pthread_mutex_init(&un.mutex, 0);
38-
pthread_cond_init(&un.cond, 0);
39-
40-
// Register for an unlock-notify callback
41-
rc = sqlite3_unlock_notify(db, _unlock_notify_cb, (void* )&un);
42-
assert(rc==SQLITE_LOCKED || rc==SQLITE_OK);
43-
44-
// The call to sqlite3_unlock_notify() always returns either SQLITE_LOCKED
45-
// or SQLITE_OK
46-
if(rc == SQLITE_OK) {
47-
pthread_mutex_lock(&un.mutex);
48-
if (!un.fired) {
49-
pthread_cond_wait(&un.cond, &un.mutex);
50-
}
51-
pthread_mutex_unlock(&un.mutex);
52-
}
53-
54-
// Destroy the mutex and condition variables
55-
pthread_cond_destroy(&un.cond);
56-
pthread_mutex_destroy(&un.mutex);
57-
58-
return rc;
59-
}
33+
int rc;
34+
UnlockNotification un;
35+
36+
// Initialize the UnlockNotification structure
37+
un.fired = 0;
38+
pthread_mutex_init(&un.mutex, 0);
39+
pthread_cond_init(&un.cond, 0);
40+
41+
// Register for an unlock-notify callback
42+
// The call to sqlite3_unlock_notify() always returns either SQLITE_LOCKED
43+
// or SQLITE_OK
44+
rc = sqlite3_unlock_notify(db, _unlock_notify_cb, (void* )&un);
45+
assert(rc == SQLITE_LOCKED || rc == SQLITE_OK);
46+
if(rc == SQLITE_OK) {
47+
pthread_mutex_lock(&un.mutex);
48+
if (!un.fired) {
49+
pthread_cond_wait(&un.cond, &un.mutex);
50+
}
51+
pthread_mutex_unlock(&un.mutex);
52+
}
6053
54+
// Destroy the mutex and condition variables
55+
pthread_cond_destroy(&un.cond);
56+
pthread_mutex_destroy(&un.mutex);
57+
return rc;
58+
}
6159
60+
// This code is a wrapper around sqlite3_step
6261
static int _sqlite3_blocking_step(sqlite3_stmt* stmt) {
63-
int rc;
64-
while(SQLITE_LOCKED == (rc = sqlite3_step(stmt))) {
65-
rc = _wait_for_unlock_notify(sqlite3_db_handle(stmt));
66-
if(rc != SQLITE_OK) break;
67-
sqlite3_reset(stmt);
68-
}
69-
return rc;
62+
while (1) {
63+
int rc = sqlite3_step(stmt);
64+
if ((rc & 0xFF) != SQLITE_LOCKED) {
65+
return rc;
66+
}
67+
rc = _wait_for_unlock_notify(sqlite3_db_handle(stmt));
68+
if (rc != SQLITE_OK) {
69+
return rc;
70+
}
71+
sqlite3_reset(stmt);
72+
}
7073
}
7174
75+
// This code is a wrapper around sqlite3_prepare_v2
76+
static int _sqlite3_blocking_prepare_v2(
77+
sqlite3* db, // Database handle
78+
const char* sql, // UTF-8 encoded SQL statement
79+
int nSql, // Length of zSql in bytes
80+
sqlite3_stmt** stmt, // OUT: A pointer to the prepared statement
81+
const char** pz // OUT: End of parsed string
82+
){
83+
int rc;
84+
while (1) {
85+
int rc = sqlite3_prepare_v2(db, sql, nSql, stmt, pz);
86+
if ((rc & 0xFF) != SQLITE_LOCKED) {
87+
return rc;
88+
}
89+
rc = _wait_for_unlock_notify(sqlite3_db_handle(stmt));
90+
if (rc != SQLITE_OK) {
91+
return rc;
92+
}
93+
}
94+
}
7295
*/
7396
import "C"
7497

@@ -149,7 +172,7 @@ func (c *Conn) Prepare(query string) (*Statement, string, error) {
149172
}
150173

151174
// Prepare statement
152-
if err := SQError(C.sqlite3_prepare_v2((*C.sqlite3)(c), cQuery, -1, &s, &cExtra)); err != SQLITE_OK {
175+
if err := SQError(C._sqlite3_blocking_prepare_v2((*C.sqlite3)(c), cQuery, -1, &s, &cExtra)); err != SQLITE_OK {
153176
return nil, "", err.With(C.GoString(C.sqlite3_errmsg((*C.sqlite3)(c))))
154177
}
155178

0 commit comments

Comments
 (0)