Skip to content

Commit 105b0c0

Browse files
twisslarabr
authored andcommitted
Add optional queuingStrategy parameter to transform
1 parent 10af204 commit 105b0c0

File tree

1 file changed

+9
-2
lines changed

1 file changed

+9
-2
lines changed

lib/streams.js

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,12 +222,19 @@ function transformWithCancel(customCancel) {
222222

223223
/**
224224
* Transform a stream using helper functions which are called on each chunk, and on stream close, respectively.
225+
* Takes an optional queuing strategy for the resulting readable stream;
226+
* see https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/ReadableStream#queuingstrategy.
227+
* By default, the queueing strategy is non-buffering. When the `process`
228+
* function is asynchronous, it may be useful to pass a buffering
229+
* queuing strategy to enable multiple chunks to be processed in parallel;
230+
* e.g. pass `{ highWaterMark: 4 }` to process up to 4 chunks in parallel.
225231
* @param {ReadableStream|Uint8array|String} input
226232
* @param {Function} process
227233
* @param {Function} finish
234+
* @param {Object} queuingStrategy
228235
* @returns {ReadableStream|Uint8array|String}
229236
*/
230-
function transform(input, process = () => undefined, finish = () => undefined) {
237+
function transform(input, process = () => undefined, finish = () => undefined, queuingStrategy = { highWaterMark: 0 }) {
231238
if (isArrayStream(input)) {
232239
const output = new ArrayStream();
233240
(async () => {
@@ -288,7 +295,7 @@ function transform(input, process = () => undefined, finish = () => undefined) {
288295
async cancel(reason) {
289296
await reader.cancel(reason);
290297
}
291-
}, { highWaterMark: 0 });
298+
}, queuingStrategy);
292299
}
293300
const result1 = process(input);
294301
const result2 = finish();

0 commit comments

Comments
 (0)