An implementation of IAsyncEnumerable<'T> as a computation expression: taskSeq { ... } with an accompanying TaskSeq module, that allows seamless use of asynchronous sequences similar to F#'s native seq and task CE's.
Latest version can be installed from Nuget.
- Overview
- Status & planning
- More information
- Building & testing
- Work in progress
- Current set of
TaskSequtility functions
The IAsyncEnumerable interface was added to .NET in .NET Core 3.0 and is part of .NET Standard 2.1. The main use-case was for iterative asynchronous enumeration over some resource. For instance, an event stream or a REST API interface with pagination, asynchronous reading over a list of files and accumulating the results, where each action can be modeled as a MoveNextAsync call on the IAsyncEnumerator<'T> given by a call to GetAsyncEnumerator().
Since the introduction of task in F# the call for a native implementation of task sequences has grown, in particular because proper iterating over an IAsyncEnumerable has proven challenging, especially if one wants to avoid mutable variables. This library is an answer to that call and implements the same resumable state machine approach with taskSeq.
As with seq and Seq, this library comes with a bunch of well-known collection functions, like TaskSeq.empty, isEmpty or TaskSeq.map, iter, collect, fold and TaskSeq.find, pick, choose, filter. Where applicable, these come with async variants, like TaskSeq.mapAsync iterAsync, collectAsync, foldAsync and TaskSeq.findAsync, pickAsync, chooseAsync, filterAsync, which allows the applied function to be asynchronous.
See below for a full list of currently implemented functions and their variants.
The taskSeq computation expression can be used just like using seq. On top of that, it adds support for working with tasks through let! and
looping over a normal or asynchronous sequence (one that implements IAsyncEnumerable<'T>'). You can use yield! and yield and there's support
for use and use!, try-with and try-finally and while loops within the task sequence expression:
Dotnet Nuget
dotnet add package FSharp.Control.TaskSeqFor a specific project:
dotnet add myproject.fsproj package FSharp.Control.TaskSeqF# Interactive (FSI):
// latest version
> #r "nuget: FSharp.Control.TaskSeq"
// or with specific version
> #r "nuget: FSharp.Control.TaskSeq, 0.2.2"Paket:
dotnet paket add FSharp.Control.TaskSeq --project <project>Package Manager:
PM> NuGet\Install-Package FSharp.Control.TaskSeqAs package reference in fsproj or csproj file:
<!-- replace version with most recent version -->
<PackageReference Include="FSharp.Control.TaskSeq" Version="0.2.2" />open System.IO
open FSharp.Control
// singleton is fine
let helloTs = taskSeq { yield "Hello, World!" }
// cold-started, that is, delay-executed
let f() = task {
// using toList forces execution of whole sequence
let! hello = TaskSeq.toList helloTs // toList returns a Task<'T list>
return List.head hello
}
// can be mixed with normal sequences
let oneToTen = taskSeq { yield! [1..10] }
// can be used with F#'s task and async in a for-loop
let f() = task { for x in oneToTen do printfn "Number %i" x }
let g() = async { for x in oneToTen do printfn "Number %i" x }
// returns a delayed sequence of IAsyncEnumerable<string>
let allFilesAsLines() = taskSeq {
let files = Directory.EnumerateFiles(@"c:\temp")
for file in files do
// await
let! contents = File.ReadAllLinesAsync file
// return all lines
yield! contents
}
let write file =
allFilesAsLines()
// synchronous map function on asynchronous task sequence
|> TaskSeq.map (fun x -> x.Replace("a", "b"))
// asynchronous map
|> TaskSeq.mapAsync (fun x -> task { return "hello: " + x })
// asynchronous iter
|> TaskSeq.iterAsync (fun data -> File.WriteAllTextAsync(fileName, data))
// infinite sequence
let feedFromTwitter user pwd = taskSeq {
do! loginToTwitterAsync(user, pwd)
while true do
let! message = getNextNextTwitterMessageAsync()
yield message
}This project has stable features currently, but before we go full "version one", we'd like to complete the surface area. This section covers the status of that, with a full list of implmented functions below. Here's the short list:
- Stabilize and battle-test
taskSeqresumable code. DONE - A growing set of module functions
TaskSeq, see below for progress. DONE & IN PROGRESS - Packaging and publishing on Nuget, DONE, PUBLISHED SINCE: 7 November 2022. See https://www.nuget.org/packages/FSharp.Control.TaskSeq
- Add
Asyncvariants for functions taking HOF arguments. DONE - Add generated docs to https://fsprojects.github.io
- Expand surface area based on
AsyncSeq. ONGOING
As of 9 November 2022: Nuget package available. In this phase, we will frequently update the package. Current:
The resumable state machine backing the taskSeq CE is now finished and restartability (not to be confused with resumability) has been implemented and stabilized. Full support for empty task sequences is done. Focus is now on adding functionality there, like adding more useful overloads for yield and let!. Suggestions are welcome!.
We are working hard on getting a full set of module functions on TaskSeq that can be used with IAsyncEnumerable sequences. Our guide is the set of F# Seq functions in F# Core and, where applicable, the functions provided from AsyncSeq. Each implemented function is documented through XML doc comments to provide the necessary context-sensitive help.
The following is the progress report:
| Done | Seq |
TaskSeq |
Variants | Remarks |
|---|---|---|---|---|
| ❓ | allPairs |
allPairs |
note #1 | |
| ✅ #81 | append |
append |
||
| ✅ #81 | appendSeq |
|||
| ✅ #81 | prependSeq |
|||
average |
average |
|||
averageBy |
averageBy |
averageByAsync |
||
| ❓ | cache |
cache |
note #1 | |
| ✅ #67 | cast |
cast |
||
| ✅ #67 | box |
|||
| ✅ #67 | unbox |
|||
| ✅ #23 | choose |
choose |
chooseAsync |
|
chunkBySize |
chunkBySize |
|||
| ✅ #11 | collect |
collect |
collectAsync |
|
| ✅ #11 | collectSeq |
collectSeqAsync |
||
compareWith |
compareWith |
compareWithAsync |
||
| ✅ #69 | concat |
concat |
||
| ✅ #70 | contains |
contains |
||
| ✅ #82 | delay |
delay |
||
distinct |
distinct |
|||
distinctBy |
dictinctBy |
distinctByAsync |
||
| ✅ #2 | empty |
empty |
||
| ✅ #23 | exactlyOne |
exactlyOne |
||
| ✅ #83 | except |
except |
||
| ✅ #83 | exceptOfSeq |
|||
| ✅ #70 | exists |
exists |
existsAsync |
|
exists2 |
exists2 |
|||
| ✅ #23 | filter |
filter |
filterAsync |
|
| ✅ #23 | find |
find |
findAsync |
|
| 🚫 | findBack |
note #2 | ||
| ✅ #68 | findIndex |
findIndex |
findIndexAsync |
|
| 🚫 | findIndexBack |
n/a | n/a | note #2 |
| ✅ #2 | fold |
fold |
foldAsync |
|
fold2 |
fold2 |
fold2Async |
||
| 🚫 | foldBack |
note #2 | ||
| 🚫 | foldBack2 |
note #2 | ||
forall |
forall |
forallAsync |
||
forall2 |
forall2 |
forall2Async |
||
| ❓ | groupBy |
groupBy |
groupByAsync |
note #1 |
| ✅ #23 | head |
head |
||
| ✅ #68 | indexed |
indexed |
||
| ✅ #69 | init |
init |
initAsync |
|
| ✅ #69 | initInfinite |
initInfinite |
initInfiniteAsync |
|
insertAt |
insertAt |
|||
insertManyAt |
insertManyAt |
|||
| ✅ #23 | isEmpty |
isEmpty |
||
| ✅ #23 | item |
item |
||
| ✅ #2 | iter |
iter |
iterAsync |
|
iter2 |
iter2 |
iter2Async |
||
| ✅ #2 | iteri |
iteri |
iteriAsync |
|
iteri2 |
iteri2 |
iteri2Async |
||
| ✅ #23 | last |
last |
||
| ✅ #53 | length |
length |
||
| ✅ #53 | lengthBy |
lengthByAsync |
||
| ✅ #2 | map |
map |
mapAsync |
|
map2 |
map2 |
map2Async |
||
map3 |
map3 |
map3Async |
||
mapFold |
mapFold |
mapFoldAsync |
||
| 🚫 | mapFoldBack |
note #2 | ||
| ✅ #2 | mapi |
mapi |
mapiAsync |
|
mapi2 |
mapi2 |
mapi2Async |
||
max |
max |
|||
maxBy |
maxBy |
maxByAsync |
||
min |
min |
|||
minBy |
minBy |
minByAsync |
||
| ✅ #2 | ofArray |
ofArray |
||
| ✅ #2 | ofAsyncArray |
|||
| ✅ #2 | ofAsyncList |
|||
| ✅ #2 | ofAsyncSeq |
|||
| ✅ #2 | ofList |
ofList |
||
| ✅ #2 | ofTaskList |
|||
| ✅ #2 | ofResizeArray |
|||
| ✅ #2 | ofSeq |
|||
| ✅ #2 | ofTaskArray |
|||
| ✅ #2 | ofTaskList |
|||
| ✅ #2 | ofTaskSeq |
|||
pairwise |
pairwise |
|||
permute |
permute |
permuteAsync |
||
| ✅ #23 | pick |
pick |
pickAsync |
|
| 🚫 | readOnly |
note #3 | ||
reduce |
reduce |
reduceAsync |
||
| 🚫 | reduceBack |
note #2 | ||
removeAt |
removeAt |
|||
removeManyAt |
removeManyAt |
|||
replicate |
replicate |
|||
| ❓ | rev |
note #1 | ||
scan |
scan |
scanAsync |
||
| 🚫 | scanBack |
note #2 | ||
| ✅ #90 | singleton |
singleton |
||
skip |
skip |
|||
skipWhile |
skipWhile |
skipWhileAsync |
||
| ❓ | sort |
note #1 | ||
| ❓ | sortBy |
note #1 | ||
| ❓ | sortByAscending |
note #1 | ||
| ❓ | sortByDescending |
note #1 | ||
| ❓ | sortWith |
note #1 | ||
splitInto |
splitInto |
|||
sum |
sum |
|||
sumBy |
sumBy |
sumByAsync |
||
| ✅ #76 | tail |
tail |
||
take |
take |
|||
takeWhile |
takeWhile |
takeWhileAsync |
||
| ✅ #2 | toArray |
toArray |
toArrayAsync |
|
| ✅ #2 | toIList |
toIListAsync |
||
| ✅ #2 | toList |
toList |
toListAsync |
|
| ✅ #2 | toResizeArray |
toResizeArrayAsync |
||
| ✅ #2 | toSeq |
toSeqAsync |
||
| […] | ||||
| ❓ | transpose |
note #1 | ||
truncate |
truncate |
|||
| ✅ #23 | tryExactlyOne |
tryExactlyOne |
tryExactlyOneAsync |
|
| ✅ #23 | tryFind |
tryFind |
tryFindAsync |
|
| 🚫 | tryFindBack |
note #2 | ||
| ✅ #68 | tryFindIndex |
tryFindIndex |
tryFindIndexAsync |
|
| 🚫 | tryFindIndexBack |
note #2 | ||
| ✅ #23 | tryHead |
tryHead |
||
| ✅ #23 | tryItem |
tryItem |
||
| ✅ #23 | tryLast |
tryLast |
||
| ✅ #23 | tryPick |
tryPick |
tryPickAsync |
|
| ✅ #76 | tryTail |
|||
unfold |
unfold |
unfoldAsync |
||
updateAt |
updateAt |
|||
where |
where |
whereAsync |
||
windowed |
windowed |
|||
| ✅ #2 | zip |
zip |
||
zip3 |
zip3 |
|||
zip4 |
¹⁾ These functions require a form of pre-materializing through TaskSeq.cache, similar to the approach taken in the corresponding Seq functions. It doesn't make much sense to have a cached async sequence. However, AsyncSeq does implement these, so we'll probably do so eventually as well.
²⁾ Because of the async nature of TaskSeq sequences, iterating from the back would be bad practice. Instead, materialize the sequence to a list or array and then apply the xxxBack iterators.
³⁾ The motivation for readOnly in Seq is that a cast from a mutable array or list to a seq<_> is valid and can be cast back, leading to a mutable sequence. Since TaskSeq doesn't implement IEnumerable<_>, such casts are not possible.
- A good C#-based introduction can be found in this blog.
- An MSDN article written shortly after it was introduced.
- Converting a
seqto anIAsyncEnumerabledemo gist as an example, thoughTaskSeqcontains many more utility functions and uses a slightly different approach. - If you're looking for using
IAsyncEnumerablewithasyncand nottask, the excellentAsyncSeqlibrary should be used. WhileTaskSeqis intended to consumeasyncjust liketaskdoes, it won't create anAsyncSeqtype (at least not yet). If you want classic Async and parallelism, you should get this library instead.
- A state machine from a monadic perspective in F# can be found here, which works with the pre-F# 6.0 non-resumable internals.
- The original RFC for F# 6.0 on resumable state machines
- The original RFC for introducing
taskto F# 6.0. - A pre F# 6.0
TaskBuilderthat motivated thetaskCE later added to F# Core. - MSDN Documentation on
taskandasync.
- Docs on MSDN form a good summary and starting point.
- Arguably the best step-by-step tutorial to using and building computation expressions by Scott Wlaschin.
TLDR: just run build. Or load the sln file in Visual Studio or VS Code and compile.
At the very least, to get the source to compile, you'll need:
- .NET 6 or .NET 7 Preview
- F# 6.0 or 7.0 compiler
- To use
build.cmd, thedotnetcommand must be accessible from your path.
Just check-out this repo locally. Then, from the root of the repo, you can do:
build [build] [release|debug]With no arguments, defaults to release.
build test [release|debug]With no arguments, defaults to release. By default, all tests are output to the console. If you don't want that, you can use --logger console;verbosity=summary.
Furthermore, no TRX file is generated and the --blame-xxx flags aren't set.
build ci [release|debug]With no arguments, defaults to release. This will run dotnet test with the --blame-xxx settings enabled to prevent hanging tests caused by
an xUnit runner bug.
There are no special CI environment variables that need to be set for running this locally.
You can pass any additional options that are valid for dotnet test and dotnet build respectively. However,
these cannot be the very first argument, so you should either use build build --myadditionalOptions fizz buzz, or
just specify the build-kind, i.e. this is fine:
build debug --verbosity detailed
build test --logger console;verbosity=summaryAt this moment, additional options cannot have quotes in them.
Command modifiers, like release and debug, can be specified with - or / if you so prefer: dotnet build /release.
build helpFor more info, see this PR: fsprojects#29.
The taskSeq CE using the statically compilable resumable state machine approach is based on, and draw heavily from Don Symes taskSeq.fs as used to test the resumable state machine in the F# core compiler.
On top of that, this library adds a set of TaskSeq module functions, with their Async variants, on par with Seq and AsyncSeq.
The following are the current surface area of the TaskSeq utility functions, ordered alphabetically.
module TaskSeq =
val append: source1: #taskSeq<'T> -> source2: #taskSeq<'T> -> taskSeq<'T>
val appendSeq: source1: #taskSeq<'T> -> source2: #seq<'T> -> taskSeq<'T>
val box: source: taskSeq<'T> -> taskSeq<obj>
val cast: source: taskSeq<obj> -> taskSeq<'T>
val choose: chooser: ('T -> 'U option) -> source: taskSeq<'T> -> taskSeq<'U>
val chooseAsync: chooser: ('T -> #Task<'U option>) -> source: taskSeq<'T> -> taskSeq<'U>
val collect: binder: ('T -> #taskSeq<'U>) -> source: taskSeq<'T> -> taskSeq<'U>
val collectAsync: binder: ('T -> #Task<'TSeqU>) -> source: taskSeq<'T> -> taskSeq<'U> when 'TSeqU :> taskSeq<'U>
val collectSeq: binder: ('T -> #seq<'U>) -> source: taskSeq<'T> -> taskSeq<'U>
val collectSeqAsync: binder: ('T -> #Task<'SeqU>) -> source: taskSeq<'T> -> taskSeq<'U> when 'SeqU :> seq<'U>
val concat: sources: taskSeq<#taskSeq<'T>> -> taskSeq<'T>
val contains<'T when 'T: equality> : value: 'T -> source: taskSeq<'T> -> Task<bool>
val delay: generator: (unit -> taskSeq<'T>) -> taskSeq<'T>
val empty<'T> : taskSeq<'T>
val exactlyOne: source: taskSeq<'T> -> Task<'T>
val except<'T when 'T: equality> : itemsToExclude: taskSeq<'T> -> source: taskSeq<'T> -> taskSeq<'T>
val exceptOfSeq<'T when 'T: equality> : itemsToExclude: seq<'T> -> source: taskSeq<'T> -> taskSeq<'T>
val exists: predicate: ('T -> bool) -> source: taskSeq<'T> -> Task<bool>
val existsAsync: predicate: ('T -> #Task<bool>) -> source: taskSeq<'T> -> Task<bool>
val filter: predicate: ('T -> bool) -> source: taskSeq<'T> -> taskSeq<'T>
val filterAsync: predicate: ('T -> #Task<bool>) -> source: taskSeq<'T> -> taskSeq<'T>
val find: predicate: ('T -> bool) -> source: taskSeq<'T> -> Task<'T>
val findAsync: predicate: ('T -> #Task<bool>) -> source: taskSeq<'T> -> Task<'T>
val findIndex: predicate: ('T -> bool) -> source: taskSeq<'T> -> Task<int>
val findIndexAsync: predicate: ('T -> #Task<bool>) -> source: taskSeq<'T> -> Task<int>
val fold: folder: ('State -> 'T -> 'State) -> state: 'State -> source: taskSeq<'T> -> Task<'State>
val foldAsync: folder: ('State -> 'T -> #Task<'State>) -> state: 'State -> source: taskSeq<'T> -> Task<'State>
val head: source: taskSeq<'T> -> Task<'T>
val indexed: source: taskSeq<'T> -> taskSeq<int * 'T>
val init: count: int -> initializer: (int -> 'T) -> taskSeq<'T>
val initAsync: count: int -> initializer: (int -> #Task<'T>) -> taskSeq<'T>
val initInfinite: initializer: (int -> 'T) -> taskSeq<'T>
val initInfiniteAsync: initializer: (int -> #Task<'T>) -> taskSeq<'T>
val isEmpty: source: taskSeq<'T> -> Task<bool>
val item: index: int -> source: taskSeq<'T> -> Task<'T>
val iter: action: ('T -> unit) -> source: taskSeq<'T> -> Task<unit>
val iterAsync: action: ('T -> #Task<unit>) -> source: taskSeq<'T> -> Task<unit>
val iteri: action: (int -> 'T -> unit) -> source: taskSeq<'T> -> Task<unit>
val iteriAsync: action: (int -> 'T -> #Task<unit>) -> source: taskSeq<'T> -> Task<unit>
val last: source: taskSeq<'T> -> Task<'T>
val length: source: taskSeq<'T> -> Task<int>
val lengthBy: predicate: ('T -> bool) -> source: taskSeq<'T> -> Task<int>
val lengthByAsync: predicate: ('T -> #Task<bool>) -> source: taskSeq<'T> -> Task<int>
val lengthOrMax: max: int -> source: taskSeq<'T> -> Task<int>
val map: mapper: ('T -> 'U) -> source: taskSeq<'T> -> taskSeq<'U>
val mapAsync: mapper: ('T -> #Task<'U>) -> source: taskSeq<'T> -> taskSeq<'U>
val mapi: mapper: (int -> 'T -> 'U) -> source: taskSeq<'T> -> taskSeq<'U>
val mapiAsync: mapper: (int -> 'T -> #Task<'U>) -> source: taskSeq<'T> -> taskSeq<'U>
val ofArray: source: 'T[] -> taskSeq<'T>
val ofAsyncArray: source: Async<'T> array -> taskSeq<'T>
val ofAsyncList: source: Async<'T> list -> taskSeq<'T>
val ofAsyncSeq: source: seq<Async<'T>> -> taskSeq<'T>
val ofList: source: 'T list -> taskSeq<'T>
val ofResizeArray: source: ResizeArray<'T> -> taskSeq<'T>
val ofSeq: source: seq<'T> -> taskSeq<'T>
val ofTaskArray: source: #Task<'T> array -> taskSeq<'T>
val ofTaskList: source: #Task<'T> list -> taskSeq<'T>
val ofTaskSeq: source: seq<#Task<'T>> -> taskSeq<'T>
val pick: chooser: ('T -> 'U option) -> source: taskSeq<'T> -> Task<'U>
val pickAsync: chooser: ('T -> #Task<'U option>) -> source: taskSeq<'T> -> Task<'U>
val prependSeq: source1: #seq<'T> -> source2: #taskSeq<'T> -> taskSeq<'T>
val singleton: source: 'T -> taskSeq<'T>
val tail: source: taskSeq<'T> -> Task<taskSeq<'T>>
val toArray: source: taskSeq<'T> -> 'T[]
val toArrayAsync: source: taskSeq<'T> -> Task<'T[]>
val toIListAsync: source: taskSeq<'T> -> Task<IList<'T>>
val toList: source: taskSeq<'T> -> 'T list
val toListAsync: source: taskSeq<'T> -> Task<'T list>
val toResizeArrayAsync: source: taskSeq<'T> -> Task<ResizeArray<'T>>
val toSeq: source: taskSeq<'T> -> seq<'T>
val tryExactlyOne: source: taskSeq<'T> -> Task<'T option>
val tryFind: predicate: ('T -> bool) -> source: taskSeq<'T> -> Task<'T option>
val tryFindAsync: predicate: ('T -> #Task<bool>) -> source: taskSeq<'T> -> Task<'T option>
val tryFindIndex: predicate: ('T -> bool) -> source: taskSeq<'T> -> Task<int option>
val tryFindIndexAsync: predicate: ('T -> #Task<bool>) -> source: taskSeq<'T> -> Task<int option>
val tryHead: source: taskSeq<'T> -> Task<'T option>
val tryItem: index: int -> source: taskSeq<'T> -> Task<'T option>
val tryLast: source: taskSeq<'T> -> Task<'T option>
val tryPick: chooser: ('T -> 'U option) -> source: taskSeq<'T> -> Task<'U option>
val tryPickAsync: chooser: ('T -> #Task<'U option>) -> source: taskSeq<'T> -> Task<'U option>
val tryTail: source: taskSeq<'T> -> Task<taskSeq<'T> option>
val unbox<'U when 'U: struct> : source: taskSeq<obj> -> taskSeq<'U>
val zip: source1: taskSeq<'T> -> source2: taskSeq<'U> -> taskSeq<'T * 'U>