Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Since the introduction of `task` in F# the call for a native implementation of _

### Module functions

As with `seq` and `Seq`, this library comes with a bunch of well-known collection functions, like `TaskSeq.empty`, `isEmpty` or `TaskSeq.map`, `iter`, `collect`, `fold` and `TaskSeq.find`, `pick`, `choose`, `filter`. Where applicable, these come with async variants, like `TaskSeq.mapAsync` `iterAsync`, `collectAsync`, `foldAsync` and `TaskSeq.findAsync`, `pickAsync`, `chooseAsync`, `filterAsync`, which allows the applied function to be asynchronous.
As with `seq` and `Seq`, this library comes with a bunch of well-known collection functions, like `TaskSeq.empty`, `isEmpty` or `TaskSeq.map`, `iter`, `collect`, `fold` and `TaskSeq.find`, `pick`, `choose`, `filter`, `takeWhile`. Where applicable, these come with async variants, like `TaskSeq.mapAsync` `iterAsync`, `collectAsync`, `foldAsync` and `TaskSeq.findAsync`, `pickAsync`, `chooseAsync`, `filterAsync`, `takeWhileAsync` which allows the applied function to be asynchronous.

[See below](#current-set-of-taskseq-utility-functions) for a full list of currently implemented functions and their variants.

Expand Down Expand Up @@ -289,7 +289,7 @@ The following is the progress report:
| | `sumBy` | `sumBy` | `sumByAsync` | |
| ✅ [#76][] | `tail` | `tail` | | |
| | `take` | `take` | | |
| | `takeWhile` | `takeWhile` | `takeWhileAsync` | |
| ✅ [#126][] | `takeWhile` | `takeWhile` | `takeWhileAsync`, `takeWhileInclusive`, `takeWhileInclusiveAsync` | |
| ✅ [#2][] | `toArray` | `toArray` | `toArrayAsync` | |
| ✅ [#2][] | | `toIList` | `toIListAsync` | |
| ✅ [#2][] | `toList` | `toList` | `toListAsync` | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>

<IsPackable>false</IsPackable>
<GenerateProgramFile>false</GenerateProgramFile>
</PropertyGroup>

<ItemGroup>
Expand All @@ -23,6 +20,7 @@
<Compile Include="TaskSeq.Except.Tests.fs" />
<Compile Include="TaskSeq.Exists.Tests.fs" />
<Compile Include="TaskSeq.Filter.Tests.fs" />
<Compile Include="TaskSeq.TakeWhile.Tests.fs" />
<Compile Include="TaskSeq.FindIndex.Tests.fs" />
<Compile Include="TaskSeq.Find.Tests.fs" />
<Compile Include="TaskSeq.Fold.Tests.fs" />
Expand Down
219 changes: 219 additions & 0 deletions src/FSharp.Control.TaskSeq.Test/TaskSeq.TakeWhile.Tests.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
module TaskSeq.Tests.TakeWhile

open System
open Xunit
open FsUnit.Xunit
open FsToolkit.ErrorHandling

open FSharp.Control

//
// TaskSeq.takeWhile
// TaskSeq.takeWhileAsync
// TaskSeq.takeWhileInclusive
// TaskSeq.takeWhileInclusiveAsync
//

module EmptySeq =
[<Theory; ClassData(typeof<TestEmptyVariants>)>]
let ``TaskSeq-takeWhile has no effect`` variant =
Gen.getEmptyVariant variant
|> TaskSeq.takeWhile ((=) 12)
|> TaskSeq.toListAsync
|> Task.map (List.isEmpty >> should be True)

[<Theory; ClassData(typeof<TestEmptyVariants>)>]
let ``TaskSeq-takeWhileAsync has no effect`` variant =
Gen.getEmptyVariant variant
|> TaskSeq.takeWhileAsync (fun x -> task { return x = 12 })
|> TaskSeq.toListAsync
|> Task.map (List.isEmpty >> should be True)

// The primary requirement is that items after the item failing the predicate must be excluded
module FiltersAfterFail =
[<Theory; InlineData false; InlineData true>]
let ``TaskSeq-takeWhile(Inclusive)? excludes all items after predicate fails`` inclusive =
// The only real difference in semantics between the base and the *Inclusive variant lies in whether the final item is returned
// NOTE the semantics are very clear on only propagating a single failing item in the inclusive case
let f, expected =
if inclusive then TaskSeq.takeWhileInclusive, "ABBC"
else TaskSeq.takeWhile, "ABB"
seq { 1; 2; 2; 3; 3; 2; 1 }
|> TaskSeq.ofSeq
|> f (fun x -> x <= 2)
|> TaskSeq.map char
|> TaskSeq.map ((+) '@')
|> TaskSeq.toArrayAsync
|> Task.map (String >> should equal expected)

// Same as preceding test, just with Async functions
[<Theory; InlineData false; InlineData true>]
let ``TaskSeq-takeWhile(Inclusive)?Async excludes all items after after predicate fails`` inclusive =
let f, expected =
if inclusive then TaskSeq.takeWhileInclusiveAsync, "ABBC"
else TaskSeq.takeWhileAsync, "ABB"
taskSeq { 1; 2; 2; 3; 3; 2; 1 }
|> f (fun x -> task { return x <= 2 })
|> TaskSeq.map char
|> TaskSeq.map ((+) '@')
|> TaskSeq.toArrayAsync
|> Task.map (String >> should equal expected)

// Covers the fact that it's not sufficient to merely exclude successor items - it's also critical that the enumeration terminates
module StopsEnumeratingAfterFail =
[<Theory; InlineData false; InlineData true>]
let ``TaskSeq-takeWhile(Inclusive)? stops consuming after predicate fails`` inclusive =
let f, expected =
if inclusive then TaskSeq.takeWhileInclusive, "ABBC"
else TaskSeq.takeWhile, "ABB"
seq { 1; 2; 2; 3; 3; failwith "Too far" }
|> TaskSeq.ofSeq
|> f (fun x -> x <= 2)
|> TaskSeq.map char
|> TaskSeq.map ((+) '@')
|> TaskSeq.toArrayAsync
|> Task.map (String >> should equal expected)

[<Theory; InlineData false; InlineData true>]
let ``TaskSeq-takeWhile(Inclusive)?Async stops consuming after predicate fails`` inclusive =
let f, expected =
if inclusive then TaskSeq.takeWhileInclusiveAsync, "ABBC"
else TaskSeq.takeWhileAsync, "ABB"
taskSeq { 1; 2; 2; 3; 3; failwith "Too far" }
|> f (fun x -> task { return x <= 2 })
|> TaskSeq.map char
|> TaskSeq.map ((+) '@')
|> TaskSeq.toArrayAsync
|> Task.map (String >> should equal expected)

/// This is the base condition as one would expect in actual code
let inline cond x = x <> 6

/// For each of the tests below, we add a guard that will trigger if the predicate is passed items known to be beyond the
/// first failing item in the known sequence (which is 1..10)
let inline condWithGuard x =
let res = cond x
if x > 6 then failwith "Test sequence should not be enumerated beyond the first item failing the predicate"
res

module Immutable =
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-takeWhile filters correctly`` variant =
Gen.getSeqImmutable variant
|> TaskSeq.takeWhile condWithGuard
|> TaskSeq.map char
|> TaskSeq.map ((+) '@')
|> TaskSeq.toArrayAsync
|> Task.map (String >> should equal "ABCDE")

[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-takeWhileAsync filters correctly`` variant =
Gen.getSeqImmutable variant
|> TaskSeq.takeWhileAsync (fun x -> task { return condWithGuard x })
|> TaskSeq.map char
|> TaskSeq.map ((+) '@')
|> TaskSeq.toArrayAsync
|> Task.map (String >> should equal "ABCDE")

module SideEffects =
[<Theory; ClassData(typeof<TestSideEffectTaskSeq>)>]
let ``TaskSeq-takeWhile filters correctly`` variant =
Gen.getSeqWithSideEffect variant
|> TaskSeq.takeWhile condWithGuard
|> TaskSeq.map char
|> TaskSeq.map ((+) '@')
|> TaskSeq.toArrayAsync
|> Task.map (String >> should equal "ABCDE")

[<Theory; ClassData(typeof<TestSideEffectTaskSeq>)>]
let ``TaskSeq-takeWhileAsync filters correctly`` variant =
Gen.getSeqWithSideEffect variant
|> TaskSeq.takeWhileAsync (fun x -> task { return condWithGuard x })
|> TaskSeq.map char
|> TaskSeq.map ((+) '@')
|> TaskSeq.toArrayAsync
|> Task.map (String >> should equal "ABCDE")

[<Theory; InlineData(false, false); InlineData(true, false); InlineData(false, true); InlineData(true, true)>]
let ``TaskSeq-takeWhile(Inclusive)?(Async)? __special-case__ prove it does not read beyond the failing yield`` (inclusive, async) = task {
let mutable x = 42 // for this test, the potential mutation should not actually occur

let items = taskSeq {
yield x // Always passes the test; always returned
yield x * 2 // the failing item (which will also be yielded in the result when using *Inclusive)
x <- x + 1 // we are proving we never get here
}

let f =
match inclusive, async with
| false, false -> TaskSeq.takeWhile (fun x -> x = 42)
| true, false -> TaskSeq.takeWhileInclusive (fun x -> x = 42)
| false, true -> TaskSeq.takeWhileAsync (fun x -> task { return x = 42 })
| true, true -> TaskSeq.takeWhileInclusiveAsync (fun x -> task { return x = 42 })

let expected = if inclusive then [| 42; 84 |] else [| 42 |]

let! first = items |> f |> TaskSeq.toArrayAsync
let! repeat = items |> f |> TaskSeq.toArrayAsync

first |> should equal expected
repeat |> should equal expected
x |> should equal 42
}

[<Theory; InlineData(false, false); InlineData(true, false); InlineData(false, true); InlineData(true, true)>]
let ``TaskSeq-takeWhile(Inclusive)?(Async)? __special-case__ prove side effects are executed`` (inclusive, async) = task {
let mutable x = 41

let items = taskSeq {
x <- x + 1
yield x
x <- x + 2
yield x * 2
x <- x + 200 // as previously proven, we should not trigger this
}

let f =
match inclusive, async with
| false, false -> TaskSeq.takeWhile (fun x -> x < 50)
| true, false -> TaskSeq.takeWhileInclusive (fun x -> x < 50)
| false, true -> TaskSeq.takeWhileAsync (fun x -> task { return x < 50 })
| true, true -> TaskSeq.takeWhileInclusiveAsync (fun x -> task { return x < 50 })

let expectedFirst = if inclusive then [| 42; 44*2 |] else [| 42 |]
let expectedRepeat = if inclusive then [| 45; 47*2 |] else [| 45 |]

let! first = items |> f |> TaskSeq.toArrayAsync
x |> should equal 44
let! repeat = items |> f |> TaskSeq.toArrayAsync
x |> should equal 47

first |> should equal expectedFirst
repeat |> should equal expectedRepeat
}

[<Theory; ClassData(typeof<TestSideEffectTaskSeq>)>]
let ``TaskSeq-takeWhile consumes the prefix of a longer sequence, with mutation`` variant = task {
let ts = Gen.getSeqWithSideEffect variant

let! first = TaskSeq.takeWhile (fun x -> x < 5) ts |> TaskSeq.toArrayAsync
let expected = [| 1..4 |]
first |> should equal expected

// side effect, reiterating causes it to resume from where we left it (minus the failing item)
let! repeat = TaskSeq.takeWhile (fun x -> x < 5) ts |> TaskSeq.toArrayAsync
repeat |> should not' (equal expected)
}

[<Theory; ClassData(typeof<TestSideEffectTaskSeq>)>]
let ``TaskSeq-takeWhileInclusiveAsync consumes the prefix for a longer sequence, with mutation`` variant = task {
let ts = Gen.getSeqWithSideEffect variant

let! first = TaskSeq.takeWhileInclusiveAsync (fun x -> task { return x < 5 }) ts |> TaskSeq.toArrayAsync
let expected = [| 1..5 |]
first |> should equal expected

// side effect, reiterating causes it to resume from where we left it (minus the failing item)
let! repeat = TaskSeq.takeWhileInclusiveAsync (fun x -> task { return x < 5 }) ts |> TaskSeq.toArrayAsync
repeat |> should not' (equal expected)
}
4 changes: 4 additions & 0 deletions src/FSharp.Control.TaskSeq/TaskSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ module TaskSeq =
let chooseAsync chooser source = Internal.choose (TryPickAsync chooser) source
let filter predicate source = Internal.filter (Predicate predicate) source
let filterAsync predicate source = Internal.filter (PredicateAsync predicate) source
let takeWhile predicate source = Internal.takeWhile (*inclusive:*)false (Predicate predicate) source
let takeWhileAsync predicate source = Internal.takeWhile false (PredicateAsync predicate) source
let takeWhileInclusive predicate source = Internal.takeWhile true (Predicate predicate) source
let takeWhileInclusiveAsync predicate source = Internal.takeWhile true (PredicateAsync predicate) source
let tryPick chooser source = Internal.tryPick (TryPick chooser) source
let tryPickAsync chooser source = Internal.tryPick (TryPickAsync chooser) source
let tryFind predicate source = Internal.tryFind (Predicate predicate) source
Expand Down
30 changes: 30 additions & 0 deletions src/FSharp.Control.TaskSeq/TaskSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,36 @@ module TaskSeq =
/// </summary>
val filter: predicate: ('T -> bool) -> source: taskSeq<'T> -> taskSeq<'T>

/// <summary>
/// Yields items from the source while the <paramref name="predicate" /> function returns <see cref="true" />.
/// The first <see cref="false" /> result concludes consumption of the source.
/// If <paramref name="predicate" /> is asynchronous, consider using <see cref="TaskSeq.takeWhileAsync" />.
/// </summary>
val takeWhile: predicate: ('T -> bool) -> source: taskSeq<'T> -> taskSeq<'T>

/// <summary>
/// Yields items from the source while the <paramref name="predicate" /> asynchronous function returns <see cref="true" />.
/// The first <see cref="false" /> result concludes consumption of the source.
/// If <paramref name="predicate" /> does not need to be asynchronous, consider using <see cref="TaskSeq.takeWhile" />.
/// </summary>
val takeWhileAsync: predicate: ('T -> #Task<bool>) -> source: taskSeq<'T> -> taskSeq<'T>

/// <summary>
/// Yields items from the source while the <paramref name="predicate" /> function returns <see cref="true" />.
/// The first <see cref="false" /> result concludes consumption of the source, but is included in the result.
/// If <paramref name="predicate" /> is asynchronous, consider using <see cref="TaskSeq.takeWhileInclusiveAsync" />.
/// If the final item is not desired, consider using <see cref="TaskSeq.takeWhile" />.
/// </summary>
val takeWhileInclusive: predicate: ('T -> bool) -> source: taskSeq<'T> -> taskSeq<'T>

/// <summary>
/// Yields items from the source while the <paramref name="predicate" /> asynchronous function returns <see cref="true" />.
/// The first <see cref="false" /> result concludes consumption of the source, but is included in the result.
/// If <paramref name="predicate" /> does not need to be asynchronous, consider using <see cref="TaskSeq.takeWhileInclusive" />.
/// If the final item is not desired, consider using <see cref="TaskSeq.takeWhileAsync" />.
/// </summary>
val takeWhileInclusiveAsync: predicate: ('T -> #Task<bool>) -> source: taskSeq<'T> -> taskSeq<'T>

/// <summary>
/// Returns a new collection containing only the elements of the collection
/// for which the given asynchronous function <paramref name="predicate" /> returns <see cref="true" />.
Expand Down
28 changes: 28 additions & 0 deletions src/FSharp.Control.TaskSeq/TaskSeqInternal.fs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,34 @@ module internal TaskSeqInternal =
| true -> yield item
| false -> ()
}

let takeWhile inclusive predicate (source: taskSeq<_>) = taskSeq {
use e = source.GetAsyncEnumerator(CancellationToken())
let! step = e.MoveNextAsync()
let mutable more = step

match predicate with
| Predicate predicate ->
while more do
let value = e.Current
more <- predicate value
if more || inclusive then
yield value
if more then
let! ok = e.MoveNextAsync()
more <- ok
| PredicateAsync predicate ->
while more do
let value = e.Current
let! passed = predicate value
more <- passed
if more || inclusive then
yield value
if more then
let! ok = e.MoveNextAsync()
more <- ok
}

// Consider turning using an F# version of this instead?
// https://github.com/i3arnon/ConcurrentHashSet
type ConcurrentHashSet<'T when 'T: equality>(ct) =
Expand Down