From decbb43be6ada4180937671f7a063b64355647a2 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 10 Sep 2025 16:01:44 +0200 Subject: [PATCH] Support asynchronous transactions --- CHANGELOG.md | 1 + .../kotlin/com/powersync/CrudTest.kt | 2 +- .../kotlin/com/powersync/DatabaseTest.kt | 104 +++++++++++-- .../com/powersync/testutils/TestUtils.kt | 2 +- .../com/powersync/bucket/BucketStorage.kt | 6 +- .../com/powersync/bucket/BucketStorageImpl.kt | 26 ++-- .../com/powersync/db/PowerSyncDatabaseImpl.kt | 61 +------- .../kotlin/com/powersync/db/Queries.kt | 142 +++++++++++++++--- .../powersync/db/driver/RawConnectionLease.kt | 9 +- .../db/internal/ConnectionContext.kt | 4 +- .../db/internal/InternalDatabaseImpl.kt | 82 +--------- .../db/internal/PowerSyncTransaction.kt | 6 +- .../ScopedWriteQueriesImplementation.kt | 81 ++++++++++ .../com/powersync/bucket/BucketStorageTest.kt | 2 +- 14 files changed, 331 insertions(+), 197 deletions(-) create mode 100644 core/src/commonMain/kotlin/com/powersync/db/internal/ScopedWriteQueriesImplementation.kt diff --git a/CHANGELOG.md b/CHANGELOG.md index 32e0d7bd..69468d72 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * Remove internal SQLDelight and SQLiter dependencies. * Add `rawConnection` getter to `ConnectionContext`, which is a `SQLiteConnection` instance from `androidx.sqlite` that can be used to step through statements in a custom way. +* Support asynchronous transactions and locks. ## 1.5.1 diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/CrudTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/CrudTest.kt index 566a2ae6..d31597ab 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/CrudTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/CrudTest.kt @@ -108,7 +108,7 @@ class CrudTest { ), ) - database.writeTransaction { tx -> + database.writeTransactionAsync { tx -> tx.execute( "INSERT INTO foo (id,a,b,c) VALUES (uuid(), ?, ?, ?)", listOf( diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt index ee91a2d5..d8512c66 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt @@ -1,7 +1,5 @@ package com.powersync -import androidx.sqlite.SQLiteConnection -import androidx.sqlite.execSQL import app.cash.turbine.test import app.cash.turbine.turbineScope import co.touchlab.kermit.ExperimentalKermitApi @@ -80,6 +78,7 @@ class DatabaseTest { // Start a long running writeTransaction val transactionJob = scope.async { + @Suppress("DEPRECATION") database.writeTransaction { tx -> // Create another user // External readers should not see this user while the transaction is open @@ -115,6 +114,57 @@ class DatabaseTest { assertEquals(afterTx.size, 2) } + @Test + fun testConcurrentReadsAsync() = + databaseTest { + database.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf( + "steven", + "s@journeyapps.com", + ), + ) + + val pausedTransaction = CompletableDeferred() + val transactionItemCreated = CompletableDeferred() + // Start a long running writeTransaction + val transactionJob = + scope.async { + database.writeTransactionAsync { tx -> + // Create another user + // External readers should not see this user while the transaction is open + tx.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf( + "steven", + "s@journeyapps.com", + ), + ) + + transactionItemCreated.complete(Unit) + + // Block this transaction until we free it + runBlocking { + pausedTransaction.await() + } + } + } + + // Make sure to wait for the item to have been created in the transaction + transactionItemCreated.await() + // Try and read while the write transaction is busy + val result = database.getAll("SELECT * FROM users") { UserRow.from(it) } + // The transaction is not commited yet, we should only read 1 user + assertEquals(result.size, 1) + + // Let the transaction complete + pausedTransaction.complete(Unit) + transactionJob.await() + + val afterTx = database.getAll("SELECT * FROM users") { UserRow.from(it) } + assertEquals(afterTx.size, 2) + } + @Test fun testTransactionReads() = databaseTest { @@ -126,6 +176,7 @@ class DatabaseTest { ), ) + @Suppress("DEPRECATION") database.writeTransaction { tx -> val userCount = tx.getAll("SELECT COUNT(*) as count FROM users") { cursor -> cursor.getLong(0)!! } @@ -146,6 +197,37 @@ class DatabaseTest { } } + @Test + fun testTransactionReadsAsync() = + databaseTest { + database.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf( + "steven", + "s@journeyapps.com", + ), + ) + + database.writeTransactionAsync { tx -> + val userCount = + tx.getAll("SELECT COUNT(*) as count FROM users") { cursor -> cursor.getLong(0)!! } + assertEquals(userCount[0], 1) + + tx.execute( + "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", + listOf( + "steven", + "s@journeyapps.com", + ), + ) + + // Getters inside the transaction should be able to see the latest update + val userCount2 = + tx.getAll("SELECT COUNT(*) as count FROM users") { cursor -> cursor.getLong(0)!! } + assertEquals(userCount2[0], 2) + } + } + @Test fun testTableUpdates() = databaseTest { @@ -161,7 +243,7 @@ class DatabaseTest { ) query.awaitItem() shouldHaveSize 1 - database.writeTransaction { + database.writeTransactionAsync { it.execute( "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", listOf("Test2", "test2@example.org"), @@ -175,11 +257,11 @@ class DatabaseTest { query.awaitItem() shouldHaveSize 3 try { - database.writeTransaction { + database.writeTransactionAsync { it.execute("DELETE FROM users;") it.execute("syntax error, revert please (this is intentional from the unit test)") } - } catch (e: Exception) { + } catch (_: Exception) { // Ignore } @@ -229,7 +311,7 @@ class DatabaseTest { // Request a lock val lockJob = scope.async { - database.readLock { + database.readLockAsync { inLock.complete(Unit) runBlocking { pausedLock.await() @@ -255,7 +337,7 @@ class DatabaseTest { assertEquals(actual = database.closed, expected = false) // Any new readLocks should throw - val exception = shouldThrow { database.readLock {} } + val exception = shouldThrow { database.readLockAsync {} } exception.message shouldBe "Cannot process connection pool request" // Release the lock @@ -327,7 +409,7 @@ class DatabaseTest { fun basicReadTransaction() = databaseTest { val count = - database.readTransaction { it -> + database.readTransactionAsync { it -> it.get("SELECT COUNT(*) from users") { it.getLong(0)!! } } count shouldBe 0 @@ -416,7 +498,7 @@ class DatabaseTest { listOf("a", "a@example.org"), ) - database.writeTransaction { + database.writeTransactionAsync { it.execute( "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", listOf("b", "b@example.org"), @@ -442,7 +524,7 @@ class DatabaseTest { fun testCrudTransactions() = databaseTest { suspend fun insertInTransaction(size: Int) { - database.writeTransaction { tx -> + database.writeTransactionAsync { tx -> repeat(size) { tx.execute("INSERT INTO users (id, name, email) VALUES (uuid(), null, null)") } @@ -478,7 +560,7 @@ class DatabaseTest { listOf("a", "a@example.org"), ) - database.writeTransaction { + database.writeTransactionAsync { it.execute( "INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", listOf("b", "b@example.org"), diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt index 0b533cfd..a695d5e5 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt @@ -117,7 +117,7 @@ internal class ActiveDatabaseTest( return db } - suspend fun openDatabaseAndInitialize(): PowerSyncDatabaseImpl = openDatabase().also { it.readLock { } } + suspend fun openDatabaseAndInitialize(): PowerSyncDatabaseImpl = openDatabase().also { it.readLockAsync { } } @OptIn(ExperimentalPowerSyncAPI::class) fun createSyncClient(): HttpClient { diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt index 0937cdac..50f44993 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt @@ -1,8 +1,8 @@ package com.powersync.bucket +import com.powersync.db.ScopedWriteQueries import com.powersync.db.SqlCursor import com.powersync.db.crud.CrudEntry -import com.powersync.db.internal.PowerSyncTransaction import com.powersync.db.schema.SerializableSchema import com.powersync.sync.Instruction import com.powersync.sync.LegacySyncImplementation @@ -18,11 +18,11 @@ internal interface BucketStorage { suspend fun nextCrudItem(): CrudEntry? - fun nextCrudItem(transaction: PowerSyncTransaction): CrudEntry? + suspend fun nextCrudItem(transaction: ScopedWriteQueries): CrudEntry? suspend fun hasCrud(): Boolean - fun hasCrud(transaction: PowerSyncTransaction): Boolean + suspend fun hasCrud(transaction: ScopedWriteQueries): Boolean fun mapCrudEntry(row: SqlCursor): CrudEntry diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt index 00331c37..6237ef39 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt @@ -2,12 +2,12 @@ package com.powersync.bucket import co.touchlab.kermit.Logger import co.touchlab.stately.concurrency.AtomicBoolean +import com.powersync.db.ScopedWriteQueries import com.powersync.db.SqlCursor import com.powersync.db.crud.CrudEntry import com.powersync.db.crud.CrudRow import com.powersync.db.internal.InternalDatabase import com.powersync.db.internal.InternalTable -import com.powersync.db.internal.PowerSyncTransaction import com.powersync.sync.Instruction import com.powersync.sync.LegacySyncImplementation import com.powersync.sync.SyncDataBatch @@ -37,7 +37,7 @@ internal class BucketStorageImpl( override suspend fun nextCrudItem(): CrudEntry? = db.getOptional(sql = nextCrudQuery, mapper = ::mapCrudEntry) - override fun nextCrudItem(transaction: PowerSyncTransaction): CrudEntry? = + override suspend fun nextCrudItem(transaction: ScopedWriteQueries): CrudEntry? = transaction.getOptional(sql = nextCrudQuery, mapper = ::mapCrudEntry) private val nextCrudQuery = "SELECT id, tx_id, data FROM ${InternalTable.CRUD} ORDER BY id ASC LIMIT 1" @@ -56,7 +56,7 @@ internal class BucketStorageImpl( return res == 1L } - override fun hasCrud(transaction: PowerSyncTransaction): Boolean { + override suspend fun hasCrud(transaction: ScopedWriteQueries): Boolean { val res = transaction.getOptional(sql = hasCrudQuery, mapper = hasCrudMapper) return res == 1L } @@ -85,10 +85,10 @@ internal class BucketStorageImpl( logger.i { "[updateLocalTarget] Updating target to checkpoint $opId" } - return db.writeTransaction { tx -> + return db.writeTransactionAsync { tx -> if (hasCrud(tx)) { logger.w { "[updateLocalTarget] ps crud is not empty" } - return@writeTransaction false + return@writeTransactionAsync false } val seqAfter = @@ -101,7 +101,7 @@ internal class BucketStorageImpl( if (seqAfter != seqBefore) { logger.d("seqAfter != seqBefore seqAfter: $seqAfter seqBefore: $seqBefore") // New crud data may have been uploaded since we got the checkpoint. Abort. - return@writeTransaction false + return@writeTransactionAsync false } tx.execute( @@ -109,13 +109,13 @@ internal class BucketStorageImpl( listOf(opId), ) - return@writeTransaction true + return@writeTransactionAsync true } } @LegacySyncImplementation override suspend fun saveSyncData(syncDataBatch: SyncDataBatch) { - db.writeTransaction { tx -> + db.writeTransactionAsync { tx -> val jsonString = JsonUtil.json.encodeToString(syncDataBatch) tx.execute( "INSERT INTO powersync_operations(op, data) VALUES(?, ?)", @@ -155,7 +155,7 @@ internal class BucketStorageImpl( @LegacySyncImplementation private suspend fun deleteBucket(bucketName: String) { - db.writeTransaction { tx -> + db.writeTransactionAsync { tx -> tx.execute( "INSERT INTO powersync_operations(op, data) VALUES(?, ?)", listOf("delete_bucket", bucketName), @@ -219,7 +219,7 @@ internal class BucketStorageImpl( } }.map { it.bucket } - db.writeTransaction { tx -> + db.writeTransactionAsync { tx -> tx.execute( "UPDATE ps_buckets SET last_op = ? WHERE name IN (SELECT json_each.value FROM json_each(?))", listOf(targetCheckpoint.lastOpId, JsonUtil.json.encodeToString(bucketNames)), @@ -306,7 +306,7 @@ internal class BucketStorageImpl( "" } - return db.writeTransaction { tx -> + return db.writeTransactionAsync { tx -> tx.execute( "INSERT INTO powersync_operations(op, data) VALUES(?, ?)", listOf("sync_local", args), @@ -338,7 +338,7 @@ internal class BucketStorageImpl( ) } - return@writeTransaction didApply + return@writeTransactionAsync didApply } } @@ -355,7 +355,7 @@ internal class BucketStorageImpl( } override suspend fun control(args: PowerSyncControlArguments): List = - db.writeTransaction { tx -> + db.writeTransactionAsync { tx -> logger.v { "powersync_control: $args" } val (op: String, data: Any?) = diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 92ec9cf2..a01d3241 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -106,7 +106,7 @@ internal class PowerSyncDatabaseImpl( checkVersion(powerSyncVersion) logger.d { "PowerSyncVersion: $powerSyncVersion" } - internalDb.writeTransaction { tx -> + internalDb.writeTransactionAsync { tx -> tx.getOptional("SELECT powersync_init()") {} } @@ -331,33 +331,6 @@ internal class PowerSyncDatabaseImpl( return internalDb.useConnection(readOnly, block) } - override suspend fun get( - sql: String, - parameters: List?, - mapper: (SqlCursor) -> RowType, - ): RowType { - waitReady() - return internalDb.get(sql, parameters, mapper) - } - - override suspend fun getAll( - sql: String, - parameters: List?, - mapper: (SqlCursor) -> RowType, - ): List { - waitReady() - return internalDb.getAll(sql, parameters, mapper) - } - - override suspend fun getOptional( - sql: String, - parameters: List?, - mapper: (SqlCursor) -> RowType, - ): RowType? { - waitReady() - return internalDb.getOptional(sql, parameters, mapper) - } - override fun onChange( tables: Set, throttleMs: Long, @@ -381,39 +354,11 @@ internal class PowerSyncDatabaseImpl( emitAll(internalDb.watch(sql, parameters, throttleMs, mapper)) } - override suspend fun readLock(callback: ThrowableLockCallback): R { - waitReady() - return internalDb.readLock(callback) - } - - override suspend fun readTransaction(callback: ThrowableTransactionCallback): R { - waitReady() - return internalDb.writeTransaction(callback) - } - - override suspend fun writeLock(callback: ThrowableLockCallback): R { - waitReady() - return internalDb.writeLock(callback) - } - - override suspend fun writeTransaction(callback: ThrowableTransactionCallback): R { - waitReady() - return internalDb.writeTransaction(callback) - } - - override suspend fun execute( - sql: String, - parameters: List?, - ): Long { - waitReady() - return internalDb.execute(sql, parameters) - } - private suspend fun handleWriteCheckpoint( lastTransactionId: Int, writeCheckpoint: String?, ) { - internalDb.writeTransaction { transaction -> + internalDb.writeTransactionAsync { transaction -> transaction.execute( "DELETE FROM ps_crud WHERE id <= ?", listOf(lastTransactionId.toLong()), @@ -460,7 +405,7 @@ internal class PowerSyncDatabaseImpl( override suspend fun disconnectAndClear(clearLocal: Boolean) { disconnect() - internalDb.writeTransaction { tx -> + internalDb.writeTransactionAsync { tx -> tx.getOptional("SELECT powersync_clear(?)", listOf(if (clearLocal) "1" else "0")) {} } currentStatus.update { copy(lastSyncedAt = null, hasSynced = false) } diff --git a/core/src/commonMain/kotlin/com/powersync/db/Queries.kt b/core/src/commonMain/kotlin/com/powersync/db/Queries.kt index 90e955f5..c694688b 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/Queries.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/Queries.kt @@ -4,46 +4,34 @@ import com.powersync.ExperimentalPowerSyncAPI import com.powersync.PowerSyncException import com.powersync.db.driver.SQLiteConnectionLease import com.powersync.db.internal.ConnectionContext +import com.powersync.db.internal.ConnectionContextImplementation import com.powersync.db.internal.PowerSyncTransaction +import com.powersync.db.internal.ScopedWriteQueriesImplementation +import com.powersync.db.internal.runTransaction import kotlinx.coroutines.flow.Flow import kotlin.coroutines.cancellation.CancellationException import kotlin.native.HiddenFromObjC import kotlin.time.Duration import kotlin.time.Duration.Companion.milliseconds +@Deprecated(message = "Use suspending callback instead", level = DeprecationLevel.WARNING) public fun interface ThrowableTransactionCallback { @Throws(PowerSyncException::class, kotlinx.coroutines.CancellationException::class) public fun execute(transaction: PowerSyncTransaction): R } +@Deprecated(message = "Use suspending callback instead", level = DeprecationLevel.WARNING) public fun interface ThrowableLockCallback { @Throws(PowerSyncException::class, kotlinx.coroutines.CancellationException::class) public fun execute(context: ConnectionContext): R } -public interface Queries { - public companion object { - /** - * The default throttle duration for [onChange] and [watch] operations. - */ - public val DEFAULT_THROTTLE: Duration = 30.milliseconds +public interface ScopedReadQueries { + public fun interface Callback { + @Throws(PowerSyncException::class, kotlinx.coroutines.CancellationException::class) + public suspend fun execute(context: ScopedReadQueries): R } - /** - * Executes a write query (INSERT, UPDATE, DELETE). - * - * @param sql The SQL query to execute. - * @param parameters The parameters for the query, or an empty list if none. - * @return The number of rows affected by the query. - * @throws PowerSyncException If a database error occurs. - * @throws CancellationException If the operation is cancelled. - */ - @Throws(PowerSyncException::class, CancellationException::class) - public suspend fun execute( - sql: String, - parameters: List? = listOf(), - ): Long - /** * Executes a read-only (SELECT) query and returns a single result. * @@ -94,6 +82,61 @@ public interface Queries { parameters: List? = listOf(), mapper: (SqlCursor) -> RowType, ): RowType? +} + +public interface ScopedWriteQueries : ScopedReadQueries { + public fun interface Callback { + @Throws(PowerSyncException::class, kotlinx.coroutines.CancellationException::class) + public suspend fun execute(context: ScopedWriteQueries): R + } + + /** + * Executes a write query (INSERT, UPDATE, DELETE). + * + * @param sql The SQL query to execute. + * @param parameters The parameters for the query, or an empty list if none. + * @return The number of rows affected by the query. + * @throws PowerSyncException If a database error occurs. + * @throws CancellationException If the operation is cancelled. + */ + @Throws(PowerSyncException::class, CancellationException::class) + public suspend fun execute( + sql: String, + parameters: List? = listOf(), + ): Long +} + +@OptIn(ExperimentalPowerSyncAPI::class) +public interface Queries : ScopedWriteQueries { + public companion object { + /** + * The default throttle duration for [onChange] and [watch] operations. + */ + public val DEFAULT_THROTTLE: Duration = 30.milliseconds + } + + override suspend fun execute( + sql: String, + parameters: List?, + ): Long = writeLockAsync { it.execute(sql, parameters) } + + override suspend fun get( + sql: String, + parameters: List?, + mapper: (SqlCursor) -> RowType, + ): RowType = readLockAsync { it.get(sql, parameters, mapper) } + + override suspend fun getAll( + sql: String, + parameters: List?, + mapper: (SqlCursor) -> RowType, + ): List = readLockAsync { it.getAll(sql, parameters, mapper) } + + override suspend fun getOptional( + sql: String, + parameters: List?, + mapper: (SqlCursor) -> RowType, + ): RowType? = readLockAsync { it.getOptional(sql, parameters, mapper) } /** * Returns a [Flow] that emits whenever the source tables are modified. @@ -131,6 +174,13 @@ public interface Queries { mapper: (SqlCursor) -> RowType, ): Flow> + @Throws(PowerSyncException::class, CancellationException::class) + @Deprecated(message = "Use suspending callback instead", replaceWith = ReplaceWith("writeLockAsync")) + public suspend fun writeLock(callback: ThrowableLockCallback): R = + useConnection(readOnly = false) { conn -> + callback.execute(ConnectionContextImplementation(conn)) + } + /** * Takes a global lock without starting a transaction. * @@ -144,7 +194,17 @@ public interface Queries { * @throws CancellationException If the operation is cancelled. */ @Throws(PowerSyncException::class, CancellationException::class) - public suspend fun writeLock(callback: ThrowableLockCallback): R + public suspend fun writeLockAsync(callback: ScopedWriteQueries.Callback): R = + asyncCallback(readonly = false, transaction = false) { callback.execute(it) } + + @Throws(PowerSyncException::class, CancellationException::class) + @Deprecated(message = "Use suspending callback instead", replaceWith = ReplaceWith("writeTransactionAsync")) + public suspend fun writeTransaction(callback: ThrowableTransactionCallback): R = + useConnection(readOnly = false) { conn -> + conn.runTransaction { + callback.execute(it) + } + } /** * Opens a read-write transaction. @@ -159,7 +219,15 @@ public interface Queries { * @throws CancellationException If the operation is cancelled. */ @Throws(PowerSyncException::class, CancellationException::class) - public suspend fun writeTransaction(callback: ThrowableTransactionCallback): R + public suspend fun writeTransactionAsync(callback: ScopedWriteQueries.Callback): R = + asyncCallback(readonly = false, transaction = true) { callback.execute(it) } + + @Throws(PowerSyncException::class, CancellationException::class) + @Deprecated(message = "Use suspending callback instead", replaceWith = ReplaceWith("readLockAsync")) + public suspend fun readLock(callback: ThrowableLockCallback): R = + useConnection(readOnly = true) { conn -> + callback.execute(ConnectionContextImplementation(conn)) + } /** * Takes a read lock without starting a transaction. @@ -172,7 +240,15 @@ public interface Queries { * @throws CancellationException If the operation is cancelled. */ @Throws(PowerSyncException::class, CancellationException::class) - public suspend fun readLock(callback: ThrowableLockCallback): R + public suspend fun readLockAsync(callback: ScopedReadQueries.Callback): R = + asyncCallback(readonly = true, transaction = false) { callback.execute(it) } + + @Throws(PowerSyncException::class, CancellationException::class) + @Deprecated(message = "Use suspending callback instead", replaceWith = ReplaceWith("readTransactionAsync")) + public suspend fun readTransaction(callback: ThrowableTransactionCallback): R = + useConnection(readOnly = true) { conn -> + conn.runTransaction { callback.execute(it) } + } /** * Opens a read-only transaction. @@ -185,7 +261,23 @@ public interface Queries { * @throws CancellationException If the operation is cancelled. */ @Throws(PowerSyncException::class, CancellationException::class) - public suspend fun readTransaction(callback: ThrowableTransactionCallback): R + public suspend fun readTransactionAsync(callback: ScopedReadQueries.Callback): R = + asyncCallback(readonly = false, transaction = true) { callback.execute(it) } + + private suspend fun asyncCallback( + readonly: Boolean, + transaction: Boolean, + callback: suspend (ScopedWriteQueries) -> R, + ): R = + useConnection(readonly) { conn -> + if (transaction) { + conn.runTransaction { tx -> + callback(ScopedWriteQueriesImplementation(tx.lease, true)) + } + } else { + callback(ScopedWriteQueriesImplementation(conn, false)) + } + } /** * Obtains a connection from the read pool or an exclusive reference on the write connection. diff --git a/core/src/commonMain/kotlin/com/powersync/db/driver/RawConnectionLease.kt b/core/src/commonMain/kotlin/com/powersync/db/driver/RawConnectionLease.kt index 2ca56b8a..d1b229b9 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/driver/RawConnectionLease.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/driver/RawConnectionLease.kt @@ -3,6 +3,7 @@ package com.powersync.db.driver import androidx.sqlite.SQLiteConnection import androidx.sqlite.SQLiteStatement import com.powersync.ExperimentalPowerSyncAPI +import com.powersync.db.runWrapped /** * A temporary view / lease of an inner [androidx.sqlite.SQLiteConnection] managed by the PowerSync @@ -22,7 +23,9 @@ internal class RawConnectionLease( override fun isInTransactionSync(): Boolean { checkNotCompleted() - return connection.inTransaction() + return runWrapped { + connection.inTransaction() + } } override suspend fun usePrepared( @@ -35,6 +38,8 @@ internal class RawConnectionLease( block: (SQLiteStatement) -> R, ): R { checkNotCompleted() - return connection.prepare(sql).use(block) + return runWrapped { + connection.prepare(sql).use(block) + } } } diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/ConnectionContext.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/ConnectionContext.kt index e80f2044..9c53ce78 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/ConnectionContext.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/ConnectionContext.kt @@ -7,9 +7,8 @@ import com.powersync.db.SqlCursor import com.powersync.db.StatementBasedCursor import com.powersync.db.driver.SQLiteConnectionLease +@Deprecated("Use suspending callback instead") public interface ConnectionContext { - // TODO (breaking): Make asynchronous, create shared superinterface with Queries - @Throws(PowerSyncException::class) public fun execute( sql: String, @@ -38,6 +37,7 @@ public interface ConnectionContext { ): RowType } +@Deprecated("Use suspending callback instead") @ExperimentalPowerSyncAPI internal class ConnectionContextImplementation( private val rawConnection: SQLiteConnectionLease, diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index 0e902533..04a319ac 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -1,10 +1,7 @@ package com.powersync.db.internal import com.powersync.ExperimentalPowerSyncAPI -import com.powersync.PowerSyncException import com.powersync.db.SqlCursor -import com.powersync.db.ThrowableLockCallback -import com.powersync.db.ThrowableTransactionCallback import com.powersync.db.driver.SQLiteConnectionLease import com.powersync.db.driver.SQLiteConnectionPool import com.powersync.db.runWrapped @@ -29,14 +26,6 @@ internal class InternalDatabaseImpl( // Could be scope.coroutineContext, but the default is GlobalScope, which seems like a bad idea. To discuss. private val dbContext = Dispatchers.IO - override suspend fun execute( - sql: String, - parameters: List?, - ): Long = - writeLock { context -> - context.execute(sql, parameters) - } - override suspend fun updateSchema(schemaJson: String) { withContext(dbContext) { runWrapped { @@ -57,24 +46,6 @@ internal class InternalDatabaseImpl( } } - override suspend fun get( - sql: String, - parameters: List?, - mapper: (SqlCursor) -> RowType, - ): RowType = readLock { connection -> connection.get(sql, parameters, mapper) } - - override suspend fun getAll( - sql: String, - parameters: List?, - mapper: (SqlCursor) -> RowType, - ): List = readLock { connection -> connection.getAll(sql, parameters, mapper) } - - override suspend fun getOptional( - sql: String, - parameters: List?, - mapper: (SqlCursor) -> RowType, - ): RowType? = readLock { connection -> connection.getOptional(sql, parameters, mapper) } - override fun onChange( tables: Set, throttleMs: Long, @@ -154,56 +125,11 @@ internal class InternalDatabaseImpl( readOnly: Boolean, block: suspend (SQLiteConnectionLease) -> T, ): T = - if (readOnly) { - pool.read(block) - } else { - pool.write(block) - } - - /** - * Creates a read lock while providing an internal transactor for transactions - */ - @OptIn(ExperimentalPowerSyncAPI::class) - private suspend fun internalReadLock(callback: suspend (SQLiteConnectionLease) -> R): R = withContext(dbContext) { - runWrapped { - useConnection(true) { connection -> - callback(connection) - } - } - } - - override suspend fun readLock(callback: ThrowableLockCallback): R = - internalReadLock { - callback.execute(ConnectionContextImplementation(it)) - } - - override suspend fun readTransaction(callback: ThrowableTransactionCallback): R = - internalReadLock { - it.runTransaction { tx -> - callback.execute(tx) - } - } - - @OptIn(ExperimentalPowerSyncAPI::class) - private suspend fun internalWriteLock(callback: suspend (SQLiteConnectionLease) -> R): R = - withContext(dbContext) { - pool.write { writer -> - runWrapped { - callback(writer) - } - } - } - - override suspend fun writeLock(callback: ThrowableLockCallback): R = - internalWriteLock { - callback.execute(ConnectionContextImplementation(it)) - } - - override suspend fun writeTransaction(callback: ThrowableTransactionCallback): R = - internalWriteLock { - it.runTransaction { tx -> - callback.execute(tx) + if (readOnly) { + pool.read(block) + } else { + pool.write(block) } } diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/PowerSyncTransaction.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/PowerSyncTransaction.kt index 3cccaab9..a9f87eb7 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/PowerSyncTransaction.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/PowerSyncTransaction.kt @@ -5,11 +5,13 @@ import com.powersync.PowerSyncException import com.powersync.db.SqlCursor import com.powersync.db.driver.SQLiteConnectionLease +@Deprecated("Use suspending callback instead") public interface PowerSyncTransaction : ConnectionContext @ExperimentalPowerSyncAPI +@Deprecated("Use suspending callback instead") internal class PowerSyncTransactionImpl( - private val lease: SQLiteConnectionLease, + val lease: SQLiteConnectionLease, ) : PowerSyncTransaction, ConnectionContext { private val delegate = ConnectionContextImplementation(lease) @@ -57,7 +59,7 @@ internal class PowerSyncTransactionImpl( } @ExperimentalPowerSyncAPI -internal suspend fun SQLiteConnectionLease.runTransaction(cb: suspend (PowerSyncTransaction) -> T): T { +internal suspend fun SQLiteConnectionLease.runTransaction(cb: suspend (PowerSyncTransactionImpl) -> T): T { execSQL("BEGIN") return try { val result = cb(PowerSyncTransactionImpl(this)) diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/ScopedWriteQueriesImplementation.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/ScopedWriteQueriesImplementation.kt new file mode 100644 index 00000000..8d658d80 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/ScopedWriteQueriesImplementation.kt @@ -0,0 +1,81 @@ +package com.powersync.db.internal + +import androidx.sqlite.SQLiteStatement +import com.powersync.ExperimentalPowerSyncAPI +import com.powersync.PowerSyncException +import com.powersync.db.ScopedWriteQueries +import com.powersync.db.SqlCursor +import com.powersync.db.StatementBasedCursor +import com.powersync.db.driver.SQLiteConnectionLease + +@OptIn(ExperimentalPowerSyncAPI::class) +internal class ScopedWriteQueriesImplementation( + private val rawConnection: SQLiteConnectionLease, + private val isTransaction: Boolean = false, +) : ScopedWriteQueries { + override suspend fun execute( + sql: String, + parameters: List?, + ): Long { + withStatement(sql, parameters) { + while (it.step()) { + // Iterate through the statement + } + } + + return withStatement("SELECT changes()", null) { + check(it.step()) + it.getLong(0) + } + } + + override suspend fun get( + sql: String, + parameters: List?, + mapper: (SqlCursor) -> RowType, + ): RowType = getOptional(sql, parameters, mapper) ?: throw PowerSyncException("get() called with query that returned no rows", null) + + override suspend fun getAll( + sql: String, + parameters: List?, + mapper: (SqlCursor) -> RowType, + ): List = + withStatement(sql, parameters) { stmt -> + buildList { + val cursor = StatementBasedCursor(stmt) + while (stmt.step()) { + add(mapper(cursor)) + } + } + } + + override suspend fun getOptional( + sql: String, + parameters: List?, + mapper: (SqlCursor) -> RowType, + ): RowType? = + withStatement(sql, parameters) { stmt -> + if (stmt.step()) { + mapper(StatementBasedCursor(stmt)) + } else { + null + } + } + + private suspend fun withStatement( + sql: String, + parameters: List?, + block: (SQLiteStatement) -> T, + ): T { + if (isTransaction) { + if (!rawConnection.isInTransactionSync()) { + throw PowerSyncException("Tried executing statement on a transaction that has been rolled back", cause = null) + } + } + + return rawConnection.usePrepared(sql) { stmt -> + stmt.bind(parameters) + block(stmt) + } + } +} diff --git a/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt b/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt index 512a90e1..a0dba0ef 100644 --- a/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt @@ -136,7 +136,7 @@ class BucketStorageTest { any(), ) } returns 1L - everySuspend { writeTransaction(any()) } returns true + everySuspend { writeTransactionAsync(any()) } returns true } bucketStorage = BucketStorageImpl(mockDb, Logger)