Skip to content

Commit 6519b87

Browse files
committed
Fix and test handling of partially-read objects
In `slice`, `transform[Async]` and `readToEnd`, properly handle (non-)stream objects that were partially read (e.g. peeked at) before. Also, add tests of the various cases.
1 parent 51418c1 commit 6519b87

File tree

3 files changed

+155
-22
lines changed

3 files changed

+155
-22
lines changed

lib/streams.js

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,9 @@ function transform(input, process = () => undefined, finish = () => undefined, q
235235
if (isStream(input)) {
236236
return _transformStream(input, process, finish, queuingStrategy);
237237
}
238+
if (input[externalBuffer]) {
239+
input = concat(input[externalBuffer]);
240+
}
238241
const result1 = process(input);
239242
const result2 = finish();
240243
if (result1 !== undefined && result2 !== undefined) return concat([result1, result2]);
@@ -261,6 +264,9 @@ async function transformAsync(
261264
if (isStream(input)) {
262265
return _transformStream(input, process, finish, queuingStrategy);
263266
}
267+
if (input[externalBuffer]) {
268+
input = concat(input[externalBuffer]);
269+
}
264270
const result1 = await process(input);
265271
const result2 = await finish();
266272
if (result1 !== undefined && result2 !== undefined) return concat([result1, result2]);
@@ -292,7 +298,7 @@ function _transformStream(input, process, finish, queuingStrategy) {
292298
let allDone = false;
293299
return new ReadableStream({
294300
start() {
295-
reader = input.getReader();
301+
reader = getReader(input);
296302
},
297303
async pull(controller) {
298304
if (allDone) {
@@ -507,7 +513,7 @@ function slice(input, begin=0, end=Infinity) {
507513
let bytesRead = 0;
508514
return new ReadableStream({
509515
start() {
510-
reader = input.getReader();
516+
reader = getReader(input);
511517
},
512518
async pull(controller) {
513519
try {
@@ -568,7 +574,7 @@ function slice(input, begin=0, end=Infinity) {
568574
return fromAsync(async () => slice(await readToEnd(input), begin, end));
569575
}
570576
if (input[externalBuffer]) {
571-
input = concat(input[externalBuffer].concat([input]));
577+
input = concat(input[externalBuffer]);
572578
}
573579
if (isUint8Array(input)) {
574580
return input.subarray(begin, end === Infinity ? input.length : end);
@@ -590,6 +596,9 @@ async function readToEnd(input, join=concat) {
590596
if (isStream(input)) {
591597
return getReader(input).readToEnd(join);
592598
}
599+
if (input[externalBuffer]) {
600+
return join(input[externalBuffer]);
601+
}
593602
return input;
594603
}
595604

lib/writer.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { externalBuffer } from './reader.js';
2+
13
const doneWritingPromise = Symbol('doneWritingPromise');
24
const doneWritingResolve = Symbol('doneWritingResolve');
35
const doneWritingReject = Symbol('doneWritingReject');
@@ -35,7 +37,7 @@ ArrayStream.prototype.getReader = function() {
3537

3638
ArrayStream.prototype.readToEnd = async function(join) {
3739
await this[doneWritingPromise];
38-
const result = join(this.slice(this[readingIndex]));
40+
const result = join((this[externalBuffer] || []).concat(this.slice(this[readingIndex])));
3941
this.length = 0;
4042
return result;
4143
};

test/common.test.ts

Lines changed: 140 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,83 @@
11
import { expect } from 'chai';
22
// @ts-expect-error Missing type definitions
3-
import { toStream, toArrayStream, readToEnd, slice, pipe, ArrayStream, transform, transformAsync } from '@openpgp/web-stream-tools';
3+
import { toStream, toArrayStream, getReader, readToEnd, slice, pipe, ArrayStream, transform, transformAsync } from '@openpgp/web-stream-tools';
44

55
describe('Common integration tests', () => {
6-
it('toStream/readToEnd', async () => {
7-
const input = 'chunk';
8-
const streamedData = toStream('chunk');
9-
expect(await readToEnd(streamedData)).to.equal(input);
6+
it('readToEnd non-stream', async () => {
7+
const input = 'chunk\nanother chunk';
8+
expect(await readToEnd(input)).to.equal('chunk\nanother chunk');
9+
});
10+
11+
it('readToEnd arraystream', async () => {
12+
const input = 'chunk\nanother chunk';
13+
const inputStream = toArrayStream(input);
14+
expect(await readToEnd(inputStream)).to.equal('chunk\nanother chunk');
15+
});
16+
17+
it('readToEnd stream', async () => {
18+
const input = 'chunk\nanother chunk';
19+
const inputStream = toStream(input);
20+
expect(await readToEnd(inputStream)).to.equal('chunk\nanother chunk');
21+
});
22+
23+
it('readToEnd partially-read non-stream', async () => {
24+
const input = new String('chunk\nanother chunk');
25+
const reader = getReader(input);
26+
await reader.readLine();
27+
reader.releaseLock();
28+
// @ts-expect-error Passing String instead of string
29+
expect(await readToEnd(input)).to.equal('another chunk');
30+
});
31+
32+
it('readToEnd partially-read arraystream', async () => {
33+
const input = 'chunk\nanother chunk';
34+
const inputStream = toArrayStream(input);
35+
const reader = getReader(inputStream);
36+
await reader.readLine();
37+
reader.releaseLock();
38+
expect(await readToEnd(inputStream)).to.equal('another chunk');
1039
});
1140

12-
it('slice', async () => {
13-
const input = 'another chunk';
14-
const streamedData = toStream(input);
15-
const slicedStream = slice(streamedData, 8);
16-
expect(await readToEnd(slicedStream)).to.equal('chunk');
41+
it('readToEnd partially-read stream', async () => {
42+
const input = 'chunk\nanother chunk';
43+
const inputStream = toStream(input);
44+
const reader = getReader(inputStream);
45+
await reader.readLine();
46+
reader.releaseLock();
47+
expect(await readToEnd(inputStream)).to.equal('another chunk');
48+
});
49+
50+
it('slice non-stream', async () => {
51+
const input = 'chunk\nanother chunk';
52+
const sliced = slice(input, 6);
53+
expect(sliced).to.equal('another chunk');
54+
});
55+
56+
it('slice stream', async () => {
57+
const input = 'chunk\nanother chunk';
58+
const inputStream = toStream(input);
59+
const sliced = slice(inputStream, 6);
60+
expect(await readToEnd(sliced)).to.equal('another chunk');
61+
});
62+
63+
it('slice partially-read non-stream', async () => {
64+
const input = new String('chunk\nanother chunk');
65+
const reader = getReader(input);
66+
await reader.readLine();
67+
reader.releaseLock();
68+
// @ts-expect-error Passing String instead of string
69+
const sliced = slice(input, 8);
70+
expect(sliced).to.equal('chunk');
71+
});
72+
73+
it('slice partially-read stream', async () => {
74+
const input = 'chunk\nanother chunk';
75+
const inputStream = toStream(input);
76+
const reader = getReader(inputStream);
77+
await reader.readLine();
78+
reader.releaseLock();
79+
const sliced = slice(inputStream, 8);
80+
expect(await readToEnd(sliced)).to.equal('chunk');
1781
});
1882

1983
it('pipe from stream to stream', async () => {
@@ -54,37 +118,95 @@ describe('Common integration tests', () => {
54118
expect(transformed).to.equal('CHUNK');
55119
});
56120

121+
it('transform partially-read non-stream', async () => {
122+
const input = new String('chunk\nanother chunk');
123+
const reader = getReader(input);
124+
await reader.readLine();
125+
reader.releaseLock();
126+
const transformed = transform(input, (str: string) => str.toUpperCase());
127+
expect(transformed).to.equal('ANOTHER CHUNK');
128+
});
129+
57130
it('transform arraystream', async () => {
58131
const input = 'chunk';
59-
const streamedData = toArrayStream(input);
60-
const transformed = transform(streamedData, (str: string) => str.toUpperCase());
132+
const inputStream = toArrayStream(input);
133+
const transformed = transform(inputStream, (str: string) => str.toUpperCase());
61134
expect(await readToEnd(transformed)).to.equal('CHUNK');
62135
});
63136

137+
it('transform partially-read arraystream', async () => {
138+
const input = 'chunk\nanother chunk';
139+
const inputStream = toArrayStream(input);
140+
const reader = getReader(inputStream);
141+
await reader.readLine();
142+
reader.releaseLock();
143+
const transformed = transform(inputStream, (str: string) => str.toUpperCase());
144+
expect(await readToEnd(transformed)).to.equal('ANOTHER CHUNK');
145+
});
146+
64147
it('transform stream', async () => {
65148
const input = 'chunk';
66-
const streamedData = toStream(input);
67-
const transformed = transform(streamedData, (str: string) => str.toUpperCase());
149+
const inputStream = toStream(input);
150+
const transformed = transform(inputStream, (str: string) => str.toUpperCase());
68151
expect(await readToEnd(transformed)).to.equal('CHUNK');
69152
});
70153

154+
it('transform partially-read stream', async () => {
155+
const input = 'chunk\nanother chunk';
156+
const inputStream = toStream(input);
157+
const reader = getReader(inputStream);
158+
await reader.readLine();
159+
reader.releaseLock();
160+
const transformed = transform(inputStream, (str: string) => str.toUpperCase());
161+
expect(await readToEnd(transformed)).to.equal('ANOTHER CHUNK');
162+
});
163+
71164
it('transformAsync non-stream', async () => {
72165
const input = 'chunk';
73166
const transformed = await transformAsync(input, async (str: string) => str.toUpperCase());
74167
expect(transformed).to.equal('CHUNK');
75168
});
76169

170+
it('transformAsync partially-read non-stream', async () => {
171+
const input = new String('chunk\nanother chunk');
172+
const reader = getReader(input);
173+
await reader.readLine();
174+
reader.releaseLock();
175+
const transformed = await transformAsync(input, async (str: string) => str.toUpperCase());
176+
expect(transformed).to.equal('ANOTHER CHUNK');
177+
});
178+
77179
it('transformAsync arraystream', async () => {
78180
const input = 'chunk';
79-
const streamedData = toArrayStream(input);
80-
const transformed = await transformAsync(streamedData, async (str: string) => str.toUpperCase());
181+
const inputStream = toArrayStream(input);
182+
const transformed = await transformAsync(inputStream, async (str: string) => str.toUpperCase());
81183
expect(await readToEnd(transformed)).to.equal('CHUNK');
82184
});
83185

186+
it('transformAsync partially-read arraystream', async () => {
187+
const input = 'chunk\nanother chunk';
188+
const inputStream = toArrayStream(input);
189+
const reader = getReader(inputStream);
190+
await reader.readLine();
191+
reader.releaseLock();
192+
const transformed = await transformAsync(inputStream, async (str: string) => str.toUpperCase());
193+
expect(await readToEnd(transformed)).to.equal('ANOTHER CHUNK');
194+
});
195+
84196
it('transformAsync stream', async () => {
85197
const input = 'chunk';
86-
const streamedData = toStream(input);
87-
const transformed = await transformAsync(streamedData, async (str: string) => str.toUpperCase());
198+
const inputStream = toStream(input);
199+
const transformed = await transformAsync(inputStream, async (str: string) => str.toUpperCase());
88200
expect(await readToEnd(transformed)).to.equal('CHUNK');
89201
});
202+
203+
it('transformAsync partially-read stream', async () => {
204+
const input = 'chunk\nanother chunk';
205+
const inputStream = toStream(input);
206+
const reader = getReader(inputStream);
207+
await reader.readLine();
208+
reader.releaseLock();
209+
const transformed = await transformAsync(inputStream, async (str: string) => str.toUpperCase());
210+
expect(await readToEnd(transformed)).to.equal('ANOTHER CHUNK');
211+
});
90212
});

0 commit comments

Comments
 (0)