@@ -109,6 +109,12 @@ export interface FormattedCompletedResult {
109109 errors ?: ReadonlyArray < GraphQLError > ;
110110}
111111
112+ interface IncrementalAggregate {
113+ newPendingSources : Set < DeferredFragmentRecord | StreamRecord > ;
114+ incrementalResults : Array < IncrementalResult > ;
115+ completedResults : Array < CompletedResult > ;
116+ }
117+
112118/**
113119 * This class is used to publish incremental results to the client, enabling semi-concurrent
114120 * execution while preserving result order.
@@ -179,20 +185,28 @@ export class IncrementalPublisher {
179185 return { value : undefined , done : true } ;
180186 }
181187
182- for ( const item of this . _released ) {
183- this . _pending . delete ( item ) ;
184- }
185- const released = this . _released ;
186- this . _released = new Set ( ) ;
188+ if ( this . _released . size > 0 ) {
189+ let aggregate = this . _incrementalInitializer ( ) ;
190+ do {
191+ for ( const item of this . _released ) {
192+ this . _pending . delete ( item ) ;
193+ }
194+ const released = this . _released ;
195+ this . _released = new Set ( ) ;
187196
188- const result = this . _getIncrementalResult ( released ) ;
197+ aggregate = this . _incrementalReducer ( aggregate , released ) ;
198+ } while ( this . _released . size > 0 ) ;
189199
190- if ( ! this . hasNext ( ) ) {
191- isDone = true ;
192- }
200+ const hasNext = this . hasNext ( ) ;
201+
202+ if ( ! hasNext ) {
203+ isDone = true ;
204+ }
193205
194- if ( result !== undefined ) {
195- return { value : result , done : false } ;
206+ return {
207+ value : this . _incrementalFinalizer ( aggregate ) ,
208+ done : false ,
209+ } ;
196210 }
197211
198212 // eslint-disable-next-line no-await-in-loop
@@ -532,37 +546,20 @@ export class IncrementalPublisher {
532546 this . _trigger ( ) ;
533547 }
534548
535- private _getIncrementalResult (
536- completedRecords : ReadonlySet < SubsequentResultRecord > ,
537- ) : SubsequentIncrementalExecutionResult | undefined {
538- const { pending, incremental, completed } =
539- this . _processPending ( completedRecords ) ;
540-
541- const hasNext = this . hasNext ( ) ;
542- if ( incremental . length === 0 && completed . length === 0 && hasNext ) {
543- return undefined ;
544- }
545-
546- const result : SubsequentIncrementalExecutionResult = { hasNext } ;
547- if ( pending . length ) {
548- result . pending = pending ;
549- }
550- if ( incremental . length ) {
551- result . incremental = incremental ;
552- }
553- if ( completed . length ) {
554- result . completed = completed ;
555- }
556-
557- return result ;
549+ private _incrementalInitializer ( ) : IncrementalAggregate {
550+ return {
551+ newPendingSources : new Set < DeferredFragmentRecord | StreamRecord > ( ) ,
552+ incrementalResults : [ ] ,
553+ completedResults : [ ] ,
554+ } ;
558555 }
559556
560- private _processPending (
557+ private _incrementalReducer (
558+ aggregate : IncrementalAggregate ,
561559 completedRecords : ReadonlySet < SubsequentResultRecord > ,
562- ) : IncrementalUpdate {
563- const newPendingSources = new Set < DeferredFragmentRecord | StreamRecord > ( ) ;
564- const incrementalResults : Array < IncrementalResult > = [ ] ;
565- const completedResults : Array < CompletedResult > = [ ] ;
560+ ) : IncrementalAggregate {
561+ const { newPendingSources, incrementalResults, completedResults } =
562+ aggregate ;
566563 for ( const subsequentResultRecord of completedRecords ) {
567564 for ( const child of subsequentResultRecord . children ) {
568565 const pendingSource = isStreamItemsRecord ( child )
@@ -625,11 +622,30 @@ export class IncrementalPublisher {
625622 }
626623 }
627624
628- return {
629- pending : this . pendingSourcesToResults ( newPendingSources ) ,
630- incremental : incrementalResults ,
631- completed : completedResults ,
625+ return aggregate ;
626+ }
627+
628+ private _incrementalFinalizer (
629+ aggregate : IncrementalAggregate ,
630+ ) : SubsequentIncrementalExecutionResult {
631+ const { newPendingSources, incrementalResults, completedResults } =
632+ aggregate ;
633+ const pendingResults = this . pendingSourcesToResults ( newPendingSources ) ;
634+
635+ const result : SubsequentIncrementalExecutionResult = {
636+ hasNext : this . hasNext ( ) ,
632637 } ;
638+ if ( pendingResults . length ) {
639+ result . pending = pendingResults ;
640+ }
641+ if ( incrementalResults . length ) {
642+ result . incremental = incrementalResults ;
643+ }
644+ if ( completedResults . length ) {
645+ result . completed = completedResults ;
646+ }
647+
648+ return result ;
633649 }
634650
635651 private _completedRecordToResult (
0 commit comments