@@ -64,6 +64,7 @@ def initialize(nonblocking: self.class.default_nonblocking, fiber_limit: self.cl
6464 @nonblocking = nonblocking
6565 end
6666 @fiber_limit = fiber_limit
67+ @lazies_at_depth = Hash . new { |h , k | h [ k ] = [ ] }
6768 end
6869
6970 # @return [Integer, nil]
@@ -140,10 +141,10 @@ def yield(source = Fiber[:__graphql_current_dataloader_source])
140141 end
141142
142143 # @api private Nothing to see here
143- def append_job ( &job )
144+ def append_job ( callable = nil , &job )
144145 # Given a block, queue it up to be worked through when `#run` is called.
145- # (If the dataloader is already running, than a Fiber will pick this up later.)
146- @pending_jobs . push ( job )
146+ # (If the dataloader is already running, then a Fiber will pick this up later.)
147+ @pending_jobs . push ( callable || job )
147148 nil
148149 end
149150
@@ -160,6 +161,10 @@ def clear_cache
160161 def run_isolated
161162 prev_queue = @pending_jobs
162163 prev_pending_keys = { }
164+ prev_lazies_at_depth = @lazies_at_depth
165+ @lazies_at_depth = @lazies_at_depth . dup . clear
166+ # Clear pending loads but keep already-cached records
167+ # in case they are useful to the given block.
163168 @source_cache . each do |source_class , batched_sources |
164169 batched_sources . each do |batch_args , batched_source_instance |
165170 if batched_source_instance . pending?
@@ -179,6 +184,7 @@ def run_isolated
179184 res
180185 ensure
181186 @pending_jobs = prev_queue
187+ @lazies_at_depth = prev_lazies_at_depth
182188 prev_pending_keys . each do |source_instance , pending |
183189 pending . each do |key , value |
184190 if !source_instance . results . key? ( key )
@@ -188,7 +194,8 @@ def run_isolated
188194 end
189195 end
190196
191- def run
197+ # @param trace_query_lazy [nil, Execution::Multiplex]
198+ def run ( trace_query_lazy : nil )
192199 trace = Fiber [ :__graphql_current_multiplex ] &.current_trace
193200 jobs_fiber_limit , total_fiber_limit = calculate_fiber_limit
194201 job_fibers = [ ]
@@ -201,26 +208,13 @@ def run
201208 while first_pass || !job_fibers . empty?
202209 first_pass = false
203210
204- while ( f = ( job_fibers . shift || ( ( ( next_job_fibers . size + job_fibers . size ) < jobs_fiber_limit ) && spawn_job_fiber ( trace ) ) ) )
205- if f . alive?
206- finished = run_fiber ( f )
207- if !finished
208- next_job_fibers << f
209- end
210- end
211- end
212- join_queues ( job_fibers , next_job_fibers )
213-
214- while ( !source_fibers . empty? || @source_cache . each_value . any? { |group_sources | group_sources . each_value . any? ( &:pending? ) } )
215- while ( f = source_fibers . shift || ( ( ( job_fibers . size + source_fibers . size + next_source_fibers . size + next_job_fibers . size ) < total_fiber_limit ) && spawn_source_fiber ( trace ) ) )
216- if f . alive?
217- finished = run_fiber ( f )
218- if !finished
219- next_source_fibers << f
220- end
221- end
211+ run_pending_steps ( trace , job_fibers , next_job_fibers , jobs_fiber_limit , source_fibers , next_source_fibers , total_fiber_limit )
212+
213+ if !@lazies_at_depth . empty?
214+ with_trace_query_lazy ( trace_query_lazy ) do
215+ run_next_pending_lazies ( job_fibers , trace )
216+ run_pending_steps ( trace , job_fibers , next_job_fibers , jobs_fiber_limit , source_fibers , next_source_fibers , total_fiber_limit )
222217 end
223- join_queues ( source_fibers , next_source_fibers )
224218 end
225219 end
226220
@@ -248,6 +242,11 @@ def run_fiber(f)
248242 f . resume
249243 end
250244
245+ # @api private
246+ def lazy_at_depth ( depth , lazy )
247+ @lazies_at_depth [ depth ] << lazy
248+ end
249+
251250 def spawn_fiber
252251 fiber_vars = get_fiber_variables
253252 Fiber . new ( blocking : !@nonblocking ) {
@@ -275,6 +274,59 @@ def merge_records(records, index_by: :id)
275274
276275 private
277276
277+ def run_next_pending_lazies ( job_fibers , trace )
278+ smallest_depth = nil
279+ @lazies_at_depth . each_key do |depth_key |
280+ smallest_depth ||= depth_key
281+ if depth_key < smallest_depth
282+ smallest_depth = depth_key
283+ end
284+ end
285+
286+ if smallest_depth
287+ lazies = @lazies_at_depth . delete ( smallest_depth )
288+ if !lazies . empty?
289+ lazies . each_with_index do |l , idx |
290+ append_job { l . value }
291+ end
292+ job_fibers . unshift ( spawn_job_fiber ( trace ) )
293+ end
294+ end
295+ end
296+
297+ def run_pending_steps ( trace , job_fibers , next_job_fibers , jobs_fiber_limit , source_fibers , next_source_fibers , total_fiber_limit )
298+ while ( f = ( job_fibers . shift || ( ( ( next_job_fibers . size + job_fibers . size ) < jobs_fiber_limit ) && spawn_job_fiber ( trace ) ) ) )
299+ if f . alive?
300+ finished = run_fiber ( f )
301+ if !finished
302+ next_job_fibers << f
303+ end
304+ end
305+ end
306+ join_queues ( job_fibers , next_job_fibers )
307+
308+ while ( !source_fibers . empty? || @source_cache . each_value . any? { |group_sources | group_sources . each_value . any? ( &:pending? ) } )
309+ while ( f = source_fibers . shift || ( ( ( job_fibers . size + source_fibers . size + next_source_fibers . size + next_job_fibers . size ) < total_fiber_limit ) && spawn_source_fiber ( trace ) ) )
310+ if f . alive?
311+ finished = run_fiber ( f )
312+ if !finished
313+ next_source_fibers << f
314+ end
315+ end
316+ end
317+ join_queues ( source_fibers , next_source_fibers )
318+ end
319+ end
320+
321+ def with_trace_query_lazy ( multiplex_or_nil , &block )
322+ if ( multiplex = multiplex_or_nil )
323+ query = multiplex . queries . length == 1 ? multiplex . queries [ 0 ] : nil
324+ multiplex . current_trace . execute_query_lazy ( query : query , multiplex : multiplex , &block )
325+ else
326+ yield
327+ end
328+ end
329+
278330 def calculate_fiber_limit
279331 total_fiber_limit = @fiber_limit || Float ::INFINITY
280332 if total_fiber_limit < 4
0 commit comments