@@ -109,6 +109,12 @@ export interface FormattedCompletedResult {
109109 errors ?: ReadonlyArray < GraphQLError > ;
110110}
111111
112+ interface IncrementalAggregate {
113+ newPendingSources : Set < DeferredFragmentRecord | StreamRecord > ;
114+ incrementalResults : Array < IncrementalResult > ;
115+ completedResults : Array < CompletedResult > ;
116+ }
117+
112118/**
113119 * This class is used to publish incremental results to the client, enabling semi-concurrent
114120 * execution while preserving result order.
@@ -179,20 +185,28 @@ export class IncrementalPublisher {
179185 return { value : undefined , done : true } ;
180186 }
181187
182- for ( const item of this . _released ) {
183- this . _pending . delete ( item ) ;
184- }
185- const released = this . _released ;
186- this . _released = new Set ( ) ;
188+ if ( this . _released . size > 0 ) {
189+ let aggregate = this . _incrementalInitializer ( ) ;
190+ do {
191+ for ( const item of this . _released ) {
192+ this . _pending . delete ( item ) ;
193+ }
194+ const released = this . _released ;
195+ this . _released = new Set ( ) ;
187196
188- const result = this . _getIncrementalResult ( released ) ;
197+ aggregate = this . _incrementalReducer ( aggregate , released ) ;
198+ } while ( this . _released . size > 0 ) ;
189199
190- if ( ! this . hasNext ( ) ) {
191- isDone = true ;
192- }
200+ const hasNext = this . hasNext ( ) ;
201+
202+ if ( ! hasNext ) {
203+ isDone = true ;
204+ }
193205
194- if ( result !== undefined ) {
195- return { value : result , done : false } ;
206+ return {
207+ value : this . _incrementalFinalizer ( aggregate ) ,
208+ done : false ,
209+ } ;
196210 }
197211
198212 // eslint-disable-next-line no-await-in-loop
@@ -542,37 +556,20 @@ export class IncrementalPublisher {
542556 this . _trigger ( ) ;
543557 }
544558
545- private _getIncrementalResult (
546- completedRecords : ReadonlySet < SubsequentResultRecord > ,
547- ) : SubsequentIncrementalExecutionResult | undefined {
548- const { pending, incremental, completed } =
549- this . _processPending ( completedRecords ) ;
550-
551- const hasNext = this . hasNext ( ) ;
552- if ( incremental . length === 0 && completed . length === 0 && hasNext ) {
553- return undefined ;
554- }
555-
556- const result : SubsequentIncrementalExecutionResult = { hasNext } ;
557- if ( pending . length ) {
558- result . pending = pending ;
559- }
560- if ( incremental . length ) {
561- result . incremental = incremental ;
562- }
563- if ( completed . length ) {
564- result . completed = completed ;
565- }
566-
567- return result ;
559+ private _incrementalInitializer ( ) : IncrementalAggregate {
560+ return {
561+ newPendingSources : new Set < DeferredFragmentRecord | StreamRecord > ( ) ,
562+ incrementalResults : [ ] ,
563+ completedResults : [ ] ,
564+ } ;
568565 }
569566
570- private _processPending (
567+ private _incrementalReducer (
568+ aggregate : IncrementalAggregate ,
571569 completedRecords : ReadonlySet < SubsequentResultRecord > ,
572- ) : IncrementalUpdate {
573- const newPendingSources = new Set < DeferredFragmentRecord | StreamRecord > ( ) ;
574- const incrementalResults : Array < IncrementalResult > = [ ] ;
575- const completedResults : Array < CompletedResult > = [ ] ;
570+ ) : IncrementalAggregate {
571+ const { newPendingSources, incrementalResults, completedResults } =
572+ aggregate ;
576573 for ( const subsequentResultRecord of completedRecords ) {
577574 for ( const child of subsequentResultRecord . children ) {
578575 const pendingSource = isStreamItemsRecord ( child )
@@ -635,11 +632,30 @@ export class IncrementalPublisher {
635632 }
636633 }
637634
638- return {
639- pending : this . pendingSourcesToResults ( newPendingSources ) ,
640- incremental : incrementalResults ,
641- completed : completedResults ,
635+ return aggregate ;
636+ }
637+
638+ private _incrementalFinalizer (
639+ aggregate : IncrementalAggregate ,
640+ ) : SubsequentIncrementalExecutionResult {
641+ const { newPendingSources, incrementalResults, completedResults } =
642+ aggregate ;
643+ const pendingResults = this . pendingSourcesToResults ( newPendingSources ) ;
644+
645+ const result : SubsequentIncrementalExecutionResult = {
646+ hasNext : this . hasNext ( ) ,
642647 } ;
648+ if ( pendingResults . length ) {
649+ result . pending = pendingResults ;
650+ }
651+ if ( incrementalResults . length ) {
652+ result . incremental = incrementalResults ;
653+ }
654+ if ( completedResults . length ) {
655+ result . completed = completedResults ;
656+ }
657+
658+ return result ;
643659 }
644660
645661 private _completedRecordToResult (
0 commit comments