2626 from graphql .execution .types import (
2727 DeferredFragmentRecord ,
2828 DeferredGroupedFieldSetRecord ,
29- DeferredGroupedFieldSetResult ,
3029 IncrementalDataRecord ,
3130 IncrementalDataRecordResult ,
3231 ReconcilableDeferredGroupedFieldSetResult ,
3332 StreamItemsRecord ,
34- StreamItemsResult ,
3533 SubsequentResultRecord ,
3634 )
3735
@@ -51,20 +49,17 @@ class DeferredFragmentNode:
5149 "deferred_fragment_record" ,
5250 "deferred_grouped_field_set_records" ,
5351 "reconcilable_results" ,
54- "results" ,
5552 )
5653
5754 deferred_fragment_record : DeferredFragmentRecord
5855 deferred_grouped_field_set_records : dict [DeferredGroupedFieldSetRecord , None ]
59- results : list [DeferredGroupedFieldSetResult ]
6056 reconcilable_results : dict [ReconcilableDeferredGroupedFieldSetResult , None ]
6157 children : list [DeferredFragmentNode ]
6258
6359 def __init__ (self , deferred_fragment_record : DeferredFragmentRecord ) -> None :
6460 """Initialize the DeferredFragmentNode."""
6561 self .deferred_fragment_record = deferred_fragment_record
6662 self .deferred_grouped_field_set_records = {}
67- self .results = []
6863 self .reconcilable_results = {}
6964 self .children = []
7065
@@ -95,6 +90,7 @@ class IncrementalGraph:
9590 _pending : dict [SubsequentResultNode , None ]
9691 _deferred_fragment_nodes : dict [DeferredFragmentRecord , DeferredFragmentNode ]
9792 _new_pending : dict [SubsequentResultNode , None ]
93+ _new_incremental_data_records : dict [IncrementalDataRecord , None ]
9894 _completed_queue : list [IncrementalDataRecordResult ]
9995 _next_queue : list [Future [Iterable [IncrementalDataRecordResult ]]]
10096
@@ -105,6 +101,7 @@ def __init__(self) -> None:
105101 self ._pending = {}
106102 self ._deferred_fragment_nodes = {}
107103 self ._new_pending = {}
104+ self ._new_incremental_data_records = {}
108105 self ._completed_queue = []
109106 self ._next_queue = []
110107 self ._tasks = set ()
@@ -115,49 +112,10 @@ def add_incremental_data_records(
115112 """Add incremental data records."""
116113 for incremental_data_record in incremental_data_records :
117114 if is_deferred_grouped_field_set_record (incremental_data_record ):
118- for deferred_fragment_record in (
119- incremental_data_record .deferred_fragment_records
120- ): # pragma: no branch
121- deferred_fragment_node = self ._add_deferred_fragment_node (
122- deferred_fragment_record
123- )
124- deferred_fragment_node .deferred_grouped_field_set_records [
125- incremental_data_record
126- ] = None
127-
128- deferred_result = incremental_data_record .result
129- if is_awaitable (deferred_result ):
130-
131- async def enqueue_deferred (
132- deferred_result : Awaitable [DeferredGroupedFieldSetResult ],
133- ) -> None :
134- self ._enqueue_completed_deferred_grouped_field_set (
135- await deferred_result
136- )
137-
138- self ._add_task (enqueue_deferred (deferred_result ))
139- else :
140- self ._enqueue_completed_deferred_grouped_field_set (
141- deferred_result , # type: ignore
142- )
143- continue
144-
145- incremental_data_record = cast ("StreamItemsRecord" , incremental_data_record )
146- stream_record = incremental_data_record .stream_record
147- if stream_record .id is None :
148- self ._new_pending [stream_record ] = None
149-
150- stream_result = incremental_data_record .result
151- if is_awaitable (stream_result ):
152-
153- async def enqueue_stream (
154- stream_result : Awaitable [StreamItemsResult ],
155- ) -> None :
156- self ._enqueue (await stream_result )
157-
158- self ._add_task (enqueue_stream (stream_result ))
115+ self ._add_deferred_grouped_field_set_record (incremental_data_record )
159116 else :
160- self ._enqueue (stream_result ) # type: ignore
117+ stream_items_record = cast ("StreamItemsRecord" , incremental_data_record )
118+ self ._add_stream_items_record (stream_items_record )
161119
162120 def add_completed_reconcilable_deferred_grouped_field_set (
163121 self , reconcilable_result : ReconcilableDeferredGroupedFieldSetResult
@@ -178,6 +136,7 @@ def add_completed_reconcilable_deferred_grouped_field_set(
178136 def get_new_pending (self ) -> list [SubsequentResultRecord ]:
179137 """Get new pending subsequent result records."""
180138 _pending , _new_pending = self ._pending , self ._new_pending
139+ _new_incremental_data_records = self ._new_incremental_data_records
181140 new_pending : list [SubsequentResultRecord ] = []
182141 add_result = new_pending .append
183142 # avoid iterating over a changing dict
@@ -189,13 +148,34 @@ def get_new_pending(self) -> list[SubsequentResultRecord]:
189148 _pending [node ] = None
190149 add_result (node )
191150 elif node .deferred_grouped_field_set_records : # type: ignore
151+ records = node .deferred_grouped_field_set_records # type: ignore
152+ for deferred_grouped_field_set_node in records : # pragma: no branch
153+ _new_incremental_data_records [deferred_grouped_field_set_node ] = (
154+ None
155+ )
192156 _pending [node ] = None
193157 add_result (node .deferred_fragment_record ) # type: ignore
194158 else :
195159 for child in node .children : # type: ignore
196160 _new_pending [child ] = None
197161 add_iteration (child )
198162 _new_pending .clear ()
163+
164+ enqueue = self ._enqueue
165+ for incremental_data_record in _new_incremental_data_records :
166+ result = incremental_data_record .result
167+ if is_awaitable (result ):
168+
169+ async def enqueue_incremental (
170+ result : Awaitable [IncrementalDataRecordResult ],
171+ ) -> None :
172+ enqueue (await result )
173+
174+ self ._add_task (enqueue_incremental (result ))
175+ else :
176+ enqueue (result ) # type: ignore
177+ _new_incremental_data_records .clear ()
178+
199179 return new_pending
200180
201181 async def completed_incremental_data (
@@ -247,8 +227,6 @@ def complete_deferred_fragment(
247227 new_pending = self ._new_pending
248228 for child in deferred_fragment_node .children :
249229 new_pending [child ] = None
250- for result in child .results :
251- self ._enqueue (result )
252230 return reconcilable_results
253231
254232 def remove_deferred_fragment (
@@ -277,6 +255,29 @@ def _remove_pending(self, subsequent_result_node: SubsequentResultNode) -> None:
277255 if not self ._pending :
278256 self .stop_incremental_data ()
279257
258+ def _add_deferred_grouped_field_set_record (
259+ self , deferred_grouped_field_set_record : DeferredGroupedFieldSetRecord
260+ ) -> None :
261+ """Add a deferred grouped field set record."""
262+ pending = self ._pending
263+ new_incremental_data_records = self ._new_incremental_data_records
264+ add = self ._add_deferred_fragment_node
265+ records = deferred_grouped_field_set_record .deferred_fragment_records
266+ for deferred_fragment_record in records : # pragma: no branch
267+ deferred_fragment_node = add (deferred_fragment_record )
268+ if deferred_fragment_node in pending :
269+ new_incremental_data_records [deferred_grouped_field_set_record ] = None
270+ deferred_fragment_node .deferred_grouped_field_set_records [
271+ deferred_grouped_field_set_record
272+ ] = None
273+
274+ def _add_stream_items_record (self , stream_items_record : StreamItemsRecord ) -> None :
275+ """Add a stream items record."""
276+ stream_record = stream_items_record .stream_record
277+ if stream_record not in self ._pending :
278+ self ._new_pending [stream_record ] = None
279+ self ._new_incremental_data_records [stream_items_record ] = None
280+
280281 def _add_deferred_fragment_node (
281282 self , deferred_fragment_record : DeferredFragmentRecord
282283 ) -> DeferredFragmentNode :
@@ -298,24 +299,6 @@ def _add_deferred_fragment_node(
298299 parent_node .children .append (deferred_fragment_node )
299300 return deferred_fragment_node
300301
301- def _enqueue_completed_deferred_grouped_field_set (
302- self , result : DeferredGroupedFieldSetResult
303- ) -> None :
304- """Enqueue completed deferred grouped field set result."""
305- is_pending = False
306- nodes = self ._deferred_fragment_nodes
307- record = result .deferred_grouped_field_set_record
308- for deferred_fragment_record in record .deferred_fragment_records :
309- try :
310- deferred_fragment_node = nodes [deferred_fragment_record ]
311- except KeyError : # pragma: no cover
312- continue
313- if deferred_fragment_node in self ._pending :
314- is_pending = True
315- deferred_fragment_node .results .append (result )
316- if is_pending :
317- self ._enqueue (result )
318-
319302 def _add_task (self , awaitable : Awaitable [Any ]) -> None :
320303 """Add the given task to the tasks set for later execution."""
321304 tasks = self ._tasks
0 commit comments