Skip to content

Commit 2f1ed1b

Browse files
twisslarabr
authored andcommitted
Eliminate buffering in transform
A `TransformStream` currently always requires an internal queue (until whatwg/streams#1158 is resolved). Therefore, don't use `TransformStream` in `transform` anymore, but create a new `ReadableStream` with the transformed chunks directly, and with a `highWaterMark` of 0, such that the internal queue is always empty.
1 parent aaa4ae6 commit 2f1ed1b

File tree

1 file changed

+34
-13
lines changed

1 file changed

+34
-13
lines changed

lib/streams.js

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -248,24 +248,45 @@ function transform(input, process = () => undefined, finish = () => undefined) {
248248
return output;
249249
}
250250
if (isStream(input)) {
251-
return transformRaw(input, {
252-
async transform(value, controller) {
253-
try {
254-
const result = await process(value);
255-
if (result !== undefined) controller.enqueue(result);
256-
} catch(e) {
257-
controller.error(e);
258-
}
251+
let reader;
252+
let allDone = false;
253+
return new ReadableStream({
254+
start() {
255+
reader = input.getReader();
259256
},
260-
async flush(controller) {
257+
async pull(controller) {
258+
if (allDone) {
259+
controller.close();
260+
return;
261+
}
261262
try {
262-
const result = await finish();
263-
if (result !== undefined) controller.enqueue(result);
264-
} catch(e) {
263+
// Read repeatedly until we have a chunk to enqueue or until
264+
// we can close the stream, as `pull` won't get called again
265+
// until we call `enqueue` or `close`.
266+
while (true) {
267+
const { value, done } = await reader.read();
268+
allDone = done;
269+
const result = await (done ? finish : process)(value);
270+
if (result !== undefined) {
271+
controller.enqueue(result);
272+
return; // `pull` will get called again
273+
}
274+
if (done) {
275+
// If `finish` didn't return a chunk to enqueue, call
276+
// `close` here. Otherwise, it will get called in the
277+
// next call to `pull`, above (since `allDone == true`).
278+
controller.close();
279+
return;
280+
}
281+
}
282+
} catch (e) {
265283
controller.error(e);
266284
}
285+
},
286+
async cancel(reason) {
287+
await reader.cancel(reason);
267288
}
268-
});
289+
}, { highWaterMark: 0 });
269290
}
270291
const result1 = process(input);
271292
const result2 = finish();

0 commit comments

Comments
 (0)