Skip to content

Commit 5364c6a

Browse files
committed
Intial queue setup
1 parent 30825f3 commit 5364c6a

File tree

2 files changed

+49
-23
lines changed

2 files changed

+49
-23
lines changed

packages/powersync-op-sqlite/package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,13 @@
7272
},
7373
"dependencies": {
7474
"@powersync/common": "workspace:*",
75+
"async": "^3.2.6",
7576
"async-lock": "^1.4.0"
7677
},
7778
"devDependencies": {
7879
"@op-engineering/op-sqlite": "^9.2.1",
7980
"@react-native/eslint-config": "^0.73.1",
81+
"@types/async": "^3.2.24",
8082
"@types/async-lock": "^1.4.0",
8183
"@types/react": "^18.2.44",
8284
"del-cli": "^5.1.0",

packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import Lock from 'async-lock';
1212
import { OPSQLiteConnection } from './OPSQLiteConnection';
1313
import { NativeModules, Platform } from 'react-native';
1414
import { SqliteOptions } from './SqliteOptions';
15+
import queue from 'async/queue';
1516

1617
/**
1718
* Adapter for React Native Quick SQLite
@@ -39,6 +40,8 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
3940

4041
protected writeConnection: OPSQLiteConnection | null;
4142

43+
private readQueue: any;
44+
4245
constructor(protected options: OPSQLiteAdapterOptions) {
4346
super();
4447
this.name = this.options.name;
@@ -90,6 +93,28 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
9093
await conn.execute('PRAGMA query_only = true');
9194
this.readConnections.push(conn);
9295
}
96+
97+
this.readQueue = queue(
98+
async (
99+
{ connection, fn }: { connection: OPSQLiteConnection; fn: (tx: OPSQLiteConnection) => Promise<any> },
100+
callback
101+
) => {
102+
try {
103+
// console.log('Starting fn(connection)', this.currentDate());
104+
let timeOut = Math.random() * 2000;
105+
console.log('Before sleep:', performance.now());
106+
await this.delay(timeOut);
107+
// timeout(fn(connection), options?.timeoutMs);
108+
const result = await fn(connection);
109+
console.log('After sleep:', performance.now());
110+
callback(null, result);
111+
} catch (error) {
112+
console.log('Error in fn(connection)', error);
113+
callback(error);
114+
}
115+
},
116+
READ_CONNECTIONS
117+
);
93118
}
94119

95120
protected async openConnection(filenameOverride?: string): Promise<OPSQLiteConnection> {
@@ -149,31 +174,30 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
149174
});
150175
}
151176

177+
async delay(msecs: number) {
178+
return new Promise((resolve) => setTimeout(resolve, msecs));
179+
}
180+
181+
currentDate() {
182+
let today = new Date();
183+
return today.toISOString();
184+
}
185+
152186
async readLock<T>(fn: (tx: OPSQLiteConnection) => Promise<T>, options?: DBLockOptions): Promise<T> {
153187
await this.initialized;
154-
// TODO: Use async queues to handle multiple read connections
155-
const sortedConnections = this.readConnections!.map((connection, index) => ({
156-
lockKey: `${LockType.READ}-${index}`,
157-
connection
158-
})).sort((a, b) => {
159-
const aBusy = this.locks.isBusy(a.lockKey);
160-
const bBusy = this.locks.isBusy(b.lockKey);
161-
// Sort by ones which are not busy
162-
return aBusy > bBusy ? 1 : 0;
163-
});
164-
165-
return new Promise(async (resolve, reject) => {
166-
try {
167-
await this.locks.acquire(
168-
sortedConnections[0].lockKey,
169-
async () => {
170-
resolve(await fn(sortedConnections[0].connection));
171-
},
172-
{ timeout: options?.timeoutMs }
173-
);
174-
} catch (ex) {
175-
reject(ex);
176-
}
188+
console.log('Read lock');
189+
return new Promise((resolve, reject) => {
190+
this.readConnections!.forEach((connection, index) => {
191+
this.readQueue.push({ connection, fn }, (err, result) => {
192+
if (err) {
193+
console.log('Rejecting q push', err);
194+
reject(err);
195+
}
196+
console.log('Connection at', index);
197+
console.log('Resolve q', result);
198+
resolve(result);
199+
});
200+
});
177201
});
178202
}
179203

0 commit comments

Comments
 (0)