Skip to content

Commit efb9d09

Browse files
committed
Debug node test failures
1 parent 66f1076 commit efb9d09

File tree

3 files changed

+50
-18
lines changed

3 files changed

+50
-18
lines changed

packages/node/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@
7070
"async-mutex": "^0.5.0",
7171
"comlink": "^4.4.2",
7272
"undici": "^7.11.0",
73-
"bson": "^6.10.4"
73+
"bson": "^6.10.4"
7474
},
7575
"devDependencies": {
7676
"@powersync/drizzle-driver": "workspace:*",

packages/node/src/db/RemoteConnection.ts

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,49 +11,73 @@ export class RemoteConnection implements LockContext {
1111

1212
private readonly worker: Worker;
1313
private readonly comlink: Remote<AsyncDatabaseOpener>;
14-
readonly database: Remote<AsyncDatabase>;
14+
private readonly database: Remote<AsyncDatabase>;
15+
16+
private readonly notifyWorkerClosed = new AbortController();
1517

1618
constructor(worker: Worker, comlink: Remote<AsyncDatabaseOpener>, database: Remote<AsyncDatabase>) {
1719
this.worker = worker;
1820
this.comlink = comlink;
1921
this.database = database;
22+
23+
this.worker.once('exit', (_) => {
24+
this.notifyWorkerClosed.abort();
25+
});
2026
}
2127

2228
/**
2329
* Runs the inner function, but appends the stack trace where this function was called. This is useful for workers
2430
* because stack traces from worker errors are otherwise unrelated to the application issue that has caused them.
2531
*/
26-
private async recoverTrace<T>(inner: () => Promise<T>): Promise<T> {
32+
private withRemote<T>(inner: () => Promise<T>): Promise<T> {
2733
const trace = {};
2834
Error.captureStackTrace(trace);
35+
const controller = this.notifyWorkerClosed;
2936

30-
try {
31-
return await inner();
32-
} catch (e) {
33-
if (e instanceof Error && e.stack) {
34-
e.stack += (trace as any).stack;
37+
return new Promise((resolve, reject) => {
38+
if (controller.signal.aborted) {
39+
reject(new Error('Called operation on closed remote'));
3540
}
3641

37-
throw e;
38-
}
42+
function handleAbort() {
43+
reject(new Error('Remote peer closed with request in flight'));
44+
}
45+
46+
function completePromise(action: () => void) {
47+
controller!.signal.removeEventListener('abort', handleAbort);
48+
action();
49+
}
50+
51+
controller.signal.addEventListener('abort', handleAbort);
52+
53+
inner()
54+
.then((data) => completePromise(() => resolve(data)))
55+
.catch((e) => {
56+
if (e instanceof Error && e.stack) {
57+
e.stack += (trace as any).stack;
58+
}
59+
60+
return completePromise(() => reject(e));
61+
});
62+
});
3963
}
4064

4165
executeBatch(query: string, params: any[][] = []): Promise<QueryResult> {
42-
return this.recoverTrace(async () => {
66+
return this.withRemote(async () => {
4367
const result = await this.database.executeBatch(query, params ?? []);
4468
return RemoteConnection.wrapQueryResult(result);
4569
});
4670
}
4771

4872
execute(query: string, params?: any[] | undefined): Promise<QueryResult> {
49-
return this.recoverTrace(async () => {
73+
return this.withRemote(async () => {
5074
const result = await this.database.execute(query, params ?? []);
5175
return RemoteConnection.wrapQueryResult(result);
5276
});
5377
}
5478

5579
executeRaw(query: string, params?: any[] | undefined): Promise<any[][]> {
56-
return this.recoverTrace(async () => {
80+
return this.withRemote(async () => {
5781
return await this.database.executeRaw(query, params ?? []);
5882
});
5983
}

packages/node/src/db/WorkerConnectionPool.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,22 @@ export class WorkerConnectionPool extends BaseObserver<DBAdapterListener> implem
9292
const listeners = new WeakMap<EventListenerOrEventListenerObject, (e: any) => void>();
9393

9494
const comlink = Comlink.wrap<AsyncDatabaseOpener>({
95-
postMessage: worker.postMessage.bind(worker),
95+
postMessage: (message: any, transfer?: any) => {
96+
console.log('to worker', message);
97+
return worker.postMessage(message, transfer);
98+
},
9699
addEventListener: (type, listener) => {
97100
let resolved: (event: any) => void =
98101
'handleEvent' in listener ? listener.handleEvent.bind(listener) : listener;
99102

103+
{
104+
const original = resolved;
105+
resolved = (event) => {
106+
console.log('from worker', event);
107+
return original(event);
108+
};
109+
}
110+
100111
// Comlink wants message events, but the message event on workers in Node returns the data only.
101112
if (type === 'message') {
102113
const original = resolved;
@@ -213,10 +224,7 @@ export class WorkerConnectionPool extends BaseObserver<DBAdapterListener> implem
213224
try {
214225
return await fn(this.writeConnection);
215226
} finally {
216-
const serializedUpdates = await this.writeConnection.database.executeRaw(
217-
"SELECT powersync_update_hooks('get');",
218-
[]
219-
);
227+
const serializedUpdates = await this.writeConnection.executeRaw("SELECT powersync_update_hooks('get');", []);
220228
const updates = JSON.parse(serializedUpdates[0][0] as string) as string[];
221229

222230
if (updates.length > 0) {

0 commit comments

Comments
 (0)