Skip to content

Commit d73d9d2

Browse files
Cleanup aborted operations if aborted. Try and gracefully handle Accesshandle errors.
1 parent 1ef44a3 commit d73d9d2

File tree

3 files changed

+39
-40
lines changed

3 files changed

+39
-40
lines changed

packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -282,14 +282,10 @@ export class LockedAsyncDatabaseAdapter
282282
try {
283283
// The database is being opened in the background. Wait for it here.
284284
if (this.databaseOpenPromise) {
285-
try {
286-
await this.databaseOpenPromise;
287-
} catch (ex) {
288-
// This will cause a retry of opening the database.
289-
const wrappedError = new ConnectionClosedError('Could not open database');
290-
wrappedError.cause = ex;
291-
throw wrappedError;
292-
}
285+
/**
286+
* We can't await this since it uses the same lock as we're in now.
287+
*/
288+
throw new ConnectionClosedError('Connection is busy re-opening');
293289
}
294290

295291
holdId = this.requiresHolds ? await this.baseDB.markHold() : null;

packages/web/src/worker/db/opfs.ts

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -259,19 +259,7 @@ export class OPFSCoopSyncVFS extends FacadeVFS {
259259
this.mapIdToFile.delete(fileId);
260260

261261
if (file?.flags & VFS.SQLITE_OPEN_MAIN_DB) {
262-
// Always close handles directly as well to ensure cleanup
263-
// This handles edge cases where the lock mechanism might not work properly
264-
if (file.persistentFile) {
265-
DB_RELATED_FILE_SUFFIXES.forEach((suffix) => {
266-
const persistentFile = this.persistentFiles.get(file.path + suffix);
267-
if (persistentFile?.accessHandle) {
268-
persistentFile.accessHandle.close();
269-
persistentFile.accessHandle = null;
270-
}
271-
});
272-
}
273262
if (file.persistentFile?.handleLockReleaser) {
274-
// Normal case: release via the lock mechanism
275263
this.#releaseAccessHandle(file);
276264
}
277265
} else if (file?.flags & VFS.SQLITE_OPEN_DELETEONCLOSE) {
@@ -532,7 +520,6 @@ export class OPFSCoopSyncVFS extends FacadeVFS {
532520
(async () => {
533521
// Acquire the Web Lock.
534522
file.persistentFile.handleLockReleaser = await this.#acquireLock(file.persistentFile);
535-
536523
try {
537524
// Get access handles for the database and releated files in parallel.
538525
this.log?.(`creating access handles for ${file.path}`);
@@ -544,11 +531,13 @@ export class OPFSCoopSyncVFS extends FacadeVFS {
544531
}
545532
})
546533
);
547-
file.persistentFile.isRequestInProgress = false;
548534
} catch (e) {
549535
this.log?.(`failed to create access handles for ${file.path}`, e);
536+
// Release the lock, if we failed here, we'd need to obtain the lock later in order to retry
550537
file.persistentFile.handleLockReleaser();
551538
throw e;
539+
} finally {
540+
file.persistentFile.isRequestInProgress = false;
552541
}
553542
})()
554543
);
@@ -581,6 +570,7 @@ export class OPFSCoopSyncVFS extends FacadeVFS {
581570
*/
582571
#acquireLock(persistentFile) {
583572
return new Promise((resolve) => {
573+
// Tell other connections we want the access handle.
584574
const lockName = persistentFile.handleRequestChannel.name;
585575
const notify = () => {
586576
this.log?.(`notifying for ${lockName}`);
@@ -590,19 +580,11 @@ export class OPFSCoopSyncVFS extends FacadeVFS {
590580
setTimeout(notify);
591581

592582
this.log?.(`lock requested: ${lockName}`);
593-
let releaseLock; // This will hold the resolver for the lock Promise
594-
const lockPromise = new Promise((release) => {
595-
releaseLock = release; // Capture the resolver
596-
});
597-
598583
navigator.locks.request(lockName, (lock) => {
584+
// We have the lock. Stop asking other connections for it.
599585
this.log?.(`lock acquired: ${lockName}`, lock);
600586
clearInterval(notifyId);
601-
// Resolve the outer Promise with a function that releases the lock
602-
resolve(() => {
603-
releaseLock(); // This resolves lockPromise, releasing the lock
604-
});
605-
return lockPromise; // Return the Promise that controls the lock
587+
return new Promise(resolve);
606588
});
607589
});
608590
}

packages/web/src/worker/sync/SharedSyncImplementation.ts

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -525,12 +525,16 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
525525

526526
client.closeListeners.push(closeListener);
527527

528-
const workerPort = await withAbort(() => client.clientProvider.getDBWorkerPort(), abortController.signal).catch(
529-
(ex) => {
530-
removeCloseListener();
531-
throw ex;
528+
const workerPort = await withAbort({
529+
action: () => client.clientProvider.getDBWorkerPort(),
530+
signal: abortController.signal,
531+
cleanupOnAbort: (port) => {
532+
port.close();
532533
}
533-
);
534+
}).catch((ex) => {
535+
removeCloseListener();
536+
throw ex;
537+
});
534538

535539
const remote = Comlink.wrap<OpenAsyncDatabaseConnection>(workerPort);
536540
const identifier = this.syncParams!.dbParams.dbFilename;
@@ -541,7 +545,13 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
541545
* We typically execute the closeListeners using the portMutex in a different context.
542546
* We can't rely on the closeListeners to abort the operation if the tab is closed.
543547
*/
544-
const db = await withAbort(() => remote(this.syncParams!.dbParams), abortController.signal).finally(() => {
548+
const db = await withAbort({
549+
action: () => remote(this.syncParams!.dbParams),
550+
signal: abortController.signal,
551+
cleanupOnAbort: (db) => {
552+
db.close();
553+
}
554+
}).finally(() => {
545555
// We can remove the close listener here since we no longer need it past this point.
546556
removeCloseListener();
547557
});
@@ -588,7 +598,12 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
588598
/**
589599
* Runs the action with an abort controller.
590600
*/
591-
function withAbort<T>(action: () => Promise<T>, signal: AbortSignal): Promise<T> {
601+
function withAbort<T>(options: {
602+
action: () => Promise<T>;
603+
signal: AbortSignal;
604+
cleanupOnAbort?: (result: T) => void;
605+
}): Promise<T> {
606+
const { action, signal, cleanupOnAbort } = options;
592607
return new Promise((resolve, reject) => {
593608
if (signal.aborted) {
594609
reject(new AbortOperation('Operation aborted by abort controller'));
@@ -608,7 +623,13 @@ function withAbort<T>(action: () => Promise<T>, signal: AbortSignal): Promise<T>
608623
}
609624

610625
action()
611-
.then((data) => completePromise(() => resolve(data)))
626+
.then((data) => {
627+
completePromise(() => resolve(data));
628+
// We already rejected due to the abort, allow for cleanup
629+
if (signal.aborted) {
630+
cleanupOnAbort?.(data);
631+
}
632+
})
612633
.catch((e) => completePromise(() => reject(e)));
613634
});
614635
}

0 commit comments

Comments
 (0)