Skip to content

Commit c9c1e24

Browse files
authored
Fix deadlock issue with OPFS and Rust client (#725)
1 parent c78071f commit c9c1e24

File tree

6 files changed

+90
-20
lines changed

6 files changed

+90
-20
lines changed

.changeset/funny-baboons-wonder.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/web': patch
3+
---
4+
5+
Fix issues with multiple tabs when the Rust client and OPFS is used.

packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts

Lines changed: 58 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ export type SharedConnectionWorker = {
1515
export type WrappedWorkerConnectionOptions<Config extends ResolvedWebSQLOpenOptions = ResolvedWebSQLOpenOptions> = {
1616
baseConnection: AsyncDatabaseConnection;
1717
identifier: string;
18+
remoteCanCloseUnexpectedly: boolean;
1819
/**
1920
* Need a remote in order to keep a reference to the Proxied worker
2021
*/
@@ -29,10 +30,13 @@ export type WrappedWorkerConnectionOptions<Config extends ResolvedWebSQLOpenOpti
2930
export class WorkerWrappedAsyncDatabaseConnection<Config extends ResolvedWebSQLOpenOptions = ResolvedWebSQLOpenOptions>
3031
implements AsyncDatabaseConnection
3132
{
32-
protected lockAbortController: AbortController;
33+
protected lockAbortController = new AbortController();
34+
protected notifyRemoteClosed: AbortController | undefined;
3335

3436
constructor(protected options: WrappedWorkerConnectionOptions<Config>) {
35-
this.lockAbortController = new AbortController();
37+
if (options.remoteCanCloseUnexpectedly) {
38+
this.notifyRemoteClosed = new AbortController();
39+
}
3640
}
3741

3842
protected get baseConnection() {
@@ -43,6 +47,48 @@ export class WorkerWrappedAsyncDatabaseConnection<Config extends ResolvedWebSQLO
4347
return this.baseConnection.init();
4448
}
4549

50+
/**
51+
* Marks the remote as closed.
52+
*
53+
* This can sometimes happen outside of our control, e.g. when a shared worker requests a connection from a tab. When
54+
* it happens, all methods on the {@link baseConnection} would never resolve. To avoid livelocks in this scenario, we
55+
* throw on all outstanding promises and forbid new calls.
56+
*/
57+
markRemoteClosed() {
58+
// Can non-null assert here because this function is only supposed to be called when remoteCanCloseUnexpectedly was
59+
// set.
60+
this.notifyRemoteClosed!.abort();
61+
}
62+
63+
private withRemote<T>(workerPromise: () => Promise<T>): Promise<T> {
64+
const controller = this.notifyRemoteClosed;
65+
if (controller) {
66+
return new Promise((resolve, reject) => {
67+
if (controller.signal.aborted) {
68+
reject(new Error('Called operation on closed remote'));
69+
}
70+
71+
function handleAbort() {
72+
reject(new Error('Remote peer closed with request in flight'));
73+
}
74+
75+
function completePromise(action: () => void) {
76+
controller!.signal.removeEventListener('abort', handleAbort);
77+
action();
78+
}
79+
80+
controller.signal.addEventListener('abort', handleAbort);
81+
82+
workerPromise()
83+
.then((data) => completePromise(() => resolve(data)))
84+
.catch((e) => completePromise(() => reject(e)));
85+
});
86+
} else {
87+
// Can't close, so just return the inner worker promise unguarded.
88+
return workerPromise();
89+
}
90+
}
91+
4692
/**
4793
* Get a MessagePort which can be used to share the internals of this connection.
4894
*/
@@ -103,24 +149,27 @@ export class WorkerWrappedAsyncDatabaseConnection<Config extends ResolvedWebSQLO
103149
async close(): Promise<void> {
104150
// Abort any pending lock requests.
105151
this.lockAbortController.abort();
106-
await this.baseConnection.close();
107-
this.options.remote[Comlink.releaseProxy]();
108-
this.options.onClose?.();
152+
try {
153+
await this.withRemote(() => this.baseConnection.close());
154+
} finally {
155+
this.options.remote[Comlink.releaseProxy]();
156+
this.options.onClose?.();
157+
}
109158
}
110159

111160
execute(sql: string, params?: any[]): Promise<ProxiedQueryResult> {
112-
return this.baseConnection.execute(sql, params);
161+
return this.withRemote(() => this.baseConnection.execute(sql, params));
113162
}
114163

115164
executeRaw(sql: string, params?: any[]): Promise<any[][]> {
116-
return this.baseConnection.executeRaw(sql, params);
165+
return this.withRemote(() => this.baseConnection.executeRaw(sql, params));
117166
}
118167

119168
executeBatch(sql: string, params?: any[]): Promise<ProxiedQueryResult> {
120-
return this.baseConnection.executeBatch(sql, params);
169+
return this.withRemote(() => this.baseConnection.executeBatch(sql, params));
121170
}
122171

123172
getConfig(): Promise<ResolvedWebSQLOpenOptions> {
124-
return this.baseConnection.getConfig();
173+
return this.withRemote(() => this.baseConnection.getConfig());
125174
}
126175
}

packages/web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ export class WASQLiteDBAdapter extends LockedAsyncDatabaseAdapter {
5454
const remote = Comlink.wrap<OpenAsyncDatabaseConnection>(workerPort);
5555
return new WorkerWrappedAsyncDatabaseConnection({
5656
remote,
57+
remoteCanCloseUnexpectedly: false,
5758
identifier: options.dbFilename,
5859
baseConnection: await remote({
5960
...options,

packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ export class WASQLiteOpenFactory extends AbstractWebSQLOpenFactory {
8282

8383
return new WorkerWrappedAsyncDatabaseConnection({
8484
remote: workerDBOpener,
85+
// This tab owns the worker, so we're guaranteed to outlive it.
86+
remoteCanCloseUnexpectedly: false,
8587
baseConnection: await workerDBOpener({
8688
dbFilename: this.options.dbFilename,
8789
vfs,

packages/web/src/worker/sync/SharedSyncImplementation.ts

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ export type WrappedSyncPort = {
7575
clientProvider: Comlink.Remote<AbstractSharedSyncClientProvider>;
7676
db?: DBAdapter;
7777
currentSubscriptions: SubscribedStream[];
78+
closeListeners: (() => void)[];
7879
};
7980

8081
/**
@@ -274,7 +275,8 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
274275
const portProvider = {
275276
port,
276277
clientProvider: Comlink.wrap<AbstractSharedSyncClientProvider>(port),
277-
currentSubscriptions: []
278+
currentSubscriptions: [],
279+
closeListeners: []
278280
} satisfies WrappedSyncPort;
279281
this.ports.push(portProvider);
280282

@@ -331,10 +333,13 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
331333
return () => {};
332334
}
333335

336+
for (const closeListener of trackedPort.closeListeners) {
337+
closeListener();
338+
}
339+
334340
if (this.dbAdapter && this.dbAdapter == trackedPort.db) {
335-
if (shouldReconnect) {
336-
await this.connectionManager.disconnect();
337-
}
341+
// Unconditionally close the connection because the database it's writing to has just been closed.
342+
await this.connectionManager.disconnect();
338343

339344
// Clearing the adapter will result in a new one being opened in connect
340345
this.dbAdapter = null;
@@ -344,10 +349,6 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
344349
}
345350
}
346351

347-
if (trackedPort.db) {
348-
await trackedPort.db.close();
349-
}
350-
351352
// Re-index subscriptions, the subscriptions of the removed port would no longer be considered.
352353
this.collectActiveSubscriptions();
353354

@@ -473,11 +474,21 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
473474
const locked = new LockedAsyncDatabaseAdapter({
474475
name: identifier,
475476
openConnection: async () => {
476-
return new WorkerWrappedAsyncDatabaseConnection({
477+
const wrapped = new WorkerWrappedAsyncDatabaseConnection({
477478
remote,
478479
baseConnection: db,
479-
identifier
480+
identifier,
481+
// It's possible for this worker to outlive the client hosting the database for us. We need to be prepared for
482+
// that and ensure pending requests are aborted when the tab is closed.
483+
remoteCanCloseUnexpectedly: true
480484
});
485+
lastClient.closeListeners.push(() => {
486+
this.logger.info('Aborting open connection because associated tab closed.');
487+
wrapped.close();
488+
wrapped.markRemoteClosed();
489+
});
490+
491+
return wrapped;
481492
},
482493
logger: this.logger
483494
});

packages/web/src/worker/sync/WorkerClient.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ export class WorkerClient {
4141

4242
private async removePort() {
4343
if (this.resolvedPort) {
44-
const release = await this.sync.removePort(this.resolvedPort);
44+
const resolved = this.resolvedPort;
45+
this.resolvedPort = null;
46+
const release = await this.sync.removePort(resolved);
4547
this.resolvedPort = null;
4648
this.port.postMessage({
4749
event: SharedSyncClientEvent.CLOSE_ACK,

0 commit comments

Comments
 (0)