Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,20 @@

- Sync options: `newClientImplementation` is now the default.
- Make `androidx.sqlite:sqlite-bundled` an API dependency of `:core` to avoid toolchain warnings.
- Add `DispatchStrategy` API for customizing how database operations are dispatched to coroutine contexts.
By default, operations use `Dispatchers.IO`, but you can now provide a custom `CoroutineDispatcher` or
a fully custom `DispatchFunction` for complete control over the execution context.

```kotlin
// Use default (Dispatchers.IO)
PowerSyncDatabase(factory, schema)

// Use a specific dispatcher
PowerSyncDatabase(factory, schema, dispatchStrategy = DispatchStrategy.Dispatcher(Dispatchers.Default))

// Use a custom function
PowerSyncDatabase(factory, schema, dispatchStrategy = DispatchStrategy.Custom(myCustomFunction))
```

## 1.8.1

Expand Down Expand Up @@ -389,4 +403,4 @@ params = params
* Replaced default Logger with [Kermit Logger](https://kermit.touchlab.co/) which allows users to
more easily use and/or change Logger settings
* Add `retryDelay` and `crudThrottle` options when setting up database connection
* Changed `_viewNameOverride` to `viewNameOverride`
* Changed `_viewNameOverride` to `viewNameOverride`
109 changes: 109 additions & 0 deletions common/src/commonMain/kotlin/com/powersync/DispatchStrategy.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package com.powersync

import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.IO
import kotlinx.coroutines.withContext

/**
* Function interface for dispatching database operations to a specific coroutine context.
*
* By default, operations are dispatched to [Dispatchers.IO]. Custom implementations
* can be provided to control the execution context of database operations.
*
* This interface supports the `operator invoke` syntax, allowing you to call it like:
* ```
* dispatchFunction { /* your code */ }
* ```
*
* **Design Note:** This must be an interface (not a function type) because Kotlin does not
* support function types with generic type parameters. Since the dispatch function needs to
* accept and return generic types `<R>`, an interface with an `operator invoke` method is
* the appropriate solution. This allows the same convenient syntax as function types while
* supporting generics.
*
* @see DispatchStrategy for dispatch strategy options
*/
public interface DispatchFunction {
/**
* Dispatches the given block to the appropriate coroutine context.
*
* @param block The suspend function to execute in the dispatch context.
* @return The result of executing the block.
*/
public suspend operator fun <R> invoke(block: suspend () -> R): R
}

/**
* Strategy for dispatching database operations to a specific coroutine context.
*
* This sealed class allows you to specify how database operations should be dispatched:
* - [Default]: Use the default dispatcher ([Dispatchers.IO])
* - [Dispatcher]: Use a specific [CoroutineDispatcher]
* - [Custom]: Use a custom [DispatchFunction] for full control
*
* Each variant provides a [dispatchFunction] that implements the actual dispatching logic.
*
* Example usage:
* ```
* // Use default (Dispatchers.IO) - this is the default if not specified
* PowerSyncDatabase(factory, schema)
* // or explicitly:
* PowerSyncDatabase(factory, schema, dispatchStrategy = DispatchStrategy.Default)
*
* // Use a specific dispatcher
* PowerSyncDatabase(factory, schema, dispatchStrategy = DispatchStrategy.Dispatcher(Dispatchers.Default))
*
* // Use a custom function
* PowerSyncDatabase(factory, schema, dispatchStrategy = DispatchStrategy.Custom(myCustomFunction))
* ```
*
* @see DispatchFunction for the dispatch function interface
*/
public sealed class DispatchStrategy {
/**
* Returns the [DispatchFunction] that implements the dispatching logic for this strategy.
*/
public abstract val dispatchFunction: DispatchFunction

/**
* Use the default dispatcher ([Dispatchers.IO]) for database operations.
*
* This is the recommended default for most use cases, as it provides
* a dedicated thread pool for I/O-bound operations.
*/
public object Default : DispatchStrategy() {
override val dispatchFunction: DispatchFunction =
Dispatcher(Dispatchers.IO).dispatchFunction
}

/**
* Use a specific [CoroutineDispatcher] for database operations.
*
* This allows you to use any coroutine dispatcher, such as:
* - [Dispatchers.Default] for CPU-bound work
* - [Dispatchers.Main] for UI operations
* - A custom dispatcher for your specific needs
*
* @property dispatcher The coroutine dispatcher to use.
*/
public data class Dispatcher(
val dispatcher: CoroutineDispatcher,
) : DispatchStrategy() {
override val dispatchFunction: DispatchFunction =
object : DispatchFunction {
override suspend fun <R> invoke(block: suspend () -> R): R = withContext(dispatcher) { block() }
}
}

/**
* Use a custom [DispatchFunction] for full control over dispatching.
*
* @property function The custom dispatch function to use.
*/
public data class Custom(
val function: DispatchFunction,
) : DispatchStrategy() {
override val dispatchFunction: DispatchFunction = function
}
}
10 changes: 8 additions & 2 deletions common/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,10 @@ public interface PowerSyncDatabase : Queries {
schema: Schema,
identifier: String,
logger: Logger,
dispatchStrategy: DispatchStrategy = DispatchStrategy.Default,
): PowerSyncDatabase {
val group = ActiveDatabaseGroup.referenceDatabase(logger, identifier)
return openedWithGroup(pool, scope, schema, logger, group)
return openedWithGroup(pool, scope, schema, logger, group, dispatchStrategy)
}

/**
Expand All @@ -268,18 +269,21 @@ public interface PowerSyncDatabase : Queries {
schema: Schema,
scope: CoroutineScope,
logger: Logger? = null,
dispatchStrategy: DispatchStrategy = DispatchStrategy.Default,
): PowerSyncDatabase {
val logger = generateLogger(logger)
// Since this returns a fresh in-memory database every time, use a fresh group to avoid warnings about the
// same database being opened multiple times.
val collection = ActiveDatabaseGroup.GroupsCollection().referenceDatabase(logger, "test")
val collection =
ActiveDatabaseGroup.GroupsCollection().referenceDatabase(logger, "test")

return openedWithGroup(
SingleConnectionPool(factory.openInMemoryConnection()),
scope,
schema,
logger,
collection,
dispatchStrategy,
)
}

Expand All @@ -289,13 +293,15 @@ public interface PowerSyncDatabase : Queries {
schema: Schema,
logger: Logger,
group: Pair<ActiveDatabaseResource, Any>,
dispatchStrategy: DispatchStrategy = DispatchStrategy.Default,
): PowerSyncDatabase =
PowerSyncDatabaseImpl(
schema,
scope,
pool,
logger,
group,
dispatchStrategy,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public fun PowerSyncDatabase(
dbFilename: String = DEFAULT_DB_FILENAME,
scope: CoroutineScope = GlobalScope,
logger: Logger? = null,
dispatchStrategy: DispatchStrategy = DispatchStrategy.Default,
/**
* Optional database file directory path.
* This parameter is ignored for iOS.
Expand All @@ -40,17 +41,18 @@ public fun PowerSyncDatabase(
scope = scope,
logger = generatedLogger,
dbDirectory = dbDirectory,
dispatchStrategy = dispatchStrategy,
)
}

@OptIn(ExperimentalPowerSyncAPI::class)
internal fun createPowerSyncDatabaseImpl(
factory: PersistentConnectionFactory,
schema: Schema,
dbFilename: String,
scope: CoroutineScope,
logger: Logger,
dbDirectory: String?,
dispatchStrategy: DispatchStrategy = DispatchStrategy.Default,
): PowerSyncDatabaseImpl {
val identifier = dbDirectory + dbFilename
val activeDatabaseGroup = ActiveDatabaseGroup.referenceDatabase(logger, identifier)
Expand All @@ -72,5 +74,6 @@ internal fun createPowerSyncDatabaseImpl(
schema,
logger,
activeDatabaseGroup,
dispatchStrategy,
) as PowerSyncDatabaseImpl
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.powersync.db

import co.touchlab.kermit.Logger
import com.powersync.DispatchStrategy
import com.powersync.ExperimentalPowerSyncAPI
import com.powersync.PowerSyncDatabase
import com.powersync.PowerSyncException
Expand Down Expand Up @@ -61,6 +62,7 @@ internal class PowerSyncDatabaseImpl(
pool: SQLiteConnectionPool,
val logger: Logger,
private val activeDatabaseGroup: Pair<ActiveDatabaseResource, Any>,
dispatchStrategy: DispatchStrategy,
) : PowerSyncDatabase {
companion object {
internal val streamConflictMessage =
Expand All @@ -79,7 +81,7 @@ internal class PowerSyncDatabaseImpl(
private val resource = activeDatabaseGroup.first
private val streams = StreamTracker(this)

private val internalDb = InternalDatabaseImpl(pool, logger)
private val internalDb = InternalDatabaseImpl(pool, logger, dispatchStrategy = dispatchStrategy)

internal val bucketStorage: BucketStorage = BucketStorageImpl(internalDb, logger)

Expand Down Expand Up @@ -391,7 +393,7 @@ internal class PowerSyncDatabaseImpl(

override suspend fun <R> readTransaction(callback: ThrowableTransactionCallback<R>): R {
waitReady()
return internalDb.writeTransaction(callback)
return internalDb.readTransaction(callback)
}

override suspend fun <R> writeLock(callback: ThrowableLockCallback<R>): R {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.powersync.db.internal

import co.touchlab.kermit.Logger
import com.powersync.DispatchFunction
import com.powersync.DispatchStrategy
import com.powersync.ExperimentalPowerSyncAPI
import com.powersync.db.SqlCursor
import com.powersync.db.ThrowableLockCallback
Expand All @@ -11,26 +13,21 @@ import com.powersync.db.runWrapped
import com.powersync.utils.AtomicMutableSet
import com.powersync.utils.JsonUtil
import com.powersync.utils.throttle
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.IO
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onSubscription
import kotlinx.coroutines.flow.transform
import kotlinx.coroutines.withContext
import kotlin.time.Duration.Companion.milliseconds

@OptIn(ExperimentalPowerSyncAPI::class)
internal class InternalDatabaseImpl(
private val pool: SQLiteConnectionPool,
private val logger: Logger,
dispatchStrategy: DispatchStrategy,
) : InternalDatabase {
// Could be scope.coroutineContext, but the default is GlobalScope, which seems like a bad idea. To discuss.
private val dbContext = Dispatchers.IO

override suspend fun execute(
sql: String,
parameters: List<Any?>?,
Expand All @@ -39,8 +36,10 @@ internal class InternalDatabaseImpl(
context.execute(sql, parameters)
}

private val dispatch: DispatchFunction = dispatchStrategy.dispatchFunction

override suspend fun updateSchema(schemaJson: String) {
withContext(dbContext) {
dispatch {
runWrapped {
pool.withAllConnections { writer, readers ->
writer.runTransaction { tx ->
Expand Down Expand Up @@ -167,7 +166,7 @@ internal class InternalDatabaseImpl(
*/
@OptIn(ExperimentalPowerSyncAPI::class)
private suspend fun <R> internalReadLock(callback: suspend (SQLiteConnectionLease) -> R): R =
withContext(dbContext) {
dispatch {
runWrapped {
useConnection(true) { connection ->
callback(connection)
Expand All @@ -189,7 +188,7 @@ internal class InternalDatabaseImpl(

@OptIn(ExperimentalPowerSyncAPI::class)
private suspend fun <R> internalWriteLock(callback: suspend (SQLiteConnectionLease) -> R): R =
withContext(dbContext) {
dispatch {
pool.write { writer ->
runWrapped {
callback(writer)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.powersync.pool

import co.touchlab.kermit.Logger
import com.powersync.DispatchFunction
import com.powersync.DispatchStrategy
import com.powersync.PowerSyncDatabase
import com.powersync.db.driver.SQLiteConnectionLease
import com.powersync.db.driver.SQLiteConnectionPool
Expand Down Expand Up @@ -104,4 +106,13 @@ public fun openPowerSyncWithPool(
schema = schema,
identifier = identifier,
logger = logger,
dispatchStrategy =
DispatchStrategy.Custom(
object : DispatchFunction {
override suspend fun <R> invoke(block: suspend () -> R): R {
// We leave the dispatching up to the pool
return block()
}
},
),
Comment on lines +110 to +117
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we used CoroutineContext here, this could e.g. be an EmptyCoroutineContext.

)
Loading