Skip to content

Commit af67d4d

Browse files
committed
incremental: disable early execution by default
Replicates graphql/graphql-js@75dca3d
1 parent 379363f commit af67d4d

File tree

7 files changed

+534
-139
lines changed

7 files changed

+534
-139
lines changed

src/graphql/execution/execute.py

Lines changed: 121 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ class ExecutionContext(IncrementalPublisherContext):
208208
field_resolver: GraphQLFieldResolver
209209
type_resolver: GraphQLTypeResolver
210210
subscribe_field_resolver: GraphQLFieldResolver
211+
enable_early_execution: bool
211212
errors: list[GraphQLError] | None
212213
cancellable_streams: set[CancellableStreamRecord] | None
213214
middleware_manager: MiddlewareManager | None
@@ -227,7 +228,8 @@ def __init__(
227228
field_resolver: GraphQLFieldResolver,
228229
type_resolver: GraphQLTypeResolver,
229230
subscribe_field_resolver: GraphQLFieldResolver,
230-
middleware_manager: MiddlewareManager | None,
231+
enable_early_execution: bool = False,
232+
middleware_manager: MiddlewareManager | None = None,
231233
is_awaitable: Callable[[Any], TypeGuard[Awaitable]] | None = None,
232234
) -> None:
233235
self.schema = schema
@@ -239,6 +241,7 @@ def __init__(
239241
self.field_resolver = field_resolver
240242
self.type_resolver = type_resolver
241243
self.subscribe_field_resolver = subscribe_field_resolver
244+
self.enable_early_execution = enable_early_execution
242245
self.middleware_manager = middleware_manager
243246
self.is_awaitable = is_awaitable or default_is_awaitable
244247
self.errors = None
@@ -260,6 +263,7 @@ def build(
260263
field_resolver: GraphQLFieldResolver | None = None,
261264
type_resolver: GraphQLTypeResolver | None = None,
262265
subscribe_field_resolver: GraphQLFieldResolver | None = None,
266+
enable_early_execution: bool = False,
263267
middleware: Middleware | None = None,
264268
is_awaitable: Callable[[Any], TypeGuard[Awaitable]] | None = None,
265269
**custom_args: Any,
@@ -333,6 +337,7 @@ def build(
333337
field_resolver or default_field_resolver,
334338
type_resolver or default_type_resolver,
335339
subscribe_field_resolver or default_field_resolver,
340+
enable_early_execution,
336341
middleware_manager,
337342
is_awaitable,
338343
**custom_args,
@@ -1623,60 +1628,67 @@ def execute_deferred_grouped_field_sets(
16231628
defer_map: RefMap[DeferUsage, DeferredFragmentRecord],
16241629
) -> list[DeferredGroupedFieldSetRecord]:
16251630
"""Execute deferred grouped field sets."""
1631+
is_awaitable = self.is_awaitable
16261632
new_deferred_grouped_field_set_records: list[DeferredGroupedFieldSetRecord] = []
16271633
append_record = new_deferred_grouped_field_set_records.append
16281634
for defer_usage_set, grouped_field_set in new_grouped_field_sets.items():
16291635
deferred_fragment_records = get_deferred_fragment_records(
16301636
defer_usage_set, defer_map
16311637
)
16321638

1633-
deferred_grouped_field_set_record = DeferredGroupedFieldSetRecord(
1639+
deferred_record = DeferredGroupedFieldSetRecord(
16341640
deferred_fragment_records,
16351641
cast("BoxedAwaitableOrValue[DeferredGroupedFieldSetResult]", None),
16361642
)
16371643

1638-
if should_defer(parent_defer_usages, defer_usage_set):
1639-
1640-
async def executor(
1641-
deferred_grouped_field_set_record: DeferredGroupedFieldSetRecord,
1642-
grouped_field_set: GroupedFieldSet,
1643-
defer_usage_set: DeferUsageSet,
1644-
) -> DeferredGroupedFieldSetResult:
1645-
result = self.execute_deferred_grouped_field_set(
1646-
deferred_grouped_field_set_record,
1647-
parent_type,
1648-
source_value,
1649-
path,
1650-
grouped_field_set,
1651-
IncrementalContext(defer_usage_set),
1652-
defer_map,
1653-
)
1654-
if self.is_awaitable(result):
1655-
return await result
1656-
return cast("DeferredGroupedFieldSetResult", result)
1657-
1658-
deferred_grouped_field_set_record.result = BoxedAwaitableOrValue(
1659-
executor(
1660-
deferred_grouped_field_set_record,
1661-
grouped_field_set,
1662-
defer_usage_set,
1663-
)
1664-
)
1665-
else:
1666-
executed = self.execute_deferred_grouped_field_set(
1667-
deferred_grouped_field_set_record,
1644+
def executor(
1645+
deferred_record: DeferredGroupedFieldSetRecord = deferred_record,
1646+
grouped_field_set: GroupedFieldSet = grouped_field_set,
1647+
defer_usage_set: DeferUsageSet = defer_usage_set,
1648+
) -> AwaitableOrValue[DeferredGroupedFieldSetResult]:
1649+
return self.execute_deferred_grouped_field_set(
1650+
deferred_record,
16681651
parent_type,
16691652
source_value,
16701653
path,
16711654
grouped_field_set,
16721655
IncrementalContext(defer_usage_set),
16731656
defer_map,
16741657
)
1675-
deferred_grouped_field_set_record.result = BoxedAwaitableOrValue(
1676-
executed
1677-
)
16781658

1679-
append_record(deferred_grouped_field_set_record)
1659+
should_defer_this_defer_usage_set = should_defer(
1660+
parent_defer_usages, defer_usage_set
1661+
)
1662+
1663+
if should_defer_this_defer_usage_set:
1664+
if self.enable_early_execution:
1665+
1666+
async def execute_async(
1667+
executor: Callable[
1668+
[], AwaitableOrValue[DeferredGroupedFieldSetResult]
1669+
] = executor,
1670+
) -> DeferredGroupedFieldSetResult:
1671+
result = executor()
1672+
if is_awaitable(result):
1673+
return await result
1674+
return result # type: ignore
1675+
1676+
deferred_record.result = BoxedAwaitableOrValue(execute_async())
1677+
else:
1678+
1679+
def execute_sync(
1680+
executor: Callable[
1681+
[], AwaitableOrValue[DeferredGroupedFieldSetResult]
1682+
] = executor,
1683+
) -> BoxedAwaitableOrValue[DeferredGroupedFieldSetResult]:
1684+
return BoxedAwaitableOrValue(executor())
1685+
1686+
deferred_record.result = execute_sync
1687+
1688+
else:
1689+
deferred_record.result = BoxedAwaitableOrValue(executor())
1690+
1691+
append_record(deferred_record)
16801692

16811693
return new_deferred_grouped_field_set_records
16821694

@@ -1746,9 +1758,13 @@ def build_sync_stream_item_queue(
17461758
) -> list[StreamItemRecord]:
17471759
"""Build sync stream item queue."""
17481760
is_awaitable = self.is_awaitable
1761+
enable_early_execution = self.enable_early_execution
17491762
complete_stream_item = self.complete_stream_item
17501763

1751-
async def get_stream_item_result() -> StreamItemResult:
1764+
stream_item_queue: list[StreamItemRecord] = []
1765+
append_stream_item = stream_item_queue.append
1766+
1767+
def first_executor() -> StreamItemResult:
17521768
initial_path = stream_path.add_key(initial_index)
17531769

17541770
first_stream_item: BoxedAwaitableOrValue[StreamItemResult] = (
@@ -1763,24 +1779,39 @@ async def get_stream_item_result() -> StreamItemResult:
17631779
)
17641780
)
17651781
)
1782+
17661783
current_index = initial_index + 1
1767-
current_stream_item = first_stream_item
1784+
current_stream_item: (
1785+
BoxedAwaitableOrValue[StreamItemResult]
1786+
| Callable[[], BoxedAwaitableOrValue[StreamItemResult]]
1787+
) = first_stream_item
17681788
for item in iterator:
1769-
result = current_stream_item.value
1770-
if not is_awaitable(result) and result.errors:
1771-
break
1789+
if isinstance(current_stream_item, BoxedAwaitableOrValue):
1790+
result = current_stream_item.value
1791+
if not is_awaitable(result) and result.errors:
1792+
break
17721793

17731794
item_path = stream_path.add_key(current_index)
1774-
current_stream_item = BoxedAwaitableOrValue(
1775-
complete_stream_item(
1795+
1796+
def current_executor(
1797+
item: Any = item, item_path: Path = item_path
1798+
) -> AwaitableOrValue[StreamItemResult]:
1799+
return complete_stream_item(
17761800
item_path,
17771801
item,
17781802
IncrementalContext(),
17791803
field_group,
17801804
info,
17811805
item_type,
17821806
)
1807+
1808+
current_stream_item = (
1809+
BoxedAwaitableOrValue(current_executor())
1810+
if enable_early_execution
1811+
else lambda executor=current_executor: # type: ignore
1812+
BoxedAwaitableOrValue(executor())
17831813
)
1814+
17841815
append_stream_item(current_stream_item)
17851816

17861817
current_index = initial_index + 1
@@ -1789,10 +1820,14 @@ async def get_stream_item_result() -> StreamItemResult:
17891820

17901821
return first_stream_item.value
17911822

1792-
stream_item_queue: list[StreamItemRecord] = [
1793-
BoxedAwaitableOrValue(get_stream_item_result())
1794-
]
1795-
append_stream_item = stream_item_queue.append
1823+
if enable_early_execution:
1824+
1825+
async def await_first_stream_item() -> StreamItemResult:
1826+
return first_executor()
1827+
1828+
append_stream_item(BoxedAwaitableOrValue(await_first_stream_item()))
1829+
else:
1830+
append_stream_item(lambda: BoxedAwaitableOrValue(first_executor()))
17961831

17971832
return stream_item_queue
17981833

@@ -1806,20 +1841,25 @@ def build_async_stream_item_queue(
18061841
item_type: GraphQLOutputType,
18071842
) -> list[StreamItemRecord]:
18081843
"""Build async stream item queue."""
1844+
1845+
def executor() -> AwaitableOrValue[StreamItemResult]:
1846+
return self.get_next_async_stream_item_result(
1847+
stream_item_queue,
1848+
stream_path,
1849+
initial_index,
1850+
async_iterator,
1851+
field_group,
1852+
info,
1853+
item_type,
1854+
)
1855+
18091856
stream_item_queue: list[StreamItemRecord] = []
18101857
stream_item_queue.append(
1811-
BoxedAwaitableOrValue(
1812-
self.get_next_async_stream_item_result(
1813-
stream_item_queue,
1814-
stream_path,
1815-
initial_index,
1816-
async_iterator,
1817-
field_group,
1818-
info,
1819-
item_type,
1820-
)
1821-
)
1858+
BoxedAwaitableOrValue(executor())
1859+
if self.enable_early_execution
1860+
else lambda: BoxedAwaitableOrValue(executor())
18221861
)
1862+
18231863
return stream_item_queue
18241864

18251865
async def get_next_async_stream_item_result(
@@ -1855,18 +1895,21 @@ async def get_next_async_stream_item_result(
18551895
item_type,
18561896
)
18571897

1898+
def executor() -> AwaitableOrValue[StreamItemResult]:
1899+
return self.get_next_async_stream_item_result(
1900+
stream_item_queue,
1901+
stream_path,
1902+
index,
1903+
async_iterator,
1904+
field_group,
1905+
info,
1906+
item_type,
1907+
)
1908+
18581909
stream_item_queue.append(
1859-
BoxedAwaitableOrValue(
1860-
self.get_next_async_stream_item_result(
1861-
stream_item_queue,
1862-
stream_path,
1863-
index,
1864-
async_iterator,
1865-
field_group,
1866-
info,
1867-
item_type,
1868-
)
1869-
),
1910+
BoxedAwaitableOrValue(executor())
1911+
if self.enable_early_execution
1912+
else lambda: BoxedAwaitableOrValue(executor())
18701913
)
18711914

18721915
if self.is_awaitable(result):
@@ -2075,6 +2118,7 @@ def execute(
20752118
field_resolver: GraphQLFieldResolver | None = None,
20762119
type_resolver: GraphQLTypeResolver | None = None,
20772120
subscribe_field_resolver: GraphQLFieldResolver | None = None,
2121+
enable_early_execution: bool = False,
20782122
middleware: Middleware | None = None,
20792123
execution_context_class: type[ExecutionContext] | None = None,
20802124
is_awaitable: Callable[[Any], TypeGuard[Awaitable]] | None = None,
@@ -2108,6 +2152,7 @@ def execute(
21082152
field_resolver,
21092153
type_resolver,
21102154
subscribe_field_resolver,
2155+
enable_early_execution,
21112156
middleware,
21122157
execution_context_class,
21132158
is_awaitable,
@@ -2137,6 +2182,7 @@ def experimental_execute_incrementally(
21372182
field_resolver: GraphQLFieldResolver | None = None,
21382183
type_resolver: GraphQLTypeResolver | None = None,
21392184
subscribe_field_resolver: GraphQLFieldResolver | None = None,
2185+
enable_early_execution: bool = False,
21402186
middleware: Middleware | None = None,
21412187
execution_context_class: type[ExecutionContext] | None = None,
21422188
is_awaitable: Callable[[Any], TypeGuard[Awaitable]] | None = None,
@@ -2167,6 +2213,7 @@ def experimental_execute_incrementally(
21672213
field_resolver,
21682214
type_resolver,
21692215
subscribe_field_resolver,
2216+
enable_early_execution,
21702217
middleware,
21712218
is_awaitable,
21722219
**custom_context_args,
@@ -2222,6 +2269,7 @@ def execute_sync(
22222269
field_resolver,
22232270
type_resolver,
22242271
None,
2272+
False,
22252273
middleware,
22262274
execution_context_class,
22272275
is_awaitable,
@@ -2444,6 +2492,7 @@ def subscribe(
24442492
field_resolver: GraphQLFieldResolver | None = None,
24452493
type_resolver: GraphQLTypeResolver | None = None,
24462494
subscribe_field_resolver: GraphQLFieldResolver | None = None,
2495+
enable_early_execution: bool = False,
24472496
execution_context_class: type[ExecutionContext] | None = None,
24482497
middleware: MiddlewareManager | None = None,
24492498
**custom_context_args: Any,
@@ -2486,6 +2535,7 @@ def subscribe(
24862535
field_resolver,
24872536
type_resolver,
24882537
subscribe_field_resolver,
2538+
enable_early_execution,
24892539
middleware=middleware,
24902540
**custom_context_args,
24912541
)
@@ -2522,6 +2572,7 @@ def create_source_event_stream(
25222572
field_resolver: GraphQLFieldResolver | None = None,
25232573
type_resolver: GraphQLTypeResolver | None = None,
25242574
subscribe_field_resolver: GraphQLFieldResolver | None = None,
2575+
enable_early_execution: bool = False,
25252576
execution_context_class: type[ExecutionContext] | None = None,
25262577
**custom_context_args: Any,
25272578
) -> AwaitableOrValue[AsyncIterable[Any] | ExecutionResult]:
@@ -2560,6 +2611,7 @@ def create_source_event_stream(
25602611
field_resolver,
25612612
type_resolver,
25622613
subscribe_field_resolver,
2614+
enable_early_execution,
25632615
**custom_context_args,
25642616
)
25652617

0 commit comments

Comments
 (0)