@@ -169,6 +169,12 @@ export interface FormattedCompletedResult {
169169 errors ?: ReadonlyArray < GraphQLError > ;
170170}
171171
172+ interface IncrementalAggregate {
173+ newPendingSources : Set < DeferredFragmentRecord | StreamRecord > ;
174+ incrementalResults : Array < IncrementalResult > ;
175+ completedResults : Array < CompletedResult > ;
176+ }
177+
172178/**
173179 * This class is used to publish incremental results to the client, enabling semi-concurrent
174180 * execution while preserving result order.
@@ -482,20 +488,28 @@ export class IncrementalPublisher {
482488 return { value : undefined , done : true } ;
483489 }
484490
485- for ( const item of this . _released ) {
486- this . _pending . delete ( item ) ;
487- }
488- const released = this . _released ;
489- this . _released = new Set ( ) ;
491+ if ( this . _released . size > 0 ) {
492+ let aggregate = this . _incrementalInitializer ( ) ;
493+ do {
494+ for ( const item of this . _released ) {
495+ this . _pending . delete ( item ) ;
496+ }
497+ const released = this . _released ;
498+ this . _released = new Set ( ) ;
490499
491- const result = this . _getIncrementalResult ( released ) ;
500+ aggregate = this . _incrementalReducer ( aggregate , released ) ;
501+ } while ( this . _released . size > 0 ) ;
492502
493- if ( this . _pending . size === 0 ) {
494- isDone = true ;
495- }
503+ const hasNext = this . _pending . size > 0 ;
504+
505+ if ( ! hasNext ) {
506+ isDone = true ;
507+ }
496508
497- if ( result !== undefined ) {
498- return { value : result , done : false } ;
509+ return {
510+ value : this . _incrementalFinalizer ( aggregate ) ,
511+ done : false ,
512+ } ;
499513 }
500514
501515 // eslint-disable-next-line no-await-in-loop
@@ -577,37 +591,20 @@ export class IncrementalPublisher {
577591 this . _trigger ( ) ;
578592 }
579593
580- private _getIncrementalResult (
581- completedRecords : ReadonlySet < SubsequentResultRecord > ,
582- ) : SubsequentIncrementalExecutionResult | undefined {
583- const { pending, incremental, completed } =
584- this . _processPending ( completedRecords ) ;
585-
586- const hasNext = this . _pending . size > 0 ;
587- if ( incremental . length === 0 && completed . length === 0 && hasNext ) {
588- return undefined ;
589- }
590-
591- const result : SubsequentIncrementalExecutionResult = { hasNext } ;
592- if ( pending . length ) {
593- result . pending = pending ;
594- }
595- if ( incremental . length ) {
596- result . incremental = incremental ;
597- }
598- if ( completed . length ) {
599- result . completed = completed ;
600- }
601-
602- return result ;
594+ private _incrementalInitializer ( ) : IncrementalAggregate {
595+ return {
596+ newPendingSources : new Set < DeferredFragmentRecord | StreamRecord > ( ) ,
597+ incrementalResults : [ ] ,
598+ completedResults : [ ] ,
599+ } ;
603600 }
604601
605- private _processPending (
602+ private _incrementalReducer (
603+ aggregate : IncrementalAggregate ,
606604 completedRecords : ReadonlySet < SubsequentResultRecord > ,
607- ) : IncrementalUpdate {
608- const newPendingSources = new Set < DeferredFragmentRecord | StreamRecord > ( ) ;
609- const incrementalResults : Array < IncrementalResult > = [ ] ;
610- const completedResults : Array < CompletedResult > = [ ] ;
605+ ) : IncrementalAggregate {
606+ const { newPendingSources, incrementalResults, completedResults } =
607+ aggregate ;
611608 for ( const subsequentResultRecord of completedRecords ) {
612609 for ( const child of subsequentResultRecord . children ) {
613610 if ( child . filtered ) {
@@ -673,11 +670,30 @@ export class IncrementalPublisher {
673670 }
674671 }
675672
676- return {
677- pending : this . pendingSourcesToResults ( newPendingSources ) ,
678- incremental : incrementalResults ,
679- completed : completedResults ,
673+ return aggregate ;
674+ }
675+
676+ private _incrementalFinalizer (
677+ aggregate : IncrementalAggregate ,
678+ ) : SubsequentIncrementalExecutionResult {
679+ const { newPendingSources, incrementalResults, completedResults } =
680+ aggregate ;
681+ const pendingResults = this . pendingSourcesToResults ( newPendingSources ) ;
682+
683+ const result : SubsequentIncrementalExecutionResult = {
684+ hasNext : this . _pending . size > 0 ,
680685 } ;
686+ if ( pendingResults . length ) {
687+ result . pending = pendingResults ;
688+ }
689+ if ( incrementalResults . length ) {
690+ result . incremental = incrementalResults ;
691+ }
692+ if ( completedResults . length ) {
693+ result . completed = completedResults ;
694+ }
695+
696+ return result ;
681697 }
682698
683699 private _completedRecordToResult (
0 commit comments