Skip to content

Commit c4da175

Browse files
committed
👍 Add accumulate() function for automatic RPC batching
Add accumulate() function that automatically batches multiple RPC calls that occur at the same timing during microtask processing. This enables parallel RPC execution with automatic batching and proper error handling.
1 parent d8bfd9d commit c4da175

File tree

5 files changed

+1574
-1
lines changed

5 files changed

+1574
-1
lines changed

batch/accumulate.ts

Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
import { nextTick } from "node:process";
2+
import type { Call, Context, Denops, Dispatcher, Meta } from "@denops/core";
3+
import { BatchError } from "@denops/core";
4+
import { AccumulateCancelledError } from "./error.ts";
5+
6+
const errorProp = Symbol("AccumulateErrorResult");
7+
8+
type ErrorResult = {
9+
[errorProp]:
10+
| { type: "error"; message: string; cause: unknown }
11+
| { type: "cancel"; cause: unknown }
12+
| { type: "unknown"; message: string; cause: unknown };
13+
};
14+
15+
class AccumulateHelper implements Denops {
16+
readonly #denops: Denops;
17+
readonly #calls: Call[] = [];
18+
readonly #results: unknown[] = [];
19+
readonly #disposer = Promise.withResolvers<void>();
20+
#closed = false;
21+
#resolvedWaiter = Promise.withResolvers<void>();
22+
23+
constructor(denops: Denops) {
24+
this.#denops = denops;
25+
}
26+
27+
static close(helper: AccumulateHelper): void {
28+
helper.#closed = true;
29+
helper.#disposer.promise.catch(() => {/* prevent unhandled rejection */});
30+
helper.#disposer.reject();
31+
}
32+
33+
get name(): string {
34+
return this.#denops.name;
35+
}
36+
37+
get meta(): Meta {
38+
return this.#denops.meta;
39+
}
40+
41+
get interrupted(): AbortSignal | undefined {
42+
return this.#denops.interrupted;
43+
}
44+
45+
get context(): Record<string | number | symbol, unknown> {
46+
return this.#denops.context;
47+
}
48+
49+
get dispatcher(): Dispatcher {
50+
return this.#denops.dispatcher;
51+
}
52+
53+
set dispatcher(dispatcher: Dispatcher) {
54+
this.#denops.dispatcher = dispatcher;
55+
}
56+
57+
async redraw(force?: boolean): Promise<void> {
58+
return await this.#denops.redraw(force);
59+
}
60+
61+
async call(fn: string, ...args: unknown[]): Promise<unknown> {
62+
this.#ensureAvailable();
63+
const call: Call = [fn, ...args];
64+
const [result] = await this.#waitResolved([call]);
65+
66+
if (isErrorResult(result)) {
67+
const error = result[errorProp];
68+
if (error.type === "error") {
69+
throw new Error(error.message, { cause: error.cause });
70+
} else if (error.type === "cancel") {
71+
const repr = `['${fn}', ...]`;
72+
throw new AccumulateCancelledError(
73+
`Call was cancelled due to another error in parallel execution: ${repr}`,
74+
{ calls: [call], cause: error.cause },
75+
);
76+
} else {
77+
throw new Error(error.message, { cause: error.cause });
78+
}
79+
}
80+
81+
return result;
82+
}
83+
84+
async batch(...calls: Call[]): Promise<unknown[]> {
85+
this.#ensureAvailable();
86+
if (calls.length === 0) {
87+
return [];
88+
}
89+
const results = await this.#waitResolved(calls);
90+
91+
const errorIndex = results.findIndex(isErrorResult);
92+
if (errorIndex >= 0) {
93+
const { [errorProp]: error } = results[errorIndex] as ErrorResult;
94+
if (error.type === "error") {
95+
throw new BatchError(error.message, results.slice(0, errorIndex));
96+
} else if (error.type === "cancel") {
97+
const [[fn]] = calls;
98+
const repr = `[['${fn}', ...], ... total ${calls.length} calls]`;
99+
throw new AccumulateCancelledError(
100+
`Batch calls were cancelled due to another error in parallel execution: ${repr}`,
101+
{ calls, cause: error.cause },
102+
);
103+
} else {
104+
throw new Error(error.message, { cause: error.cause });
105+
}
106+
}
107+
108+
return results;
109+
}
110+
111+
async cmd(cmd: string, ctx: Context = {}): Promise<void> {
112+
await this.call("denops#api#cmd", cmd, ctx);
113+
}
114+
115+
async eval(expr: string, ctx: Context = {}): Promise<unknown> {
116+
return await this.call("denops#api#eval", expr, ctx);
117+
}
118+
119+
async dispatch(
120+
name: string,
121+
fn: string,
122+
...args: unknown[]
123+
): Promise<unknown> {
124+
return await this.#denops.dispatch(name, fn, ...args);
125+
}
126+
127+
#ensureAvailable(): void {
128+
if (this.#closed) {
129+
throw new TypeError(
130+
"AccumulateHelper instance is not available outside of 'accumulate' block",
131+
);
132+
}
133+
}
134+
135+
async #waitResolved(calls: Call[]): Promise<unknown[]> {
136+
const start = this.#calls.length;
137+
this.#calls.push(...calls);
138+
const end = this.#calls.length;
139+
nextTick(() => {
140+
if (end === this.#calls.length) {
141+
this.#resolvePendingCalls();
142+
}
143+
});
144+
try {
145+
await Promise.race([
146+
this.#disposer.promise,
147+
this.#resolvedWaiter.promise,
148+
]);
149+
} catch {
150+
// Rethrow the error if the disposer is rejected.
151+
this.#ensureAvailable();
152+
}
153+
return this.#results.slice(start, end);
154+
}
155+
156+
async #resolvePendingCalls(): Promise<void> {
157+
const resultIndex = this.#results.length;
158+
const calls = this.#calls.slice(resultIndex);
159+
this.#results.length = this.#calls.length;
160+
const { resolve } = this.#resolvedWaiter;
161+
this.#resolvedWaiter = Promise.withResolvers();
162+
if (!this.#closed) {
163+
const results = await this.#resolveCalls(calls);
164+
this.#results.splice(resultIndex, results.length, ...results);
165+
}
166+
resolve();
167+
}
168+
169+
async #resolveCalls(calls: Call[]): Promise<unknown[]> {
170+
try {
171+
return await this.#denops.batch(...calls);
172+
} catch (error: unknown) {
173+
if (isBatchError(error)) {
174+
const { results, message } = error;
175+
const errorResult = {
176+
[errorProp]: { type: "error", message, cause: error },
177+
};
178+
const cancelledResults = calls.slice(results.length + 1)
179+
.map(() => ({
180+
[errorProp]: { type: "cancel", cause: error },
181+
}));
182+
return [...results, errorResult, ...cancelledResults];
183+
} else {
184+
const message = error instanceof Error ? error.message : String(error);
185+
const unknownErrors = calls.map(() => ({
186+
[errorProp]: { type: "unknown", message, cause: error },
187+
}));
188+
return unknownErrors;
189+
}
190+
}
191+
}
192+
}
193+
194+
function isBatchError(obj: unknown): obj is BatchError {
195+
return obj instanceof Error && obj.name === "BatchError";
196+
}
197+
198+
function isErrorResult(obj: unknown): obj is ErrorResult {
199+
return obj != null && Object.hasOwn(obj, errorProp);
200+
}
201+
202+
/**
203+
* Runs an `executor` function while automatically batching multiple RPCs.
204+
*
205+
* `accumulate()` allows you to write normal async functions while automatically
206+
* batching multiple RPCs that occur at the same timing (during microtask
207+
* processing) into a single RPC call.
208+
*
209+
* Note that RPC calls with side effects should be avoided, and if you do, the
210+
* order in which you call them should be carefully considered.
211+
*
212+
* @example
213+
* ```typescript
214+
* import { assertType, IsExact } from "jsr:@std/testing/types";
215+
* import type { Entrypoint } from "jsr:@denops/std";
216+
* import * as fn from "jsr:@denops/std/function";
217+
* import { accumulate } from "jsr:@denops/std/batch";
218+
*
219+
* export const main: Entrypoint = async (denops) => {
220+
* const results = await accumulate(denops, async (denops) => {
221+
* const lines = await fn.getline(denops, 1, "$");
222+
* return await Promise.all(lines.map(async (line, index) => {
223+
* const keyword = await fn.matchstr(denops, line, "\\k\\+");
224+
* const len = await fn.len(denops, keyword);
225+
* return {
226+
* lnum: index + 1,
227+
* keyword,
228+
* len,
229+
* };
230+
* }));
231+
* });
232+
*
233+
* assertType<
234+
* IsExact<
235+
* typeof results,
236+
* { lnum: number; keyword: string; len: number; }[]
237+
* >
238+
* >(true);
239+
* }
240+
* ```
241+
*
242+
* In the case of the example, the following 3 RPCs are called.
243+
*
244+
* 1. RPC call to `getline`.
245+
* 2. Multiple `matchstr` calls in one RPC.
246+
* 3. Multiple `len` calls in one RPC.
247+
*
248+
* @remarks
249+
* The `denops` instance passed as the argument to the `executor` function is
250+
* only valid within the `accumulate()` block. Attempting to use it outside the
251+
* block will result in an error when calling `denops.call()`, `denops.batch()`,
252+
* `denops.cmd()`, or `denops.eval()`.
253+
*/
254+
export async function accumulate<T extends unknown>(
255+
denops: Denops,
256+
executor: (helper: Denops) => T,
257+
): Promise<Awaited<T>> {
258+
const helper = new AccumulateHelper(denops);
259+
try {
260+
return await executor(helper);
261+
} finally {
262+
AccumulateHelper.close(helper);
263+
}
264+
}

0 commit comments

Comments
 (0)