Skip to content

Commit 07231cb

Browse files
cleanup port assignments and init flow
1 parent 1e9fa3f commit 07231cb

File tree

4 files changed

+55
-103
lines changed

4 files changed

+55
-103
lines changed

packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import {
22
PowerSyncConnectionOptions,
33
PowerSyncCredentials,
44
SubscribedStream,
5-
SyncStatus,
65
SyncStatusOptions
76
} from '@powersync/common';
87
import * as Comlink from 'comlink';
@@ -187,6 +186,8 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem
187186
* - The shared worker can then request the same lock. The client has been closed if the shared worker can acquire the lock.
188187
* - Once the shared worker knows the client's lock, we can guarentee that the shared worker will detect if the client has been closed.
189188
* - This makes the client safe for the shared worker to use.
189+
* - The client is only added to the SharedSyncImplementation once the lock has been registered.
190+
* This ensures we don't ever keep track of dead clients (tabs that closed before the lock was registered).
190191
* - The client side lock is held until the client is disposed.
191192
* - We resolve the top-level promise after the lock has been registered with the shared worker.
192193
* - The client sends the params to the shared worker after locks have been registered.
@@ -288,12 +289,4 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem
288289
updateSubscriptions(subscriptions: SubscribedStream[]): void {
289290
this.syncManager.updateSubscriptions(subscriptions);
290291
}
291-
292-
/**
293-
* Used in tests to force a connection states
294-
*/
295-
private async _testUpdateStatus(status: SyncStatus) {
296-
await this.isInitialized;
297-
return this.syncManager._testUpdateAllStatuses(status.toJSON());
298-
}
299292
}

packages/web/src/worker/sync/SharedSyncImplementation.ts

Lines changed: 48 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,6 @@ export type WrappedSyncPort = {
7676
db?: DBAdapter;
7777
currentSubscriptions: SubscribedStream[];
7878
closeListeners: (() => void | Promise<void>)[];
79-
/**
80-
* If we can use Navigator locks to detect if the client has closed.
81-
*/
82-
isProtectedFromClose: boolean;
8379
isClosing: boolean;
8480
};
8581

@@ -176,14 +172,16 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
176172
/**
177173
* Gets the last client port which we know is safe from unexpected closes.
178174
*/
179-
protected get lastWrappedPort(): WrappedSyncPort | undefined {
180-
// Find the last port which is protected from close
181-
for (let i = this.ports.length - 1; i >= 0; i--) {
182-
if (this.ports[i].isProtectedFromClose && !this.ports[i].isClosing) {
183-
return this.ports[i];
175+
protected async getLastWrappedPort(): Promise<WrappedSyncPort | undefined> {
176+
// Find the last port which is not closing
177+
return await this.portMutex.runExclusive(() => {
178+
for (let i = this.ports.length - 1; i >= 0; i--) {
179+
if (!this.ports[i].isClosing) {
180+
return this.ports[i];
181+
}
184182
}
185-
}
186-
return;
183+
return;
184+
});
187185
}
188186

189187
async waitForStatus(status: SyncStatusOptions): Promise<void> {
@@ -232,44 +230,45 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
232230
async setParams(params: SharedSyncInitOptions) {
233231
await this.portMutex.runExclusive(async () => {
234232
this.collectActiveSubscriptions();
235-
if (this.syncParams) {
236-
// Cannot modify already existing sync implementation params
237-
return;
238-
}
239-
240-
// First time setting params
241-
this.syncParams = params;
242-
if (params.streamOptions?.flags?.broadcastLogs) {
243-
this.logger = this.broadCastLogger;
244-
}
233+
});
245234

246-
const lockedAdapter = new LockedAsyncDatabaseAdapter({
247-
name: params.dbParams.dbFilename,
248-
openConnection: async () => {
249-
// Gets a connection from the clients when a new connection is requested.
250-
return await this.openInternalDB();
251-
},
252-
logger: this.logger,
253-
reOpenOnConnectionClosed: true
254-
});
255-
this.distributedDB = lockedAdapter;
256-
await lockedAdapter.init();
257-
258-
lockedAdapter.registerListener({
259-
databaseReOpened: () => {
260-
// We may have missed some table updates while the database was closed.
261-
// We can poke the crud in case we missed any updates.
262-
this.connectionManager.syncStreamImplementation?.triggerCrudUpload();
263-
}
264-
});
235+
if (this.syncParams) {
236+
// Cannot modify already existing sync implementation params
237+
return;
238+
}
265239

266-
self.onerror = (event) => {
267-
// Share any uncaught events on the broadcast logger
268-
this.logger.error('Uncaught exception in PowerSync shared sync worker', event);
269-
};
240+
// First time setting params
241+
this.syncParams = params;
242+
if (params.streamOptions?.flags?.broadcastLogs) {
243+
this.logger = this.broadCastLogger;
244+
}
270245

271-
this.iterateListeners((l) => l.initialized?.());
246+
const lockedAdapter = new LockedAsyncDatabaseAdapter({
247+
name: params.dbParams.dbFilename,
248+
openConnection: async () => {
249+
// Gets a connection from the clients when a new connection is requested.
250+
return await this.openInternalDB();
251+
},
252+
logger: this.logger,
253+
reOpenOnConnectionClosed: true
254+
});
255+
this.distributedDB = lockedAdapter;
256+
await lockedAdapter.init();
257+
258+
lockedAdapter.registerListener({
259+
databaseReOpened: () => {
260+
// We may have missed some table updates while the database was closed.
261+
// We can poke the crud in case we missed any updates.
262+
this.connectionManager.syncStreamImplementation?.triggerCrudUpload();
263+
}
272264
});
265+
266+
self.onerror = (event) => {
267+
// Share any uncaught events on the broadcast logger
268+
this.logger.error('Uncaught exception in PowerSync shared sync worker', event);
269+
};
270+
271+
this.iterateListeners((l) => l.initialized?.());
273272
}
274273

275274
async dispose() {
@@ -303,7 +302,6 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
303302
clientProvider: Comlink.wrap<AbstractSharedSyncClientProvider>(port),
304303
currentSubscriptions: [],
305304
closeListeners: [],
306-
isProtectedFromClose: false,
307305
isClosing: false
308306
} satisfies WrappedSyncPort;
309307
this.ports.push(portProvider);
@@ -409,7 +407,7 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
409407
remote: new WebRemote(
410408
{
411409
invalidateCredentials: async () => {
412-
const lastPort = this.lastWrappedPort;
410+
const lastPort = await this.getLastWrappedPort();
413411
if (!lastPort) {
414412
throw new Error('No client port found to invalidate credentials');
415413
}
@@ -421,7 +419,7 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
421419
}
422420
},
423421
fetchCredentials: async () => {
424-
const lastPort = this.lastWrappedPort;
422+
const lastPort = await this.getLastWrappedPort();
425423
if (!lastPort) {
426424
throw new Error('No client port found to fetch credentials');
427425
}
@@ -447,7 +445,7 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
447445
this.logger
448446
),
449447
uploadCrud: async () => {
450-
const lastPort = this.lastWrappedPort;
448+
const lastPort = await this.getLastWrappedPort();
451449
if (!lastPort) {
452450
throw new Error('No client port found to upload crud');
453451
}
@@ -483,7 +481,7 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
483481
protected async openInternalDB() {
484482
while (true) {
485483
try {
486-
const lastClient = this.lastWrappedPort;
484+
const lastClient = await this.getLastWrappedPort();
487485
if (!lastClient) {
488486
// Should not really happen in practice
489487
throw new Error(`Could not open DB connection since no client is connected.`);
@@ -563,19 +561,6 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
563561
this.syncStatus = new SyncStatus(status);
564562
this.ports.forEach((p) => p.clientProvider.statusChanged(status));
565563
}
566-
567-
/**
568-
* A function only used for unit tests which updates the internal
569-
* sync stream client and all tab client's sync status
570-
*/
571-
async _testUpdateAllStatuses(status: SyncStatusOptions) {
572-
if (!this.connectionManager.syncStreamImplementation) {
573-
throw new Error('Cannot update status without a sync stream implementation');
574-
}
575-
// Only assigning, don't call listeners for this test
576-
this.connectionManager.syncStreamImplementation!.syncStatus = new SyncStatus(status);
577-
this.updateAllStatuses(status);
578-
}
579564
}
580565

581566
/**

packages/web/src/worker/sync/SharedSyncImplementation.worker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@ const sharedSyncImplementation = new SharedSyncImplementation();
1010

1111
_self.onconnect = async function (event: MessageEvent<string>) {
1212
const port = event.ports[0];
13-
await new WorkerClient(sharedSyncImplementation, port).initialize();
13+
new WorkerClient(sharedSyncImplementation, port);
1414
};

packages/web/src/worker/sync/WorkerClient.ts

Lines changed: 4 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { ILogLevel, PowerSyncConnectionOptions, SubscribedStream, SyncStatusOptions } from '@powersync/common';
1+
import { ILogLevel, PowerSyncConnectionOptions, SubscribedStream } from '@powersync/common';
22
import * as Comlink from 'comlink';
33
import { getNavigatorLocks } from '../../shared/navigator';
44
import {
@@ -24,9 +24,6 @@ export class WorkerClient {
2424
private readonly port: MessagePort
2525
) {
2626
Comlink.expose(this, this.port);
27-
}
28-
29-
async initialize() {
3027
/**
3128
* Adds an extra listener which can remove this port
3229
* from the list of monitored ports.
@@ -37,16 +34,6 @@ export class WorkerClient {
3734
await this.removePort();
3835
}
3936
});
40-
41-
/**
42-
* Keep a reference to the resolved port promise.
43-
* The init timing is difficult to predict due to the async message passing.
44-
* We only want to use a port if we are know it's been protected from being closed.
45-
* The lock based close signal will be added asynchronously. We need to use the
46-
* added port once the lock is configured.
47-
*/
48-
this.resolvedPortPromise = this.sync.addPort(this.port);
49-
this.resolvedPort = await this.resolvedPortPromise;
5037
}
5138

5239
private async removePort() {
@@ -70,18 +57,9 @@ export class WorkerClient {
7057
* it can consider the connection to be closed.
7158
*/
7259
async addLockBasedCloseSignal(name: string) {
73-
if (!this.resolvedPortPromise) {
74-
// The init logic above is actually synchronous, so this should not happen.
75-
this.sync.broadCastLogger.warn('addLockBasedCloseSignal called before port promise registered');
76-
} else {
77-
const wrappedPort = await this.resolvedPortPromise;
78-
/**
79-
* The client registered a navigator lock. We now can guarantee detecting if the client has closed.
80-
* E.g. before this point: It's possible some ports might have been created and closed before the
81-
* lock based close signal is added. We should not trust those ports.
82-
*/
83-
wrappedPort.isProtectedFromClose = true;
84-
}
60+
// Only add the port once the lock has been obtained on the client.
61+
this.resolvedPort = await this.sync.addPort(this.port);
62+
// Don't await this lock request
8563
getNavigatorLocks().request(name, async () => {
8664
await this.removePort();
8765
});
@@ -121,8 +99,4 @@ export class WorkerClient {
12199
disconnect() {
122100
return this.sync.disconnect();
123101
}
124-
125-
async _testUpdateAllStatuses(status: SyncStatusOptions) {
126-
return this.sync._testUpdateAllStatuses(status);
127-
}
128102
}

0 commit comments

Comments
 (0)