Skip to content

Commit 5d990cd

Browse files
committed
Advanced API to do better batching of certain queries.
1 parent 50319d6 commit 5d990cd

File tree

3 files changed

+92
-1
lines changed

3 files changed

+92
-1
lines changed

src/Rezoom.SQL.Mapping/CommandBatch.fs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ open System.Data.Common
55
open System.Collections.Generic
66
open System.Text
77
open System.Threading
8+
open System.Threading.Tasks
89
open FSharp.Control.Tasks.ContextInsensitive
910
open Rezoom.SQL
1011

@@ -278,9 +279,12 @@ module private CommandBatchUtilities =
278279
open CommandBatchUtilities
279280

280281
type AsyncCommandBatch(conn : DbConnection, tran : DbTransaction) =
282+
let deferred = Queue<unit -> unit>()
281283
let builders = ResizeArray<CommandBatchBuilder>()
282284
let evaluation =
283285
lazy
286+
while deferred.Count > 0 do
287+
deferred.Dequeue()()
284288
task {
285289
let arr = Array.zeroCreate builders.Count
286290
for i = 0 to builders.Count - 1 do
@@ -290,7 +294,21 @@ type AsyncCommandBatch(conn : DbConnection, tran : DbTransaction) =
290294
}
291295
do
292296
builders.Add(CommandBatchBuilder(conn, tran))
293-
member __.Batch(cmd : #Command<'a>) =
297+
member this.Batch(f : unit -> #Command<'a>) : (CancellationToken -> 'a Task) =
298+
let mutable eventuallyBatched = None
299+
deferred.Enqueue(fun () ->
300+
let cmd = f()
301+
let batched = this.Batch(cmd)
302+
eventuallyBatched <- Some batched)
303+
fun (token : CancellationToken) ->
304+
task {
305+
let! _ = evaluation.Value
306+
match eventuallyBatched with
307+
| None -> return failwith "BUG: deferred batch didn't work"
308+
| Some batched ->
309+
return! batched token
310+
}
311+
member __.Batch(cmd : #Command<'a>) : (CancellationToken -> 'a Task) =
294312
let inline retrieveResult builderIndex resultsIndex =
295313
fun (_ : CancellationToken) ->
296314
task {

src/Rezoom.SQL.Mapping/Plans.fs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ open System.Data.Common
77
open Rezoom
88
open Rezoom.SQL
99
open Rezoom.SQL.Mapping
10+
open FSharp.Control.Tasks.ContextInsensitive
11+
open System.Threading
1012

1113
type private ExecutionLocalConnections(provider : ConnectionProvider) =
1214
let connections = Dictionary()
@@ -99,6 +101,73 @@ type private CommandErrand<'a>(command : Command<'a>) =
99101
let truncate = 80
100102
if all.Length < truncate then all else all.Substring(0, truncate - 3) + "..."
101103

104+
type private SharedCommandStepState<'id, 'a when 'id : equality>(factory : SharedCommandFactory<'id, 'a>, batch : AsyncCommandBatch) =
105+
let ids = ResizeArray<'id>()
106+
// defer the command-building till the last possible moment before the batch executes
107+
let bulkTask = batch.Batch(fun () -> factory.BuildCommand(ids))
108+
let lazyResults =
109+
lazy
110+
task {
111+
let! resultSet = bulkTask CancellationToken.None
112+
let dict = Dictionary()
113+
for resultRow in resultSet do
114+
let id = factory.Selector(resultRow)
115+
let succ, found = dict.TryGetValue(id)
116+
let found =
117+
if succ then found else
118+
let it = ResizeArray()
119+
dict.[id] <- it
120+
it
121+
found.Add(resultRow)
122+
return dict
123+
}
124+
member this.PrepareId(id : 'id) =
125+
ids.Add(id)
126+
fun (_ : CancellationToken) ->
127+
task {
128+
let! results = lazyResults.Value
129+
let succ, found = results.TryGetValue(id)
130+
return
131+
if succ then found :> 'a IReadOnlyList
132+
else [||] :> 'a IReadOnlyList
133+
}
134+
135+
and private SharedCommandStepStateLookup<'id, 'a when 'id : equality>() =
136+
let idsByFactory = Dictionary<obj, SharedCommandStepState<'id, 'a>>()
137+
member this.ByFactory(factory : SharedCommandFactory<'id, 'a>, batch : AsyncCommandBatch) =
138+
let succ, found = idsByFactory.TryGetValue(factory)
139+
if succ then found else
140+
let state = SharedCommandStepState<'id, 'a>(factory, batch)
141+
idsByFactory.[factory] <- state
142+
state
143+
144+
and private SharedCommandStepStateLookupFactory<'id, 'a when 'id : equality>() =
145+
inherit ServiceFactory<SharedCommandStepStateLookup<'id, 'a>>()
146+
override __.ServiceLifetime = ServiceLifetime.StepLocal
147+
override __.CreateService(_) = SharedCommandStepStateLookup<'id, 'a>()
148+
override __.DisposeService(_, _) = ()
149+
150+
and SharedCommandFactory<'id, 'a when 'id : equality>(buildCommand : 'id seq -> Command<'a IReadOnlyList>, selector : 'a -> 'id) =
151+
let templateCommand = buildCommand Seq.empty
152+
let connectionName = templateCommand.ConnectionName
153+
let cacheArgument = CommandErrandArgument(templateCommand.Parameters)
154+
member internal __.BuildCommand = buildCommand
155+
member internal __.Selector = selector
156+
member factory.ErrandForKey(id : 'id) =
157+
let cacheArg = box (id, cacheArgument)
158+
{ new AsynchronousErrand<'a IReadOnlyList>() with
159+
override __.CacheInfo = templateCommand.CacheInfo
160+
override __.CacheArgument = cacheArg
161+
override __.SequenceGroup = null
162+
override __.ToString() =
163+
templateCommand.ToString() + " (Arg = " + string (box id) + ")"
164+
override __.Prepare(cxt) =
165+
let batches = cxt.GetService<StepLocalBatchesFactory, _>()
166+
let batch = batches.GetBatch(connectionName)
167+
let subErrands = cxt.GetService<SharedCommandStepStateLookupFactory<'id, 'a>, _>().ByFactory(factory, batch)
168+
subErrands.PrepareId(id)
169+
} :> Errand<'a IReadOnlyList>
170+
102171
// Have to use a C#-style extension method to support the scalar constraint.
103172

104173
[<Extension>]

src/Rezoom.SQL.Mapping/Plans.fsi

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,7 @@ type ScalarCommandExtensions = class
2727
static member Scalar : cmd : Command<#IScalar<'a>> -> Plan<'a>
2828
end
2929

30+
type SharedCommandFactory<'id, 'a when 'id : equality> = class
31+
new : buildCommand : ('id seq -> Command<'a IReadOnlyList>) * selector : ('a -> 'id) -> SharedCommandFactory<'id, 'a>
32+
member ErrandForKey : key : 'id -> Errand<'a IReadOnlyList>
33+
end

0 commit comments

Comments
 (0)