From 7bdc82ecf3bbf7bee903a40cc0a07054243ab05b Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Mon, 3 Nov 2025 21:28:42 +0200 Subject: [PATCH] fix(Queue): calling stop should not drop pushed values --- src/execution/Queue.ts | 12 +++++++----- src/execution/__tests__/Queue-test.ts | 19 +++++++++++++------ 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/src/execution/Queue.ts b/src/execution/Queue.ts index c582bd52e7..bd9a9a076d 100644 --- a/src/execution/Queue.ts +++ b/src/execution/Queue.ts @@ -59,12 +59,12 @@ export class Queue { } private _nextBatch(): Promise | undefined> { - if (this._stopped) { - return Promise.resolve(undefined); - } if (this._items.length) { return Promise.resolve(this.batch()); } + if (this._stopped) { + return Promise.resolve(undefined); + } const { promise, resolve } = promiseWithResolvers< Generator | undefined >(); @@ -73,8 +73,10 @@ export class Queue { } private _push(item: T): void { - this._items.push(item); - this._resolve(this.batch()); + if (!this._stopped) { + this._items.push(item); + this._resolve(this.batch()); + } } private _resolve(maybeIterable: Generator | undefined): void { diff --git a/src/execution/__tests__/Queue-test.ts b/src/execution/__tests__/Queue-test.ts index 2096c9f28e..405deaa6b1 100644 --- a/src/execution/__tests__/Queue-test.ts +++ b/src/execution/__tests__/Queue-test.ts @@ -3,8 +3,6 @@ import { describe, it } from 'mocha'; import { resolveOnNextTick } from '../../__testUtils__/resolveOnNextTick.js'; -import { promiseWithResolvers } from '../../jsutils/promiseWithResolvers.js'; - import { Queue } from '../Queue.js'; describe('Queue', () => { @@ -91,6 +89,17 @@ describe('Queue', () => { }); it('should allow the executor to indicate completion', async () => { + const queue = new Queue((push, stop) => { + push(1); + stop(); + }); + + const sub = queue.subscribe((batch) => Array.from(batch)); + expect(await sub.next()).to.deep.equal({ done: false, value: [1] }); + expect(await sub.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('should allow the executor to indicate completion prior to any push calls', async () => { const queue = new Queue((push, stop) => { stop(); push(1); // should be ignored @@ -101,10 +110,8 @@ describe('Queue', () => { }); it('should allow a consumer to abort a pending call to next', async () => { - const queue = new Queue(async () => { - const { promise } = promiseWithResolvers(); - // wait forever - await promise; + const queue = new Queue(() => { + // no pushes }); const sub = queue.subscribe((batch) => batch);