Skip to content

Commit d6d0f52

Browse files
committed
swap EventProducer for CompletableDeferred
This change fixes a rare bug where RecoverFromDroppedLoginBug may not run. While very rare this could happen if start was called on OperationRepo first and it finished before RecoverFromDroppedLoginBug.start was called. We swapped out EventProducer for a CompletableDeferred to address this issue. CompletableDeferred is a better fit as we simply need to wait until something is initialized, and it will never fire more than once. We also refactored RecoverFromDroppedLoginBugTests to account for the change, but improved the testing to ensure the operation itself is correct.
1 parent b04a96a commit d6d0f52

File tree

6 files changed

+76
-85
lines changed

6 files changed

+76
-85
lines changed

OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/IOperationRepo.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ interface IOperationRepo {
3939
*/
4040
fun <T : Operation> containsInstanceOf(type: KClass<T>): Boolean
4141

42-
fun addOperationLoadedListener(handler: IOperationRepoLoadedListener)
42+
suspend fun awaitInitialized()
4343
}
4444

4545
// Extension function so the syntax containsInstanceOf<Operation>() can be used over

OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/IOperationRepoLoadedListener.kt

Lines changed: 0 additions & 5 deletions
This file was deleted.

OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,18 @@
11
package com.onesignal.core.internal.operations.impl
22

3-
import com.onesignal.common.events.EventProducer
4-
import com.onesignal.common.events.IEventNotifier
53
import com.onesignal.common.threading.WaiterWithValue
64
import com.onesignal.core.internal.config.ConfigModelStore
75
import com.onesignal.core.internal.operations.ExecutionResult
86
import com.onesignal.core.internal.operations.GroupComparisonType
97
import com.onesignal.core.internal.operations.IOperationExecutor
108
import com.onesignal.core.internal.operations.IOperationRepo
11-
import com.onesignal.core.internal.operations.IOperationRepoLoadedListener
129
import com.onesignal.core.internal.operations.Operation
1310
import com.onesignal.core.internal.startup.IStartableService
1411
import com.onesignal.core.internal.time.ITime
1512
import com.onesignal.debug.LogLevel
1613
import com.onesignal.debug.internal.logging.Logging
1714
import com.onesignal.user.internal.operations.impl.states.NewRecordsState
15+
import kotlinx.coroutines.CompletableDeferred
1816
import kotlinx.coroutines.CoroutineScope
1917
import kotlinx.coroutines.delay
2018
import kotlinx.coroutines.launch
@@ -30,7 +28,7 @@ internal class OperationRepo(
3028
private val _configModelStore: ConfigModelStore,
3129
private val _time: ITime,
3230
private val _newRecordState: NewRecordsState,
33-
) : IOperationRepo, IStartableService, IEventNotifier<IOperationRepoLoadedListener> {
31+
) : IOperationRepo, IStartableService {
3432
internal class OperationQueueItem(
3533
val operation: Operation,
3634
val waiter: WaiterWithValue<Boolean>? = null,
@@ -52,17 +50,10 @@ internal class OperationRepo(
5250
private val waiter = WaiterWithValue<LoopWaiterMessage>()
5351
private var paused = false
5452
private var coroutineScope = CoroutineScope(newSingleThreadContext(name = "OpRepo"))
55-
private val loadedSubscription: EventProducer<IOperationRepoLoadedListener> = EventProducer()
53+
private val initialized = CompletableDeferred<Unit>()
5654

57-
override val hasSubscribers: Boolean
58-
get() = loadedSubscription.hasSubscribers
59-
60-
override fun unsubscribe(handler: IOperationRepoLoadedListener) {
61-
loadedSubscription.unsubscribe(handler)
62-
}
63-
64-
override fun subscribe(handler: IOperationRepoLoadedListener) {
65-
loadedSubscription.subscribe(handler)
55+
override suspend fun awaitInitialized() {
56+
initialized.await()
6657
}
6758

6859
/** *** Buckets ***
@@ -101,10 +92,6 @@ internal class OperationRepo(
10192
}
10293
}
10394

104-
override fun addOperationLoadedListener(handler: IOperationRepoLoadedListener) {
105-
subscribe(handler)
106-
}
107-
10895
override fun start() {
10996
paused = false
11097
coroutineScope.launch {
@@ -431,6 +418,6 @@ internal class OperationRepo(
431418
)
432419
if (successful) successfulIndex++
433420
}
434-
loadedSubscription.fire { it.onOperationRepoLoaded() }
421+
initialized.complete(Unit)
435422
}
436423
}

OneSignalSDK/onesignal/core/src/main/java/com/onesignal/user/internal/migrations/RecoverFromDroppedLoginBug.kt

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@ package com.onesignal.user.internal.migrations
33
import com.onesignal.common.IDManager
44
import com.onesignal.core.internal.config.ConfigModelStore
55
import com.onesignal.core.internal.operations.IOperationRepo
6-
import com.onesignal.core.internal.operations.IOperationRepoLoadedListener
76
import com.onesignal.core.internal.operations.containsInstanceOf
87
import com.onesignal.core.internal.startup.IStartableService
98
import com.onesignal.debug.internal.logging.Logging
109
import com.onesignal.user.internal.identity.IdentityModelStore
1110
import com.onesignal.user.internal.operations.LoginUserOperation
11+
import kotlinx.coroutines.Dispatchers
12+
import kotlinx.coroutines.GlobalScope
13+
import kotlinx.coroutines.launch
1214

1315
/**
1416
* Purpose: Automatically recovers a stalled User in the OperationRepo due
@@ -31,21 +33,20 @@ class RecoverFromDroppedLoginBug(
3133
private val _operationRepo: IOperationRepo,
3234
private val _identityModelStore: IdentityModelStore,
3335
private val _configModelStore: ConfigModelStore,
34-
) : IStartableService, IOperationRepoLoadedListener {
36+
) : IStartableService {
3537
override fun start() {
36-
_operationRepo.addOperationLoadedListener(this)
37-
}
38-
39-
override fun onOperationRepoLoaded() {
40-
if (isInBadState()) {
41-
Logging.warn(
42-
"User with externalId:" +
43-
"${_identityModelStore.model.externalId} " +
44-
"was in a bad state, causing it to not update on OneSignal's " +
45-
"backend! We are recovering and replaying all unsent " +
46-
"operations now.",
47-
)
48-
recoverByAddingBackDroppedLoginOperation()
38+
GlobalScope.launch(Dispatchers.IO) {
39+
_operationRepo.awaitInitialized()
40+
if (isInBadState()) {
41+
Logging.warn(
42+
"User with externalId:" +
43+
"${_identityModelStore.model.externalId} " +
44+
"was in a bad state, causing it to not update on OneSignal's " +
45+
"backend! We are recovering and replaying all unsent " +
46+
"operations now.",
47+
)
48+
recoverByAddingBackDroppedLoginOperation()
49+
}
4950
}
5051
}
5152

OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import io.mockk.spyk
2525
import io.mockk.verify
2626
import kotlinx.coroutines.delay
2727
import kotlinx.coroutines.launch
28+
import kotlinx.coroutines.time.withTimeout
29+
import kotlinx.coroutines.withTimeout
2830
import kotlinx.coroutines.withTimeoutOrNull
2931
import kotlinx.coroutines.yield
3032
import java.util.UUID
@@ -599,24 +601,17 @@ class OperationRepoTests : FunSpec({
599601
result shouldBe null
600602
}
601603

602-
test("ensure onOperationRepoLoaded is called once loading is completed") {
604+
test("ensure awaitInitialized() unsuspends") {
603605
// Given
604606
val mocks = Mocks()
605-
val spyListener = spyk<IOperationRepoLoadedListener>()
606607

607608
// When
608-
mocks.operationRepo.addOperationLoadedListener(spyListener)
609609
mocks.operationRepo.start()
610610
// enqueueAndWait used to know we are fully loaded.
611611
mocks.operationRepo.enqueueAndWait(mockOperation())
612612

613613
// Then
614-
mocks.operationRepo.hasSubscribers shouldBe true
615-
coVerifyOrder {
616-
mocks.operationRepo.subscribe(any())
617-
mocks.operationModelStore.loadOperations()
618-
spyListener.onOperationRepoLoaded()
619-
}
614+
withTimeout(1_000) { mocks.operationRepo.awaitInitialized() }
620615
}
621616

622617
test("ensure loadSavedOperations doesn't duplicate existing OperationItems") {
Lines changed: 49 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
package com.onesignal.user.internal.migrations
22

3-
import com.onesignal.common.threading.Waiter
4-
import com.onesignal.core.internal.config.ConfigModelStore
5-
import com.onesignal.core.internal.operations.IOperationRepoLoadedListener
63
import com.onesignal.core.internal.operations.impl.OperationModelStore
74
import com.onesignal.core.internal.operations.impl.OperationRepo
85
import com.onesignal.core.internal.time.impl.Time
6+
import com.onesignal.debug.LogLevel
7+
import com.onesignal.debug.internal.logging.Logging
98
import com.onesignal.mocks.MockHelper
109
import com.onesignal.user.internal.operations.ExecutorMocks
10+
import com.onesignal.user.internal.operations.LoginUserOperation
1111
import io.kotest.core.spec.style.FunSpec
12+
import io.kotest.matchers.shouldBe
1213
import io.mockk.every
1314
import io.mockk.just
1415
import io.mockk.mockk
@@ -24,9 +25,11 @@ private class Mocks {
2425
val mockOperationModelStore = mockk<OperationModelStore>()
2526
every { mockOperationModelStore.loadOperations() } just runs
2627
every { mockOperationModelStore.list() } returns listOf()
28+
every { mockOperationModelStore.add(any()) } just runs
29+
every { mockOperationModelStore.remove(any()) } just runs
2730
mockOperationModelStore
2831
}
29-
val configModelStore = mockk<ConfigModelStore>()
32+
val configModelStore = MockHelper.configModelStore()
3033
val operationRepo =
3134
spyk(
3235
OperationRepo(
@@ -38,36 +41,59 @@ private class Mocks {
3841
),
3942
)
4043

41-
val recovery = spyk(RecoverFromDroppedLoginBug(operationRepo, MockHelper.identityModelStore(), configModelStore))
44+
var oneSignalId = "local-id"
45+
val identityModelStore by lazy {
46+
MockHelper.identityModelStore {
47+
it.onesignalId = oneSignalId
48+
it.externalId = "myExtId"
49+
}
50+
}
51+
val recovery = spyk(RecoverFromDroppedLoginBug(operationRepo, identityModelStore, configModelStore))
52+
53+
val expectedOperation by lazy {
54+
LoginUserOperation(
55+
configModelStore.model.appId,
56+
identityModelStore.model.onesignalId,
57+
identityModelStore.model.externalId,
58+
null,
59+
)
60+
}
61+
62+
fun verifyExpectedLoginOperation(expectedOp: LoginUserOperation = expectedOperation) {
63+
verify(exactly = 1) {
64+
operationRepo.enqueue(
65+
withArg {
66+
(it is LoginUserOperation) shouldBe true
67+
val op = it as LoginUserOperation
68+
op.appId shouldBe expectedOp.appId
69+
op.externalId shouldBe expectedOp.externalId
70+
op.existingOnesignalId shouldBe expectedOp.existingOnesignalId
71+
op.onesignalId shouldBe expectedOp.onesignalId
72+
},
73+
)
74+
}
75+
}
4276
}
4377

4478
class RecoverFromDroppedLoginBugTests : FunSpec({
45-
test("ensure onOperationRepoLoaded callback fires from operationRepo") {
79+
beforeAny {
80+
Logging.logLevel = LogLevel.NONE
81+
}
82+
83+
test("ensure it adds missing operation") {
4684
// Given
4785
val mocks = Mocks()
4886

4987
// When
5088
mocks.recovery.start()
51-
val waiter = Waiter()
52-
mocks.operationRepo.addOperationLoadedListener(
53-
object : IOperationRepoLoadedListener {
54-
override fun onOperationRepoLoaded() {
55-
waiter.wake()
56-
}
57-
},
58-
)
5989
mocks.operationRepo.start()
60-
// Waiting here ensures recovery.onOperationRepoLoaded() is called consistently
61-
waiter.waitForWake()
90+
mocks.operationRepo.awaitInitialized()
6291

6392
// Then
64-
verify(exactly = 1) {
65-
mocks.operationRepo.subscribe(mocks.recovery)
66-
mocks.recovery.onOperationRepoLoaded()
67-
}
93+
mocks.verifyExpectedLoginOperation()
6894
}
6995

70-
test("ensure onOperationRepoLoaded callback fires from operationRepo, even if started first") {
96+
test("ensure it adds missing operation, even if operationRepo is already initialized") {
7197
// Given
7298
val mocks = Mocks()
7399

@@ -77,22 +103,9 @@ class RecoverFromDroppedLoginBugTests : FunSpec({
77103
delay(200)
78104

79105
mocks.recovery.start()
80-
81-
val waiter = Waiter()
82-
mocks.operationRepo.addOperationLoadedListener(
83-
object : IOperationRepoLoadedListener {
84-
override fun onOperationRepoLoaded() {
85-
waiter.wake()
86-
}
87-
},
88-
)
89-
// Waiting here ensures recovery.onOperationRepoLoaded() is called consistently
90-
withTimeout(1_000) { waiter.waitForWake() }
106+
withTimeout(1_000) { mocks.operationRepo.awaitInitialized() }
91107

92108
// Then
93-
verify(exactly = 1) {
94-
mocks.operationRepo.subscribe(mocks.recovery)
95-
mocks.recovery.onOperationRepoLoaded()
96-
}
109+
mocks.verifyExpectedLoginOperation()
97110
}
98111
})

0 commit comments

Comments
 (0)