@@ -12,6 +12,7 @@ import { addPath, pathToArray } from '../jsutils/Path.js';
1212import { promiseForObject } from '../jsutils/promiseForObject.js' ;
1313import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js' ;
1414import { promiseReduce } from '../jsutils/promiseReduce.js' ;
15+ import { Publisher } from '../jsutils/Publisher.js' ;
1516
1617import type { GraphQLFormattedError } from '../error/GraphQLError.js' ;
1718import { GraphQLError } from '../error/GraphQLError.js' ;
@@ -121,7 +122,10 @@ export interface ExecutionContext {
121122 typeResolver : GraphQLTypeResolver < any , any > ;
122123 subscribeFieldResolver : GraphQLFieldResolver < any , any > ;
123124 errors : Array < GraphQLError > ;
124- subsequentPayloads : Set < IncrementalDataRecord > ;
125+ publisher : Publisher <
126+ IncrementalDataRecord ,
127+ SubsequentIncrementalExecutionResult
128+ > ;
125129}
126130
127131/**
@@ -351,43 +355,44 @@ function executeImpl(
351355 // Errors from sub-fields of a NonNull type may propagate to the top level,
352356 // at which point we still log the error and null the parent field, which
353357 // in this case is the entire response.
358+ const { publisher, errors } = exeContext ;
354359 try {
355360 const result = executeOperation ( exeContext ) ;
356361 if ( isPromise ( result ) ) {
357362 return result . then (
358363 ( data ) => {
359- const initialResult = buildResponse ( data , exeContext . errors ) ;
360- if ( exeContext . subsequentPayloads . size > 0 ) {
364+ const initialResult = buildResponse ( data , errors ) ;
365+ if ( publisher . hasNext ( ) ) {
361366 return {
362367 initialResult : {
363368 ...initialResult ,
364369 hasNext : true ,
365370 } ,
366- subsequentResults : yieldSubsequentPayloads ( exeContext ) ,
371+ subsequentResults : publisher . subscribe ( ) ,
367372 } ;
368373 }
369374 return initialResult ;
370375 } ,
371376 ( error ) => {
372- exeContext . errors . push ( error ) ;
373- return buildResponse ( null , exeContext . errors ) ;
377+ errors . push ( error ) ;
378+ return buildResponse ( null , errors ) ;
374379 } ,
375380 ) ;
376381 }
377- const initialResult = buildResponse ( result , exeContext . errors ) ;
378- if ( exeContext . subsequentPayloads . size > 0 ) {
382+ const initialResult = buildResponse ( result , errors ) ;
383+ if ( publisher . hasNext ( ) ) {
379384 return {
380385 initialResult : {
381386 ...initialResult ,
382387 hasNext : true ,
383388 } ,
384- subsequentResults : yieldSubsequentPayloads ( exeContext ) ,
389+ subsequentResults : publisher . subscribe ( ) ,
385390 } ;
386391 }
387392 return initialResult ;
388393 } catch ( error ) {
389- exeContext . errors . push ( error ) ;
390- return buildResponse ( null , exeContext . errors ) ;
394+ errors . push ( error ) ;
395+ return buildResponse ( null , errors ) ;
391396 }
392397}
393398
@@ -503,7 +508,7 @@ export function buildExecutionContext(
503508 fieldResolver : fieldResolver ?? defaultFieldResolver ,
504509 typeResolver : typeResolver ?? defaultTypeResolver ,
505510 subscribeFieldResolver : subscribeFieldResolver ?? defaultFieldResolver ,
506- subsequentPayloads : new Set ( ) ,
511+ publisher : new Publisher ( getIncrementalResult , returnStreamIterators ) ,
507512 errors : [ ] ,
508513 } ;
509514}
@@ -515,7 +520,7 @@ function buildPerEventExecutionContext(
515520 return {
516521 ...exeContext ,
517522 rootValue : payload ,
518- subsequentPayloads : new Set ( ) ,
523+ // no need to update publisher, incremental delivery is not supported for subscriptions
519524 errors : [ ] ,
520525 } ;
521526}
@@ -2098,7 +2103,8 @@ function filterSubsequentPayloads(
20982103 currentIncrementalDataRecord : IncrementalDataRecord | undefined ,
20992104) : void {
21002105 const nullPathArray = pathToArray ( nullPath ) ;
2101- exeContext . subsequentPayloads . forEach ( ( incrementalDataRecord ) => {
2106+ const publisher = exeContext . publisher ;
2107+ publisher . getPending ( ) . forEach ( ( incrementalDataRecord ) => {
21022108 if ( incrementalDataRecord === currentIncrementalDataRecord ) {
21032109 // don't remove payload from where error originates
21042110 return ;
@@ -2118,24 +2124,30 @@ function filterSubsequentPayloads(
21182124 // ignore error
21192125 } ) ;
21202126 }
2121- exeContext . subsequentPayloads . delete ( incrementalDataRecord ) ;
2127+ publisher . delete ( incrementalDataRecord ) ;
21222128 } ) ;
21232129}
21242130
2125- function getCompletedIncrementalResults (
2126- exeContext : ExecutionContext ,
2127- ) : Array < IncrementalResult > {
2131+ function getIncrementalResult (
2132+ pending : ReadonlySet < IncrementalDataRecord > ,
2133+ publisher : Publisher <
2134+ IncrementalDataRecord ,
2135+ SubsequentIncrementalExecutionResult
2136+ > ,
2137+ ) : SubsequentIncrementalExecutionResult | undefined {
21282138 const incrementalResults : Array < IncrementalResult > = [ ] ;
2129- for ( const incrementalDataRecord of exeContext . subsequentPayloads ) {
2139+ let encounteredCompletedAsyncIterator = false ;
2140+ for ( const incrementalDataRecord of pending ) {
21302141 const incrementalResult : IncrementalResult = { } ;
21312142 if ( ! incrementalDataRecord . isCompleted ) {
21322143 continue ;
21332144 }
2134- exeContext . subsequentPayloads . delete ( incrementalDataRecord ) ;
2145+ publisher . delete ( incrementalDataRecord ) ;
21352146 if ( isStreamItemsRecord ( incrementalDataRecord ) ) {
21362147 const items = incrementalDataRecord . items ;
21372148 if ( incrementalDataRecord . isCompletedAsyncIterator ) {
21382149 // async iterable resolver just finished but there may be pending payloads
2150+ encounteredCompletedAsyncIterator = true ;
21392151 continue ;
21402152 }
21412153 ( incrementalResult as IncrementalStreamResult ) . items = items ;
@@ -2153,80 +2165,27 @@ function getCompletedIncrementalResults(
21532165 }
21542166 incrementalResults . push ( incrementalResult ) ;
21552167 }
2156- return incrementalResults ;
2157- }
2158-
2159- function yieldSubsequentPayloads (
2160- exeContext : ExecutionContext ,
2161- ) : AsyncGenerator < SubsequentIncrementalExecutionResult , void , void > {
2162- let isDone = false ;
2163-
2164- async function next ( ) : Promise <
2165- IteratorResult < SubsequentIncrementalExecutionResult , void >
2166- > {
2167- if ( isDone ) {
2168- return { value : undefined , done : true } ;
2169- }
2170-
2171- await Promise . race (
2172- Array . from ( exeContext . subsequentPayloads ) . map ( ( p ) => p . promise ) ,
2173- ) ;
21742168
2175- if ( isDone ) {
2176- // a different call to next has exhausted all payloads
2177- return { value : undefined , done : true } ;
2178- }
2179-
2180- const incremental = getCompletedIncrementalResults ( exeContext ) ;
2181- const hasNext = exeContext . subsequentPayloads . size > 0 ;
2182-
2183- if ( ! incremental . length && hasNext ) {
2184- return next ( ) ;
2185- }
2169+ return incrementalResults . length
2170+ ? { incremental : incrementalResults , hasNext : publisher . hasNext ( ) }
2171+ : encounteredCompletedAsyncIterator && ! publisher . hasNext ( )
2172+ ? { hasNext : false }
2173+ : undefined ;
2174+ }
21862175
2187- if ( ! hasNext ) {
2188- isDone = true ;
2176+ async function returnStreamIterators (
2177+ pendingRecords : ReadonlySet < IncrementalDataRecord > ,
2178+ ) : Promise < void > {
2179+ const promises : Array < Promise < IteratorResult < unknown > > > = [ ] ;
2180+ pendingRecords . forEach ( ( incrementalDataRecord ) => {
2181+ if (
2182+ isStreamItemsRecord ( incrementalDataRecord ) &&
2183+ incrementalDataRecord . asyncIterator ?. return
2184+ ) {
2185+ promises . push ( incrementalDataRecord . asyncIterator . return ( ) ) ;
21892186 }
2190-
2191- return {
2192- value : incremental . length ? { incremental, hasNext } : { hasNext } ,
2193- done : false ,
2194- } ;
2195- }
2196-
2197- function returnStreamIterators ( ) {
2198- const promises : Array < Promise < IteratorResult < unknown > > > = [ ] ;
2199- exeContext . subsequentPayloads . forEach ( ( incrementalDataRecord ) => {
2200- if (
2201- isStreamItemsRecord ( incrementalDataRecord ) &&
2202- incrementalDataRecord . asyncIterator ?. return
2203- ) {
2204- promises . push ( incrementalDataRecord . asyncIterator . return ( ) ) ;
2205- }
2206- } ) ;
2207- return Promise . all ( promises ) ;
2208- }
2209-
2210- return {
2211- [ Symbol . asyncIterator ] ( ) {
2212- return this ;
2213- } ,
2214- next,
2215- async return ( ) : Promise <
2216- IteratorResult < SubsequentIncrementalExecutionResult , void >
2217- > {
2218- await returnStreamIterators ( ) ;
2219- isDone = true ;
2220- return { value : undefined , done : true } ;
2221- } ,
2222- async throw (
2223- error ?: unknown ,
2224- ) : Promise < IteratorResult < SubsequentIncrementalExecutionResult , void > > {
2225- await returnStreamIterators ( ) ;
2226- isDone = true ;
2227- return Promise . reject ( error ) ;
2228- } ,
2229- } ;
2187+ } ) ;
2188+ await Promise . all ( promises ) ;
22302189}
22312190
22322191class DeferredFragmentRecord {
@@ -2252,7 +2211,7 @@ class DeferredFragmentRecord {
22522211 this . parentContext = opts . parentContext ;
22532212 this . errors = [ ] ;
22542213 this . _exeContext = opts . exeContext ;
2255- this . _exeContext . subsequentPayloads . add ( this ) ;
2214+ this . _exeContext . publisher . add ( this ) ;
22562215 this . isCompleted = false ;
22572216 this . data = null ;
22582217 this . promise = new Promise < ObjMap < unknown > | null > ( ( resolve ) => {
@@ -2303,7 +2262,7 @@ class StreamItemsRecord {
23032262 this . asyncIterator = opts . asyncIterator ;
23042263 this . errors = [ ] ;
23052264 this . _exeContext = opts . exeContext ;
2306- this . _exeContext . subsequentPayloads . add ( this ) ;
2265+ this . _exeContext . publisher . add ( this ) ;
23072266 this . isCompleted = false ;
23082267 this . items = null ;
23092268 this . promise = new Promise < Array < unknown > | null > ( ( resolve ) => {
0 commit comments