@@ -10,23 +10,17 @@ SQLiteOPResult genericSqliteOpenDb(string const dbName, string const docPath,
1010ConnectionState::ConnectionState (const std::string dbName,
1111 const std::string docPath, int SQLFlags) {
1212 auto result = genericSqliteOpenDb (dbName, docPath, &connection, SQLFlags);
13-
14- this ->clearLock ();
15- threadDone = false ;
16- thread = new std::thread (&ConnectionState::doWork, this );
13+ if (result.type != SQLiteOk) {
14+ throw std::runtime_error (" Failed to open SQLite database: " + result.errorMessage );
15+ }
16+ thread = std::thread (&ConnectionState::doWork, this );
17+ this ->clearLock ();
1718}
1819
1920ConnectionState::~ConnectionState () {
20- // So threads know it's time to shut down
21- threadDone = true ;
22-
23- // Wake up all the threads, so they can finish and be joined
24- workQueueConditionVariable.notify_all ();
25- if (thread->joinable ()) {
26- thread->join ();
21+ if (!isClosed) {
22+ close ();
2723 }
28-
29- delete thread;
3024}
3125
3226void ConnectionState::clearLock () {
@@ -64,21 +58,41 @@ std::future<void> ConnectionState::refreshSchema() {
6458}
6559
6660void ConnectionState::close () {
61+ {
62+ std::unique_lock<std::mutex> g (workQueueMutex);
63+ // prevent any new work from being queued
64+ isClosed = true ;
65+ }
66+
67+ // Wait for the work queue to empty
6768 waitFinished ();
68- // So that the thread can stop (if not already)
69- threadDone = true ;
69+
70+ {
71+ // Now signal the thread to stop and notify it
72+ std::unique_lock<std::mutex> g (workQueueMutex);
73+ threadDone = true ;
74+ workQueueConditionVariable.notify_all ();
75+ }
76+
77+ // Join the worker thread
78+ if (thread.joinable ()) {
79+ thread.join ();
80+ }
81+
82+ // Safely close the SQLite connection
7083 sqlite3_close_v2 (connection);
7184}
7285
7386void ConnectionState::queueWork (std::function<void (sqlite3 *)> task) {
74- // Grab the mutex
75- std::lock_guard<std::mutex> g (workQueueMutex);
76-
77- // Push the request to the queue
78- workQueue.push (task);
87+ {
88+ std::unique_lock<std::mutex> g (workQueueMutex);
89+ if (isClosed) {
90+ throw std::runtime_error (" Connection is not open. Connection has been closed before queueing work." );
91+ }
92+ workQueue.push (task);
93+ }
7994
80- // Notify one thread that there are requests to process
81- workQueueConditionVariable.notify_all ();
95+ workQueueConditionVariable.notify_all ();
8296}
8397
8498void ConnectionState::doWork () {
@@ -104,9 +118,9 @@ void ConnectionState::doWork() {
104118 workQueue.pop ();
105119 }
106120
107- ++ threadBusy;
121+ threadBusy = true ;
108122 task (connection);
109- -- threadBusy;
123+ threadBusy = false ;
110124 // Need to notify in order for waitFinished to be updated when
111125 // the queue is empty and not busy
112126 {
@@ -118,11 +132,8 @@ void ConnectionState::doWork() {
118132
119133void ConnectionState::waitFinished () {
120134 std::unique_lock<std::mutex> g (workQueueMutex);
121- if (workQueue.empty ()) {
122- return ;
123- }
124135 workQueueConditionVariable.wait (
125- g, [&] { return workQueue.empty () && ( threadBusy == 0 ) ; });
136+ g, [&] { return workQueue.empty () && ! threadBusy; });
126137}
127138
128139SQLiteOPResult genericSqliteOpenDb (string const dbName, string const docPath,
0 commit comments