@@ -7,6 +7,8 @@ import com.powersync.bucket.BucketPriority
77import com.powersync.bucket.Checkpoint
88import com.powersync.bucket.OpType
99import com.powersync.bucket.OplogEntry
10+ import com.powersync.bucket.WriteCheckpointData
11+ import com.powersync.bucket.WriteCheckpointResponse
1012import com.powersync.db.PowerSyncDatabaseImpl
1113import com.powersync.db.schema.Schema
1214import com.powersync.sync.SyncLine
@@ -17,6 +19,7 @@ import com.powersync.utils.JsonUtil
1719import dev.mokkery.verify
1820import io.kotest.matchers.collections.shouldHaveSize
1921import io.kotest.matchers.shouldBe
22+ import kotlinx.coroutines.CompletableDeferred
2023import kotlinx.coroutines.DelicateCoroutinesApi
2124import kotlinx.serialization.encodeToString
2225import kotlin.test.Test
@@ -35,7 +38,7 @@ class SyncIntegrationTest {
3538 @OptIn(DelicateCoroutinesApi ::class )
3639 fun closesResponseStreamOnDatabaseClose () =
3740 databaseTest {
38- val syncStream = syncStream()
41+ val syncStream = database. syncStream()
3942 database.connectInternal(syncStream, 1000L )
4043
4144 turbineScope(timeout = 10.0 .seconds) {
@@ -55,7 +58,7 @@ class SyncIntegrationTest {
5558 @OptIn(DelicateCoroutinesApi ::class )
5659 fun cleansResourcesOnDisconnect () =
5760 databaseTest {
58- val syncStream = syncStream()
61+ val syncStream = database. syncStream()
5962 database.connectInternal(syncStream, 1000L )
6063
6164 turbineScope(timeout = 10.0 .seconds) {
@@ -77,7 +80,7 @@ class SyncIntegrationTest {
7780 @Test
7881 fun cannotUpdateSchemaWhileConnected () =
7982 databaseTest {
80- val syncStream = syncStream()
83+ val syncStream = database. syncStream()
8184 database.connectInternal(syncStream, 1000L )
8285
8386 turbineScope(timeout = 10.0 .seconds) {
@@ -96,7 +99,7 @@ class SyncIntegrationTest {
9699 @Test
97100 fun testPartialSync () =
98101 databaseTest {
99- val syncStream = syncStream()
102+ val syncStream = database. syncStream()
100103 database.connectInternal(syncStream, 1000L )
101104
102105 val checksums =
@@ -188,7 +191,7 @@ class SyncIntegrationTest {
188191 @Test
189192 fun testRemembersLastPartialSync () =
190193 databaseTest {
191- val syncStream = syncStream()
194+ val syncStream = database. syncStream()
192195 database.connectInternal(syncStream, 1000L )
193196
194197 syncLines.send(
@@ -225,7 +228,7 @@ class SyncIntegrationTest {
225228 @Test
226229 fun setsDownloadingState () =
227230 databaseTest {
228- val syncStream = syncStream()
231+ val syncStream = database. syncStream()
229232 database.connectInternal(syncStream, 1000L )
230233
231234 turbineScope(timeout = 10.0 .seconds) {
@@ -258,7 +261,7 @@ class SyncIntegrationTest {
258261 fun setsConnectingState () =
259262 databaseTest {
260263 turbineScope(timeout = 10.0 .seconds) {
261- val syncStream = syncStream()
264+ val syncStream = database. syncStream()
262265 val turbine = database.currentStatus.asFlow().testIn(this )
263266
264267 database.connectInternal(syncStream, 1000L )
@@ -274,7 +277,7 @@ class SyncIntegrationTest {
274277 @Test
275278 fun testMultipleSyncsDoNotCreateMultipleStatusEntries () =
276279 databaseTest {
277- val syncStream = syncStream()
280+ val syncStream = database. syncStream()
278281 database.connectInternal(syncStream, 1000L )
279282
280283 turbineScope(timeout = 10.0 .seconds) {
@@ -404,4 +407,109 @@ class SyncIntegrationTest {
404407 turbine.cancel()
405408 }
406409 }
410+
411+ @Test
412+ @OptIn(ExperimentalKermitApi ::class )
413+ fun `handles checkpoints during uploads` () =
414+ databaseTest {
415+ val testConnector = TestConnector ()
416+ connector = testConnector
417+ database.connectInternal(database.syncStream(), 1000L )
418+
419+ suspend fun expectUserRows (amount : Int ) {
420+ val row = database.get(" SELECT COUNT(*) FROM users" ) { it.getLong(0 )!! }
421+ assertEquals(amount, row.toInt())
422+ }
423+
424+ val completeUpload = CompletableDeferred <Unit >()
425+ val uploadStarted = CompletableDeferred <Unit >()
426+ testConnector.uploadDataCallback = { db ->
427+ db.getCrudBatch()?.let { batch ->
428+ uploadStarted.complete(Unit )
429+ completeUpload.await()
430+ batch.complete.invoke(null )
431+ }
432+ }
433+
434+ // Trigger an upload (adding a keep-alive sync line because the execute could start before the database is fully
435+ // connected).
436+ database.execute(" INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)" , listOf (" local" , " local@example.org" ))
437+ syncLines.send(SyncLine .KeepAlive (1234 ))
438+ expectUserRows(1 )
439+ uploadStarted.await()
440+
441+ // Pretend that the connector takes forever in uploadData, but the data gets uploaded before the method returns.
442+ syncLines.send(
443+ SyncLine .FullCheckpoint (
444+ Checkpoint (
445+ writeCheckpoint = " 1" ,
446+ lastOpId = " 2" ,
447+ checksums = listOf (BucketChecksum (" a" , checksum = 0 )),
448+ ),
449+ ),
450+ )
451+ turbineScope {
452+ val turbine = database.currentStatus.asFlow().testIn(this )
453+ turbine.waitFor { it.downloading }
454+ turbine.cancelAndIgnoreRemainingEvents()
455+ }
456+
457+ syncLines.send(
458+ SyncLine .SyncDataBucket (
459+ bucket = " a" ,
460+ data =
461+ listOf (
462+ OplogEntry (
463+ checksum = 0 ,
464+ opId = " 1" ,
465+ op = OpType .PUT ,
466+ rowId = " 1" ,
467+ rowType = " users" ,
468+ data = """ {"id": "test1", "name": "from local", "email": ""}""" ,
469+ ),
470+ OplogEntry (
471+ checksum = 0 ,
472+ opId = " 2" ,
473+ op = OpType .PUT ,
474+ rowId = " 2" ,
475+ rowType = " users" ,
476+ data = """ {"id": "test1", "name": "additional entry", "email": ""}""" ,
477+ ),
478+ ),
479+ after = null ,
480+ nextAfter = null ,
481+ hasMore = false ,
482+ ),
483+ )
484+ syncLines.send(SyncLine .CheckpointComplete (lastOpId = " 2" ))
485+
486+ // Despite receiving a valid checkpoint with two rows, it should not be visible because we have local data.
487+ waitFor {
488+ assertNotNull(
489+ logWriter.logs.find {
490+ it.message.contains(" Could not apply checkpoint due to local data" )
491+ },
492+ )
493+ }
494+ database.expectUserCount(1 )
495+
496+ // Mark the upload as completed, this should trigger a write_checkpoint.json request
497+ val requestedCheckpoint = CompletableDeferred <Unit >()
498+ checkpointResponse = {
499+ requestedCheckpoint.complete(Unit )
500+ WriteCheckpointResponse (WriteCheckpointData (" 1" ))
501+ }
502+ completeUpload.complete(Unit )
503+ requestedCheckpoint.await()
504+
505+ // This should apply the checkpoint
506+ turbineScope {
507+ val turbine = database.currentStatus.asFlow().testIn(this )
508+ turbine.waitFor { ! it.downloading }
509+ turbine.cancelAndIgnoreRemainingEvents()
510+ }
511+
512+ // Meaning that the two rows are now visible
513+ database.expectUserCount(2 )
514+ }
407515}
0 commit comments