|
| 1 | +import { nextTick } from "node:process"; |
| 2 | +import type { Call, Context, Denops, Dispatcher, Meta } from "@denops/core"; |
| 3 | +import { BatchError } from "@denops/core"; |
| 4 | +import { AccumulateCancelledError } from "./error.ts"; |
| 5 | + |
| 6 | +const errorProp = Symbol("AccumulateErrorResult"); |
| 7 | + |
| 8 | +type ErrorResult = { |
| 9 | + [errorProp]: |
| 10 | + | { type: "error"; message: string; cause: unknown } |
| 11 | + | { type: "cancel"; cause: unknown } |
| 12 | + | { type: "unknown"; message: string; cause: unknown }; |
| 13 | +}; |
| 14 | + |
| 15 | +class AccumulateHelper implements Denops { |
| 16 | + readonly #denops: Denops; |
| 17 | + readonly #calls: Call[] = []; |
| 18 | + readonly #results: unknown[] = []; |
| 19 | + readonly #disposer = Promise.withResolvers<void>(); |
| 20 | + #closed = false; |
| 21 | + #resolvedWaiter = Promise.withResolvers<void>(); |
| 22 | + |
| 23 | + constructor(denops: Denops) { |
| 24 | + this.#denops = denops; |
| 25 | + } |
| 26 | + |
| 27 | + static close(helper: AccumulateHelper): void { |
| 28 | + helper.#closed = true; |
| 29 | + helper.#disposer.promise.catch(() => {/* prevent unhandled rejection */}); |
| 30 | + helper.#disposer.reject(); |
| 31 | + } |
| 32 | + |
| 33 | + get name(): string { |
| 34 | + return this.#denops.name; |
| 35 | + } |
| 36 | + |
| 37 | + get meta(): Meta { |
| 38 | + return this.#denops.meta; |
| 39 | + } |
| 40 | + |
| 41 | + get interrupted(): AbortSignal | undefined { |
| 42 | + return this.#denops.interrupted; |
| 43 | + } |
| 44 | + |
| 45 | + get context(): Record<string | number | symbol, unknown> { |
| 46 | + return this.#denops.context; |
| 47 | + } |
| 48 | + |
| 49 | + get dispatcher(): Dispatcher { |
| 50 | + return this.#denops.dispatcher; |
| 51 | + } |
| 52 | + |
| 53 | + set dispatcher(dispatcher: Dispatcher) { |
| 54 | + this.#denops.dispatcher = dispatcher; |
| 55 | + } |
| 56 | + |
| 57 | + async redraw(force?: boolean): Promise<void> { |
| 58 | + return await this.#denops.redraw(force); |
| 59 | + } |
| 60 | + |
| 61 | + async call(fn: string, ...args: unknown[]): Promise<unknown> { |
| 62 | + this.#ensureAvailable(); |
| 63 | + const call: Call = [fn, ...args]; |
| 64 | + const [result] = await this.#waitResolved([call]); |
| 65 | + |
| 66 | + if (isErrorResult(result)) { |
| 67 | + const error = result[errorProp]; |
| 68 | + if (error.type === "error") { |
| 69 | + throw new Error(error.message, { cause: error.cause }); |
| 70 | + } else if (error.type === "cancel") { |
| 71 | + const repr = `['${fn}', ...]`; |
| 72 | + throw new AccumulateCancelledError( |
| 73 | + `Call was cancelled due to another error in parallel execution: ${repr}`, |
| 74 | + { calls: [call], cause: error.cause }, |
| 75 | + ); |
| 76 | + } else { |
| 77 | + throw new Error(error.message, { cause: error.cause }); |
| 78 | + } |
| 79 | + } |
| 80 | + |
| 81 | + return result; |
| 82 | + } |
| 83 | + |
| 84 | + async batch(...calls: Call[]): Promise<unknown[]> { |
| 85 | + this.#ensureAvailable(); |
| 86 | + if (calls.length === 0) { |
| 87 | + return []; |
| 88 | + } |
| 89 | + const results = await this.#waitResolved(calls); |
| 90 | + |
| 91 | + const errorIndex = results.findIndex(isErrorResult); |
| 92 | + if (errorIndex >= 0) { |
| 93 | + const { [errorProp]: error } = results[errorIndex] as ErrorResult; |
| 94 | + if (error.type === "error") { |
| 95 | + throw new BatchError(error.message, results.slice(0, errorIndex)); |
| 96 | + } else if (error.type === "cancel") { |
| 97 | + const [[fn]] = calls; |
| 98 | + const repr = `[['${fn}', ...], ... total ${calls.length} calls]`; |
| 99 | + throw new AccumulateCancelledError( |
| 100 | + `Batch calls were cancelled due to another error in parallel execution: ${repr}`, |
| 101 | + { calls, cause: error.cause }, |
| 102 | + ); |
| 103 | + } else { |
| 104 | + throw new Error(error.message, { cause: error.cause }); |
| 105 | + } |
| 106 | + } |
| 107 | + |
| 108 | + return results; |
| 109 | + } |
| 110 | + |
| 111 | + async cmd(cmd: string, ctx: Context = {}): Promise<void> { |
| 112 | + await this.call("denops#api#cmd", cmd, ctx); |
| 113 | + } |
| 114 | + |
| 115 | + async eval(expr: string, ctx: Context = {}): Promise<unknown> { |
| 116 | + return await this.call("denops#api#eval", expr, ctx); |
| 117 | + } |
| 118 | + |
| 119 | + async dispatch( |
| 120 | + name: string, |
| 121 | + fn: string, |
| 122 | + ...args: unknown[] |
| 123 | + ): Promise<unknown> { |
| 124 | + return await this.#denops.dispatch(name, fn, ...args); |
| 125 | + } |
| 126 | + |
| 127 | + #ensureAvailable(): void { |
| 128 | + if (this.#closed) { |
| 129 | + throw new TypeError( |
| 130 | + "AccumulateHelper instance is not available outside of 'accumulate' block", |
| 131 | + ); |
| 132 | + } |
| 133 | + } |
| 134 | + |
| 135 | + async #waitResolved(calls: Call[]): Promise<unknown[]> { |
| 136 | + const start = this.#calls.length; |
| 137 | + this.#calls.push(...calls); |
| 138 | + const end = this.#calls.length; |
| 139 | + nextTick(() => { |
| 140 | + if (end === this.#calls.length) { |
| 141 | + this.#resolvePendingCalls(); |
| 142 | + } |
| 143 | + }); |
| 144 | + try { |
| 145 | + await Promise.race([ |
| 146 | + this.#disposer.promise, |
| 147 | + this.#resolvedWaiter.promise, |
| 148 | + ]); |
| 149 | + } catch { |
| 150 | + // Rethrow the error if the disposer is rejected. |
| 151 | + this.#ensureAvailable(); |
| 152 | + } |
| 153 | + return this.#results.slice(start, end); |
| 154 | + } |
| 155 | + |
| 156 | + async #resolvePendingCalls(): Promise<void> { |
| 157 | + const resultIndex = this.#results.length; |
| 158 | + const calls = this.#calls.slice(resultIndex); |
| 159 | + this.#results.length = this.#calls.length; |
| 160 | + const { resolve } = this.#resolvedWaiter; |
| 161 | + this.#resolvedWaiter = Promise.withResolvers(); |
| 162 | + if (!this.#closed) { |
| 163 | + const results = await this.#resolveCalls(calls); |
| 164 | + this.#results.splice(resultIndex, results.length, ...results); |
| 165 | + } |
| 166 | + resolve(); |
| 167 | + } |
| 168 | + |
| 169 | + async #resolveCalls(calls: Call[]): Promise<unknown[]> { |
| 170 | + try { |
| 171 | + return await this.#denops.batch(...calls); |
| 172 | + } catch (error: unknown) { |
| 173 | + if (isBatchError(error)) { |
| 174 | + const { results, message } = error; |
| 175 | + const errorResult = { |
| 176 | + [errorProp]: { type: "error", message, cause: error }, |
| 177 | + }; |
| 178 | + const cancelledResults = calls.slice(results.length + 1) |
| 179 | + .map(() => ({ |
| 180 | + [errorProp]: { type: "cancel", cause: error }, |
| 181 | + })); |
| 182 | + return [...results, errorResult, ...cancelledResults]; |
| 183 | + } else { |
| 184 | + const message = error instanceof Error ? error.message : String(error); |
| 185 | + const unknownErrors = calls.map(() => ({ |
| 186 | + [errorProp]: { type: "unknown", message, cause: error }, |
| 187 | + })); |
| 188 | + return unknownErrors; |
| 189 | + } |
| 190 | + } |
| 191 | + } |
| 192 | +} |
| 193 | + |
| 194 | +function isBatchError(obj: unknown): obj is BatchError { |
| 195 | + return obj instanceof Error && obj.name === "BatchError"; |
| 196 | +} |
| 197 | + |
| 198 | +function isErrorResult(obj: unknown): obj is ErrorResult { |
| 199 | + return obj != null && Object.hasOwn(obj, errorProp); |
| 200 | +} |
| 201 | + |
| 202 | +/** |
| 203 | + * Runs an `executor` function while automatically batching multiple RPCs. |
| 204 | + * |
| 205 | + * `accumulate()` allows you to write normal async functions while automatically |
| 206 | + * batching multiple RPCs that occur at the same timing (during microtask |
| 207 | + * processing) into a single RPC call. |
| 208 | + * |
| 209 | + * Note that RPC calls with side effects should be avoided, and if you do, the |
| 210 | + * order in which you call them should be carefully considered. |
| 211 | + * |
| 212 | + * @example |
| 213 | + * ```typescript |
| 214 | + * import { assertType, IsExact } from "jsr:@std/testing/types"; |
| 215 | + * import type { Entrypoint } from "jsr:@denops/std"; |
| 216 | + * import * as fn from "jsr:@denops/std/function"; |
| 217 | + * import { accumulate } from "jsr:@denops/std/batch"; |
| 218 | + * |
| 219 | + * export const main: Entrypoint = async (denops) => { |
| 220 | + * const results = await accumulate(denops, async (denops) => { |
| 221 | + * const lines = await fn.getline(denops, 1, "$"); |
| 222 | + * return await Promise.all(lines.map(async (line, index) => { |
| 223 | + * const keyword = await fn.matchstr(denops, line, "\\k\\+"); |
| 224 | + * const len = await fn.len(denops, keyword); |
| 225 | + * return { |
| 226 | + * lnum: index + 1, |
| 227 | + * keyword, |
| 228 | + * len, |
| 229 | + * }; |
| 230 | + * })); |
| 231 | + * }); |
| 232 | + * |
| 233 | + * assertType< |
| 234 | + * IsExact< |
| 235 | + * typeof results, |
| 236 | + * { lnum: number; keyword: string; len: number; }[] |
| 237 | + * > |
| 238 | + * >(true); |
| 239 | + * } |
| 240 | + * ``` |
| 241 | + * |
| 242 | + * In the case of the example, the following 3 RPCs are called. |
| 243 | + * |
| 244 | + * 1. RPC call to `getline`. |
| 245 | + * 2. Multiple `matchstr` calls in one RPC. |
| 246 | + * 3. Multiple `len` calls in one RPC. |
| 247 | + * |
| 248 | + * @remarks |
| 249 | + * The `denops` instance passed as the argument to the `executor` function is |
| 250 | + * only valid within the `accumulate()` block. Attempting to use it outside the |
| 251 | + * block will result in an error when calling `denops.call()`, `denops.batch()`, |
| 252 | + * `denops.cmd()`, or `denops.eval()`. |
| 253 | + */ |
| 254 | +export async function accumulate<T extends unknown>( |
| 255 | + denops: Denops, |
| 256 | + executor: (helper: Denops) => T, |
| 257 | +): Promise<Awaited<T>> { |
| 258 | + const helper = new AccumulateHelper(denops); |
| 259 | + try { |
| 260 | + return await executor(helper); |
| 261 | + } finally { |
| 262 | + AccumulateHelper.close(helper); |
| 263 | + } |
| 264 | +} |
0 commit comments