Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions lib/streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
* @returns {ReadableStream} Concatenated list
*/
function concatStream(list) {
list = list.map(toStream);

Check warning on line 69 in lib/streams.js

View workflow job for this annotation

GitHub Actions / ESLint

Assignment to function parameter 'list'
const transform = transformWithCancel(async function(reason) {
await Promise.all(transforms.map(stream => cancel(stream, reason)));
});
Expand Down Expand Up @@ -111,7 +111,7 @@
preventCancel = false
} = {}) {
if (isStream(input) && !isArrayStream(input) && !isArrayStream(target)) {
input = toStream(input);

Check warning on line 114 in lib/streams.js

View workflow job for this annotation

GitHub Actions / ESLint

Assignment to function parameter 'input'
try {
if (input[externalBuffer]) {
const writer = getWriter(target);
Expand All @@ -130,7 +130,7 @@
return;
}
if (!isStream(input)) {
input = toArrayStream(input);

Check warning on line 133 in lib/streams.js

View workflow job for this annotation

GitHub Actions / ESLint

Assignment to function parameter 'input'
}
const reader = getReader(input);
const writer = getWriter(target);
Expand Down Expand Up @@ -235,6 +235,9 @@
if (isStream(input)) {
return _transformStream(input, process, finish, queuingStrategy);
}
if (input[externalBuffer]) {
input = concat(input[externalBuffer]);

Check warning on line 239 in lib/streams.js

View workflow job for this annotation

GitHub Actions / ESLint

Assignment to function parameter 'input'
}
const result1 = process(input);
const result2 = finish();
if (result1 !== undefined && result2 !== undefined) return concat([result1, result2]);
Expand All @@ -261,6 +264,9 @@
if (isStream(input)) {
return _transformStream(input, process, finish, queuingStrategy);
}
if (input[externalBuffer]) {
input = concat(input[externalBuffer]);

Check warning on line 268 in lib/streams.js

View workflow job for this annotation

GitHub Actions / ESLint

Assignment to function parameter 'input'
}
const result1 = await process(input);
const result2 = await finish();
if (result1 !== undefined && result2 !== undefined) return concat([result1, result2]);
Expand Down Expand Up @@ -292,7 +298,7 @@
let allDone = false;
return new ReadableStream({
start() {
reader = input.getReader();
reader = getReader(input);
},
async pull(controller) {
if (allDone) {
Expand Down Expand Up @@ -361,7 +367,7 @@
fn(incoming.readable, outgoing.writable);
return outgoing.readable;
}
input = toArrayStream(input);

Check warning on line 370 in lib/streams.js

View workflow job for this annotation

GitHub Actions / ESLint

Assignment to function parameter 'input'
const output = new ArrayStream();
fn(input, output);
return output;
Expand Down Expand Up @@ -507,7 +513,7 @@
let bytesRead = 0;
return new ReadableStream({
start() {
reader = input.getReader();
reader = getReader(input);
},
async pull(controller) {
try {
Expand Down Expand Up @@ -568,7 +574,7 @@
return fromAsync(async () => slice(await readToEnd(input), begin, end));
}
if (input[externalBuffer]) {
input = concat(input[externalBuffer].concat([input]));
input = concat(input[externalBuffer]);

Check warning on line 577 in lib/streams.js

View workflow job for this annotation

GitHub Actions / ESLint

Assignment to function parameter 'input'
}
if (isUint8Array(input)) {
return input.subarray(begin, end === Infinity ? input.length : end);
Expand All @@ -590,6 +596,9 @@
if (isStream(input)) {
return getReader(input).readToEnd(join);
}
if (input[externalBuffer]) {
return join(input[externalBuffer]);
}
return input;
}

Expand Down
4 changes: 3 additions & 1 deletion lib/writer.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { externalBuffer } from './reader.js';

const doneWritingPromise = Symbol('doneWritingPromise');
const doneWritingResolve = Symbol('doneWritingResolve');
const doneWritingReject = Symbol('doneWritingReject');
Expand Down Expand Up @@ -35,7 +37,7 @@ ArrayStream.prototype.getReader = function() {

ArrayStream.prototype.readToEnd = async function(join) {
await this[doneWritingPromise];
const result = join(this.slice(this[readingIndex]));
const result = join((this[externalBuffer] || []).concat(this.slice(this[readingIndex])));
this.length = 0;
return result;
};
Expand Down
158 changes: 140 additions & 18 deletions test/common.test.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,83 @@
import { expect } from 'chai';
// @ts-expect-error Missing type definitions
import { toStream, toArrayStream, readToEnd, slice, pipe, ArrayStream, transform, transformAsync } from '@openpgp/web-stream-tools';
import { toStream, toArrayStream, getReader, readToEnd, slice, pipe, ArrayStream, transform, transformAsync } from '@openpgp/web-stream-tools';

describe('Common integration tests', () => {
it('toStream/readToEnd', async () => {
const input = 'chunk';
const streamedData = toStream('chunk');
expect(await readToEnd(streamedData)).to.equal(input);
it('readToEnd non-stream', async () => {
const input = 'chunk\nanother chunk';
expect(await readToEnd(input)).to.equal('chunk\nanother chunk');
});

it('readToEnd arraystream', async () => {
const input = 'chunk\nanother chunk';
const inputStream = toArrayStream(input);
expect(await readToEnd(inputStream)).to.equal('chunk\nanother chunk');
});

it('readToEnd stream', async () => {
const input = 'chunk\nanother chunk';
const inputStream = toStream(input);
expect(await readToEnd(inputStream)).to.equal('chunk\nanother chunk');
});

it('readToEnd partially-read non-stream', async () => {
const input = new String('chunk\nanother chunk');
const reader = getReader(input);
await reader.readLine();
reader.releaseLock();
// @ts-expect-error Passing String instead of string
expect(await readToEnd(input)).to.equal('another chunk');
});

it('readToEnd partially-read arraystream', async () => {
const input = 'chunk\nanother chunk';
const inputStream = toArrayStream(input);
const reader = getReader(inputStream);
await reader.readLine();
reader.releaseLock();
expect(await readToEnd(inputStream)).to.equal('another chunk');
});

it('slice', async () => {
const input = 'another chunk';
const streamedData = toStream(input);
const slicedStream = slice(streamedData, 8);
expect(await readToEnd(slicedStream)).to.equal('chunk');
it('readToEnd partially-read stream', async () => {
const input = 'chunk\nanother chunk';
const inputStream = toStream(input);
const reader = getReader(inputStream);
await reader.readLine();
reader.releaseLock();
expect(await readToEnd(inputStream)).to.equal('another chunk');
});

it('slice non-stream', async () => {
const input = 'chunk\nanother chunk';
const sliced = slice(input, 6);
expect(sliced).to.equal('another chunk');
});

it('slice stream', async () => {
const input = 'chunk\nanother chunk';
const inputStream = toStream(input);
const sliced = slice(inputStream, 6);
expect(await readToEnd(sliced)).to.equal('another chunk');
});

it('slice partially-read non-stream', async () => {
const input = new String('chunk\nanother chunk');
const reader = getReader(input);
await reader.readLine();
reader.releaseLock();
// @ts-expect-error Passing String instead of string
const sliced = slice(input, 8);
expect(sliced).to.equal('chunk');
});

it('slice partially-read stream', async () => {
const input = 'chunk\nanother chunk';
const inputStream = toStream(input);
const reader = getReader(inputStream);
await reader.readLine();
reader.releaseLock();
const sliced = slice(inputStream, 8);
expect(await readToEnd(sliced)).to.equal('chunk');
});

it('pipe from stream to stream', async () => {
Expand Down Expand Up @@ -54,37 +118,95 @@ describe('Common integration tests', () => {
expect(transformed).to.equal('CHUNK');
});

it('transform partially-read non-stream', async () => {
const input = new String('chunk\nanother chunk');
const reader = getReader(input);
await reader.readLine();
reader.releaseLock();
const transformed = transform(input, (str: string) => str.toUpperCase());
expect(transformed).to.equal('ANOTHER CHUNK');
});

it('transform arraystream', async () => {
const input = 'chunk';
const streamedData = toArrayStream(input);
const transformed = transform(streamedData, (str: string) => str.toUpperCase());
const inputStream = toArrayStream(input);
const transformed = transform(inputStream, (str: string) => str.toUpperCase());
expect(await readToEnd(transformed)).to.equal('CHUNK');
});

it('transform partially-read arraystream', async () => {
const input = 'chunk\nanother chunk';
const inputStream = toArrayStream(input);
const reader = getReader(inputStream);
await reader.readLine();
reader.releaseLock();
const transformed = transform(inputStream, (str: string) => str.toUpperCase());
expect(await readToEnd(transformed)).to.equal('ANOTHER CHUNK');
});

it('transform stream', async () => {
const input = 'chunk';
const streamedData = toStream(input);
const transformed = transform(streamedData, (str: string) => str.toUpperCase());
const inputStream = toStream(input);
const transformed = transform(inputStream, (str: string) => str.toUpperCase());
expect(await readToEnd(transformed)).to.equal('CHUNK');
});

it('transform partially-read stream', async () => {
const input = 'chunk\nanother chunk';
const inputStream = toStream(input);
const reader = getReader(inputStream);
await reader.readLine();
reader.releaseLock();
const transformed = transform(inputStream, (str: string) => str.toUpperCase());
expect(await readToEnd(transformed)).to.equal('ANOTHER CHUNK');
});

it('transformAsync non-stream', async () => {
const input = 'chunk';
const transformed = await transformAsync(input, async (str: string) => str.toUpperCase());
expect(transformed).to.equal('CHUNK');
});

it('transformAsync partially-read non-stream', async () => {
const input = new String('chunk\nanother chunk');
const reader = getReader(input);
await reader.readLine();
reader.releaseLock();
const transformed = await transformAsync(input, async (str: string) => str.toUpperCase());
expect(transformed).to.equal('ANOTHER CHUNK');
});

it('transformAsync arraystream', async () => {
const input = 'chunk';
const streamedData = toArrayStream(input);
const transformed = await transformAsync(streamedData, async (str: string) => str.toUpperCase());
const inputStream = toArrayStream(input);
const transformed = await transformAsync(inputStream, async (str: string) => str.toUpperCase());
expect(await readToEnd(transformed)).to.equal('CHUNK');
});

it('transformAsync partially-read arraystream', async () => {
const input = 'chunk\nanother chunk';
const inputStream = toArrayStream(input);
const reader = getReader(inputStream);
await reader.readLine();
reader.releaseLock();
const transformed = await transformAsync(inputStream, async (str: string) => str.toUpperCase());
expect(await readToEnd(transformed)).to.equal('ANOTHER CHUNK');
});

it('transformAsync stream', async () => {
const input = 'chunk';
const streamedData = toStream(input);
const transformed = await transformAsync(streamedData, async (str: string) => str.toUpperCase());
const inputStream = toStream(input);
const transformed = await transformAsync(inputStream, async (str: string) => str.toUpperCase());
expect(await readToEnd(transformed)).to.equal('CHUNK');
});

it('transformAsync partially-read stream', async () => {
const input = 'chunk\nanother chunk';
const inputStream = toStream(input);
const reader = getReader(inputStream);
await reader.readLine();
reader.releaseLock();
const transformed = await transformAsync(inputStream, async (str: string) => str.toUpperCase());
expect(await readToEnd(transformed)).to.equal('ANOTHER CHUNK');
});
});