Skip to content

Commit de59619

Browse files
twisslarabr
authored andcommitted
Eliminate buffering in slice
1 parent c7325f8 commit de59619

File tree

1 file changed

+39
-10
lines changed

1 file changed

+39
-10
lines changed

lib/streams.js

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ function transform(input, process = () => undefined, finish = () => undefined, q
271271
// Read repeatedly until we have a chunk to enqueue or until
272272
// we can close the stream, as `pull` won't get called again
273273
// until we call `enqueue` or `close`.
274-
while (true) {
274+
while (true) { // eslint-disable-line no-constant-condition
275275
const { value, done } = await reader.read();
276276
allDone = done;
277277
const result = await (done ? finish : process)(value);
@@ -473,19 +473,48 @@ function slice(input, begin=0, end=Infinity) {
473473
}
474474
if (isStream(input)) {
475475
if (begin >= 0 && end >= 0) {
476+
let reader;
476477
let bytesRead = 0;
477-
return transformRaw(input, {
478-
transform(value, controller) {
479-
if (bytesRead < end) {
480-
if (bytesRead + value.length >= begin) {
481-
controller.enqueue(slice(value, Math.max(begin - bytesRead, 0), end - bytesRead));
478+
return new ReadableStream({
479+
start() {
480+
reader = input.getReader();
481+
},
482+
async pull(controller) {
483+
try {
484+
// Read repeatedly until we have a chunk to enqueue or until
485+
// we can close the stream, as `pull` won't get called again
486+
// until we call `enqueue` or `close`.
487+
while (true) { // eslint-disable-line no-constant-condition
488+
if (bytesRead < end) {
489+
const { value, done } = await reader.read();
490+
if (done) {
491+
controller.close();
492+
input.releaseLock();
493+
return;
494+
}
495+
let valueToEnqueue;
496+
if (bytesRead + value.length >= begin) {
497+
valueToEnqueue = slice(value, Math.max(begin - bytesRead, 0), end - bytesRead);
498+
}
499+
bytesRead += value.length;
500+
if (valueToEnqueue) {
501+
controller.enqueue(valueToEnqueue);
502+
return; // `pull` will get called again
503+
}
504+
} else {
505+
controller.close();
506+
input.releaseLock();
507+
return;
508+
}
482509
}
483-
bytesRead += value.length;
484-
} else {
485-
controller.terminate();
510+
} catch (e) {
511+
controller.error(e);
486512
}
513+
},
514+
async cancel(reason) {
515+
await reader.cancel(reason);
487516
}
488-
});
517+
}, { highWaterMark: 0 });
489518
}
490519
if (begin < 0 && (end < 0 || end === Infinity)) {
491520
let lastBytes = [];

0 commit comments

Comments
 (0)