Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 41 additions & 10 deletions lib/streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
* @returns {ReadableStream} Concatenated list
*/
function concatStream(list) {
list = list.map(toStream);

Check warning on line 69 in lib/streams.js

View workflow job for this annotation

GitHub Actions / ESLint

Assignment to function parameter 'list'
const transform = transformWithCancel(async function(reason) {
await Promise.all(transforms.map(stream => cancel(stream, reason)));
});
Expand Down Expand Up @@ -111,7 +111,7 @@
preventCancel = false
} = {}) {
if (isStream(input) && !isArrayStream(input) && !isArrayStream(target)) {
input = toStream(input);

Check warning on line 114 in lib/streams.js

View workflow job for this annotation

GitHub Actions / ESLint

Assignment to function parameter 'input'
try {
if (input[externalBuffer]) {
const writer = getWriter(target);
Expand All @@ -130,7 +130,7 @@
return;
}
if (!isStream(input)) {
input = toArrayStream(input);

Check warning on line 133 in lib/streams.js

View workflow job for this annotation

GitHub Actions / ESLint

Assignment to function parameter 'input'
}
const reader = getReader(input);
const writer = getWriter(target);
Expand Down Expand Up @@ -224,25 +224,58 @@
* Transform a stream using helper functions which are called on each chunk, and on stream close, respectively.
* Takes an optional queuing strategy for the resulting readable stream;
* see https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/ReadableStream#queuingstrategy.
* By default, the queueing strategy is non-buffering. When the `process`
* function is asynchronous, it may be useful to pass a buffering
* queuing strategy to enable multiple chunks to be processed in parallel;
* e.g. pass `{ highWaterMark: 4 }` to process up to 4 chunks in parallel.
* By default, the queueing strategy is non-buffering.
* @param {ReadableStream|Uint8array|String} input
* @param {Function} process
* @param {Function} finish
* @param {Object} queuingStrategy
* @returns {ReadableStream|Uint8array|String}
*/
function transform(input, process = () => undefined, finish = () => undefined, queuingStrategy = { highWaterMark: 0 }) {
if (isStream(input)) {
return _transformStream(input, process, finish, queuingStrategy);
}
const result1 = process(input);
const result2 = finish();
if (result1 !== undefined && result2 !== undefined) return concat([result1, result2]);
return result1 !== undefined ? result1 : result2;
}

/**
* Transform a stream using helper functions which are called on each chunk, and on stream close, respectively.
* Takes an optional queuing strategy for the resulting readable stream;
* see https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/ReadableStream#queuingstrategy.
* By default, the queueing strategy is to buffer one chunk.
* @param {ReadableStream|Uint8array|String} input
* @param {Function} process
* @param {Function} finish
* @param {Object} queuingStrategy
* @returns {ReadableStream|Uint8array|String}
*/
async function transformAsync(
input,
process = async () => undefined,
finish = async () => undefined,
queuingStrategy = { highWaterMark: 1 }
) {
if (isStream(input)) {
return _transformStream(input, process, finish, queuingStrategy);
}
const result1 = await process(input);
const result2 = await finish();
if (result1 !== undefined && result2 !== undefined) return concat([result1, result2]);
return result1 !== undefined ? result1 : result2;
}

function _transformStream(input, process, finish, queuingStrategy) {
if (isArrayStream(input)) {
const output = new ArrayStream();
(async () => {
const writer = getWriter(output);
try {
const data = await readToEnd(input);
const result1 = process(data);
const result2 = finish();
const result1 = await process(data);
const result2 = await finish();
let result;
if (result1 !== undefined && result2 !== undefined) result = concat([result1, result2]);
else result = result1 !== undefined ? result1 : result2;
Expand Down Expand Up @@ -297,10 +330,7 @@
}
}, queuingStrategy);
}
const result1 = process(input);
const result2 = finish();
if (result1 !== undefined && result2 !== undefined) return concat([result1, result2]);
return result1 !== undefined ? result1 : result2;
throw new Error('Unreachable');
}

/**
Expand Down Expand Up @@ -331,7 +361,7 @@
fn(incoming.readable, outgoing.writable);
return outgoing.readable;
}
input = toArrayStream(input);

Check warning on line 364 in lib/streams.js

View workflow job for this annotation

GitHub Actions / ESLint

Assignment to function parameter 'input'
const output = new ArrayStream();
fn(input, output);
return output;
Expand Down Expand Up @@ -538,7 +568,7 @@
return fromAsync(async () => slice(await readToEnd(input), begin, end));
}
if (input[externalBuffer]) {
input = concat(input[externalBuffer].concat([input]));

Check warning on line 571 in lib/streams.js

View workflow job for this annotation

GitHub Actions / ESLint

Assignment to function parameter 'input'
}
if (isUint8Array(input)) {
return input.subarray(begin, end === Infinity ? input.length : end);
Expand Down Expand Up @@ -635,6 +665,7 @@
pipe,
transformRaw,
transform,
transformAsync,
transformPair,
parse,
clone,
Expand Down
42 changes: 41 additions & 1 deletion test/common.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect } from 'chai';
// @ts-expect-error Missing type definitions
import { toStream, toArrayStream, readToEnd, slice, pipe, ArrayStream } from '@openpgp/web-stream-tools';
import { toStream, toArrayStream, readToEnd, slice, pipe, ArrayStream, transform, transformAsync } from '@openpgp/web-stream-tools';

describe('Common integration tests', () => {
it('toStream/readToEnd', async () => {
Expand Down Expand Up @@ -47,4 +47,44 @@ describe('Common integration tests', () => {
pipe(inputStream, outputStream);
expect(await readToEnd(outputStream)).to.equal('chunk');
});

it('transform non-stream', async () => {
const input = 'chunk';
const transformed = transform(input, (str: string) => str.toUpperCase());
expect(transformed).to.equal('CHUNK');
});

it('transform arraystream', async () => {
const input = 'chunk';
const streamedData = toArrayStream(input);
const transformed = transform(streamedData, (str: string) => str.toUpperCase());
expect(await readToEnd(transformed)).to.equal('CHUNK');
});

it('transform stream', async () => {
const input = 'chunk';
const streamedData = toStream(input);
const transformed = transform(streamedData, (str: string) => str.toUpperCase());
expect(await readToEnd(transformed)).to.equal('CHUNK');
});

it('transformAsync non-stream', async () => {
const input = 'chunk';
const transformed = await transformAsync(input, async (str: string) => str.toUpperCase());
expect(transformed).to.equal('CHUNK');
});

it('transformAsync arraystream', async () => {
const input = 'chunk';
const streamedData = toArrayStream(input);
const transformed = await transformAsync(streamedData, async (str: string) => str.toUpperCase());
expect(await readToEnd(transformed)).to.equal('CHUNK');
});

it('transformAsync stream', async () => {
const input = 'chunk';
const streamedData = toStream(input);
const transformed = await transformAsync(streamedData, async (str: string) => str.toUpperCase());
expect(await readToEnd(transformed)).to.equal('CHUNK');
});
});