Skip to content

Commit 53df0cc

Browse files
author
Jamil Maqdis Anton
committed
WIP
1 parent 717a73e commit 53df0cc

File tree

2 files changed

+23
-5
lines changed

2 files changed

+23
-5
lines changed

src/Read.fs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,16 @@ type ReadingDirection =
1010
| Backward
1111

1212
module Read =
13-
type private StartPositionInclusive = int64
14-
let readFromAllStreamAsync: IStreamStore -> ReadingDirection -> StartPositionInclusive -> int -> Async<ReadAllPage> =
13+
type StartPositionInclusive = int64
14+
15+
let readFromAllStreamAsync: IStreamStore -> ReadingDirection -> StartPositionInclusive -> MessageCount -> Async<ReadAllPage> =
1516
fun store readingDirection startPositionInclusive msgCount ->
1617
match readingDirection with
1718
| Forward -> store.ReadAllForwards(startPositionInclusive, msgCount)
1819
| Backward -> store.ReadAllBackwards(startPositionInclusive, msgCount)
1920
|> Async.AwaitTask
2021

21-
let readFromStreamAsync: IStreamStore -> ReadingDirection -> StreamDetails -> int -> Async<ReadStreamPage> =
22+
let readFromStreamAsync: IStreamStore -> ReadingDirection -> StreamDetails -> MessageCount -> Async<ReadStreamPage> =
2223
fun store readingDirection streamDetails msgCount ->
2324
match readingDirection with
2425
| Forward ->
@@ -27,7 +28,7 @@ module Read =
2728
store.ReadStreamBackwards(streamDetails.streamName, Helpers.toVersion streamDetails.version, msgCount)
2829
|> Async.AwaitTask
2930

30-
let readFromStreamAsync': IStreamStore -> ReadingDirection -> StreamDetails -> int -> CancellationToken -> Async<ReadStreamPage> =
31+
let readFromStreamAsync': IStreamStore -> ReadingDirection -> StreamDetails -> MessageCount -> CancellationToken -> Async<ReadStreamPage> =
3132
fun store readingDirection streamDetails msgCount cancellationToken ->
3233
match readingDirection with
3334
| Forward ->
@@ -39,7 +40,21 @@ module Read =
3940
|> Async.AwaitTask
4041

4142
module ReadExtras =
42-
let readStreamMessages: IStreamStore -> ReadingDirection -> StreamDetails -> int -> AsyncResult<List<StreamMessage>, string> =
43+
let readAllStreamMessages: IStreamStore -> ReadingDirection -> StartPositionInclusive -> MessageCount -> AsyncResult<List<StreamMessage>, string> =
44+
fun store readingDirection startPositionInclusive msgCount ->
45+
Read.readFromAllStreamAsync store readingDirection startPositionInclusive msgCount
46+
|> Async.bind (fun readAllPage ->
47+
readAllPage.Messages
48+
|> Seq.toList
49+
|> fun messageList ->
50+
if messageList.Length = msgCount then
51+
Ok messageList
52+
else
53+
Error
54+
(sprintf "Failed to retrieve all messages. Retrieved messages count: %d" messageList.Length)
55+
|> AsyncResult.fromResult)
56+
57+
let readStreamMessages: IStreamStore -> ReadingDirection -> StreamDetails -> MessageCount -> AsyncResult<List<StreamMessage>, string> =
4358
fun store readingDirection streamDetails msgCount ->
4459
Read.readFromStreamAsync store readingDirection streamDetails msgCount
4560
|> Async.bind (fun readStreamPage ->

src/Types.fs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ type StreamDetails =
1313
{ streamName: string
1414
version: Version }
1515

16+
type StartPositionInclusive = int64
17+
type MessageCount = int
18+
1619
module Helpers =
1720
let toVersion: Version -> int =
1821
function

0 commit comments

Comments
 (0)