Skip to content
Open
28 changes: 28 additions & 0 deletions modern-async.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,29 @@ declare module "reflectAsyncStatus" {
export default reflectAsyncStatus;
function reflectAsyncStatus<T>(fct: () => Promise<T> | T): Promise<PromiseSettledResult<T>>;
}
declare module "generatorEntries" {
export default generatorEntries;
function generatorEntries(obj: object): Iterable<[any, any]>;
}
declare module "asyncFromEntries" {
export default asyncFromEntries;
function asyncFromEntries<K, V>(iterable: Iterable<Promise<[K, V]> | [K, V]> | AsyncIterable<[K, V]>): Promise<object>;
}
declare module "asyncMapEntries" {
export default asyncMapEntries;
function asyncMapEntries<K, V>(obj: object, iteratee: (value: V, key: K, obj: object) => Promise<[any, any]> | [any, any], queueOrConcurrency?: Queue | number): Promise<object>;
import Queue from "Queue";
}
declare module "asyncMapKeys" {
export default asyncMapKeys;
function asyncMapKeys<K, V>(obj: object, iteratee: (value: V, key: K, obj: object) => Promise<any> | any, queueOrConcurrency?: Queue | number): Promise<object>;
import Queue from "Queue";
}
declare module "asyncMapValues" {
export default asyncMapValues;
function asyncMapValues<K, V>(obj: object, iteratee: (value: V, key: K, obj: object) => Promise<any> | any, queueOrConcurrency?: Queue | number): Promise<object>;
import Queue from "Queue";
}
declare module "modern-async" {
export { default as asyncIterableWrap } from "asyncIterableWrap";
export { default as asyncRoot } from "asyncRoot";
Expand Down Expand Up @@ -200,4 +223,9 @@ declare module "modern-async" {
export { default as asyncTimeoutPrecise } from "asyncTimeoutPrecise";
export { default as asyncIterableToArray } from "asyncIterableToArray";
export { default as reflectAsyncStatus } from "reflectAsyncStatus";
export { default as generatorEntries } from "generatorEntries";
export { default as asyncFromEntries } from "asyncFromEntries";
export { default as asyncMapEntries } from "asyncMapEntries";
export { default as asyncMapKeys } from "asyncMapKeys";
export { default as asyncMapValues } from "asyncMapValues";
}
1 change: 0 additions & 1 deletion src/Queue.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ class Queue {
* `false` in any other case.
*/
execCancellable (fct, priority = 0) {
assert(typeof fct === 'function', 'fct must be a function')
assert(typeof priority === 'number', 'priority must be a number')
const deferred = new Deferred()
let i = this._iqueue.length
Expand Down
8 changes: 3 additions & 5 deletions src/asyncEvery.mjs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@

import Queue from './Queue.mjs'
import asyncWrap from './asyncWrap.mjs'
import assert from 'nanoassert'
import asyncFindIndex from './asyncFindIndex.mjs'

/**
* Returns `true` if all elements of an iterable pass a truth test and `false` otherwise.
*
* The calls to `iteratee` will be performed in a queue to limit the concurrency of these calls.
* The calls to `iteratee` will be performed asynchronously in a {@link Queue}, allowing control over the concurrency of those calls.
* If any truth test returns `false` the promise is immediately resolved.
*
* Whenever a test returns `false`, all the remaining tasks will be cancelled as long
Expand All @@ -22,8 +21,8 @@ import asyncFindIndex from './asyncFindIndex.mjs'
* * `value`: The current value to process
* * `index`: The index in the iterable. Will start from 0.
* * `iterable`: The iterable on which the operation is being performed.
* @param {Queue | number} [queueOrConcurrency] If a queue is specified it will be used to schedule the calls to
* `iteratee`. If a number is specified it will be used as the concurrency of a Queue that will be created
* @param {Queue | number} [queueOrConcurrency] If a {@link Queue} is specified it will be used to schedule the calls to
* `iteratee`. If a number is specified it will be used as the concurrency of a {@link Queue} that will be created
* implicitly for the same purpose. Defaults to `1`.
* @returns {Promise<boolean>} A promise that will be resolved to `true` if all values pass the truth test and `false`
* if a least one of them doesn't pass it. That promise will be rejected if one of the truth test throws
Expand Down Expand Up @@ -70,7 +69,6 @@ import asyncFindIndex from './asyncFindIndex.mjs'
* // total processing time should be ~ 10ms
*/
async function asyncEvery (iterable, iteratee, queueOrConcurrency = 1) {
assert(typeof iteratee === 'function', 'iteratee must be a function')
iteratee = asyncWrap(iteratee)
const index = await asyncFindIndex(iterable, async (value, index, iterable) => {
return !(await iteratee(value, index, iterable))
Expand Down
6 changes: 3 additions & 3 deletions src/asyncFilter.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import asyncGeneratorFilter from './asyncGeneratorFilter.mjs'
/**
* Returns an array of all the values in `iterable` which pass an asynchronous truth test.
*
* The calls to `iteratee` will be performed in a queue to limit the concurrency of these calls.
* The calls to `iteratee` will be performed asynchronously in a {@link Queue}, allowing control over the concurrency of those calls.
* The results will be in the same order than in `iterable`.
*
* If any of the calls to `iteratee` throws an exception the returned promise will be rejected and the remaining
Expand All @@ -18,8 +18,8 @@ import asyncGeneratorFilter from './asyncGeneratorFilter.mjs'
* * `value`: The current value to process
* * `index`: The index in the iterable. Will start from 0.
* * `iterable`: The iterable on which the operation is being performed.
* @param {Queue | number} [queueOrConcurrency] If a queue is specified it will be used to schedule the calls to
* `iteratee`. If a number is specified it will be used as the concurrency of a Queue that will be created
* @param {Queue | number} [queueOrConcurrency] If a {@link Queue} is specified it will be used to schedule the calls to
* `iteratee`. If a number is specified it will be used as the concurrency of a {@link Queue} that will be created
* implicitly for the same purpose. Defaults to `1`.
* @returns {Promise<any[]>} A promise that will be resolved with an array containing all the values that passed
* the truth test. This promise will be rejected if any of the `iteratee` calls throws an exception.
Expand Down
20 changes: 20 additions & 0 deletions src/asyncFilterObject.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@

import Queue from './Queue.mjs'
import asyncFromEntries from './asyncFromEntries.mjs'
import asyncGeneratorFilter from './asyncGeneratorFilter.mjs'
import generatorEntries from './generatorEntries.mjs'
import asyncWrap from './asyncWrap.mjs'

/**
* @param obj
* @param iteratee
* @param queueOrConcurrency
*/
async function asyncFilterObject (obj, iteratee, queueOrConcurrency = 1) {
iteratee = asyncWrap(iteratee)
return await asyncFromEntries(asyncGeneratorFilter(generatorEntries(obj), async ([k, v]) => {
return await iteratee(v, k, obj)
}, queueOrConcurrency))
}

export default asyncFilterObject
6 changes: 3 additions & 3 deletions src/asyncFind.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import Queue from './Queue.mjs'
/**
* Returns the first element of an iterable that passes an asynchronous truth test.
*
* The calls to `iteratee` will be performed in a queue to limit the concurrency of these calls.
* The calls to `iteratee` will be performed asynchronously in a {@link Queue}, allowing control over the concurrency of those calls.
*
* Whenever a result is found, all the remaining tasks will be cancelled as long
* as they didn't started already. In case of exception in one of the `iteratee` calls the promise
Expand All @@ -19,8 +19,8 @@ import Queue from './Queue.mjs'
* * `value`: The current value to process
* * `index`: The index in the iterable. Will start from 0.
* * `iterable`: The iterable on which the operation is being performed.
* @param {Queue | number} [queueOrConcurrency] If a queue is specified it will be used to schedule the calls to
* `iteratee`. If a number is specified it will be used as the concurrency of a Queue that will be created
* @param {Queue | number} [queueOrConcurrency] If a {@link Queue} is specified it will be used to schedule the calls to
* `iteratee`. If a number is specified it will be used as the concurrency of a {@link Queue} that will be created
* implicitly for the same purpose. Defaults to `1`.
* @param {boolean} [ordered] If true this function will return on the first element in the iterable
* order for which `iteratee` returned true. If false it will be the first in time.
Expand Down
6 changes: 3 additions & 3 deletions src/asyncFindIndex.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import Queue from './Queue.mjs'
/**
* Returns the index of the first element of an iterable that passes an asynchronous truth test.
*
* The calls to `iteratee` will be performed in a queue to limit the concurrency of these calls.
* The calls to `iteratee` will be performed asynchronously in a {@link Queue}, allowing control over the concurrency of those calls.
*
* Whenever a result is found, all the remaining tasks will be cancelled as long
* as they didn't started already. In case of exception in one of the iteratee calls the promise
Expand All @@ -19,8 +19,8 @@ import Queue from './Queue.mjs'
* * `value`: The current value to process
* * `index`: The index in the iterable. Will start from 0.
* * `iterable`: The iterable on which the operation is being performed.
* @param {Queue | number} [queueOrConcurrency] If a queue is specified it will be used to schedule the calls to
* `iteratee`. If a number is specified it will be used as the concurrency of a Queue that will be created
* @param {Queue | number} [queueOrConcurrency] If a {@link Queue} is specified it will be used to schedule the calls to
* `iteratee`. If a number is specified it will be used as the concurrency of a {@link Queue} that will be created
* implicitly for the same purpose. Defaults to `1`.
* @param {boolean} [ordered] If true this function will return on the first element in the iterable
* order for which `iteratee` returned true. If false it will be the first in time.
Expand Down
1 change: 0 additions & 1 deletion src/asyncFindInternal.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import reflectAsyncStatus from './reflectAsyncStatus.mjs'
* @returns {*} ignore
*/
async function asyncFindInternal (iterable, iteratee, queueOrConcurrency, ordered) {
assert(typeof iteratee === 'function', 'iteratee must be a function')
iteratee = asyncWrap(iteratee)
const it = asyncIterableWrap(iterable)
const queue = getQueue(queueOrConcurrency)
Expand Down
22 changes: 22 additions & 0 deletions src/asyncFindKey.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

import asyncFindInternal from './asyncFindInternal.mjs'
import generatorEntries from './generatorEntries.mjs'
import asyncWrap from './asyncWrap.mjs'
import Queue from './Queue.mjs'

/**
* @param obj
* @param iteratee
* @param queueOrConcurrency
* @param ordered
*/
async function asyncFindKey (obj, iteratee, queueOrConcurrency = 1, ordered = false) {
iteratee = asyncWrap(iteratee)
// eslint-disable-next-line no-unused-vars
const [k, _] = (await asyncFindInternal(generatorEntries(obj), async ([k, v]) => {
return await iteratee(v, k, obj)
}, queueOrConcurrency, ordered))[1]
return k
}

export default asyncFindKey
8 changes: 4 additions & 4 deletions src/asyncForEach.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import Queue from './Queue.mjs'
/**
* Calls a function on each element of iterable.
*
* The calls to `iteratee` will be performed in a queue to limit the concurrency of these calls.
* The calls to `iteratee` will be performed asynchronously in a {@link Queue}, allowing control over the concurrency of those calls.
*
* If any of the calls to iteratee throws an exception the returned promise will be rejected and the remaining
* pending tasks will be cancelled.
Expand All @@ -16,8 +16,8 @@ import Queue from './Queue.mjs'
* * `value`: The current value to process
* * `index`: The index in the iterable. Will start from 0.
* * `iterable`: The iterable on which the operation is being performed.
* @param {Queue | number} [queueOrConcurrency] If a queue is specified it will be used to schedule the calls to
* `iteratee`. If a number is specified it will be used as the concurrency of a Queue that will be created
* @param {Queue | number} [queueOrConcurrency] If a {@link Queue} is specified it will be used to schedule the calls to
* `iteratee`. If a number is specified it will be used as the concurrency of a {@link Queue} that will be created
* implicitly for the same purpose. Defaults to `1`.
* @returns {Promise} A promise that will be resolved when all the calls to `iteratee` have been done.
* This promise will be rejected if any call to `iteratee` throws an exception.
Expand Down Expand Up @@ -59,7 +59,7 @@ import Queue from './Queue.mjs'
*/
async function asyncForEach (iterable, iteratee, queueOrConcurrency = 1) {
// eslint-disable-next-line no-unused-vars
for await (const _el of asyncGeneratorMap(iterable, iteratee, queueOrConcurrency)) {
for await (const _el of asyncGeneratorMap(iterable, iteratee, queueOrConcurrency, false)) {
// do nothing
}
}
Expand Down
22 changes: 22 additions & 0 deletions src/asyncForEachObject.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

import asyncGeneratorMap from './asyncGeneratorMap.mjs'
import generatorEntries from './generatorEntries.mjs'
import Queue from './Queue.mjs'
import asyncWrap from './asyncWrap.mjs'

/**
* @param obj
* @param iteratee
* @param queueOrConcurrency
*/
async function asyncForEachObject (obj, iteratee, queueOrConcurrency = 1) {
iteratee = asyncWrap(iteratee)
// eslint-disable-next-line no-unused-vars
for await (const _el of asyncGeneratorMap(generatorEntries(obj), async ([k, v]) => {
await iteratee(v, k, obj)
}, queueOrConcurrency, false)) {
// do nothing
}
}

export default asyncForEachObject
51 changes: 51 additions & 0 deletions src/asyncFromEntries.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@

import asyncIterableWrap from './asyncIterableWrap.mjs'

/**
* Fully consumes an iterable or async iterable containing key-value pairs an returns
* a new object built with those key-value pairs.
*
* This function is an alternative to standard `Object.fromPairs` but accepting an async
* iterable.
*
* @param {Iterable | AsyncIterable} iterable An iterable or async iterable yielding key-value pairs.
* Key value-pairs must be tuples containing two objects:
* * The key
* * The value
* @returns {Promise<object>} A promise that will be resolved with a new object built with the
* key-value pairs of the iterable.
* @see {@link generatorEntries} to convert an object to an iterable of key-value pairs
* @example
* // Example using a synchronous iterable
* import { asyncFromEntries } from 'modern-async'
*
* const entries = [["a", 1], ["b", 2], ["c", 3]]
*
* const obj = await asyncFromEntries(entries)
* console.log(obj) // prints Object { a: 1, b: 2, c: 3 }
* @example
* // Example using an asynchronous iterable
* import { asyncFromEntries, asyncSleep } from 'modern-async'
*
* async function * asyncEntryGenerator() {
* await asyncSleep(10) // waits 10ms
* yield ["a", 1]
* await asyncSleep(10) // waits 10ms
* yield ["b", 2]
* await asyncSleep(10) // waits 10ms
* yield ["c", 3]
* }
*
* const obj = await asyncFromEntries(asyncEntryGenerator())
* console.log(obj) // prints Object { a: 1, b: 2, c: 3 }
*/
async function asyncFromEntries (iterable) {
const it = asyncIterableWrap(iterable)
const obj = {}
for await (const [key, value] of it) {
obj[key] = value
}
return obj
}

export default asyncFromEntries
31 changes: 31 additions & 0 deletions src/asyncFromEntries.test.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@

import { expect, test } from '@jest/globals'
import asyncFromEntries from './asyncFromEntries.mjs'

test('asyncFromEntries base sync', async () => {
const entries = [['a', 1], ['b', 2], ['c', 3]]

const obj = await asyncFromEntries(entries)

expect(obj).toEqual({
a: 1,
b: 2,
c: 3
})
})

test('asyncFromEntries base async', async () => {
const asyncEntryGenerator = async function * () {
yield ['a', 1]
yield ['b', 2]
yield ['c', 3]
}

const obj = await asyncFromEntries(asyncEntryGenerator())

expect(obj).toEqual({
a: 1,
b: 2,
c: 3
})
})
22 changes: 10 additions & 12 deletions src/asyncGeneratorFilter.mjs
Original file line number Diff line number Diff line change
@@ -1,30 +1,29 @@

import asyncGeneratorMap from './asyncGeneratorMap.mjs'
import assert from 'nanoassert'
import Queue from './Queue.mjs'
import asyncWrap from './asyncWrap.mjs'

/**
* Produces a an async iterator that will return each value or `iterable` which pass an asynchronous truth test.
* Produces a an async iterable that will return each value or `iterable` which pass an asynchronous truth test.
*
* The iterator will perform the calls to `iteratee` in a queue to limit the concurrency of
* these calls. The iterator will consume values from `iterable` only if slots are available in the
* The iterable will perform the calls to `iteratee` asynchronously in a {@link Queue} to limit the concurrency of
* these calls. The iterable will consume values from `iterable` only if slots are available in the
* queue.
*
* If the returned iterator is not fully consumed it will stop consuming new values from `iterable` and scheduling
* If the returned iterable is not fully consumed it will stop consuming new values from `iterable` and scheduling
* new calls to `iteratee` in the queue, but already scheduled tasks will still be executed.
*
* If `iterable` or any of the calls to `iteratee` throws an exception all pending tasks will be cancelled and the
* returned async iterator will throw that exception.
* returned async iterable will throw that exception.
*
* @param {Iterable | AsyncIterable} iterable An iterable or async iterable object.
* @param {Function} iteratee A function that will be called with each member of the iterable. It will receive
* three arguments:
* * `value`: The current value to process
* * `index`: The index in the iterable. Will start from 0.
* * `iterable`: The iterable on which the operation is being performed.
* @param {Queue | number} [queueOrConcurrency] If a queue is specified it will be used to schedule the calls to
* `iteratee`. If a number is specified it will be used as the concurrency of a Queue that will be created
* @param {Queue | number} [queueOrConcurrency] If a {@link Queue} is specified it will be used to schedule the calls to
* `iteratee`. If a number is specified it will be used as the concurrency of a {@link Queue} that will be created
* implicitly for the same purpose. Defaults to `1`.
* @param {boolean} [ordered] If true the results will be yielded in the same order as in the source
* iterable, regardless of which calls to iteratee returned first. If false the the results will be yielded as soon
Expand All @@ -33,22 +32,21 @@ import asyncWrap from './asyncWrap.mjs'
* @example
* import {asyncGeneratorFilter, asyncSleep} from 'modern-async'
*
* const iterator = function * () {
* const generator = function * () {
* for (let i = 0; i < 10000; i += 1) {
* yield i
* }
* }
* const filterIterator = asyncGeneratorFilter(iterator(), async (v) => {
* const filterGenerator = asyncGeneratorFilter(generator(), async (v) => {
* await asyncSleep(1000)
* return v % 3 === 0
* })
* for await (const el of filterIterator) {
* for await (const el of filterGenerator) {
* console.log(el)
* }
* // will print "0", "3", "6", etc... Only one number will be printed every 3 seconds.
*/
async function * asyncGeneratorFilter (iterable, iteratee, queueOrConcurrency = 1, ordered = true) {
assert(typeof iteratee === 'function', 'iteratee must be a function')
iteratee = asyncWrap(iteratee)
for await (const [value, pass] of asyncGeneratorMap(iterable, async (v, i, t) => {
return [v, await iteratee(v, i, t)]
Expand Down
Loading