Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
5ba154a
Extract dataloader changes from run-queue-3; merge lazy resolution in…
rmosolgo Aug 26, 2025
e79e04b
fix file endings
rmosolgo Sep 2, 2025
4d57c4d
Handle ExecutionErrors raised from leaf coercion + dataloader
rmosolgo Sep 2, 2025
43e8eb3
Fix stale references to lazies_at_depth; correctly re-enter execution…
rmosolgo Sep 2, 2025
e9dffaa
Quick test fixes
rmosolgo Sep 2, 2025
bb3f2da
Add deprecations to Execution::Resolve; better lazy/Dataloader integr…
rmosolgo Sep 2, 2025
e901a86
Debug mutation test
rmosolgo Sep 2, 2025
e9f8dc2
Update spec for non-depth-first behavior with FlatDataloader
rmosolgo Sep 2, 2025
4fb6307
Fix context for argument errors
rmosolgo Sep 2, 2025
c2257b8
Update specs for resolution order
rmosolgo Sep 2, 2025
fc40677
Add Lazy handling to AsyncDataloader
rmosolgo Sep 2, 2025
ba2712d
Fix initialization; rebuild snapshot
rmosolgo Sep 2, 2025
3493129
Remove steps_to_rerun_after_lazy since it's not used yet
rmosolgo Sep 2, 2025
78bd98f
Clean up lazies_at_depth, DRY FlatDataloader
rmosolgo Sep 2, 2025
d34262b
Update assertions
rmosolgo Sep 2, 2025
5c43bcc
Update more specs
rmosolgo Sep 2, 2025
2b9f736
Put back missing require
rmosolgo Sep 2, 2025
889bcf6
Add Lazy resolution to NullDataloader and remove FlatDataloader
rmosolgo Sep 3, 2025
d406d39
Use a fresh NullDataloader instance for run_isolated to support froze…
rmosolgo Sep 3, 2025
7834e2b
Revert some changes to test output
rmosolgo Sep 3, 2025
f5a87f3
Merge branch 'master' into merge-dataloader-and-lazy
rmosolgo Sep 3, 2025
add47cc
Update breadth-first spec
rmosolgo Sep 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 75 additions & 23 deletions lib/graphql/dataloader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def initialize(nonblocking: self.class.default_nonblocking, fiber_limit: self.cl
@nonblocking = nonblocking
end
@fiber_limit = fiber_limit
@lazies_at_depth = Hash.new { |h, k| h[k] = [] }
end

# @return [Integer, nil]
Expand Down Expand Up @@ -140,10 +141,10 @@ def yield(source = Fiber[:__graphql_current_dataloader_source])
end

# @api private Nothing to see here
def append_job(&job)
def append_job(callable = nil, &job)
# Given a block, queue it up to be worked through when `#run` is called.
# (If the dataloader is already running, than a Fiber will pick this up later.)
@pending_jobs.push(job)
# (If the dataloader is already running, then a Fiber will pick this up later.)
@pending_jobs.push(callable || job)
nil
end

Expand All @@ -160,6 +161,10 @@ def clear_cache
def run_isolated
prev_queue = @pending_jobs
prev_pending_keys = {}
prev_lazies_at_depth = @lazies_at_depth
@lazies_at_depth = @lazies_at_depth.dup.clear
# Clear pending loads but keep already-cached records
# in case they are useful to the given block.
@source_cache.each do |source_class, batched_sources|
batched_sources.each do |batch_args, batched_source_instance|
if batched_source_instance.pending?
Expand All @@ -179,6 +184,7 @@ def run_isolated
res
ensure
@pending_jobs = prev_queue
@lazies_at_depth = prev_lazies_at_depth
prev_pending_keys.each do |source_instance, pending|
pending.each do |key, value|
if !source_instance.results.key?(key)
Expand All @@ -188,7 +194,8 @@ def run_isolated
end
end

def run
# @param trace_query_lazy [nil, Execution::Multiplex]
def run(trace_query_lazy: nil)
trace = Fiber[:__graphql_current_multiplex]&.current_trace
jobs_fiber_limit, total_fiber_limit = calculate_fiber_limit
job_fibers = []
Expand All @@ -201,26 +208,13 @@ def run
while first_pass || !job_fibers.empty?
first_pass = false

while (f = (job_fibers.shift || (((next_job_fibers.size + job_fibers.size) < jobs_fiber_limit) && spawn_job_fiber(trace))))
if f.alive?
finished = run_fiber(f)
if !finished
next_job_fibers << f
end
end
end
join_queues(job_fibers, next_job_fibers)

while (!source_fibers.empty? || @source_cache.each_value.any? { |group_sources| group_sources.each_value.any?(&:pending?) })
while (f = source_fibers.shift || (((job_fibers.size + source_fibers.size + next_source_fibers.size + next_job_fibers.size) < total_fiber_limit) && spawn_source_fiber(trace)))
if f.alive?
finished = run_fiber(f)
if !finished
next_source_fibers << f
end
end
run_pending_steps(trace, job_fibers, next_job_fibers, jobs_fiber_limit, source_fibers, next_source_fibers, total_fiber_limit)

if !@lazies_at_depth.empty?
with_trace_query_lazy(trace_query_lazy) do
run_next_pending_lazies(job_fibers, trace)
run_pending_steps(trace, job_fibers, next_job_fibers, jobs_fiber_limit, source_fibers, next_source_fibers, total_fiber_limit)
end
join_queues(source_fibers, next_source_fibers)
end
end

Expand Down Expand Up @@ -248,6 +242,11 @@ def run_fiber(f)
f.resume
end

# @api private
def lazy_at_depth(depth, lazy)
@lazies_at_depth[depth] << lazy
end

def spawn_fiber
fiber_vars = get_fiber_variables
Fiber.new(blocking: !@nonblocking) {
Expand Down Expand Up @@ -275,6 +274,59 @@ def merge_records(records, index_by: :id)

private

def run_next_pending_lazies(job_fibers, trace)
smallest_depth = nil
@lazies_at_depth.each_key do |depth_key|
smallest_depth ||= depth_key
if depth_key < smallest_depth
smallest_depth = depth_key
end
end

if smallest_depth
lazies = @lazies_at_depth.delete(smallest_depth)
if !lazies.empty?
lazies.each_with_index do |l, idx|
append_job { l.value }
end
job_fibers.unshift(spawn_job_fiber(trace))
end
end
end

def run_pending_steps(trace, job_fibers, next_job_fibers, jobs_fiber_limit, source_fibers, next_source_fibers, total_fiber_limit)
while (f = (job_fibers.shift || (((next_job_fibers.size + job_fibers.size) < jobs_fiber_limit) && spawn_job_fiber(trace))))
if f.alive?
finished = run_fiber(f)
if !finished
next_job_fibers << f
end
end
end
join_queues(job_fibers, next_job_fibers)

while (!source_fibers.empty? || @source_cache.each_value.any? { |group_sources| group_sources.each_value.any?(&:pending?) })
while (f = source_fibers.shift || (((job_fibers.size + source_fibers.size + next_source_fibers.size + next_job_fibers.size) < total_fiber_limit) && spawn_source_fiber(trace)))
if f.alive?
finished = run_fiber(f)
if !finished
next_source_fibers << f
end
end
end
join_queues(source_fibers, next_source_fibers)
end
end

def with_trace_query_lazy(multiplex_or_nil, &block)
if (multiplex = multiplex_or_nil)
query = multiplex.queries.length == 1 ? multiplex.queries[0] : nil
multiplex.current_trace.execute_query_lazy(query: query, multiplex: multiplex, &block)
else
yield
end
end

def calculate_fiber_limit
total_fiber_limit = @fiber_limit || Float::INFINITY
if total_fiber_limit < 4
Expand Down
33 changes: 22 additions & 11 deletions lib/graphql/dataloader/async_dataloader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def yield(source = Fiber[:__graphql_current_dataloader_source])
nil
end

def run
def run(trace_query_lazy: nil)
trace = Fiber[:__graphql_current_multiplex]&.current_trace
jobs_fiber_limit, total_fiber_limit = calculate_fiber_limit
job_fibers = []
Expand All @@ -29,16 +29,7 @@ def run
first_pass = false
fiber_vars = get_fiber_variables

while (f = (job_fibers.shift || (((job_fibers.size + next_job_fibers.size + source_tasks.size) < jobs_fiber_limit) && spawn_job_fiber(trace))))
if f.alive?
finished = run_fiber(f)
if !finished
next_job_fibers << f
end
end
end
job_fibers.concat(next_job_fibers)
next_job_fibers.clear
run_pending_steps(job_fibers, next_job_fibers, source_tasks, jobs_fiber_limit, trace)

Sync do |root_task|
set_fiber_variables(fiber_vars)
Expand All @@ -54,6 +45,13 @@ def run
next_source_tasks.clear
end
end

if !@lazies_at_depth.empty?
with_trace_query_lazy(trace_query_lazy) do
run_next_pending_lazies(job_fibers, trace)
run_pending_steps(job_fibers, next_job_fibers, source_tasks, jobs_fiber_limit, trace)
end
end
end
trace&.end_dataloader(self)
end
Expand All @@ -69,6 +67,19 @@ def run

private

def run_pending_steps(job_fibers, next_job_fibers, source_tasks, jobs_fiber_limit, trace)
while (f = (job_fibers.shift || (((job_fibers.size + next_job_fibers.size + source_tasks.size) < jobs_fiber_limit) && spawn_job_fiber(trace))))
if f.alive?
finished = run_fiber(f)
if !finished
next_job_fibers << f
end
end
end
job_fibers.concat(next_job_fibers)
next_job_fibers.clear
end

def spawn_source_task(parent_task, condition, trace)
pending_sources = nil
@source_cache.each_value do |source_by_batch_params|
Expand Down
54 changes: 44 additions & 10 deletions lib/graphql/dataloader/null_dataloader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,58 @@

module GraphQL
class Dataloader
# The default implementation of dataloading -- all no-ops.
# GraphQL-Ruby uses this when Dataloader isn't enabled.
#
# The Dataloader interface isn't public, but it enables
# simple internal code while adding the option to add Dataloader.
# It runs execution code inline and gathers lazy objects (eg. Promises)
# and resolves them during {#run}.
class NullDataloader < Dataloader
# These are all no-ops because code was
# executed synchronously.
def initialize(*)
@lazies_at_depth = Hash.new { |h,k| h[k] = [] }
end

def freeze
@lazies_at_depth.default_proc = nil
@lazies_at_depth.freeze
super
end

def run(trace_query_lazy: nil)
with_trace_query_lazy(trace_query_lazy) do
while !@lazies_at_depth.empty?
smallest_depth = nil
@lazies_at_depth.each_key do |depth_key|
smallest_depth ||= depth_key
if depth_key < smallest_depth
smallest_depth = depth_key
end
end

if smallest_depth
lazies = @lazies_at_depth.delete(smallest_depth)
lazies.each(&:value) # resolve these Lazy instances
end
end
end
end

def run_isolated
new_dl = self.class.new
res = nil
new_dl.append_job {
res = yield
}
new_dl.run
res
end

def initialize(*); end
def run; end
def run_isolated; yield; end
def clear_cache; end

def yield(_source)
raise GraphQL::Error, "GraphQL::Dataloader is not running -- add `use GraphQL::Dataloader` to your schema to use Dataloader sources."
end

def append_job
yield
def append_job(callable = nil)
callable ? callable.call : yield
nil
end

Expand Down
14 changes: 2 additions & 12 deletions lib/graphql/execution/interpreter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ def run_all(schema, query_options, context: {}, max_complexity: schema.max_compl
trace.execute_multiplex(multiplex: multiplex) do
schema = multiplex.schema
queries = multiplex.queries
lazies_at_depth = Hash.new { |h, k| h[k] = [] }
multiplex_analyzers = schema.multiplex_analyzers
if multiplex.max_complexity
multiplex_analyzers += [GraphQL::Analysis::MaxQueryComplexity]
Expand Down Expand Up @@ -73,7 +72,7 @@ def run_all(schema, query_options, context: {}, max_complexity: schema.max_compl
# Although queries in a multiplex _share_ an Interpreter instance,
# they also have another item of state, which is private to that query
# in particular, assign it here:
runtime = Runtime.new(query: query, lazies_at_depth: lazies_at_depth)
runtime = Runtime.new(query: query)
query.context.namespace(:interpreter_runtime)[:runtime] = runtime

query.current_trace.execute_query(query: query) do
Expand All @@ -88,16 +87,7 @@ def run_all(schema, query_options, context: {}, max_complexity: schema.max_compl
}
end

multiplex.dataloader.run

# Then, work through lazy results in a breadth-first way
multiplex.dataloader.append_job {
query = multiplex.queries.length == 1 ? multiplex.queries[0] : nil
multiplex.current_trace.execute_query_lazy(multiplex: multiplex, query: query) do
Interpreter::Resolve.resolve_each_depth(lazies_at_depth, multiplex.dataloader)
end
}
multiplex.dataloader.run
multiplex.dataloader.run(trace_query_lazy: multiplex)

# Then, find all errors and assign the result to the query object
results.each_with_index do |data_result, idx|
Expand Down
20 changes: 7 additions & 13 deletions lib/graphql/execution/interpreter/resolve.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@ class Interpreter
module Resolve
# Continue field results in `results` until there's nothing else to continue.
# @return [void]
# @deprecated Call `dataloader.run` instead
def self.resolve_all(results, dataloader)
warn "#{self}.#{__method__} is deprecated; Use `dataloader.run` instead.#{caller(1, 5).map { |l| "\n #{l}"}.join}"
dataloader.append_job { resolve(results, dataloader) }
nil
end

# @deprecated Call `dataloader.run` instead
def self.resolve_each_depth(lazies_at_depth, dataloader)
warn "#{self}.#{__method__} is deprecated; Use `dataloader.run` instead.#{caller(1, 5).map { |l| "\n #{l}"}.join}"

smallest_depth = nil
lazies_at_depth.each_key do |depth_key|
smallest_depth ||= depth_key
Expand All @@ -34,20 +39,9 @@ def self.resolve_each_depth(lazies_at_depth, dataloader)
nil
end

# After getting `results` back from an interpreter evaluation,
# continue it until you get a response-ready Ruby value.
#
# `results` is one level of _depth_ of a query or multiplex.
#
# Resolve all lazy values in that depth before moving on
# to the next level.
#
# It's assumed that the lazies will
# return {Lazy} instances if there's more work to be done,
# or return {Hash}/{Array} if the query should be continued.
#
# @return [void]
# @deprecated Call `dataloader.run` instead
def self.resolve(results, dataloader)
warn "#{self}.#{__method__} is deprecated; Use `dataloader.run` instead.#{caller(1, 5).map { |l| "\n #{l}"}.join}"
# There might be pending jobs here that _will_ write lazies
# into the result hash. We should run them out, so we
# can be sure that all lazies will be present in the result hashes.
Expand Down
Loading