Skip to content

Commit 878a416

Browse files
authored
Merge pull request #60
Eliminate buffering in `stream.transform()` and `slice()`
2 parents 8efe113 + de59619 commit 878a416

File tree

1 file changed

+82
-23
lines changed

1 file changed

+82
-23
lines changed

lib/streams.js

Lines changed: 82 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -222,12 +222,19 @@ function transformWithCancel(customCancel) {
222222

223223
/**
224224
* Transform a stream using helper functions which are called on each chunk, and on stream close, respectively.
225+
* Takes an optional queuing strategy for the resulting readable stream;
226+
* see https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/ReadableStream#queuingstrategy.
227+
* By default, the queueing strategy is non-buffering. When the `process`
228+
* function is asynchronous, it may be useful to pass a buffering
229+
* queuing strategy to enable multiple chunks to be processed in parallel;
230+
* e.g. pass `{ highWaterMark: 4 }` to process up to 4 chunks in parallel.
225231
* @param {ReadableStream|Uint8array|String} input
226232
* @param {Function} process
227233
* @param {Function} finish
234+
* @param {Object} queuingStrategy
228235
* @returns {ReadableStream|Uint8array|String}
229236
*/
230-
function transform(input, process = () => undefined, finish = () => undefined) {
237+
function transform(input, process = () => undefined, finish = () => undefined, queuingStrategy = { highWaterMark: 0 }) {
231238
if (isArrayStream(input)) {
232239
const output = new ArrayStream();
233240
(async () => {
@@ -248,24 +255,47 @@ function transform(input, process = () => undefined, finish = () => undefined) {
248255
return output;
249256
}
250257
if (isStream(input)) {
251-
return transformRaw(input, {
252-
async transform(value, controller) {
253-
try {
254-
const result = await process(value);
255-
if (result !== undefined) controller.enqueue(result);
256-
} catch(e) {
257-
controller.error(e);
258-
}
258+
let reader;
259+
let allDone = false;
260+
return new ReadableStream({
261+
start() {
262+
reader = input.getReader();
259263
},
260-
async flush(controller) {
264+
async pull(controller) {
265+
if (allDone) {
266+
controller.close();
267+
input.releaseLock();
268+
return;
269+
}
261270
try {
262-
const result = await finish();
263-
if (result !== undefined) controller.enqueue(result);
264-
} catch(e) {
271+
// Read repeatedly until we have a chunk to enqueue or until
272+
// we can close the stream, as `pull` won't get called again
273+
// until we call `enqueue` or `close`.
274+
while (true) { // eslint-disable-line no-constant-condition
275+
const { value, done } = await reader.read();
276+
allDone = done;
277+
const result = await (done ? finish : process)(value);
278+
if (result !== undefined) {
279+
controller.enqueue(result);
280+
return; // `pull` will get called again
281+
}
282+
if (done) {
283+
// If `finish` didn't return a chunk to enqueue, call
284+
// `close` here. Otherwise, it will get called in the
285+
// next call to `pull`, above (since `allDone == true`).
286+
controller.close();
287+
input.releaseLock();
288+
return;
289+
}
290+
}
291+
} catch (e) {
265292
controller.error(e);
266293
}
294+
},
295+
async cancel(reason) {
296+
await reader.cancel(reason);
267297
}
268-
});
298+
}, queuingStrategy);
269299
}
270300
const result1 = process(input);
271301
const result2 = finish();
@@ -443,19 +473,48 @@ function slice(input, begin=0, end=Infinity) {
443473
}
444474
if (isStream(input)) {
445475
if (begin >= 0 && end >= 0) {
476+
let reader;
446477
let bytesRead = 0;
447-
return transformRaw(input, {
448-
transform(value, controller) {
449-
if (bytesRead < end) {
450-
if (bytesRead + value.length >= begin) {
451-
controller.enqueue(slice(value, Math.max(begin - bytesRead, 0), end - bytesRead));
478+
return new ReadableStream({
479+
start() {
480+
reader = input.getReader();
481+
},
482+
async pull(controller) {
483+
try {
484+
// Read repeatedly until we have a chunk to enqueue or until
485+
// we can close the stream, as `pull` won't get called again
486+
// until we call `enqueue` or `close`.
487+
while (true) { // eslint-disable-line no-constant-condition
488+
if (bytesRead < end) {
489+
const { value, done } = await reader.read();
490+
if (done) {
491+
controller.close();
492+
input.releaseLock();
493+
return;
494+
}
495+
let valueToEnqueue;
496+
if (bytesRead + value.length >= begin) {
497+
valueToEnqueue = slice(value, Math.max(begin - bytesRead, 0), end - bytesRead);
498+
}
499+
bytesRead += value.length;
500+
if (valueToEnqueue) {
501+
controller.enqueue(valueToEnqueue);
502+
return; // `pull` will get called again
503+
}
504+
} else {
505+
controller.close();
506+
input.releaseLock();
507+
return;
508+
}
452509
}
453-
bytesRead += value.length;
454-
} else {
455-
controller.terminate();
510+
} catch (e) {
511+
controller.error(e);
456512
}
513+
},
514+
async cancel(reason) {
515+
await reader.cancel(reason);
457516
}
458-
});
517+
}, { highWaterMark: 0 });
459518
}
460519
if (begin < 0 && (end < 0 || end === Infinity)) {
461520
let lastBytes = [];

0 commit comments

Comments
 (0)