Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions src/execution/Queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ export class Queue<T> {
}

private _nextBatch(): Promise<Generator<T> | undefined> {
if (this._stopped) {
return Promise.resolve(undefined);
}
if (this._items.length) {
return Promise.resolve(this.batch());
}
if (this._stopped) {
return Promise.resolve(undefined);
}
const { promise, resolve } = promiseWithResolvers<
Generator<T> | undefined
>();
Expand All @@ -73,8 +73,10 @@ export class Queue<T> {
}

private _push(item: T): void {
this._items.push(item);
this._resolve(this.batch());
if (!this._stopped) {
this._items.push(item);
this._resolve(this.batch());
}
}

private _resolve(maybeIterable: Generator<T> | undefined): void {
Expand Down
19 changes: 13 additions & 6 deletions src/execution/__tests__/Queue-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ import { describe, it } from 'mocha';

import { resolveOnNextTick } from '../../__testUtils__/resolveOnNextTick.js';

import { promiseWithResolvers } from '../../jsutils/promiseWithResolvers.js';

import { Queue } from '../Queue.js';

describe('Queue', () => {
Expand Down Expand Up @@ -91,6 +89,17 @@ describe('Queue', () => {
});

it('should allow the executor to indicate completion', async () => {
const queue = new Queue<number>((push, stop) => {
push(1);
stop();
});

const sub = queue.subscribe((batch) => Array.from(batch));
expect(await sub.next()).to.deep.equal({ done: false, value: [1] });
expect(await sub.next()).to.deep.equal({ done: true, value: undefined });
});

it('should allow the executor to indicate completion prior to any push calls', async () => {
const queue = new Queue<number>((push, stop) => {
stop();
push(1); // should be ignored
Expand All @@ -101,10 +110,8 @@ describe('Queue', () => {
});

it('should allow a consumer to abort a pending call to next', async () => {
const queue = new Queue<number>(async () => {
const { promise } = promiseWithResolvers();
// wait forever
await promise;
const queue = new Queue<number>(() => {
// no pushes
});

const sub = queue.subscribe((batch) => batch);
Expand Down
Loading