Skip to content

Commit 5010b88

Browse files
committed
Show warning if crud transactions are not completed
1 parent 8f71635 commit 5010b88

File tree

5 files changed

+142
-29
lines changed

5 files changed

+142
-29
lines changed

packages/powersync/lib/src/bucket_storage.dart

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,12 @@ class BucketStorage {
270270
});
271271
}
272272

273+
Future<CrudEntry?> nextCrudItem() async {
274+
var next = await _internalDb
275+
.getOptional('SELECT * FROM ps_crud ORDER BY id ASC LIMIT 1');
276+
return next == null ? null : CrudEntry.fromRow(next);
277+
}
278+
273279
Future<bool> hasCrud() async {
274280
final anyData = await select('SELECT 1 FROM ps_crud LIMIT 1');
275281
return anyData.isNotEmpty;

packages/powersync/lib/src/streaming_sync.dart

Lines changed: 38 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import 'package:sqlite_async/mutex.dart';
1010

1111
import 'bucket_storage.dart';
1212
import 'connector.dart';
13+
import 'crud.dart';
1314
import 'stream_utils.dart';
1415
import 'sync_status.dart';
1516
import 'sync_types.dart';
@@ -102,6 +103,10 @@ class StreamingSyncImplementation {
102103
return _abort?.aborted ?? false;
103104
}
104105

106+
bool get isConnected {
107+
return lastStatus.connected;
108+
}
109+
105110
Future<void> streamingSync() async {
106111
try {
107112
_abort = AbortController();
@@ -159,40 +164,49 @@ class StreamingSyncImplementation {
159164
}
160165

161166
Future<void> uploadAllCrud() async {
167+
// Keep track of the first item in the CRUD queue for the last `uploadCrud` iteration.
168+
CrudEntry? checkedCrudItem;
169+
162170
while (true) {
163171
try {
164-
bool done = await uploadCrudBatch();
165-
_updateStatus(uploadError: _noError);
166-
if (done) {
172+
// This is the first item in the FIFO CRUD queue.
173+
CrudEntry? nextCrudItem = await adapter.nextCrudItem();
174+
if (nextCrudItem != null) {
175+
if (nextCrudItem.clientId == checkedCrudItem?.clientId) {
176+
// This will force a higher log level than exceptions which are caught here.
177+
isolateLogger.warning(
178+
"""Potentially previously uploaded CRUD entries are still present in the upload queue.
179+
Make sure to handle uploads and complete CRUD transactions or batches by calling and awaiting their [.complete()] method.
180+
The next upload iteration will be delayed.""");
181+
throw Exception(
182+
'Delaying due to previously encountered CRUD item.');
183+
}
184+
185+
checkedCrudItem = nextCrudItem;
186+
await uploadCrud();
187+
_updateStatus(uploadError: _noError);
188+
} else {
189+
// Uploading is completed
190+
await adapter.updateLocalTarget(() => getWriteCheckpoint());
167191
break;
168192
}
169193
} catch (e, stacktrace) {
194+
checkedCrudItem = null;
170195
isolateLogger.warning('Data upload error', e, stacktrace);
171196
_updateStatus(uploading: false, uploadError: e);
172197
await Future.delayed(retryDelay);
173-
}
174-
}
175-
_updateStatus(uploading: false);
176-
}
177-
178-
Future<bool> uploadCrudBatch() async {
179-
return crudMutex.lock(() async {
180-
if ((await adapter.hasCrud())) {
181-
_updateStatus(uploading: true);
182-
await uploadCrud();
183-
return false;
184-
} else {
185-
// This isolate is the only one triggering
186-
final updated = await adapter.updateLocalTarget(() async {
187-
return getWriteCheckpoint();
188-
});
189-
if (updated) {
190-
_localPingController.add(null);
198+
if (!isConnected) {
199+
// Exit the upload loop if the sync stream is no longer connected
200+
break;
191201
}
192-
193-
return true;
202+
isolateLogger.warning(
203+
"Caught exception when uploading. Upload will retry after a delay",
204+
e,
205+
stacktrace);
206+
} finally {
207+
_updateStatus(uploading: false);
194208
}
195-
}, timeout: retryDelay);
209+
}
196210
}
197211

198212
Future<String> getWriteCheckpoint() async {
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import 'package:logging/logging.dart';
2+
import 'package:powersync/powersync.dart';
3+
import 'package:test/test.dart';
4+
5+
import 'test_server.dart';
6+
import 'utils/abstract_test_utils.dart';
7+
import 'utils/test_utils_impl.dart';
8+
9+
final testUtils = TestUtils();
10+
const testId = "2290de4f-0488-4e50-abed-f8e8eb1d0b42";
11+
const testId2 = "2290de4f-0488-4e50-abed-f8e8eb1d0b43";
12+
const partialWarning =
13+
'Potentially previously uploaded CRUD entries are still present';
14+
15+
class TestConnector extends PowerSyncBackendConnector {
16+
final Function _fetchCredentials;
17+
final Future<void> Function(PowerSyncDatabase database) _uploadData;
18+
19+
TestConnector(this._fetchCredentials, this._uploadData);
20+
21+
@override
22+
Future<PowerSyncCredentials?> fetchCredentials() {
23+
return _fetchCredentials();
24+
}
25+
26+
@override
27+
Future<void> uploadData(PowerSyncDatabase database) async {
28+
return _uploadData(database);
29+
}
30+
}
31+
32+
void main() {
33+
group('CRUD Tests', () {
34+
late PowerSyncDatabase powersync;
35+
late String path;
36+
37+
setUp(() async {
38+
path = testUtils.dbPath();
39+
await testUtils.cleanDb(path: path);
40+
});
41+
42+
tearDown(() async {
43+
// await powersync.disconnectAndClear();
44+
await powersync.close();
45+
});
46+
47+
test('should warn for missing upload operations in uploadData', () async {
48+
var server = await createServer();
49+
50+
credentialsCallback() async {
51+
return PowerSyncCredentials(
52+
endpoint: server.endpoint,
53+
token: 'token',
54+
userId: 'userId',
55+
);
56+
}
57+
58+
uploadData(PowerSyncDatabase db) async {
59+
// Do nothing
60+
}
61+
62+
final records = <LogRecord>[];
63+
final sub = testWarningLogger.onRecord.listen(records.add);
64+
65+
powersync =
66+
await testUtils.setupPowerSync(path: path, logger: testWarningLogger);
67+
powersync.retryDelay = Duration(milliseconds: 0);
68+
var connector = TestConnector(credentialsCallback, uploadData);
69+
powersync.connect(connector: connector);
70+
71+
// Create something with CRUD in it.
72+
await powersync.execute(
73+
'INSERT INTO assets(id, description) VALUES(?, ?)', [testId, 'test']);
74+
75+
// Wait for the uploadData to be called.
76+
await Future.delayed(Duration(milliseconds: 100));
77+
78+
// Create something else with CRUD in it.
79+
await powersync.execute(
80+
'INSERT INTO assets(id, description) VALUES(?, ?)',
81+
[testId2, 'test2']);
82+
83+
sub.cancel();
84+
85+
var warningLogs = records.where((r) => r.level == Level.WARNING).toList();
86+
expect(warningLogs, hasLength(2));
87+
expect(warningLogs[0].message, contains(partialWarning));
88+
});
89+
});
90+
}

packages/powersync/test/utils/abstract_test_utils.dart

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@ const defaultSchema = schema;
2424

2525
final testLogger = _makeTestLogger();
2626

27-
Logger _makeTestLogger() {
27+
final testWarningLogger = _makeTestLogger(level: Level.WARNING);
28+
29+
Logger _makeTestLogger({Level level = Level.ALL}) {
2830
final logger = Logger.detached('PowerSync Tests');
29-
logger.level = Level.ALL;
31+
logger.level = level;
3032
logger.onRecord.listen((record) {
3133
print(
3234
'[${record.loggerName}] ${record.level.name}: ${record.time}: ${record.message}');
@@ -70,9 +72,9 @@ abstract class AbstractTestUtils {
7072

7173
/// Creates a SqliteDatabaseConnection
7274
Future<PowerSyncDatabase> setupPowerSync(
73-
{String? path, Schema? schema}) async {
75+
{String? path, Schema? schema, Logger? logger}) async {
7476
final db = PowerSyncDatabase.withFactory(await testFactory(path: path),
75-
schema: schema ?? defaultSchema, logger: testLogger);
77+
schema: schema ?? defaultSchema, logger: logger ?? testLogger);
7678
await db.initialize();
7779
return db;
7880
}

packages/powersync/test/utils/web_test_utils.dart

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import 'dart:async';
22
import 'dart:html';
33

44
import 'package:js/js.dart';
5+
import 'package:logging/logging.dart';
56
import 'package:powersync/powersync.dart';
67
import 'package:sqlite_async/sqlite3_common.dart';
78
import 'package:sqlite_async/sqlite_async.dart';
@@ -51,7 +52,7 @@ class TestUtils extends AbstractTestUtils {
5152

5253
@override
5354
Future<PowerSyncDatabase> setupPowerSync(
54-
{String? path, Schema? schema}) async {
55+
{String? path, Schema? schema, Logger? logger}) async {
5556
await _isInitialized;
5657
return super.setupPowerSync(path: path, schema: schema);
5758
}

0 commit comments

Comments
 (0)