Skip to content

Commit 0158bf4

Browse files
authored
Add mergeIterables() (#30)
Fixes #18
1 parent 5c458e0 commit 0158bf4

File tree

3 files changed

+125
-44
lines changed

3 files changed

+125
-44
lines changed

README.md

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,16 @@ for await (const item of mappedIterator) {
6969

7070
## API
7171

72-
### `bufferedAsyncMap(input, callback[, { bufferSize=6 }]) => AsyncIterableIterator`
72+
### bufferedAsyncMap()
7373

7474
Iterates and applies the `callback` to up to `bufferSize` items from `input` yielding values as they resolve.
7575

76+
#### Syntax
77+
78+
`bufferedAsyncMap(input, callback[, { bufferSize=6, ordered=false }]) => AsyncIterableIterator`
79+
80+
#### Arguments
81+
7682
* `input` – either an async iterable, an ordinare iterable or an array
7783
* `callback(item)` – should be either an async generator or an ordinary async function. Items from async generators are buffered in the main buffer and the buffer is refilled by the one that has least items in the current buffer (`input` is considered equal to sub iterators in this regard when refilling the buffer)
7884

@@ -81,6 +87,22 @@ Iterates and applies the `callback` to up to `bufferSize` items from `input` yie
8187
* `bufferSize`_optional_ – defaults to `6`, sets the max amount of simultanoeus items that processed at once in the buffer.
8288
* `ordered`_optional_ – defaults to `false`, when `true` the result will be returned in order instead of unordered
8389

90+
### mergeIterables()
91+
92+
Merges all given (async) iterables in parallel, returning the values as they resolve
93+
94+
#### Syntax
95+
96+
`mergeIterables(input[, { bufferSize=6 }]) => AsyncIterableIterator`
97+
98+
#### Arguments
99+
100+
* `input` – either an async iterable, an ordinare iterable or an array
101+
102+
#### Options
103+
104+
* `bufferSize`_optional_ – defaults to `6`, sets the max amount of simultanoeus items that processed at once in the buffer.
105+
84106
## Similar modules
85107

86108
* [`hwp`](https://github.com/mcollina/hwp) – similar module by [@mcollina](https://github.com/mcollina)

index.js

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,30 @@
55
// TODO: Look into https://tc39.es/ecma262/#sec-iteratorclose / https://tc39.es/ecma262/#sec-asynciteratorclose
66
// TODO: See "iteratorKind" in https://tc39.es/ecma262/#sec-runtime-semantics-forin-div-ofbodyevaluation-lhs-stmt-iterator-lhskind-labelset – see how it loops and validates the returned values
77
// TODO: THERE'S ACTUALLY A "throw" method MENTION IN https://tc39.es/ecma262/#sec-generator-function-definitions-runtime-semantics-evaluation: "NOTE: Exceptions from the inner iterator throw method are propagated. Normal completions from an inner throw method are processed similarly to an inner next." THOUGH NOT SURE HOW TO TRIGGER IT IN PRACTICE, SEE yield.spec.js
8-
// TODO: Make a proper merge for async iterables by accepting multiple input iterables, see: https://twitter.com/matteocollina/status/1392056092482576385
98

109
import { findLeastTargeted } from './lib/find-least-targeted.js';
1110
import { arrayDeleteInPlace, makeIterableAsync } from './lib/misc.js';
1211
import { isAsyncIterable, isIterable, isPartOfArray } from './lib/type-checks.js';
1312

13+
/**
14+
* @template T
15+
* @param {AsyncIterable<T> | Iterable<T> | T[]} item
16+
* @returns {AsyncIterable<T>}
17+
*/
18+
async function * yieldIterable (item) {
19+
yield * item;
20+
}
21+
22+
/**
23+
* @template T
24+
* @param {Array<AsyncIterable<T> | Iterable<T> | T[]>} input
25+
* @param {{ bufferSize?: number|undefined }} [options]
26+
* @returns {AsyncIterable<T>}
27+
*/
28+
export async function * mergeIterables (input, { bufferSize } = {}) {
29+
yield * bufferedAsyncMap(input, yieldIterable, { bufferSize });
30+
}
31+
1432
/**
1533
* @template T
1634
* @template R

test/values.spec.js

Lines changed: 83 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import sinonChai from 'sinon-chai';
66

77
import {
88
bufferedAsyncMap,
9+
mergeIterables,
910
} from '../index.js';
1011

1112
import {
@@ -193,62 +194,60 @@ describe('bufferedAsyncMap() values', () => {
193194
sinon.restore();
194195
});
195196

196-
describe('main', () => {
197-
it('should return all values from the original AsyncIterable when looped over', async () => {
198-
// Create the promise first, then have it be fully executed using clock.runAllAsync()
199-
const promisedResult = (async () => {
200-
/** @type {number[]} */
201-
const rawResult = [];
197+
it('should return all values from the original AsyncIterable when looped over', async () => {
198+
// Create the promise first, then have it be fully executed using clock.runAllAsync()
199+
const promisedResult = (async () => {
200+
/** @type {number[]} */
201+
const rawResult = [];
202202

203-
for await (const value of bufferedAsyncMap(baseAsyncIterable, async (item) => item)) {
204-
rawResult.push(value);
205-
}
203+
for await (const value of bufferedAsyncMap(baseAsyncIterable, async (item) => item)) {
204+
rawResult.push(value);
205+
}
206206

207-
/** @type {[number[], number]} */
208-
const result = [rawResult, Date.now()];
207+
/** @type {[number[], number]} */
208+
const result = [rawResult, Date.now()];
209209

210-
return result;
211-
})();
210+
return result;
211+
})();
212212

213-
await clock.runAllAsync();
213+
await clock.runAllAsync();
214214

215-
const [result, duration] = await promisedResult;
215+
const [result, duration] = await promisedResult;
216216

217-
result.should.deep.equal(expectedResult);
218-
duration.should.equal(6300);
219-
});
217+
result.should.deep.equal(expectedResult);
218+
duration.should.equal(6300);
219+
});
220220

221-
it('should return all values from the original AsyncIterable when accessed directly', async () => {
222-
// Create the promise first, then have it be fully executed using clock.runAllAsync()
223-
const promisedResult = (async () => {
224-
const asyncIterable = bufferedAsyncMap(baseAsyncIterable, async (item) => item);
225-
const asyncIterator = asyncIterable[Symbol.asyncIterator]();
221+
it('should return all values from the original AsyncIterable when accessed directly', async () => {
222+
// Create the promise first, then have it be fully executed using clock.runAllAsync()
223+
const promisedResult = (async () => {
224+
const asyncIterable = bufferedAsyncMap(baseAsyncIterable, async (item) => item);
225+
const asyncIterator = asyncIterable[Symbol.asyncIterator]();
226226

227-
/** @type {Promise<IteratorResult<number, void>>[]} */
228-
const iterations = [];
227+
/** @type {Promise<IteratorResult<number, void>>[]} */
228+
const iterations = [];
229229

230-
for (let i = 0; i < count; i++) {
231-
iterations.push(asyncIterator.next());
232-
}
230+
for (let i = 0; i < count; i++) {
231+
iterations.push(asyncIterator.next());
232+
}
233233

234-
const rawResult = await Promise.all(iterations);
234+
const rawResult = await Promise.all(iterations);
235235

236-
/** @type {[(number|void)[], number]} */
237-
const result = [
238-
rawResult.map(item => item.value),
239-
Date.now(),
240-
];
236+
/** @type {[(number|void)[], number]} */
237+
const result = [
238+
rawResult.map(item => item.value),
239+
Date.now(),
240+
];
241241

242-
return result;
243-
})();
242+
return result;
243+
})();
244244

245-
await clock.runAllAsync();
245+
await clock.runAllAsync();
246246

247-
const [result, duration] = await promisedResult;
247+
const [result, duration] = await promisedResult;
248248

249-
result.should.deep.equal(expectedResult);
250-
duration.should.equal(4300);
251-
});
249+
result.should.deep.equal(expectedResult);
250+
duration.should.equal(4300);
252251
});
253252

254253
it('should return all values from the original AsyncIterable when given as an array', async () => {
@@ -280,7 +279,7 @@ describe('bufferedAsyncMap() values', () => {
280279
duration.should.equal(2000);
281280
});
282281

283-
it('should handle chained async generator values from the original AsyncIterable when looped over', async () => {
282+
it('should handle nested async generator values from the original AsyncIterable when looped over', async () => {
284283
// Create the promise first, then have it be fully executed using clock.runAllAsync()
285284
const promisedResult = (async () => {
286285
/** @type {string[]} */
@@ -890,4 +889,46 @@ describe('bufferedAsyncMap() values', () => {
890889
duration.should.equal(111900);
891890
});
892891
});
892+
893+
describe('mergeIterables', () => {
894+
it('should process iterables in parallel', async () => {
895+
// Create the promise first, then have it be fully executed using clock.runAllAsync()
896+
const promisedResult = (async () => {
897+
/** @type {string[]} */
898+
const rawResult = [];
899+
900+
for await (const value of mergeIterables([
901+
yieldValuesOverTimeWithPrefix(6, (i) => i % 2 === 1 ? 2000 : 100, 'first-'),
902+
yieldValuesOverTimeWithPrefix(6, (i) => i % 2 === 1 ? 2000 : 100, 'second-'),
903+
])) {
904+
rawResult.push(value);
905+
}
906+
907+
/** @type {[string[], number]} */
908+
const result = [rawResult, Date.now()];
909+
910+
return result;
911+
})();
912+
913+
await clock.runAllAsync();
914+
915+
const [result, duration] = await promisedResult;
916+
917+
result.should.deep.equal([
918+
'first-0',
919+
'second-0',
920+
'second-1',
921+
'first-1',
922+
'second-2',
923+
'first-2',
924+
'second-3',
925+
'first-3',
926+
'second-4',
927+
'first-4',
928+
'second-5',
929+
'first-5',
930+
]);
931+
duration.should.equal(6300);
932+
});
933+
});
893934
});

0 commit comments

Comments
 (0)