@@ -162,6 +162,12 @@ export interface FormattedCompletedResult {
162162 errors ?: ReadonlyArray < GraphQLError > ;
163163}
164164
165+ interface IncrementalAggregate {
166+ newPendingSources : Set < DeferredFragmentRecord | StreamRecord > ;
167+ incrementalResults : Array < IncrementalResult > ;
168+ completedResults : Array < CompletedResult > ;
169+ }
170+
165171/**
166172 * This class is used to publish incremental results to the client, enabling semi-concurrent
167173 * execution while preserving result order.
@@ -399,20 +405,28 @@ export class IncrementalPublisher {
399405 return { value : undefined , done : true } ;
400406 }
401407
402- for ( const item of this . _released ) {
403- this . _pending . delete ( item ) ;
404- }
405- const released = this . _released ;
406- this . _released = new Set ( ) ;
408+ if ( this . _released . size > 0 ) {
409+ let aggregate = this . _incrementalInitializer ( ) ;
410+ do {
411+ for ( const item of this . _released ) {
412+ this . _pending . delete ( item ) ;
413+ }
414+ const released = this . _released ;
415+ this . _released = new Set ( ) ;
407416
408- const result = this . _getIncrementalResult ( released ) ;
417+ aggregate = this . _incrementalReducer ( aggregate , released ) ;
418+ } while ( this . _released . size > 0 ) ;
409419
410- if ( this . _pending . size === 0 ) {
411- isDone = true ;
412- }
420+ const hasNext = this . _pending . size > 0 ;
421+
422+ if ( ! hasNext ) {
423+ isDone = true ;
424+ }
413425
414- if ( result !== undefined ) {
415- return { value : result , done : false } ;
426+ return {
427+ value : this . _incrementalFinalizer ( aggregate ) ,
428+ done : false ,
429+ } ;
416430 }
417431
418432 // eslint-disable-next-line no-await-in-loop
@@ -494,37 +508,20 @@ export class IncrementalPublisher {
494508 this . _trigger ( ) ;
495509 }
496510
497- private _getIncrementalResult (
498- completedRecords : ReadonlySet < SubsequentResultRecord > ,
499- ) : SubsequentIncrementalExecutionResult | undefined {
500- const { pending, incremental, completed } =
501- this . _processPending ( completedRecords ) ;
502-
503- const hasNext = this . _pending . size > 0 ;
504- if ( incremental . length === 0 && completed . length === 0 && hasNext ) {
505- return undefined ;
506- }
507-
508- const result : SubsequentIncrementalExecutionResult = { hasNext } ;
509- if ( pending . length ) {
510- result . pending = pending ;
511- }
512- if ( incremental . length ) {
513- result . incremental = incremental ;
514- }
515- if ( completed . length ) {
516- result . completed = completed ;
517- }
518-
519- return result ;
511+ private _incrementalInitializer ( ) : IncrementalAggregate {
512+ return {
513+ newPendingSources : new Set < DeferredFragmentRecord | StreamRecord > ( ) ,
514+ incrementalResults : [ ] ,
515+ completedResults : [ ] ,
516+ } ;
520517 }
521518
522- private _processPending (
519+ private _incrementalReducer (
520+ aggregate : IncrementalAggregate ,
523521 completedRecords : ReadonlySet < SubsequentResultRecord > ,
524- ) : IncrementalUpdate {
525- const newPendingSources = new Set < DeferredFragmentRecord | StreamRecord > ( ) ;
526- const incrementalResults : Array < IncrementalResult > = [ ] ;
527- const completedResults : Array < CompletedResult > = [ ] ;
522+ ) : IncrementalAggregate {
523+ const { newPendingSources, incrementalResults, completedResults } =
524+ aggregate ;
528525 for ( const subsequentResultRecord of completedRecords ) {
529526 for ( const child of subsequentResultRecord . children ) {
530527 if ( child . filtered ) {
@@ -585,11 +582,30 @@ export class IncrementalPublisher {
585582 }
586583 }
587584
588- return {
589- pending : this . pendingSourcesToResults ( newPendingSources ) ,
590- incremental : incrementalResults ,
591- completed : completedResults ,
585+ return aggregate ;
586+ }
587+
588+ private _incrementalFinalizer (
589+ aggregate : IncrementalAggregate ,
590+ ) : SubsequentIncrementalExecutionResult {
591+ const { newPendingSources, incrementalResults, completedResults } =
592+ aggregate ;
593+ const pendingResults = this . pendingSourcesToResults ( newPendingSources ) ;
594+
595+ const result : SubsequentIncrementalExecutionResult = {
596+ hasNext : this . _pending . size > 0 ,
592597 } ;
598+ if ( pendingResults . length ) {
599+ result . pending = pendingResults ;
600+ }
601+ if ( incrementalResults . length ) {
602+ result . incremental = incrementalResults ;
603+ }
604+ if ( completedResults . length ) {
605+ result . completed = completedResults ;
606+ }
607+
608+ return result ;
593609 }
594610
595611 private _completedRecordToResult (
0 commit comments