Skip to content

Commit a57ba0b

Browse files
authored
Parallelize xet uploads (network) (#1715)
See #1704 This parallelizes at least network calls, a complementary PR would put the wasm module into a worker to parallelize CPU. cc @assafvayner @jsulz @Kakulukian
1 parent aff2314 commit a57ba0b

File tree

4 files changed

+211
-35
lines changed

4 files changed

+211
-35
lines changed

packages/hub/src/lib/commit.ts

Lines changed: 40 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import { base64FromBytes } from "../utils/base64FromBytes";
2323
import { isFrontend } from "../utils/isFrontend";
2424
import { createBlobs } from "../utils/createBlobs";
2525
import { uploadShards } from "../utils/uploadShards";
26+
import { splitAsyncGenerator } from "../utils/splitAsyncGenerator";
27+
import { mergeAsyncGenerators } from "../utils/mergeAsyncGenerators";
2628

2729
const CONCURRENT_SHAS = 5;
2830
const CONCURRENT_LFS_UPLOADS = 5;
@@ -323,43 +325,46 @@ export async function* commitIter(params: CommitParams): AsyncGenerator<CommitPr
323325
};
324326
}
325327
}
326-
for await (const event of uploadShards(
327-
(async function* () {
328-
for (const obj of json.objects) {
329-
const op = shaToOperation.get(obj.oid);
330-
if (!op || !obj.actions?.upload) {
331-
continue;
332-
}
333-
abortSignal?.throwIfAborted();
334-
335-
yield { content: op.content, path: op.path, sha256: obj.oid };
328+
const source = (async function* () {
329+
for (const obj of json.objects) {
330+
const op = shaToOperation.get(obj.oid);
331+
if (!op || !obj.actions?.upload) {
332+
continue;
336333
}
337-
})(),
338-
{
339-
fetch: params.fetch,
340-
accessToken,
341-
hubUrl: params.hubUrl ?? HUB_URL,
342-
repo: repoId,
343-
// todo: maybe leave empty if PR?
344-
rev: params.branch ?? "main",
345-
}
346-
)) {
347-
if (event.event === "file") {
348-
yield {
349-
event: "fileProgress",
350-
path: event.path,
351-
progress: 1,
352-
state: "uploading",
353-
};
354-
} else if (event.event === "fileProgress") {
355-
yield {
356-
event: "fileProgress",
357-
path: event.path,
358-
progress: event.progress,
359-
state: "uploading",
360-
};
334+
abortSignal?.throwIfAborted();
335+
336+
yield { content: op.content, path: op.path, sha256: obj.oid };
361337
}
362-
}
338+
})();
339+
const sources = splitAsyncGenerator(source, 5);
340+
yield* mergeAsyncGenerators(
341+
sources.map(async function* (source) {
342+
for await (const event of uploadShards(source, {
343+
fetch: params.fetch,
344+
accessToken,
345+
hubUrl: params.hubUrl ?? HUB_URL,
346+
repo: repoId,
347+
// todo: maybe leave empty if PR?
348+
rev: params.branch ?? "main",
349+
})) {
350+
if (event.event === "file") {
351+
yield {
352+
event: "fileProgress" as const,
353+
path: event.path,
354+
progress: 1,
355+
state: "uploading" as const,
356+
};
357+
} else if (event.event === "fileProgress") {
358+
yield {
359+
event: "fileProgress" as const,
360+
path: event.path,
361+
progress: event.progress,
362+
state: "uploading" as const,
363+
};
364+
}
365+
}
366+
})
367+
);
363368
} else {
364369
yield* eventToGenerator<CommitProgressEvent, void>((yieldCallback, returnCallback, rejectCallback) => {
365370
return promisesQueueStreaming(
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
import { describe, expect, it } from "vitest";
2+
import { mergeAsyncGenerators } from "./mergeAsyncGenerators";
3+
import { splitAsyncGenerator } from "./splitAsyncGenerator";
4+
5+
describe("mergeAsyncGenerators", () => {
6+
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
7+
it("should merge multiple async generators", async () => {
8+
const generator1 = (async function* () {
9+
yield 1;
10+
yield 2;
11+
await sleep(250);
12+
yield 3;
13+
})();
14+
const generator2 = (async function* () {
15+
await sleep(100);
16+
yield 4;
17+
yield 5;
18+
yield 6;
19+
})();
20+
const generator3 = (async function* () {
21+
await sleep(200);
22+
yield 7;
23+
yield 8;
24+
yield 9;
25+
})();
26+
27+
const results: number[] = [];
28+
29+
for await (const result of mergeAsyncGenerators([generator1, generator2, generator3])) {
30+
results.push(result);
31+
}
32+
expect(results).toEqual([1, 2, 4, 5, 6, 7, 8, 9, 3]);
33+
});
34+
35+
it("should merge multiple async generators from a single source", async () => {
36+
const source = (async function* () {
37+
yield 1;
38+
yield 2;
39+
yield 3;
40+
yield 4;
41+
yield 5;
42+
yield 6;
43+
yield 7;
44+
yield 8;
45+
yield 9;
46+
})();
47+
const sources = splitAsyncGenerator(source, 3);
48+
49+
const generator1 = (async function* () {
50+
for await (const result of sources[0]) {
51+
yield { result, gen: 1 };
52+
await sleep(100);
53+
}
54+
})();
55+
56+
const generator2 = (async function* () {
57+
await sleep(50);
58+
for await (const result of sources[1]) {
59+
yield { result, gen: 2 };
60+
await sleep(100);
61+
}
62+
})();
63+
64+
const generator3 = (async function* () {
65+
await sleep(80);
66+
let count = 0;
67+
for await (const result of sources[2]) {
68+
yield { result, gen: 3 };
69+
count++;
70+
71+
if (count >= 2) {
72+
return;
73+
}
74+
}
75+
})();
76+
77+
const results: { result: number; gen: number }[] = [];
78+
for await (const result of mergeAsyncGenerators([generator1, generator2, generator3])) {
79+
results.push(result);
80+
}
81+
expect(results.length).toBe(9);
82+
expect(results).toEqual([
83+
{ result: 1, gen: 1 },
84+
{ result: 2, gen: 2 },
85+
{ result: 3, gen: 3 },
86+
{ result: 4, gen: 3 },
87+
{ result: 5, gen: 1 },
88+
{ result: 6, gen: 2 },
89+
{ result: 7, gen: 1 },
90+
{ result: 8, gen: 2 },
91+
{ result: 9, gen: 1 },
92+
]);
93+
});
94+
});
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/**
2+
* Merge outputs of multiple async generators.
3+
*/
4+
export async function* mergeAsyncGenerators<T>(generators: AsyncGenerator<T>[]): AsyncGenerator<T> {
5+
const executing: Promise<{ result: IteratorResult<T>; gen: AsyncGenerator<T> }>[] = [];
6+
7+
const generatorSymbol = Symbol("generator");
8+
9+
for (const gen of generators) {
10+
const p = gen.next().then((result) => ({ result, gen }));
11+
Object.defineProperty(p, generatorSymbol, {
12+
value: gen,
13+
});
14+
executing.push(p);
15+
}
16+
17+
while (executing.length > 0) {
18+
const next = await Promise.race(executing);
19+
const { result, gen } = next;
20+
21+
const index = executing.findIndex((p) => Object.getOwnPropertyDescriptor(p, generatorSymbol)?.value === gen);
22+
23+
if (result.done) {
24+
executing.splice(index, 1);
25+
continue;
26+
}
27+
28+
yield result.value;
29+
executing[index] = gen.next().then((result) => ({ result, gen }));
30+
Object.defineProperty(executing[index], generatorSymbol, {
31+
value: gen,
32+
});
33+
}
34+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/**
2+
* Split an async generator into multiple async generators, all drawing from the same source.
3+
*/
4+
export function splitAsyncGenerator<T>(source: AsyncGenerator<T>, n: number): Array<AsyncGenerator<T>> {
5+
if (n <= 0) {
6+
return [];
7+
}
8+
9+
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
10+
11+
let takenIndex: number | null = null;
12+
const generators: AsyncGenerator<T>[] = [];
13+
let remaining = n;
14+
for (let i = 0; i < n; i++) {
15+
generators.push({
16+
next: async () => {
17+
while (takenIndex !== null) {
18+
await sleep(1);
19+
}
20+
takenIndex = i;
21+
return source.next().then((r) => {
22+
takenIndex = null;
23+
return r;
24+
});
25+
},
26+
return: async () => {
27+
remaining--;
28+
if (remaining === 0) {
29+
return source.return(undefined);
30+
}
31+
return {
32+
done: true,
33+
value: undefined,
34+
};
35+
},
36+
throw: async (error) => {
37+
return source.throw(error);
38+
},
39+
[Symbol.asyncIterator]: () => generators[i],
40+
});
41+
}
42+
return generators;
43+
}

0 commit comments

Comments
 (0)