@@ -24,10 +24,11 @@ import com.powersync.utils.JsonParam
2424import com.powersync.utils.JsonUtil
2525import com.powersync.utils.throttle
2626import com.powersync.utils.toJsonObject
27+ import kotlinx.coroutines.CancellationException
2728import kotlinx.coroutines.CoroutineScope
2829import kotlinx.coroutines.FlowPreview
2930import kotlinx.coroutines.Job
30- import kotlinx.coroutines.cancelAndJoin
31+ import kotlinx.coroutines.SupervisorJob
3132import kotlinx.coroutines.ensureActive
3233import kotlinx.coroutines.flow.Flow
3334import kotlinx.coroutines.flow.filter
@@ -95,9 +96,7 @@ internal class PowerSyncDatabaseImpl(
9596 override val currentStatus: SyncStatus = SyncStatus ()
9697
9798 private val mutex = Mutex ()
98- private var syncStream: SyncStream ? = null
99- private var syncJob: Job ? = null
100- private var uploadJob: Job ? = null
99+ private var syncSupervisorJob: Job ? = null
101100
102101 // This is set in the init
103102 private lateinit var powerSyncVersion: String
@@ -123,7 +122,7 @@ internal class PowerSyncDatabaseImpl(
123122 override suspend fun updateSchema (schema : Schema ) =
124123 runWrappedSuspending {
125124 mutex.withLock {
126- if (this .syncStream != null ) {
125+ if (this .syncSupervisorJob != null ) {
127126 throw PowerSyncException (
128127 " Cannot update schema while connected" ,
129128 cause = Exception (" PowerSync client is already connected" ),
@@ -161,12 +160,11 @@ internal class PowerSyncDatabaseImpl(
161160 stream : SyncStream ,
162161 crudThrottleMs : Long ,
163162 ) {
164- this .syncStream = stream
165-
166163 val db = this
167-
168- syncJob =
169- scope.launch {
164+ val job = SupervisorJob (scope.coroutineContext[Job ])
165+ syncSupervisorJob = job
166+ scope.launch(job) {
167+ launch {
170168 // Get a global lock for checking mutex maps
171169 val streamMutex = resource.group.syncMutex
172170
@@ -181,7 +179,7 @@ internal class PowerSyncDatabaseImpl(
181179 // (The tryLock should throw if this client already holds the lock).
182180 logger.w(streamConflictMessage)
183181 }
184- } catch (ex : IllegalStateException ) {
182+ } catch (_ : IllegalStateException ) {
185183 logger.e { " The streaming sync client did not disconnect before connecting" }
186184 }
187185
@@ -194,40 +192,46 @@ internal class PowerSyncDatabaseImpl(
194192 // We have a lock if we reached here
195193 try {
196194 ensureActive()
197- syncStream !! .streamingSync()
195+ stream .streamingSync()
198196 } finally {
199197 streamMutex.unlock(db)
200198 }
201199 }
202200
203- scope.launch {
204- syncStream!! .status.asFlow().collect {
205- currentStatus.update(
206- connected = it.connected,
207- connecting = it.connecting,
208- uploading = it.uploading,
209- downloading = it.downloading,
210- lastSyncedAt = it.lastSyncedAt,
211- hasSynced = it.hasSynced,
212- uploadError = it.uploadError,
213- downloadError = it.downloadError,
214- clearDownloadError = it.downloadError == null ,
215- clearUploadError = it.uploadError == null ,
216- priorityStatusEntries = it.priorityStatusEntries,
217- )
201+ launch {
202+ stream.status.asFlow().collect {
203+ currentStatus.update(
204+ connected = it.connected,
205+ connecting = it.connecting,
206+ uploading = it.uploading,
207+ downloading = it.downloading,
208+ lastSyncedAt = it.lastSyncedAt,
209+ hasSynced = it.hasSynced,
210+ uploadError = it.uploadError,
211+ downloadError = it.downloadError,
212+ clearDownloadError = it.downloadError == null ,
213+ clearUploadError = it.uploadError == null ,
214+ priorityStatusEntries = it.priorityStatusEntries,
215+ )
216+ }
218217 }
219- }
220218
221- uploadJob =
222- scope.launch {
219+ launch {
223220 internalDb
224221 .updatesOnTables()
225222 .filter { it.contains(InternalTable .CRUD .toString()) }
226223 .throttle(crudThrottleMs)
227224 .collect {
228- syncStream !! .triggerCrudUpload()
225+ stream .triggerCrudUpload()
229226 }
230227 }
228+ }
229+
230+ job.invokeOnCompletion {
231+ if (it is DisconnectRequestedException ) {
232+ stream.invalidateCredentials()
233+ }
234+ }
231235 }
232236
233237 override suspend fun getCrudBatch (limit : Int ): CrudBatch ? {
@@ -364,17 +368,12 @@ internal class PowerSyncDatabaseImpl(
364368 override suspend fun disconnect () = mutex.withLock { disconnectInternal() }
365369
366370 private suspend fun disconnectInternal () {
367- if (syncJob != null && syncJob!! .isActive) {
368- syncJob?.cancelAndJoin()
369- }
370-
371- if (uploadJob != null && uploadJob!! .isActive) {
372- uploadJob?.cancelAndJoin()
373- }
374-
375- if (syncStream != null ) {
376- syncStream?.invalidateCredentials()
377- syncStream = null
371+ val syncJob = syncSupervisorJob
372+ if (syncJob != null && syncJob.isActive) {
373+ // Using this exception type will also make the sync job invalidate credentials.
374+ syncJob.cancel(DisconnectRequestedException )
375+ syncJob.join()
376+ syncSupervisorJob = null
378377 }
379378
380379 currentStatus.update(
@@ -470,7 +469,7 @@ internal class PowerSyncDatabaseImpl(
470469 /* *
471470 * Check that a supported version of the powersync extension is loaded.
472471 */
473- private suspend fun checkVersion (powerSyncVersion : String ) {
472+ private fun checkVersion (powerSyncVersion : String ) {
474473 // Parse version
475474 val versionInts: List <Int > =
476475 try {
@@ -488,3 +487,5 @@ internal class PowerSyncDatabaseImpl(
488487 }
489488 }
490489}
490+
491+ internal object DisconnectRequestedException : CancellationException(" disconnect() called" )
0 commit comments