Skip to content

Commit ffe5abe

Browse files
Fire and forget close operations.
Serialize all sync status errors.
1 parent f881992 commit ffe5abe

File tree

9 files changed

+914
-559
lines changed

9 files changed

+914
-559
lines changed

package.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,5 +49,10 @@
4949
"rollup-plugin-dts": "^6.2.1",
5050
"typescript": "^5.7.2",
5151
"vitest": "^3.2.4"
52+
},
53+
"pnpm": {
54+
"overrides": {
55+
"@journeyapps/wa-sqlite": "0.0.0-dev-20251201120934"
56+
}
5257
}
5358
}

packages/common/src/db/crud/SyncStatus.ts

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
import { CoreStreamSubscription } from '../../client/sync/stream/core-instruction.js';
21
import { SyncClientImplementation } from '../../client/sync/stream/AbstractStreamingSyncImplementation.js';
3-
import { InternalProgressInformation, ProgressWithOperations, SyncProgress } from './SyncProgress.js';
2+
import { CoreStreamSubscription } from '../../client/sync/stream/core-instruction.js';
43
import { SyncStreamDescription, SyncSubscriptionDescription } from '../../client/sync/sync-streams.js';
4+
import { InternalProgressInformation, ProgressWithOperations, SyncProgress } from './SyncProgress.js';
55

66
export type SyncDataFlowStatus = Partial<{
77
downloading: boolean;
@@ -250,13 +250,28 @@ export class SyncStatus {
250250
return {
251251
connected: this.connected,
252252
connecting: this.connecting,
253-
dataFlow: this.dataFlowStatus,
253+
dataFlow: {
254+
...this.dataFlowStatus,
255+
uploadError: this.serializeError(this.dataFlowStatus.uploadError),
256+
downloadError: this.serializeError(this.dataFlowStatus.downloadError)
257+
},
254258
lastSyncedAt: this.lastSyncedAt,
255259
hasSynced: this.hasSynced,
256260
priorityStatusEntries: this.priorityStatusEntries
257261
};
258262
}
259263

264+
protected serializeError(error?: Error) {
265+
if (typeof error == 'undefined') {
266+
return undefined;
267+
}
268+
return {
269+
name: error.name,
270+
message: error.message,
271+
stack: error.stack
272+
};
273+
}
274+
260275
private static comparePriorities(a: SyncPriorityStatus, b: SyncPriorityStatus) {
261276
return b.priority - a.priority; // Reverse because higher priorities have lower numbers
262277
}

packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,14 +80,17 @@ export class WorkerWrappedAsyncDatabaseConnection<Config extends ResolvedWebSQLO
8080
return this.withRemote(() => this.baseConnection.isAutoCommit());
8181
}
8282

83-
private withRemote<T>(workerPromise: () => Promise<T>): Promise<T> {
83+
private withRemote<T>(workerPromise: () => Promise<T>, fireActionOnAbort = false): Promise<T> {
8484
const controller = this.notifyRemoteClosed;
8585
if (controller) {
8686
return new Promise((resolve, reject) => {
8787
if (controller.signal.aborted) {
8888
reject(new ConnectionClosedError('Called operation on closed remote'));
89-
// Don't run the operation if we're going to reject
90-
return;
89+
if (!fireActionOnAbort) {
90+
// Don't run the operation if we're going to reject
91+
// We might want to fire-and-forget the operation in some cases (like a close operation)
92+
return;
93+
}
9194
}
9295

9396
function handleAbort() {
@@ -172,7 +175,8 @@ export class WorkerWrappedAsyncDatabaseConnection<Config extends ResolvedWebSQLO
172175
// Abort any pending lock requests.
173176
this.lockAbortController.abort();
174177
try {
175-
await this.withRemote(() => this.baseConnection.close());
178+
// fire and forget the close operation
179+
await this.withRemote(() => this.baseConnection.close(), true);
176180
} finally {
177181
this.options.remote[Comlink.releaseProxy]();
178182
this.options.onClose?.();

packages/web/src/worker/sync/SharedSyncImplementation.ts

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,16 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
184184
});
185185
}
186186

187+
/**
188+
* In some very rare cases a specific tab might not respond to requests.
189+
* This returns a random port which is not closing.
190+
*/
191+
protected async getRandomWrappedPort(): Promise<WrappedSyncPort | undefined> {
192+
return await this.portMutex.runExclusive(() => {
193+
return this.ports[Math.floor(Math.random() * this.ports.length)];
194+
});
195+
}
196+
187197
async waitForStatus(status: SyncStatusOptions): Promise<void> {
188198
return this.withSyncImplementation(async (sync) => {
189199
return sync.waitForStatus(status);
@@ -487,12 +497,17 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
487497
protected async openInternalDB() {
488498
while (true) {
489499
try {
490-
const lastClient = await this.getLastWrappedPort();
491-
if (!lastClient) {
500+
const client = await this.getRandomWrappedPort();
501+
if (!client) {
492502
// Should not really happen in practice
493503
throw new Error(`Could not open DB connection since no client is connected.`);
494504
}
495505

506+
// Fail-safe timeout for opening a database connection.
507+
const timeout = setTimeout(() => {
508+
abortController.abort();
509+
}, 10_000);
510+
496511
/**
497512
* Handle cases where the client might close while opening a connection.
498513
*/
@@ -502,21 +517,20 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
502517
};
503518

504519
const removeCloseListener = () => {
505-
const index = lastClient.closeListeners.indexOf(closeListener);
520+
const index = client.closeListeners.indexOf(closeListener);
506521
if (index >= 0) {
507-
lastClient.closeListeners.splice(index, 1);
522+
client.closeListeners.splice(index, 1);
508523
}
509524
};
510525

511-
lastClient.closeListeners.push(closeListener);
526+
client.closeListeners.push(closeListener);
512527

513-
const workerPort = await withAbort(
514-
() => lastClient.clientProvider.getDBWorkerPort(),
515-
abortController.signal
516-
).catch((ex) => {
517-
removeCloseListener();
518-
throw ex;
519-
});
528+
const workerPort = await withAbort(() => client.clientProvider.getDBWorkerPort(), abortController.signal).catch(
529+
(ex) => {
530+
removeCloseListener();
531+
throw ex;
532+
}
533+
);
520534

521535
const remote = Comlink.wrap<OpenAsyncDatabaseConnection>(workerPort);
522536
const identifier = this.syncParams!.dbParams.dbFilename;
@@ -532,6 +546,8 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
532546
removeCloseListener();
533547
});
534548

549+
clearTimeout(timeout);
550+
535551
const wrapped = new WorkerWrappedAsyncDatabaseConnection({
536552
remote,
537553
baseConnection: db,
@@ -540,15 +556,15 @@ export class SharedSyncImplementation extends BaseObserver<SharedSyncImplementat
540556
// that and ensure pending requests are aborted when the tab is closed.
541557
remoteCanCloseUnexpectedly: true
542558
});
543-
lastClient.closeListeners.push(async () => {
559+
client.closeListeners.push(async () => {
544560
this.logger.info('Aborting open connection because associated tab closed.');
545561
/**
546562
* Don't await this close operation. It might never resolve if the tab is closed.
547-
* We run the close operation first, before marking the remote as closed. This gives the database some chance
548-
* to close the connection.
563+
* We mark the remote as closed first, this will reject any pending requests.
564+
* We then call close. The close operation is configured to fire-and-forget, the main promise will reject immediately.
549565
*/
550-
wrapped.close().catch((ex) => this.logger.warn('error closing database connection', ex));
551566
wrapped.markRemoteClosed();
567+
wrapped.close().catch((ex) => this.logger.warn('error closing database connection', ex));
552568
});
553569

554570
return wrapped;
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import { SyncStreamConnectionMethod } from '@powersync/common';
2+
import { describe, expect } from 'vitest';
3+
import { sharedMockSyncServiceTest } from './utils/mockSyncServiceTest';
4+
5+
/**
6+
* Test to verify that Error instances are properly serialized when passed through MessagePorts.
7+
* When errors occur in the shared worker and are reported via statusChanged, they should
8+
* be properly serialized and deserialized to appear in the sync status.
9+
*/
10+
describe('Error Serialization through MessagePorts', { sequential: true }, () => {
11+
sharedMockSyncServiceTest(
12+
'should serialize and deserialize Error in sync status when connection fails',
13+
{ timeout: 10_000 },
14+
async ({ context: { database, mockService } }) => {
15+
await mockService.setAutomaticResponse({
16+
status: 401,
17+
headers: { 'Content-Type': 'application/json' },
18+
bodyLines: ['Unauthorized']
19+
});
20+
21+
// Start connection attempt
22+
await database.connect(
23+
{
24+
fetchCredentials: async () => {
25+
return {
26+
endpoint: 'http://localhost:3000',
27+
token: 'test-token'
28+
};
29+
},
30+
uploadData: async () => {}
31+
},
32+
{
33+
connectionMethod: SyncStreamConnectionMethod.HTTP
34+
}
35+
);
36+
37+
expect(database.currentStatus.dataFlowStatus?.downloadError).toBeDefined();
38+
expect(database.currentStatus.dataFlowStatus?.downloadError?.name).toBe('Error');
39+
expect(database.currentStatus.dataFlowStatus?.downloadError?.message).toBe('HTTP : "Unauthorized"\n');
40+
expect(database.currentStatus.dataFlowStatus?.downloadError?.stack).toBeDefined();
41+
}
42+
);
43+
});

packages/web/tests/mockSyncServiceExample.test.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ describe('Mock Sync Service Example', { timeout: 100000 }, () => {
1717
sharedMockSyncServiceTest(
1818
'should allow mocking sync responses in shared worker',
1919
{ timeout: 100000 },
20-
async ({ context: { database, connect } }) => {
20+
async ({ context: { database, connect, mockService } }) => {
2121
// Call connect to start the sync worker and get the sync service
22-
const { syncService, syncRequestId } = await connect();
22+
const { syncRequestId } = await connect();
2323

2424
// Push a checkpoint with buckets (following node test pattern)
2525
const checkpoint: StreamingSyncCheckpoint = {
@@ -37,10 +37,10 @@ describe('Mock Sync Service Example', { timeout: 100000 }, () => {
3737
}
3838
};
3939

40-
await syncService.pushBodyLine(syncRequestId, checkpoint);
40+
await mockService.pushBodyLine(syncRequestId, checkpoint);
4141

4242
// The connect call should resolve by now
43-
await syncService.pushBodyLine(syncRequestId, {
43+
await mockService.pushBodyLine(syncRequestId, {
4444
data: {
4545
bucket: 'a',
4646
data: [
@@ -57,14 +57,14 @@ describe('Mock Sync Service Example', { timeout: 100000 }, () => {
5757
});
5858

5959
// Push checkpoint_complete to finish the sync
60-
await syncService.pushBodyLine(syncRequestId, {
60+
await mockService.pushBodyLine(syncRequestId, {
6161
checkpoint_complete: {
6262
last_op_id: '1'
6363
}
6464
});
6565

6666
// Complete the response
67-
await syncService.completeResponse(syncRequestId);
67+
await mockService.completeResponse(syncRequestId);
6868

6969
// Wait for sync to complete and verify the data was saved
7070
await vi.waitFor(async () => {

packages/web/tests/multiple_instances.test.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ describe('Multiple Instances', { sequential: true }, () => {
243243

244244
sharedMockSyncServiceTest(
245245
'should trigger uploads from last connected clients',
246-
async ({ context: { database, openDatabase, connect, connector } }) => {
246+
async ({ context: { database, openDatabase, connect, connector, mockService } }) => {
247247
const secondDatabase = openDatabase();
248248

249249
expect(database.currentStatus.connected).false;
@@ -258,7 +258,7 @@ describe('Multiple Instances', { sequential: true }, () => {
258258
await database.execute('INSERT into lists (id, name) VALUES (uuid(), ?)', ['steven']);
259259

260260
// connect from the first database
261-
const { syncService } = await connect();
261+
await connect();
262262

263263
await vi.waitFor(() => expect(database.currentStatus.connected).true);
264264

@@ -275,13 +275,13 @@ describe('Multiple Instances', { sequential: true }, () => {
275275
const secondConnectPromise = secondDatabase.connect(secondConnector);
276276
let _pendingRequestId: string;
277277
await vi.waitFor(async () => {
278-
const requests = await syncService.getPendingRequests();
278+
const requests = await mockService.getPendingRequests();
279279
expect(requests.length).toBeGreaterThan(0);
280280
_pendingRequestId = requests[0].id;
281281
});
282282
const pendingRequestId = _pendingRequestId!;
283-
await syncService.createResponse(pendingRequestId, 200, { 'Content-Type': 'application/json' });
284-
await syncService.pushBodyLine(pendingRequestId, {
283+
await mockService.createResponse(pendingRequestId, 200, { 'Content-Type': 'application/json' });
284+
await mockService.pushBodyLine(pendingRequestId, {
285285
token_expires_in: 10000000
286286
});
287287
await secondConnectPromise;

packages/web/tests/utils/mockSyncServiceTest.ts

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ export function createTestConnector(): MockedTestConnector {
4141
* Result of calling the connect function
4242
*/
4343
export interface ConnectResult {
44-
syncService: MockSyncService;
4544
syncRequestId: string;
4645
}
4746

@@ -63,6 +62,7 @@ export const sharedMockSyncServiceTest = test.extend<{
6362
database: PowerSyncDatabase;
6463
databaseName: string;
6564
openDatabase: (customConfig?: Partial<WebPowerSyncDatabaseOptions>) => PowerSyncDatabase;
65+
mockService: MockSyncService;
6666
};
6767
}>({
6868
context: async ({}, use) => {
@@ -98,6 +98,15 @@ export const sharedMockSyncServiceTest = test.extend<{
9898

9999
const database = openDatabase();
100100

101+
// Get the identifier from the database.name property
102+
const identifier = database.database.name;
103+
104+
// Connect to the shared worker to get the mock service
105+
const mockService = await getMockSyncServiceFromWorker(identifier);
106+
if (!mockService) {
107+
throw new Error('Mock service not available');
108+
}
109+
101110
const connector = createTestConnector();
102111

103112
const connectFn = async (customConnector?: PowerSyncBackendConnector): Promise<ConnectResult> => {
@@ -116,18 +125,6 @@ export const sharedMockSyncServiceTest = test.extend<{
116125
{ timeout: 1000 }
117126
);
118127

119-
// Get the identifier from the database.name property
120-
const identifier = database.database.name;
121-
122-
// Connect to the shared worker to get the mock service
123-
const mockService = await getMockSyncServiceFromWorker(identifier);
124-
125-
if (!mockService) {
126-
throw new Error(
127-
'Mock service not available - ensure enableMultiTabs is true and running in a test environment'
128-
);
129-
}
130-
131128
let _syncRequestId: string;
132129
await vi.waitFor(async () => {
133130
const requests = await mockService.getPendingRequests();
@@ -147,7 +144,6 @@ export const sharedMockSyncServiceTest = test.extend<{
147144
await connectionPromise;
148145

149146
return {
150-
syncService: mockService,
151147
syncRequestId
152148
};
153149
};
@@ -157,7 +153,8 @@ export const sharedMockSyncServiceTest = test.extend<{
157153
connect: connectFn,
158154
database,
159155
databaseName: dbFilename,
160-
openDatabase
156+
openDatabase,
157+
mockService
161158
});
162159
}
163160
});

0 commit comments

Comments
 (0)