@@ -64,37 +64,59 @@ fun main() = runBlocking {
6464 */
6565@FlowPreview
6666public fun <T > Flow<T>.debounce (timeoutMillis : Long ): Flow <T > {
67- require(timeoutMillis > 0 ) { " Debounce timeout should be positive" }
68- return scopedFlow { downstream ->
69- // Actually Any, KT-30796
70- val values = produce<Any ?>(capacity = Channel .CONFLATED ) {
71- collect { value -> send(value ? : NULL ) }
72- }
73- var lastValue: Any? = null
74- while (lastValue != = DONE ) {
75- select<Unit > {
76- // Should be receiveOrClosed when boxing issues are fixed
77- values.onReceiveOrNull {
78- if (it == null ) {
79- if (lastValue != null ) downstream.emit(NULL .unbox(lastValue))
80- lastValue = DONE
81- } else {
82- lastValue = it
83- }
84- }
85-
86- lastValue?.let { value ->
87- // set timeout when lastValue != null
88- onTimeout(timeoutMillis) {
89- lastValue = null // Consume the value
90- downstream.emit(NULL .unbox(value))
91- }
92- }
93- }
94- }
95- }
67+ require(timeoutMillis >= 0L ) { " Debounce timeout should not be negative" }
68+ if (timeoutMillis == 0L ) return this
69+ return debounceInternal { timeoutMillis }
9670}
9771
72+ /* *
73+ * Returns a flow that mirrors the original flow, but filters out values
74+ * that are followed by the newer values within the given [timeout][timeoutMillis].
75+ * The latest value is always emitted.
76+ *
77+ * A variation of [debounce] that allows specifying the timeout value dynamically.
78+ *
79+ * Example:
80+ *
81+ * ```kotlin
82+ * flow {
83+ * emit(1)
84+ * delay(90)
85+ * emit(2)
86+ * delay(90)
87+ * emit(3)
88+ * delay(1010)
89+ * emit(4)
90+ * delay(1010)
91+ * emit(5)
92+ * }.debounce {
93+ * if (it == 1) {
94+ * 0L
95+ * } else {
96+ * 1000L
97+ * }
98+ * }
99+ * ```
100+ * <!--- KNIT example-delay-02.kt -->
101+ *
102+ * produces the following emissions
103+ *
104+ * ```text
105+ * 1, 3, 4, 5
106+ * ```
107+ * <!--- TEST -->
108+ *
109+ * Note that the resulting flow does not emit anything as long as the original flow emits
110+ * items faster than every [timeoutMillis] milliseconds.
111+ *
112+ * @param timeoutMillis [T] is the emitted value and the return value is timeout in milliseconds.
113+ */
114+ @FlowPreview
115+ @OptIn(kotlin.experimental.ExperimentalTypeInference ::class )
116+ @OverloadResolutionByLambdaReturnType
117+ public fun <T > Flow<T>.debounce (timeoutMillis : (T ) -> Long ): Flow <T > =
118+ debounceInternal(timeoutMillis)
119+
98120/* *
99121 * Returns a flow that mirrors the original flow, but filters out values
100122 * that are followed by the newer values within the given [timeout].
@@ -129,7 +151,104 @@ public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
129151 */
130152@ExperimentalTime
131153@FlowPreview
132- public fun <T > Flow<T>.debounce (timeout : Duration ): Flow <T > = debounce(timeout.toDelayMillis())
154+ public fun <T > Flow<T>.debounce (timeout : Duration ): Flow <T > =
155+ debounce(timeout.toDelayMillis())
156+
157+ /* *
158+ * Returns a flow that mirrors the original flow, but filters out values
159+ * that are followed by the newer values within the given [timeout].
160+ * The latest value is always emitted.
161+ *
162+ * A variation of [debounce] that allows specifying the timeout value dynamically.
163+ *
164+ * Example:
165+ *
166+ * ```kotlin
167+ * flow {
168+ * emit(1)
169+ * delay(90.milliseconds)
170+ * emit(2)
171+ * delay(90.milliseconds)
172+ * emit(3)
173+ * delay(1010.milliseconds)
174+ * emit(4)
175+ * delay(1010.milliseconds)
176+ * emit(5)
177+ * }.debounce {
178+ * if (it == 1) {
179+ * 0.milliseconds
180+ * } else {
181+ * 1000.milliseconds
182+ * }
183+ * }
184+ * ```
185+ * <!--- KNIT example-delay-duration-02.kt -->
186+ *
187+ * produces the following emissions
188+ *
189+ * ```text
190+ * 1, 3, 4, 5
191+ * ```
192+ * <!--- TEST -->
193+ *
194+ * Note that the resulting flow does not emit anything as long as the original flow emits
195+ * items faster than every [timeout] unit.
196+ *
197+ * @param timeout [T] is the emitted value and the return value is timeout in [Duration].
198+ */
199+ @ExperimentalTime
200+ @FlowPreview
201+ @JvmName(" debounceDuration" )
202+ @OptIn(kotlin.experimental.ExperimentalTypeInference ::class )
203+ @OverloadResolutionByLambdaReturnType
204+ public fun <T > Flow<T>.debounce (timeout : (T ) -> Duration ): Flow <T > =
205+ debounceInternal { emittedItem ->
206+ timeout(emittedItem).toDelayMillis()
207+ }
208+
209+ private fun <T > Flow<T>.debounceInternal (timeoutMillisSelector : (T ) -> Long ) : Flow <T > =
210+ scopedFlow { downstream ->
211+ // Produce the values using the default (rendezvous) channel
212+ // Note: the actual type is Any, KT-30796
213+ val values = produce<Any ?> {
214+ collect { value -> send(value ? : NULL ) }
215+ }
216+ // Now consume the values
217+ var lastValue: Any? = null
218+ while (lastValue != = DONE ) {
219+ var timeoutMillis = 0L // will be always computed when lastValue != null
220+ // Compute timeout for this value
221+ if (lastValue != null ) {
222+ timeoutMillis = timeoutMillisSelector(NULL .unbox(lastValue))
223+ require(timeoutMillis >= 0L ) { " Debounce timeout should not be negative" }
224+ if (timeoutMillis == 0L ) {
225+ downstream.emit(NULL .unbox(lastValue))
226+ lastValue = null // Consume the value
227+ }
228+ }
229+ // assert invariant: lastValue != null implies timeoutMillis > 0
230+ assert { lastValue == null || timeoutMillis > 0 }
231+ // wait for the next value with timeout
232+ select<Unit > {
233+ // Set timeout when lastValue exists and is not consumed yet
234+ if (lastValue != null ) {
235+ onTimeout(timeoutMillis) {
236+ downstream.emit(NULL .unbox(lastValue))
237+ lastValue = null // Consume the value
238+ }
239+ }
240+ // Should be receiveOrClosed when boxing issues are fixed
241+ values.onReceiveOrNull { value ->
242+ if (value == null ) {
243+ if (lastValue != null ) downstream.emit(NULL .unbox(lastValue))
244+ lastValue = DONE
245+ } else {
246+ lastValue = value
247+ }
248+ }
249+ }
250+ }
251+ }
133252
134253/* *
135254 * Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMillis].
@@ -144,15 +263,15 @@ public fun <T> Flow<T>.debounce(timeout: Duration): Flow<T> = debounce(timeout.t
144263 * }
145264 * }.sample(200)
146265 * ```
147- * <!--- KNIT example-delay-02 .kt -->
266+ * <!--- KNIT example-delay-03 .kt -->
148267 *
149268 * produces the following emissions
150269 *
151270 * ```text
152271 * 1, 3, 5, 7, 9
153272 * ```
154273 * <!--- TEST -->
155- *
274+ *
156275 * Note that the latest element is not emitted if it does not fit into the sampling window.
157276 */
158277@FlowPreview
@@ -215,7 +334,7 @@ internal fun CoroutineScope.fixedPeriodTicker(delayMillis: Long, initialDelayMil
215334 * }
216335 * }.sample(200.milliseconds)
217336 * ```
218- * <!--- KNIT example-delay-duration-02 .kt -->
337+ * <!--- KNIT example-delay-duration-03 .kt -->
219338 *
220339 * produces the following emissions
221340 *
0 commit comments