Skip to content

Commit 97a4277

Browse files
HertebySimon Herteby
andauthored
Wrap all code that uses Tasks with async.Delay, so that it behaves properly and doesn't execute until the Async computation is run. (#5)
Co-authored-by: Simon Herteby <simon.herteby@insurello.se>
1 parent 30484ab commit 97a4277

File tree

3 files changed

+61
-52
lines changed

3 files changed

+61
-52
lines changed

src/AppendRaw.fs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,21 +26,23 @@ module AppendRaw =
2626
(appendVersion: AppendVersion)
2727
(messageDetails: MessageDetails)
2828
: Async<AppendResult> =
29-
store.AppendToStream
30-
(StreamId(streamName),
31-
fromAppendVersion appendVersion,
32-
[| newStreamMessageFromMessageDetails messageDetails |])
33-
|> Async.AwaitTask
29+
async.Delay(fun () ->
30+
store.AppendToStream
31+
(StreamId(streamName),
32+
fromAppendVersion appendVersion,
33+
[| newStreamMessageFromMessageDetails messageDetails |])
34+
|> Async.AwaitTask)
3435

3536
let appendNewMessages (store: SqlStreamStore.IStreamStore)
3637
(streamName: string)
3738
(appendVersion: AppendVersion)
3839
(messages: MessageDetails list)
3940
: Async<AppendResult> =
40-
store.AppendToStream
41-
(StreamId(streamName),
42-
fromAppendVersion appendVersion,
43-
messages
44-
|> List.map newStreamMessageFromMessageDetails
45-
|> List.toArray)
46-
|> Async.AwaitTask
41+
async.Delay(fun () ->
42+
store.AppendToStream
43+
(StreamId(streamName),
44+
fromAppendVersion appendVersion,
45+
messages
46+
|> List.map newStreamMessageFromMessageDetails
47+
|> List.toArray)
48+
|> Async.AwaitTask)

src/Postgres.fs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ module Postgres =
6161
new SqlStreamStore.PostgresStreamStore(SqlStreamStore.PostgresStreamStoreSettings(config))
6262

6363
let createSchemaRaw (store: SqlStreamStore.PostgresStreamStore): Async<unit> =
64-
store.CreateSchemaIfNotExists() |> Async.AwaitTask
64+
async.Delay(fun () -> store.CreateSchemaIfNotExists() |> Async.AwaitTask)
6565

6666
let createSchema (store: SqlStreamStore.PostgresStreamStore): Async<Result<unit, string>> =
6767
createSchemaRaw store

src/ReadRaw.fs

Lines changed: 46 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -21,37 +21,41 @@ module ReadRaw =
2121
(startPositionInclusive: StartPosition)
2222
(msgCount: int)
2323
: Async<ReadAllPage> =
24-
match readingDirection with
25-
| ReadingDirection.Forward -> store.ReadAllForwards(fromStartPositionInclusive startPositionInclusive, msgCount)
26-
| ReadingDirection.Backward ->
27-
store.ReadAllBackwards(fromStartPositionInclusive startPositionInclusive, msgCount)
28-
|> Async.AwaitTask
24+
async.Delay(fun () ->
25+
match readingDirection with
26+
| ReadingDirection.Forward ->
27+
store.ReadAllForwards(fromStartPositionInclusive startPositionInclusive, msgCount)
28+
| ReadingDirection.Backward ->
29+
store.ReadAllBackwards(fromStartPositionInclusive startPositionInclusive, msgCount)
30+
|> Async.AwaitTask)
2931

3032
let readFromStream (store: SqlStreamStore.IStreamStore)
3133
(readingDirection: ReadingDirection)
3234
(streamName: string)
3335
(readVersion: ReadVersion)
3436
(msgCount: int)
3537
: Async<ReadStreamPage> =
36-
match readingDirection with
37-
| ReadingDirection.Forward ->
38-
store.ReadStreamForwards(StreamId(streamName), fromReadVersion readVersion, msgCount)
39-
| ReadingDirection.Backward ->
40-
store.ReadStreamBackwards(StreamId(streamName), fromReadVersion readVersion, msgCount)
41-
|> Async.AwaitTask
38+
async.Delay(fun () ->
39+
match readingDirection with
40+
| ReadingDirection.Forward ->
41+
store.ReadStreamForwards(StreamId(streamName), fromReadVersion readVersion, msgCount)
42+
| ReadingDirection.Backward ->
43+
store.ReadStreamBackwards(StreamId(streamName), fromReadVersion readVersion, msgCount)
44+
|> Async.AwaitTask)
4245

4346
let readFromAllStream' (store: SqlStreamStore.IStreamStore)
4447
(readingDirection: ReadingDirection)
4548
(startPositionInclusive: StartPosition)
4649
(msgCount: int)
4750
(prefetchJson: bool)
4851
: Async<ReadAllPage> =
49-
match readingDirection with
50-
| ReadingDirection.Forward ->
51-
store.ReadAllForwards(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson)
52-
| ReadingDirection.Backward ->
53-
store.ReadAllBackwards(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson)
54-
|> Async.AwaitTask
52+
async.Delay(fun () ->
53+
match readingDirection with
54+
| ReadingDirection.Forward ->
55+
store.ReadAllForwards(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson)
56+
| ReadingDirection.Backward ->
57+
store.ReadAllBackwards(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson)
58+
|> Async.AwaitTask)
5559

5660
let readFromStream' (store: SqlStreamStore.IStreamStore)
5761
(readingDirection: ReadingDirection)
@@ -60,12 +64,13 @@ module ReadRaw =
6064
(msgCount: int)
6165
(prefetchJson: bool)
6266
: Async<ReadStreamPage> =
63-
match readingDirection with
64-
| ReadingDirection.Forward ->
65-
store.ReadStreamForwards(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson)
66-
| ReadingDirection.Backward ->
67-
store.ReadStreamBackwards(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson)
68-
|> Async.AwaitTask
67+
async.Delay(fun () ->
68+
match readingDirection with
69+
| ReadingDirection.Forward ->
70+
store.ReadStreamForwards(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson)
71+
| ReadingDirection.Backward ->
72+
store.ReadStreamBackwards(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson)
73+
|> Async.AwaitTask)
6974

7075
let readFromAllStream'' (store: SqlStreamStore.IStreamStore)
7176
(readingDirection: ReadingDirection)
@@ -74,14 +79,15 @@ module ReadRaw =
7479
(prefetchJson: bool)
7580
(cancellationToken: CancellationToken)
7681
: Async<ReadAllPage> =
77-
match readingDirection with
78-
| ReadingDirection.Forward ->
79-
store.ReadAllForwards
80-
(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson, cancellationToken)
81-
| ReadingDirection.Backward ->
82-
store.ReadAllBackwards
83-
(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson, cancellationToken)
84-
|> Async.AwaitTask
82+
async.Delay(fun () ->
83+
match readingDirection with
84+
| ReadingDirection.Forward ->
85+
store.ReadAllForwards
86+
(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson, cancellationToken)
87+
| ReadingDirection.Backward ->
88+
store.ReadAllBackwards
89+
(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson, cancellationToken)
90+
|> Async.AwaitTask)
8591

8692
let readFromStream'' (store: SqlStreamStore.IStreamStore)
8793
(readingDirection: ReadingDirection)
@@ -91,11 +97,12 @@ module ReadRaw =
9197
(prefetchJson: bool)
9298
(cancellationToken: CancellationToken)
9399
: Async<ReadStreamPage> =
94-
match readingDirection with
95-
| ReadingDirection.Forward ->
96-
store.ReadStreamForwards
97-
(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson, cancellationToken)
98-
| ReadingDirection.Backward ->
99-
store.ReadStreamBackwards
100-
(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson, cancellationToken)
101-
|> Async.AwaitTask
100+
async.Delay(fun () ->
101+
match readingDirection with
102+
| ReadingDirection.Forward ->
103+
store.ReadStreamForwards
104+
(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson, cancellationToken)
105+
| ReadingDirection.Backward ->
106+
store.ReadStreamBackwards
107+
(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson, cancellationToken)
108+
|> Async.AwaitTask)

0 commit comments

Comments
 (0)