Skip to content

Commit 7776805

Browse files
authored
Support ordered iteration (#29)
* Initial basic ordered support * Support subiterators * Remove completed TODO
1 parent f77358b commit 7776805

File tree

7 files changed

+352
-37
lines changed

7 files changed

+352
-37
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
</div>
88

99

10-
Buffered processing of async iterables / generators in parallel to achieve comparable performance to `Promise.all()`
10+
Buffered parallel processing of async iterables / generators.
1111

1212
[![npm version](https://img.shields.io/npm/v/buffered-async-iterable.svg?style=flat)](https://www.npmjs.com/package/buffered-async-iterable)
1313
[![npm downloads](https://img.shields.io/npm/dm/buffered-async-iterable.svg?style=flat)](https://www.npmjs.com/package/buffered-async-iterable)
@@ -16,7 +16,6 @@ Buffered processing of async iterables / generators in parallel to achieve compa
1616
[![js-semistandard-style](https://img.shields.io/badge/code%20style-semistandard-brightgreen.svg)](https://github.com/voxpelli/eslint-config)
1717
[![Follow @voxpelli@mastodon.social](https://img.shields.io/mastodon/follow/109247025527949675?domain=https%3A%2F%2Fmastodon.social&style=social)](https://mastodon.social/@voxpelli)
1818

19-
**WORK IN PROGRESS – early prerelease**
2019

2120
## Usage
2221

@@ -80,6 +79,7 @@ Iterates and applies the `callback` to up to `bufferSize` items from `input` yie
8079
#### Options
8180

8281
* `bufferSize`_optional_ – defaults to `6`, sets the max amount of simultanoeus items that processed at once in the buffer.
82+
* `ordered`_optional_ – defaults to `false`, when `true` the result will be returned in order instead of unordered
8383

8484
## Similar modules
8585

index.js

Lines changed: 48 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,25 @@
55
// TODO: Look into https://tc39.es/ecma262/#sec-iteratorclose / https://tc39.es/ecma262/#sec-asynciteratorclose
66
// TODO: See "iteratorKind" in https://tc39.es/ecma262/#sec-runtime-semantics-forin-div-ofbodyevaluation-lhs-stmt-iterator-lhskind-labelset – see how it loops and validates the returned values
77
// TODO: THERE'S ACTUALLY A "throw" method MENTION IN https://tc39.es/ecma262/#sec-generator-function-definitions-runtime-semantics-evaluation: "NOTE: Exceptions from the inner iterator throw method are propagated. Normal completions from an inner throw method are processed similarly to an inner next." THOUGH NOT SURE HOW TO TRIGGER IT IN PRACTICE, SEE yield.spec.js
8-
// TODO: Have option to persist order? To not use Promise.race()?
98
// TODO: Make a proper merge for async iterables by accepting multiple input iterables, see: https://twitter.com/matteocollina/status/1392056092482576385
109

1110
import { findLeastTargeted } from './lib/find-least-targeted.js';
12-
import { makeIterableAsync } from './lib/misc.js';
13-
import { isAsyncIterable, isIterable, isPartOfSet } from './lib/type-checks.js';
11+
import { arrayDeleteInPlace, makeIterableAsync } from './lib/misc.js';
12+
import { isAsyncIterable, isIterable, isPartOfArray } from './lib/type-checks.js';
1413

1514
/**
1615
* @template T
1716
* @template R
1817
* @param {AsyncIterable<T> | Iterable<T> | T[]} input
1918
* @param {(item: T) => (Promise<R>|AsyncIterable<R>)} callback
20-
* @param {{ bufferSize?: number|undefined }} [options]
19+
* @param {{ bufferSize?: number|undefined, ordered?: boolean|undefined }} [options]
2120
* @returns {AsyncIterableIterator<R> & { return: NonNullable<AsyncIterableIterator<R>["return"]>, throw: NonNullable<AsyncIterableIterator<R>["throw"]> }}
2221
*/
2322
export function bufferedAsyncMap (input, callback, options) {
2423
/** @typedef {Promise<IteratorResult<R|AsyncIterable<R>> & { bufferPromise: BufferPromise, fromSubIterator?: boolean, isSubIterator?: boolean, err?: unknown }>} BufferPromise */
2524
const {
2625
bufferSize = 6,
26+
ordered = false,
2727
} = options || {};
2828

2929
/** @type {AsyncIterable<T>} */
@@ -39,11 +39,11 @@ export function bufferedAsyncMap (input, callback, options) {
3939
/** @type {AsyncIterator<T, unknown>} */
4040
const asyncIterator = asyncIterable[Symbol.asyncIterator]();
4141

42-
/** @type {Set<AsyncIterator<R, unknown>>} */
43-
const subIterators = new Set();
42+
/** @type {AsyncIterator<R, unknown>[]} */
43+
const subIterators = [];
4444

45-
/** @type {Set<BufferPromise>} */
46-
const bufferedPromises = new Set();
45+
/** @type {BufferPromise[]} */
46+
const bufferedPromises = [];
4747

4848
/** @type {WeakMap<BufferPromise, AsyncIterator<T>|AsyncIterator<R>>} */
4949
const promisesToSourceIteratorMap = new WeakMap();
@@ -76,8 +76,8 @@ export function bufferedAsyncMap (input, callback, options) {
7676
);
7777

7878
// TODO: Could we use an AbortController to improve this? See eg. https://github.com/mcollina/hwp/pull/10
79-
bufferedPromises.clear();
80-
subIterators.clear();
79+
bufferedPromises.splice(0, bufferedPromises.length);
80+
subIterators.splice(0, subIterators.length);
8181

8282
if (throwAnyError && hasError) {
8383
throw hasError;
@@ -90,14 +90,20 @@ export function bufferedAsyncMap (input, callback, options) {
9090
const fillQueue = () => {
9191
if (hasError || isDone) return;
9292

93-
// Check which iterator that has the least amount of queued promises right now
94-
const iterator = findLeastTargeted(
95-
mainReturnedDone ? subIterators : [...subIterators, asyncIterator],
96-
bufferedPromises,
97-
promisesToSourceIteratorMap
98-
);
93+
/** @type {AsyncIterator<R, unknown>|undefined} */
94+
let currentSubIterator;
9995

100-
const currentSubIterator = isPartOfSet(iterator, subIterators) ? iterator : undefined;
96+
if (ordered) {
97+
currentSubIterator = subIterators[0];
98+
} else {
99+
const iterator = findLeastTargeted(
100+
mainReturnedDone ? subIterators : [...subIterators, asyncIterator],
101+
bufferedPromises,
102+
promisesToSourceIteratorMap
103+
);
104+
105+
currentSubIterator = isPartOfArray(iterator, subIterators) ? iterator : undefined;
106+
}
101107

102108
/** @type {BufferPromise} */
103109
const bufferPromise = currentSubIterator
@@ -110,7 +116,7 @@ export function bufferedAsyncMap (input, callback, options) {
110116
throw new TypeError('Expected an object value');
111117
}
112118
if ('err' in result || result.done) {
113-
subIterators.delete(currentSubIterator);
119+
arrayDeleteInPlace(subIterators, currentSubIterator);
114120
}
115121

116122
/** @type {Awaited<BufferPromise>} */
@@ -174,29 +180,43 @@ export function bufferedAsyncMap (input, callback, options) {
174180
});
175181

176182
promisesToSourceIteratorMap.set(bufferPromise, currentSubIterator || asyncIterator);
177-
bufferedPromises.add(bufferPromise);
178183

179-
if (bufferedPromises.size < bufferSize) {
184+
if (ordered && currentSubIterator) {
185+
let i = 0;
186+
187+
while (promisesToSourceIteratorMap.get(/** @type {BufferPromise} */ (bufferedPromises[i])) === currentSubIterator) {
188+
i += 1;
189+
}
190+
191+
bufferedPromises.splice(i, 0, bufferPromise);
192+
} else {
193+
bufferedPromises.push(bufferPromise);
194+
}
195+
196+
if (bufferedPromises.length < bufferSize) {
180197
fillQueue();
181198
}
182199
};
183200

184201
/** @type {AsyncIterator<R>["next"]} */
185202
const nextValue = async () => {
186-
if (bufferedPromises.size === 0) return markAsEnded(true);
203+
const nextBufferedPromise = bufferedPromises[0];
204+
205+
if (!nextBufferedPromise) return markAsEnded(true);
187206
if (isDone) return { done: true, value: undefined };
188207

208+
/** @type {Awaited<BufferPromise>} */
209+
const resolvedPromise = await (ordered ? nextBufferedPromise : Promise.race(bufferedPromises));
210+
arrayDeleteInPlace(bufferedPromises, resolvedPromise.bufferPromise);
211+
189212
// Wait for some of the current promises to be finished
190213
const {
191-
bufferPromise,
192214
done,
193215
err,
194216
fromSubIterator,
195217
isSubIterator,
196218
value,
197-
} = await Promise.race(bufferedPromises);
198-
199-
bufferedPromises.delete(bufferPromise);
219+
} = resolvedPromise;
200220

201221
// We are mandated by the spec to always do this return if the iterator is done
202222
if (isDone) {
@@ -206,16 +226,16 @@ export function bufferedAsyncMap (input, callback, options) {
206226
hasError = err instanceof Error ? err : new Error('Unknown error');
207227
}
208228

209-
if (fromSubIterator || subIterators.size !== 0) {
229+
if (fromSubIterator || subIterators.length > 0) {
210230
fillQueue();
211231
}
212232

213-
return bufferedPromises.size === 0
233+
return bufferedPromises.length === 0
214234
? markAsEnded(true)
215235
: nextValue();
216236
} else if (isSubIterator && isAsyncIterable(value)) {
217237
// TODO: Handle possible error here? Or too obscure?
218-
subIterators.add(value[Symbol.asyncIterator]());
238+
subIterators.unshift(value[Symbol.asyncIterator]());
219239
fillQueue();
220240
return nextValue();
221241
} else {

lib/find-least-targeted.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/**
22
* @template Target
33
* @template {object} Item
4-
* @param {Set<Item>} items
4+
* @param {Iterable<Item> | Item[]} items
55
* @param {WeakMap<Item, Target>} itemTargets
66
* @returns {Map<Target, number>}
77
*/
@@ -25,7 +25,7 @@ function countTargets (items, itemTargets) {
2525
* @template Target
2626
* @template {object} Item
2727
* @param {Iterable<Target> | Target[]} targets
28-
* @param {Set<Item>} items
28+
* @param {Iterable<Item> | Item[]} items
2929
* @param {WeakMap<Item, Target>} itemTargets
3030
* @returns {Target|undefined}
3131
*/

lib/misc.js

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,17 @@ export async function * makeIterableAsync (input) {
88
yield value;
99
}
1010
}
11+
12+
/**
13+
* Similar to the .delete() on a set
14+
*
15+
* @template T
16+
* @param {T[]} list
17+
* @param {T} value
18+
*/
19+
export function arrayDeleteInPlace (list, value) {
20+
const index = list.indexOf(value);
21+
if (index !== -1) {
22+
list.splice(index, 1);
23+
}
24+
}

lib/type-checks.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ export const isIterable = (value) => Boolean(value && typeof value === 'object'
1111
export const isAsyncIterable = (value) => Boolean(value && typeof value === 'object' && Symbol.asyncIterator in value);
1212

1313
/**
14-
* @template SetValue
14+
* @template Values
1515
* @param {unknown} value
16-
* @param {Set<SetValue>} set
17-
* @returns {value is SetValue}
16+
* @param {Values[]} list
17+
* @returns {value is Values}
1818
*/
19-
export const isPartOfSet = (value, set) => set.has(/** @type {SetValue} */ (value));
19+
export const isPartOfArray = (value, list) => list.includes(/** @type {Values} */ (value));

test/utils.js

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,17 @@ export async function * yieldValuesOverTimeWithPrefix (count, wait, prefix) {
4242
await promisableTimeout(waitCallback(i));
4343
}
4444
}
45+
46+
/**
47+
* @param {number} count
48+
* @param {number|((i: number) => number)} wait
49+
* @param {(i: number) => AsyncGenerator<string>} nested
50+
* @returns {AsyncIterable<string>}
51+
*/
52+
export async function * nestedYieldValuesOverTime (count, wait, nested) {
53+
const waitCallback = typeof wait === 'number' ? () => wait : wait;
54+
for (let i = 0; i < count; i++) {
55+
yield * nested(i);
56+
await promisableTimeout(waitCallback(i));
57+
}
58+
}

0 commit comments

Comments
 (0)