Skip to content

Commit 0fc0d73

Browse files
Use a distributed database adapter instead of reconnecting. Retry opening internal worker connections.
1 parent 62b04de commit 0fc0d73

File tree

4 files changed

+104
-109
lines changed

4 files changed

+104
-109
lines changed

packages/common/src/client/ConnectionManager.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { ILogger } from 'js-logger';
2+
import { SyncStatus } from '../db/crud/SyncStatus.js';
23
import { BaseListener, BaseObserver } from '../utils/BaseObserver.js';
34
import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector.js';
45
import {
@@ -13,7 +14,6 @@ import {
1314
SyncStreamSubscribeOptions,
1415
SyncStreamSubscription
1516
} from './sync/sync-streams.js';
16-
import { SyncStatus } from '../db/crud/SyncStatus.js';
1717

1818
/**
1919
* @internal

packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ import {
1313
import { getNavigatorLocks } from '../..//shared/navigator';
1414
import { AsyncDatabaseConnection } from './AsyncDatabaseConnection';
1515
import { SharedConnectionWorker, WebDBAdapter } from './WebDBAdapter';
16-
import { WorkerWrappedAsyncDatabaseConnection } from './WorkerWrappedAsyncDatabaseConnection';
16+
import {
17+
WorkerConnectionClosedError,
18+
WorkerWrappedAsyncDatabaseConnection
19+
} from './WorkerWrappedAsyncDatabaseConnection';
1720
import { WASQLiteVFS } from './wa-sqlite/WASQLiteConnection';
1821
import { ResolvedWASQLiteOpenFactoryOptions } from './wa-sqlite/WASQLiteOpenFactory';
1922
import { ResolvedWebSQLOpenOptions } from './web-sql-flags';
@@ -52,6 +55,7 @@ export class LockedAsyncDatabaseAdapter
5255
private _config: ResolvedWebSQLOpenOptions | null = null;
5356
protected pendingAbortControllers: Set<AbortController>;
5457
protected requiresHolds: boolean | null;
58+
protected requiresReOpen: boolean;
5559

5660
closing: boolean;
5761
closed: boolean;
@@ -64,6 +68,7 @@ export class LockedAsyncDatabaseAdapter
6468
this.closed = false;
6569
this.closing = false;
6670
this.requiresHolds = null;
71+
this.requiresReOpen = false;
6772
// Set the name if provided. We can query for the name if not available yet
6873
this.debugMode = options.debugMode ?? false;
6974
if (this.debugMode) {
@@ -106,18 +111,26 @@ export class LockedAsyncDatabaseAdapter
106111
return this.initPromise;
107112
}
108113

109-
protected async _init() {
114+
protected async openInternalDB() {
115+
// Dispose any previous table change listener.
116+
this._disposeTableChangeListener?.();
117+
this._disposeTableChangeListener = null;
118+
110119
this._db = await this.options.openConnection();
111120
await this._db.init();
112121
this._config = await this._db.getConfig();
113122
await this.registerOnChangeListener(this._db);
114-
this.iterateListeners((cb) => cb.initialized?.());
115123
/**
116124
* This is only required for the long-lived shared IndexedDB connections.
117125
*/
118126
this.requiresHolds = (this._config as ResolvedWASQLiteOpenFactoryOptions).vfs == WASQLiteVFS.IDBBatchAtomicVFS;
119127
}
120128

129+
protected async _init() {
130+
await this.openInternalDB();
131+
this.iterateListeners((cb) => cb.initialized?.());
132+
}
133+
121134
getConfiguration(): ResolvedWebSQLOpenOptions {
122135
if (!this._config) {
123136
throw new Error(`Cannot get config before initialization is completed`);
@@ -223,7 +236,7 @@ export class LockedAsyncDatabaseAdapter
223236
this.pendingAbortControllers.add(abortController);
224237
const { timeoutMs } = options ?? {};
225238

226-
const timoutId = timeoutMs
239+
const timeoutId = timeoutMs
227240
? setTimeout(() => {
228241
abortController.abort(`Timeout after ${timeoutMs}ms`);
229242
this.pendingAbortControllers.delete(abortController);
@@ -235,12 +248,21 @@ export class LockedAsyncDatabaseAdapter
235248
{ signal: abortController.signal },
236249
async () => {
237250
this.pendingAbortControllers.delete(abortController);
238-
if (timoutId) {
239-
clearTimeout(timoutId);
251+
if (timeoutId) {
252+
clearTimeout(timeoutId);
240253
}
241254
const holdId = this.requiresHolds ? await this.baseDB.markHold() : null;
242255
try {
256+
if (this.requiresReOpen) {
257+
await this.openInternalDB();
258+
this.requiresReOpen = false;
259+
}
243260
return await callback();
261+
} catch (ex) {
262+
if (ex instanceof WorkerConnectionClosedError) {
263+
this.requiresReOpen = true;
264+
}
265+
throw ex;
244266
} finally {
245267
if (holdId) {
246268
await this.baseDB.releaseHold(holdId);

packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,13 @@ export type WrappedWorkerConnectionOptions<Config extends ResolvedWebSQLOpenOpti
2323
onClose?: () => void;
2424
};
2525

26+
export class WorkerConnectionClosedError extends Error {
27+
constructor(message: string) {
28+
super(message);
29+
this.name = 'WorkerConnectionClosedError';
30+
}
31+
}
32+
2633
/**
2734
* Wraps a provided instance of {@link AsyncDatabaseConnection}, providing necessary proxy
2835
* functions for worker listeners.
@@ -77,13 +84,13 @@ export class WorkerWrappedAsyncDatabaseConnection<Config extends ResolvedWebSQLO
7784
if (controller) {
7885
return new Promise((resolve, reject) => {
7986
if (controller.signal.aborted) {
80-
reject(new Error('Called operation on closed remote'));
87+
reject(new WorkerConnectionClosedError('Called operation on closed remote'));
8188
// Don't run the operation if we're going to reject
8289
return;
8390
}
8491

8592
function handleAbort() {
86-
reject(new Error('Remote peer closed with request in flight'));
93+
reject(new WorkerConnectionClosedError('Remote peer closed with request in flight'));
8794
}
8895

8996
function completePromise(action: () => void) {

0 commit comments

Comments
 (0)