Skip to content

Commit b116054

Browse files
authored
Refactor FlowRedux (#52)
* refactor FlowReduxStore * refactor FlowReduxStore * refactor FlowReduxStore * refactor FlowReduxStore * refactor FlowReduxStore * refactor FlowReduxStore * refactor FlowReduxStore * refactor FlowReduxStore * refactor FlowReduxStore * Update flowredux/src/commonMain/kotlin/com/hoc081098/flowredux/FlowReduxStore.kt * Update GithubSearchViewModel.kt * Update buildSrc/src/main/kotlin/deps.kt [skip ci] * Update buildSrc/src/main/kotlin/deps.kt [skip ci] * Update buildSrc/src/main/kotlin/deps.kt * Apply suggestions from code review * refactor FlowReduxStore * flip
1 parent 833027a commit b116054

File tree

10 files changed

+203
-71
lines changed

10 files changed

+203
-71
lines changed
468 Bytes
Binary file not shown.
468 Bytes
Binary file not shown.

flowredux/src/commonMain/kotlin/com/hoc081098/flowredux/DefaultFlowReduxStore.kt

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,33 @@
11
package com.hoc081098.flowredux
22

3+
import kotlin.coroutines.CoroutineContext
34
import kotlinx.coroutines.CoroutineScope
5+
import kotlinx.coroutines.Job
46
import kotlinx.coroutines.async
57
import kotlinx.coroutines.awaitAll
8+
import kotlinx.coroutines.cancel
69
import kotlinx.coroutines.channels.Channel
710
import kotlinx.coroutines.coroutineScope
8-
import kotlinx.coroutines.flow.MutableSharedFlow
911
import kotlinx.coroutines.flow.MutableStateFlow
10-
import kotlinx.coroutines.flow.SharedFlow
1112
import kotlinx.coroutines.flow.StateFlow
12-
import kotlinx.coroutines.flow.asSharedFlow
1313
import kotlinx.coroutines.flow.asStateFlow
14+
import kotlinx.coroutines.flow.buffer
1415
import kotlinx.coroutines.flow.consumeAsFlow
1516
import kotlinx.coroutines.flow.launchIn
1617
import kotlinx.coroutines.flow.merge
1718
import kotlinx.coroutines.flow.onEach
19+
import kotlinx.coroutines.job
1820

1921
internal class DefaultFlowReduxStore<Action, State>(
20-
override val coroutineScope: CoroutineScope,
22+
coroutineContext: CoroutineContext,
2123
initialState: State,
22-
sideEffects: List<SideEffect<State, Action>>,
23-
reducer: Reducer<State, Action>
24+
sideEffects: List<SideEffect<Action, State>>,
25+
reducer: Reducer<Action, State>
2426
) : FlowReduxStore<Action, State> {
27+
private val coroutineScope = CoroutineScope(coroutineContext + Job())
28+
2529
private val _stateFlow = MutableStateFlow(initialState)
2630
private val _actionChannel = Channel<Action>(Channel.UNLIMITED)
27-
private val _actionSharedFlow = MutableSharedFlow<Action>(Channel.UNLIMITED)
2831

2932
override val stateFlow: StateFlow<State> = _stateFlow.asStateFlow()
3033

@@ -42,19 +45,25 @@ internal class DefaultFlowReduxStore<Action, State>(
4245
)
4346
}
4447
add(_actionChannel.consumeAsFlow())
45-
}.merge()
48+
}
49+
.merge()
50+
.buffer(Channel.UNLIMITED) // buffer all actions, we don't want to miss any action.
4651

4752
actionFlow
4853
.onEach { action ->
54+
// update state
4955
_stateFlow.value = reducer(_stateFlow.value, action)
5056

57+
// send action to loopbacks
5158
loopbacks.sendAll(action)
52-
check(_actionSharedFlow.tryEmit(action)) { "Cannot send $action" }
5359
}
5460
.launchIn(coroutineScope)
5561
}
5662

57-
override val actionSharedFlow: SharedFlow<Action> = _actionSharedFlow.asSharedFlow()
63+
override fun close() = coroutineScope.cancel()
64+
65+
override fun isClosed() = coroutineScope.coroutineContext.job.isCancelled
66+
5867
override fun dispatch(action: Action): Boolean = _actionChannel
5968
.trySend(action)
6069
.isSuccess

flowredux/src/commonMain/kotlin/com/hoc081098/flowredux/FlowReduxStore.kt

Lines changed: 91 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,112 @@
11
package com.hoc081098.flowredux
22

3+
import kotlin.coroutines.CoroutineContext
34
import kotlinx.coroutines.CoroutineScope
4-
import kotlinx.coroutines.coroutineScope
5+
import kotlinx.coroutines.Job
6+
import kotlinx.coroutines.channels.Channel
7+
import kotlinx.coroutines.channels.ReceiveChannel
58
import kotlinx.coroutines.flow.Flow
6-
import kotlinx.coroutines.flow.SharedFlow
79
import kotlinx.coroutines.flow.StateFlow
10+
import kotlinx.coroutines.flow.emptyFlow
11+
import kotlinx.coroutines.flow.launchIn
12+
import kotlinx.coroutines.flow.mapNotNull
13+
import kotlinx.coroutines.flow.onCompletion
14+
import kotlinx.coroutines.flow.onEach
15+
import kotlinx.coroutines.job
816

17+
// TODO: Consider using AutoCloseable since Kotlin 1.8.20
918
public sealed interface FlowReduxStore<Action, State> {
10-
public val coroutineScope: CoroutineScope
11-
19+
/**
20+
* The state of this store.
21+
*/
1222
public val stateFlow: StateFlow<State>
1323

14-
/** Get streams of actions.
15-
*
16-
* This [Flow] includes dispatched [Action]s (via [dispatch] function)
17-
* and [Action]s returned from [SideEffect]s.
24+
/**
25+
* @return false if cannot dispatch action (this store was closed).
1826
*/
19-
public val actionSharedFlow: SharedFlow<Action>
27+
public fun dispatch(action: Action): Boolean
2028

2129
/**
22-
* @return false if cannot dispatch action ([coroutineScope] was cancelled).
30+
* Call this method to close this store.
31+
* A closed store will not accept any action anymore, thus state will not change anymore.
32+
* All [SideEffect]s will be cancelled.
2333
*/
24-
public fun dispatch(action: Action): Boolean
34+
public fun close()
35+
36+
/**
37+
* After calling [close] method, this function will return true.
38+
*
39+
* @return true if this store was closed.
40+
*/
41+
public fun isClosed(): Boolean
2542
}
2643

44+
/**
45+
* Create a [FlowReduxStore] with [sideEffects] and [reducer].
46+
*
47+
* The [FlowReduxStore] will be closed when the [CoroutineScope] is cancelled.
48+
* That requires the [CoroutineScope] has a [Job] in its context.
49+
* And you don't need to call [FlowReduxStore.close] manually.
50+
*
51+
* @receiver The [CoroutineScope] of the state machine. This scope must have a [Job] in its context.
52+
* @param initialState The initial state of the state machine.
53+
* @param sideEffects A list of [SideEffect]s.
54+
* @param reducer A [Reducer] function.
55+
*/
2756
public fun <Action, State> CoroutineScope.createFlowReduxStore(
2857
initialState: State,
29-
sideEffects: List<SideEffect<State, Action>>,
30-
reducer: Reducer<State, Action>
58+
sideEffects: List<SideEffect<Action, State>>,
59+
reducer: Reducer<Action, State>,
60+
): FlowReduxStore<Action, State> {
61+
val store = DefaultFlowReduxStore(
62+
coroutineContext = coroutineContext,
63+
initialState = initialState,
64+
sideEffects = sideEffects,
65+
reducer = reducer
66+
)
67+
coroutineContext.job.invokeOnCompletion {
68+
store.close()
69+
}
70+
return store
71+
}
72+
73+
/**
74+
* Create a [SideEffect] that maps all actions to [Output]s and send them to a [Channel].
75+
* The result [Channel] will be closed when the [SideEffect] is cancelled (when calling [FlowReduxStore.close]).
76+
*
77+
* @param capacity The capacity of the [Channel].
78+
* @param transformActionToOutput A function that maps an [Action] to an [Output].
79+
* If the function returns `null`, the [Action] will be ignored.
80+
* Otherwise, the [Action] will be mapped to an [Output] and sent to the [Channel].
81+
* @return A [Pair] of the [SideEffect] and a [Flow] of [Output]s.
82+
*/
83+
public fun <Action, State, Output> allActionsToOutputChannelSideEffect(
84+
capacity: Int = Channel.UNLIMITED,
85+
transformActionToOutput: (Action) -> Output?,
86+
): Pair<SideEffect<Action, State>, ReceiveChannel<Output>> {
87+
val actionChannel = Channel<Output>(capacity)
88+
89+
val sideEffect = SideEffect<Action, State> { actionFlow, _, coroutineScope ->
90+
actionFlow
91+
.mapNotNull(transformActionToOutput)
92+
.onEach(actionChannel::send)
93+
.onCompletion { actionChannel.close() }
94+
.launchIn(coroutineScope)
95+
96+
emptyFlow()
97+
}
98+
99+
return sideEffect to actionChannel
100+
}
101+
102+
@Suppress("FunctionName") // Factory function
103+
public fun <Action, State> FlowReduxStore(
104+
coroutineContext: CoroutineContext,
105+
initialState: State,
106+
sideEffects: List<SideEffect<Action, State>>,
107+
reducer: Reducer<Action, State>,
31108
): FlowReduxStore<Action, State> = DefaultFlowReduxStore(
32-
coroutineScope = this,
109+
coroutineContext = coroutineContext,
33110
initialState = initialState,
34111
sideEffects = sideEffects,
35112
reducer = reducer

flowredux/src/commonMain/kotlin/com/hoc081098/flowredux/Reducer.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ package com.hoc081098.flowredux
66
*
77
* If a reducer should not react on a Action, just return the old State.
88
*
9-
* @param S The type of the state
10-
* @param A The type of the Actions
9+
* @param State The type of the state
10+
* @param Action The type of the Actions
1111
*/
12-
public fun interface Reducer<S, A> {
13-
public operator fun invoke(state: S, action: A): S
12+
public fun interface Reducer<Action, State> {
13+
public operator fun invoke(state: State, action: Action): State
1414
}

flowredux/src/commonMain/kotlin/com/hoc081098/flowredux/SideEffect.kt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,18 @@ import kotlinx.coroutines.flow.StateFlow
88
* It is a function which takes a stream of actions and returns a stream of actions. Actions in, actions out
99
* (concept borrowed from redux-observable.js.or - so called epics).
1010
*/
11-
public fun interface SideEffect<S, A> {
11+
public fun interface SideEffect<Action, State> {
1212
/**
1313
* @param actionFlow Input action. Every SideEffect should be responsible to handle a single Action
1414
* (i.e using [kotlinx.coroutines.flow.filter] or [kotlinx.coroutines.flow.filterIsInstance] operator)
1515
* @param stateFlow Allow getting the latest state of the state machine or reacting to state changes.
1616
* @param coroutineScope The scope of [FlowReduxStore]. It can be used to start a coroutine or
1717
* share [actionFlow] via [kotlinx.coroutines.flow.shareIn] operator.
18+
* @return A Flow of actions. It can be empty if no action should be dispatched.
1819
*/
1920
public operator fun invoke(
20-
actionFlow: Flow<A>,
21-
stateFlow: StateFlow<S>,
21+
actionFlow: Flow<Action>,
22+
stateFlow: StateFlow<State>,
2223
coroutineScope: CoroutineScope
23-
): Flow<A>
24+
): Flow<Action>
2425
}

0 commit comments

Comments
 (0)