Skip to content

Commit b7255b7

Browse files
authored
Fix connect() resolving early with Rust client (#643)
1 parent ffe3095 commit b7255b7

File tree

3 files changed

+64
-19
lines changed

3 files changed

+64
-19
lines changed

.changeset/new-readers-dance.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/common': patch
3+
---
4+
5+
Rust sync client: Fix `connect()` resolving before a connection is made.

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -342,17 +342,18 @@ export abstract class AbstractStreamingSyncImplementation
342342
let checkedCrudItem: CrudEntry | undefined;
343343

344344
while (true) {
345-
this.updateSyncStatus({
346-
dataFlow: {
347-
uploading: true
348-
}
349-
});
350345
try {
351346
/**
352347
* This is the first item in the FIFO CRUD queue.
353348
*/
354349
const nextCrudItem = await this.options.adapter.nextCrudItem();
355350
if (nextCrudItem) {
351+
this.updateSyncStatus({
352+
dataFlow: {
353+
uploading: true
354+
}
355+
});
356+
356357
if (nextCrudItem.clientId == checkedCrudItem?.clientId) {
357358
// This will force a higher log level than exceptions which are caught here.
358359
this.logger.warn(`Potentially previously uploaded CRUD entries are still present in the upload queue.
@@ -410,23 +411,15 @@ The next upload iteration will be delayed.`);
410411
this.abortController = controller;
411412
this.streamingSyncPromise = this.streamingSync(this.abortController.signal, options);
412413

413-
// Return a promise that resolves when the connection status is updated
414+
// Return a promise that resolves when the connection status is updated to indicate that we're connected.
414415
return new Promise<void>((resolve) => {
415416
const disposer = this.registerListener({
416-
statusUpdated: (update) => {
417-
// This is triggered as soon as a connection is read from
418-
if (typeof update.connected == 'undefined') {
419-
// only concern with connection updates
420-
return;
421-
}
422-
423-
if (update.connected == false) {
424-
/**
425-
* This function does not reject if initial connect attempt failed.
426-
* Connected can be false if the connection attempt was aborted or if the initial connection
427-
* attempt failed.
428-
*/
417+
statusChanged: (status) => {
418+
if (status.dataFlowStatus.downloadError != null) {
429419
this.logger.warn('Initial connect attempt did not successfully connect to server');
420+
} else if (status.connecting) {
421+
// Still connecting.
422+
return;
430423
}
431424

432425
disposer();
@@ -889,6 +882,10 @@ The next upload iteration will be delayed.`);
889882
);
890883
}
891884

885+
// The rust client will set connected: true after the first sync line because that's when it gets invoked, but
886+
// we're already connected here and can report that.
887+
syncImplementation.updateSyncStatus({ connected: true });
888+
892889
try {
893890
while (!controlInvocations.closed) {
894891
const line = await controlInvocations.read();

packages/node/tests/sync.test.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,49 @@ function defineSyncTests(impl: SyncClientImplementation) {
135135
expect(Math.abs(lastSyncedAt - now)).toBeLessThan(5000);
136136
});
137137

138+
mockSyncServiceTest('connect() waits for connection', async ({ syncService }) => {
139+
const database = await syncService.createDatabase();
140+
let connectCompleted = false;
141+
database.connect(new TestConnector(), options).then(() => {
142+
connectCompleted = true;
143+
});
144+
expect(connectCompleted).toBeFalsy();
145+
146+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
147+
// We want connected: true once we have a connection
148+
await vi.waitFor(() => connectCompleted);
149+
expect(database.currentStatus.dataFlowStatus.downloading).toBeFalsy();
150+
151+
syncService.pushLine({
152+
checkpoint: {
153+
last_op_id: '10',
154+
buckets: [bucket('a', 10)]
155+
}
156+
});
157+
158+
await vi.waitFor(() => expect(database.currentStatus.dataFlowStatus.downloading).toBeTruthy());
159+
});
160+
161+
mockSyncServiceTest('does not set uploading status without local writes', async ({ syncService }) => {
162+
const database = await syncService.createDatabase();
163+
database.registerListener({
164+
statusChanged(status) {
165+
expect(status.dataFlowStatus.uploading).toBeFalsy();
166+
}
167+
});
168+
169+
database.connect(new TestConnector(), options);
170+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
171+
172+
syncService.pushLine({
173+
checkpoint: {
174+
last_op_id: '10',
175+
buckets: [bucket('a', 10)]
176+
}
177+
});
178+
await vi.waitFor(() => expect(database.currentStatus.dataFlowStatus.downloading).toBeTruthy());
179+
});
180+
138181
describe('reports progress', () => {
139182
let lastOpId = 0;
140183

0 commit comments

Comments
 (0)