diff --git a/batch/accumulate.ts b/batch/accumulate.ts new file mode 100644 index 0000000..f3a1e16 --- /dev/null +++ b/batch/accumulate.ts @@ -0,0 +1,264 @@ +import { nextTick } from "node:process"; +import type { Call, Context, Denops, Dispatcher, Meta } from "@denops/core"; +import { BatchError } from "@denops/core"; +import { AccumulateCancelledError } from "./error.ts"; + +const errorProp = Symbol("AccumulateErrorResult"); + +type ErrorResult = { + [errorProp]: + | { type: "error"; message: string; cause: unknown } + | { type: "cancel"; cause: unknown } + | { type: "unknown"; message: string; cause: unknown }; +}; + +class AccumulateHelper implements Denops { + readonly #denops: Denops; + readonly #calls: Call[] = []; + readonly #results: unknown[] = []; + readonly #disposer = Promise.withResolvers(); + #closed = false; + #resolvedWaiter = Promise.withResolvers(); + + constructor(denops: Denops) { + this.#denops = denops; + } + + static close(helper: AccumulateHelper): void { + helper.#closed = true; + helper.#disposer.promise.catch(() => {/* prevent unhandled rejection */}); + helper.#disposer.reject(); + } + + get name(): string { + return this.#denops.name; + } + + get meta(): Meta { + return this.#denops.meta; + } + + get interrupted(): AbortSignal | undefined { + return this.#denops.interrupted; + } + + get context(): Record { + return this.#denops.context; + } + + get dispatcher(): Dispatcher { + return this.#denops.dispatcher; + } + + set dispatcher(dispatcher: Dispatcher) { + this.#denops.dispatcher = dispatcher; + } + + async redraw(force?: boolean): Promise { + return await this.#denops.redraw(force); + } + + async call(fn: string, ...args: unknown[]): Promise { + this.#ensureAvailable(); + const call: Call = [fn, ...args]; + const [result] = await this.#waitResolved([call]); + + if (isErrorResult(result)) { + const error = result[errorProp]; + if (error.type === "error") { + throw new Error(error.message, { cause: error.cause }); + } else if (error.type === "cancel") { + const repr = `['${fn}', ...]`; + throw new AccumulateCancelledError( + `Call was cancelled due to another error in parallel execution: ${repr}`, + { calls: [call], cause: error.cause }, + ); + } else { + throw new Error(error.message, { cause: error.cause }); + } + } + + return result; + } + + async batch(...calls: Call[]): Promise { + this.#ensureAvailable(); + if (calls.length === 0) { + return []; + } + const results = await this.#waitResolved(calls); + + const errorIndex = results.findIndex(isErrorResult); + if (errorIndex >= 0) { + const { [errorProp]: error } = results[errorIndex] as ErrorResult; + if (error.type === "error") { + throw new BatchError(error.message, results.slice(0, errorIndex)); + } else if (error.type === "cancel") { + const [[fn]] = calls; + const repr = `[['${fn}', ...], ... total ${calls.length} calls]`; + throw new AccumulateCancelledError( + `Batch calls were cancelled due to another error in parallel execution: ${repr}`, + { calls, cause: error.cause }, + ); + } else { + throw new Error(error.message, { cause: error.cause }); + } + } + + return results; + } + + async cmd(cmd: string, ctx: Context = {}): Promise { + await this.call("denops#api#cmd", cmd, ctx); + } + + async eval(expr: string, ctx: Context = {}): Promise { + return await this.call("denops#api#eval", expr, ctx); + } + + async dispatch( + name: string, + fn: string, + ...args: unknown[] + ): Promise { + return await this.#denops.dispatch(name, fn, ...args); + } + + #ensureAvailable(): void { + if (this.#closed) { + throw new TypeError( + "AccumulateHelper instance is not available outside of 'accumulate' block", + ); + } + } + + async #waitResolved(calls: Call[]): Promise { + const start = this.#calls.length; + this.#calls.push(...calls); + const end = this.#calls.length; + nextTick(() => { + if (end === this.#calls.length) { + this.#resolvePendingCalls(); + } + }); + try { + await Promise.race([ + this.#disposer.promise, + this.#resolvedWaiter.promise, + ]); + } catch { + // Rethrow the error if the disposer is rejected. + this.#ensureAvailable(); + } + return this.#results.slice(start, end); + } + + async #resolvePendingCalls(): Promise { + const resultIndex = this.#results.length; + const calls = this.#calls.slice(resultIndex); + this.#results.length = this.#calls.length; + const { resolve } = this.#resolvedWaiter; + this.#resolvedWaiter = Promise.withResolvers(); + if (!this.#closed) { + const results = await this.#resolveCalls(calls); + this.#results.splice(resultIndex, results.length, ...results); + } + resolve(); + } + + async #resolveCalls(calls: Call[]): Promise { + try { + return await this.#denops.batch(...calls); + } catch (error: unknown) { + if (isBatchError(error)) { + const { results, message } = error; + const errorResult = { + [errorProp]: { type: "error", message, cause: error }, + }; + const cancelledResults = calls.slice(results.length + 1) + .map(() => ({ + [errorProp]: { type: "cancel", cause: error }, + })); + return [...results, errorResult, ...cancelledResults]; + } else { + const message = error instanceof Error ? error.message : String(error); + const unknownErrors = calls.map(() => ({ + [errorProp]: { type: "unknown", message, cause: error }, + })); + return unknownErrors; + } + } + } +} + +function isBatchError(obj: unknown): obj is BatchError { + return obj instanceof Error && obj.name === "BatchError"; +} + +function isErrorResult(obj: unknown): obj is ErrorResult { + return obj != null && Object.hasOwn(obj, errorProp); +} + +/** + * Runs an `executor` function while automatically batching multiple RPCs. + * + * `accumulate()` allows you to write normal async functions while automatically + * batching multiple RPCs that occur at the same timing (during microtask + * processing) into a single RPC call. + * + * Note that RPC calls with side effects should be avoided, and if you do, the + * order in which you call them should be carefully considered. + * + * @example + * ```typescript + * import { assertType, IsExact } from "jsr:@std/testing/types"; + * import type { Entrypoint } from "jsr:@denops/std"; + * import * as fn from "jsr:@denops/std/function"; + * import { accumulate } from "jsr:@denops/std/batch"; + * + * export const main: Entrypoint = async (denops) => { + * const results = await accumulate(denops, async (denops) => { + * const lines = await fn.getline(denops, 1, "$"); + * return await Promise.all(lines.map(async (line, index) => { + * const keyword = await fn.matchstr(denops, line, "\\k\\+"); + * const len = await fn.len(denops, keyword); + * return { + * lnum: index + 1, + * keyword, + * len, + * }; + * })); + * }); + * + * assertType< + * IsExact< + * typeof results, + * { lnum: number; keyword: string; len: number; }[] + * > + * >(true); + * } + * ``` + * + * In the case of the example, the following 3 RPCs are called. + * + * 1. RPC call to `getline`. + * 2. Multiple `matchstr` calls in one RPC. + * 3. Multiple `len` calls in one RPC. + * + * @remarks + * The `denops` instance passed as the argument to the `executor` function is + * only valid within the `accumulate()` block. Attempting to use it outside the + * block will result in an error when calling `denops.call()`, `denops.batch()`, + * `denops.cmd()`, or `denops.eval()`. + */ +export async function accumulate( + denops: Denops, + executor: (helper: Denops) => T, +): Promise> { + const helper = new AccumulateHelper(denops); + try { + return await executor(helper); + } finally { + AccumulateHelper.close(helper); + } +} diff --git a/batch/accumulate_test.ts b/batch/accumulate_test.ts new file mode 100644 index 0000000..4a51df9 --- /dev/null +++ b/batch/accumulate_test.ts @@ -0,0 +1,1245 @@ +import { flushPromises, peekPromiseState } from "@core/asyncutil"; +import { delay } from "@std/async"; +import { + assertEquals, + assertInstanceOf, + assertRejects, + assertStrictEquals, + assertStringIncludes, +} from "@std/assert"; +import { assertType, type IsExact } from "@std/testing/types"; +import { + assertSpyCallArgs, + assertSpyCalls, + resolvesNext, + spy, + stub, +} from "@std/testing/mock"; +import { DisposableStack } from "@nick/dispose"; +import { BatchError, type Denops } from "@denops/core"; +import { batch, collect } from "./mod.ts"; +import { DenopsStub, test } from "@denops/test"; + +import { accumulate } from "./accumulate.ts"; +import { AccumulateCancelledError } from "./error.ts"; + +function preventUnhandledRejection(promise: Promise): void { + promise.catch(() => {/* noop */}); +} + +Deno.test("accumulate() resolves", async (t) => { + const mocked_denops = new DenopsStub(); + const stubBatch = (...values: unknown[]) => + stub( + mocked_denops, + "batch", + (...calls) => { + if (calls.length > values.length) { + return Promise.reject(new Error("Too few values")); + } + return Promise.resolve(values.splice(0, calls.length)); + }, + ); + + await t.step("undefined", async () => { + using denops_batch = stubBatch(undefined); + const actual = await accumulate(mocked_denops, (_helper) => { + return undefined; + }); + assertType>(true); + assertEquals(actual, undefined); + assertSpyCalls(denops_batch, 0); + }); + await t.step("null", async () => { + using denops_batch = stubBatch(null); + const actual = await accumulate(mocked_denops, (helper) => { + return helper.call("eval", "v:none") as Promise; + }); + assertType>(true); + assertEquals(actual, null); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [["eval", "v:none"]], + ]); + }); + await t.step("number", async () => { + using denops_batch = stubBatch(42); + const actual = await accumulate(mocked_denops, (helper) => { + return helper.call("strlen", "foo") as Promise; + }); + assertType>(true); + assertEquals(actual, 42); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [["strlen", "foo"]], + ]); + }); + await t.step("string", async () => { + using denops_batch = stubBatch("foo"); + const actual = await accumulate(mocked_denops, (helper) => { + return helper.call("matchstr", "foo", ".*") as Promise; + }); + assertType>(true); + assertEquals(actual, "foo"); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [["matchstr", "foo", ".*"]], + ]); + }); + await t.step("boolean", async () => { + using denops_batch = stubBatch(true); + const actual = await accumulate(mocked_denops, (helper) => { + return helper.call("eval", "v:true") as Promise; + }); + assertType>(true); + assertEquals(actual, true); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [["eval", "v:true"]], + ]); + }); + await t.step("bigint", async () => { + using denops_batch = stubBatch(42); + const actual = await accumulate(mocked_denops, async (helper) => { + return BigInt(await helper.call("strlen", "foo") as number); + }); + assertType>(true); + assertEquals(actual, 42n); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [["strlen", "foo"]], + ]); + }); + await t.step("Object", async () => { + using denops_batch = stubBatch(42, "a", true); + const actual = await accumulate(mocked_denops, async (helper) => { + const [a, b, c] = await Promise.all([ + helper.call("strlen", "foo") as Promise, + helper.call("matchstr", "bar", "a") as Promise, + helper.call("eval", "v:true") as Promise, + ]); + return { a, b, c }; + }); + assertType< + IsExact + >(true); + assertEquals(actual, { a: 42, b: "a", c: true }); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [ + ["strlen", "foo"], + ["matchstr", "bar", "a"], + ["eval", "v:true"], + ], + ]); + }); + await t.step("Tuple", async () => { + using denops_batch = stubBatch(42, "a", true); + const actual = await accumulate(mocked_denops, (helper) => { + return Promise.all([ + helper.call("strlen", "foo") as Promise, + helper.call("matchstr", "bar", "a") as Promise, + helper.call("eval", "v:true") as Promise, + ]); + }); + assertType>(true); + assertEquals(actual, [42, "a", true]); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [ + ["strlen", "foo"], + ["matchstr", "bar", "a"], + ["eval", "v:true"], + ], + ]); + }); + await t.step("Array", async () => { + using denops_batch = stubBatch(42, 123, 39); + const actual = await accumulate(mocked_denops, (helper) => { + const items = ["foo", "bar", "baz"]; + return Promise.all(items.map( + (item) => helper.call("strlen", item) as Promise, + )); + }); + assertType>(true); + assertEquals(actual, [42, 123, 39]); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [ + ["strlen", "foo"], + ["strlen", "bar"], + ["strlen", "baz"], + ], + ]); + }); + await t.step("Set", async () => { + using denops_batch = stubBatch(42, 123, 39); + const actual = await accumulate(mocked_denops, async (helper) => { + return new Set( + await Promise.all([ + helper.call("strlen", "foo") as Promise, + helper.call("matchstr", "bar", "a") as Promise, + helper.call("strlen", "baz") as Promise, + ]), + ); + }); + assertType>>(true); + assertEquals(actual, new Set([42, 123, 39])); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [ + ["strlen", "foo"], + ["matchstr", "bar", "a"], + ["strlen", "baz"], + ], + ]); + }); + await t.step("Map values", async () => { + using denops_batch = stubBatch(42, 123, 39); + const actual = await accumulate(mocked_denops, async (helper) => { + const items = ["foo", "bar", "baz"]; + return new Map( + await Promise.all( + items.map(async (item) => + [item, await helper.call("strlen", item) as number] as const + ), + ), + ); + }); + assertType>>(true); + assertEquals( + actual, + new Map([ + ["foo", 42], + ["bar", 123], + ["baz", 39], + ]), + ); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [ + ["strlen", "foo"], + ["strlen", "bar"], + ["strlen", "baz"], + ], + ]); + }); + await t.step("chained Promise", async () => { + using denops_batch = stubBatch(42, 39, 123, 456); + const actual = await accumulate(mocked_denops, async (helper) => { + const [a, b] = await Promise.all([ + helper.call("strlen", "foo"), + helper.call("strlen", "bar"), + ]); + return await Promise.all([ + helper.call("stridx", "bar", "a", a) as Promise, + helper.call("stridx", "baz", "b", b) as Promise, + ]); + }); + assertType>(true); + assertEquals(actual, [123, 456]); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [ + ["strlen", "foo"], + ["strlen", "bar"], + ], + [ + ["stridx", "bar", "a", 42], + ["stridx", "baz", "b", 39], + ], + ]); + }); + await t.step("delayed Promise", async () => { + const values = [42, 123, 39]; + using denops_batch = stub( + mocked_denops, + "batch", + async (...calls) => { + if (calls.length > values.length) { + return Promise.reject(new Error("Too few values")); + } + const results = values.splice(0, calls.length); + await delay(50); + return results; + }, + ); + const actual = await accumulate(mocked_denops, async (helper) => { + return await Promise.all( + [ + helper.call("strlen", "foo") as Promise, + (async () => { + const b = helper.call("strlen", "bar") as Promise; + await delay(100); + const c = helper.call("strlen", "baz") as Promise; + return Promise.all([b, c]); + })(), + ] as const, + ); + }); + assertType< + IsExact + >(true); + assertEquals(actual, [42, [123, 39]]); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [ + ["strlen", "foo"], + ["strlen", "bar"], + ], + [["strlen", "baz"]], + ]); + }); + await t.step("0 delayed Promise", async () => { + const values = [42, 123, 39]; + using denops_batch = stub( + mocked_denops, + "batch", + async (...calls) => { + if (calls.length > values.length) { + return Promise.reject(new Error("Too few values")); + } + const results = values.splice(0, calls.length); + await delay(50); + return results; + }, + ); + const actual = await accumulate(mocked_denops, async (helper) => { + return await Promise.all( + [ + helper.call("strlen", "foo") as Promise, + (async () => { + const b = helper.call("strlen", "bar") as Promise; + await delay(0); + const c = helper.call("strlen", "baz") as Promise; + return Promise.all([b, c]); + })(), + ] as const, + ); + }); + assertType< + IsExact + >(true); + assertEquals(actual, [42, [123, 39]]); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [ + ["strlen", "foo"], + ["strlen", "bar"], + ], + [["strlen", "baz"]], + ]); + }); + await t.step("nested 'accumulate()'", async () => { + using denops_batch = stubBatch(1, 2, 3, 4); + const actual = await accumulate(mocked_denops, (helper) => { + return Promise.all([ + helper.call("strlen", "foo") as Promise, + accumulate(helper, (innerHelper) => { + return Promise.all([ + innerHelper.call("stridx", "bar", "a") as Promise, + innerHelper.call("stridx", "baz", "z") as Promise, + ]); + }), + helper.call("strlen", "quux") as Promise, + ]); + }); + assertType< + IsExact + >(true); + assertEquals(actual, [1, [3, 4], 2]); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [ + ["strlen", "foo"], + ["strlen", "quux"], + ["stridx", "bar", "a"], + ["stridx", "baz", "z"], + ], + ]); + }); + await t.step("nested 'batch()'", async () => { + using denops_batch = stubBatch(1, 2, 3, 4); + const actual = await accumulate(mocked_denops, (helper) => { + return Promise.all([ + helper.call("strlen", "foo") as Promise, + batch(helper, async (batchHelper) => { + await batchHelper.call("stridx", "bar", "a"); + await batchHelper.call("stridx", "baz", "z"); + }), + helper.call("strlen", "quux") as Promise, + ]); + }); + assertType>(true); + assertEquals(actual, [1, undefined, 2]); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [ + ["strlen", "foo"], + ["strlen", "quux"], + ["stridx", "bar", "a"], + ["stridx", "baz", "z"], + ], + ]); + }); + await t.step("nested 'collect()'", async () => { + using denops_batch = stubBatch(1, 2, 3, 4); + const actual = await accumulate(mocked_denops, (helper) => { + return Promise.all([ + helper.call("strlen", "foo") as Promise, + collect(helper, (collectHelper) => [ + collectHelper.call("stridx", "bar", "a") as Promise, + collectHelper.call("stridx", "baz", "z") as Promise, + ]), + helper.call("strlen", "quux") as Promise, + ]); + }); + assertType< + IsExact + >(true); + assertEquals(actual, [1, [3, 4], 2]); + assertEquals(denops_batch.calls.map((c) => c.args), [ + [ + ["strlen", "foo"], + ["strlen", "quux"], + ["stridx", "bar", "a"], + ["stridx", "baz", "z"], + ], + ]); + }); +}); + +test({ + mode: "all", + name: "accumulate()", + fn: async (denops, t) => { + await denops.call("execute", [ + "let g:test_fn_call_args = []", + "function! TestFn(...) abort", + " call add(g:test_fn_call_args, a:000->copy())", + "endfunction", + ]); + + await t.step("when the executor is not callable", async (t) => { + await t.step("rejects an error", async () => { + await assertRejects( + // deno-lint-ignore no-explicit-any + () => accumulate(denops, null as any), + TypeError, + ); + }); + }); + await t.step("when the executor resolves", async (t) => { + using denops_batch = spy(denops, "batch"); + let helperPromise: Promise; + await accumulate(denops, (helper) => { + helperPromise = helper.call("strlen", "foo"); + preventUnhandledRejection(helperPromise); + }); + + await t.step("rejects pending batch 'calls' immediately", async () => { + await flushPromises(); + assertEquals(await peekPromiseState(helperPromise), "rejected"); + await assertRejects( + () => helperPromise, + TypeError, + "not available outside", + ); + }); + await t.step("does not call underlying 'denops.batch()'", () => { + assertSpyCalls(denops_batch, 0); + }); + }); + await t.step("when the executor throws", async (t) => { + using denops_batch = spy(denops, "batch"); + const error = new Error("test error"); + let helperPromise: Promise; + const accumulatePromise = accumulate(denops, (helper) => { + helperPromise = helper.call("strlen", "foo"); + preventUnhandledRejection(helperPromise); + throw error; + }); + preventUnhandledRejection(accumulatePromise); + await flushPromises(); + + await t.step("the helper method rejects immediately", async () => { + assertEquals(await peekPromiseState(helperPromise), "rejected"); + await assertRejects( + () => helperPromise, + TypeError, + "not available outside", + ); + }); + await t.step("rejects the actual error immediately", async () => { + assertEquals(await peekPromiseState(accumulatePromise), "rejected"); + const actual = await assertRejects(() => accumulatePromise); + assertStrictEquals(actual, error); + }); + await t.step("does not call underlying 'denops.batch()'", () => { + assertSpyCalls(denops_batch, 0); + }); + }); + await t.step("when the executor rejects", async (t) => { + using denops_batch = spy(denops, "batch"); + const error = new Error("test error"); + let helperPromise: Promise; + const accumulatePromise = accumulate(denops, (helper) => { + helperPromise = helper.call("strlen", "foo"); + preventUnhandledRejection(helperPromise); + return Promise.reject(error); + }); + preventUnhandledRejection(accumulatePromise); + await flushPromises(); + + await t.step("the helper method rejects immediately", async () => { + assertEquals(await peekPromiseState(helperPromise), "rejected"); + await assertRejects( + () => helperPromise, + TypeError, + "not available outside", + ); + }); + await t.step("rejects the actual error immediately", async () => { + assertEquals(await peekPromiseState(accumulatePromise), "rejected"); + const actual = await assertRejects(() => accumulatePromise); + assertStrictEquals(actual, error); + }); + await t.step("does not call underlying 'denops.batch()'", () => { + assertSpyCalls(denops_batch, 0); + }); + }); + await t.step( + "when the executor rejects while underlying 'denops.batch()' executing", + async (t) => { + using stack = new DisposableStack(); + const underlyingDenopsWaiter = stack.adopt( + Promise.withResolvers(), + (t) => t.reject(), + ); + const denops_batch = stack.use(stub( + denops, + "batch", + () => underlyingDenopsWaiter.promise, + )); + const error = new Error("test error"); + let helperPromise: Promise; + const accumulatePromise = accumulate(denops, async (helper) => { + helperPromise = helper.call("strlen", "foo"); + preventUnhandledRejection(helperPromise); + await delay(0); // Ensure underlying batch is executing + return Promise.reject(error); + }); + await flushPromises(); + + await t.step("the helper method rejects immediately", async () => { + assertEquals(await peekPromiseState(helperPromise), "rejected"); + await assertRejects( + () => helperPromise, + TypeError, + "not available outside", + ); + }); + await t.step("rejects the actual error immediately", async () => { + assertEquals(await peekPromiseState(accumulatePromise), "rejected"); + const actual = await assertRejects(() => accumulatePromise); + assertStrictEquals(actual, error); + }); + await t.step("calls underlying 'denops.batch()'", () => { + assertSpyCalls(denops_batch, 1); + assertSpyCallArgs(denops_batch, 0, [["strlen", "foo"]]); + }); + await t.step("the underlying rejection should handled", async () => { + underlyingDenopsWaiter.reject( + new Error("This error should be ignored"), + ); + await flushPromises(); + }); + }, + ); + await t.step("AccumulateHelper", async (t) => { + await t.step(".redraw()", async (t) => { + await t.step("call underlying 'denops.redraw()'", async () => { + using denops_redraw = stub( + denops, + "redraw", + (): Promise => denops_redraw.original.call(denops), + ); + await accumulate(denops, async (helper) => { + await helper.redraw(); + }); + assertSpyCalls(denops_redraw, 1); + }); + await t.step("when underlying 'denops.redraw()' rejects", async (t) => { + await t.step("rejects an error", async (t) => { + using _denops_redraw = stub( + denops, + "redraw", + (): Promise => Promise.reject(new Error("Network error")), + ); + let error: Error; + await accumulate(denops, async (helper) => { + error = await assertRejects( + () => helper.redraw(), + Error, + "Network error", + ); + }); + await t.step("with the stack trace contains the call site", () => { + assertStringIncludes( + error.stack ?? "", + "AccumulateHelper.redraw", + ); + }); + }); + }); + }); + await t.step(".call()", async (t) => { + await t.step("calls Vim function", async () => { + await denops.call("execute", [ + "let g:test_fn_call_args = []", + ]); + await accumulate(denops, async (helper) => { + await helper.call("TestFn", "foo", 1, true); + await Promise.all([ + helper.call("TestFn", "a"), + helper.call("TestFn", "b"), + helper.call("TestFn", "c"), + ]); + }); + const actual = await denops.eval("g:test_fn_call_args"); + assertEquals(actual, [ + ["foo", 1, true], + ["a"], + ["b"], + ["c"], + ]); + }); + await t.step("resolves a result of Vim function", async () => { + let actual: unknown; + await accumulate(denops, async (helper) => { + actual = await helper.call("range", 2, 4); + }); + assertEquals(actual, [2, 3, 4]); + }); + await t.step("rejects an error which Vim throws", async (t) => { + let error: Error; + await accumulate(denops, async (helper) => { + error = await assertRejects( + () => helper.call("notexistsfn"), + Error, + "Unknown function: notexistsfn", + ); + }); + await t.step("and the stack trace contains the call site", () => { + assertStringIncludes( + error.stack ?? "", + "AccumulateHelper.call", + ); + }); + }); + await t.step( + "when an error occurs during parallel execution", + async (t) => { + let actual: PromiseSettledResult[]; + await accumulate(denops, async (helper) => { + actual = await Promise.allSettled([ + helper.call("range", 1, 3), + helper.call("range", 2, 4), + helper.call("notexistsfn"), + helper.call("range", 0, 2), + helper.call("range", 3, 5), + ]); + }); + await t.step("calls before the error resolves", () => { + assertEquals(actual[0].status, "fulfilled"); + assertEquals(actual[1].status, "fulfilled"); + assertEquals( + (actual[0] as PromiseFulfilledResult).value, + [1, 2, 3], + ); + assertEquals( + (actual[1] as PromiseFulfilledResult).value, + [2, 3, 4], + ); + }); + await t.step("the invalid call rejects the actual error", () => { + assertEquals(actual[2].status, "rejected"); + const error = (actual[2] as PromiseRejectedResult).reason; + assertInstanceOf(error, Error); + assertStringIncludes( + error.message, + "Unknown function: notexistsfn", + ); + assertStringIncludes( + error.stack ?? "", + "AccumulateHelper.call", + ); + }); + await t.step( + "calls after the error rejects an AccumulateCancelledError", + () => { + assertEquals(actual[3].status, "rejected"); + assertEquals(actual[4].status, "rejected"); + const error1 = (actual[3] as PromiseRejectedResult).reason; + const error2 = (actual[4] as PromiseRejectedResult).reason; + assertInstanceOf(error1, AccumulateCancelledError); + assertInstanceOf(error2, AccumulateCancelledError); + assertStringIncludes(error1.message, "['range', ...]"); + assertStringIncludes(error2.message, "['range', ...]"); + assertEquals(error1.calls, [["range", 0, 2]]); + assertEquals(error2.calls, [["range", 3, 5]]); + assertStringIncludes( + error1.stack ?? "", + "AccumulateHelper.call", + ); + assertStringIncludes( + error2.stack ?? "", + "AccumulateHelper.call", + ); + }, + ); + }, + ); + await t.step( + "when underlying 'denops.batch()' throws a non-BatchError", + async (t) => { + const underlyingError = new Error("Network error"); + using denops_batch = stub( + denops, + "batch", + resolvesNext([underlyingError]), + ); + let actual: PromiseSettledResult[]; + await accumulate(denops, async (helper) => { + actual = await Promise.allSettled([ + helper.call("range", 0, 2), + helper.call("range", 3, 5), + ]); + }); + assertSpyCalls(denops_batch, 1); + await t.step("the first call rejects the actual error", () => { + assertEquals(actual[0].status, "rejected"); + const error = (actual[0] as PromiseRejectedResult).reason; + assertInstanceOf(error, Error); + assertEquals(error.message, "Network error"); + assertStrictEquals(error.cause, underlyingError); + assertStringIncludes( + error.stack ?? "", + "AccumulateHelper.call", + ); + }); + await t.step("the second call rejects the actual error", () => { + assertEquals(actual[1].status, "rejected"); + const error = (actual[1] as PromiseRejectedResult).reason; + assertInstanceOf(error, Error); + assertEquals(error.message, "Network error"); + assertStrictEquals(error.cause, underlyingError); + assertStringIncludes( + error.stack ?? "", + "AccumulateHelper.call", + ); + }); + }, + ); + await t.step( + "when underlying 'denops.batch()' throws a non-Error", + async (t) => { + const underlyingError = 42; + using denops_batch = stub( + denops, + "batch", + () => Promise.reject(underlyingError), + ); + let actual: PromiseSettledResult[]; + await accumulate(denops, async (helper) => { + actual = await Promise.allSettled([ + helper.call("range", 0, 2), + helper.call("range", 3, 5), + ]); + }); + assertSpyCalls(denops_batch, 1); + await t.step("the first call rejects the actual error", () => { + assertEquals(actual[0].status, "rejected"); + const error = (actual[0] as PromiseRejectedResult).reason; + assertInstanceOf(error, Error); + assertEquals(error.message, "42"); + assertStrictEquals(error.cause, underlyingError); + assertStringIncludes( + error.stack ?? "", + "AccumulateHelper.call", + ); + }); + await t.step("the second call rejects the actual error", () => { + assertEquals(actual[1].status, "rejected"); + const error = (actual[1] as PromiseRejectedResult).reason; + assertInstanceOf(error, Error); + assertEquals(error.message, "42"); + assertStrictEquals(error.cause, underlyingError); + assertStringIncludes( + error.stack ?? "", + "AccumulateHelper.call", + ); + }); + }, + ); + await t.step( + "when new calls are made while underlying 'denops.batch()' executing", + async (t) => { + using denops_batch = stub( + denops, + "batch", + async (...calls): Promise => { + await delay(10); + return await denops_batch.original.apply(denops, calls); + }, + ); + let preceding: Promise; + let delayed: Promise; + await accumulate(denops, async (helper) => { + preceding = helper.call("range", 0, 2); + delayed = (async () => { + await delay(0); // Ensure underlying batch is executing + return helper.call("range", 1, 3); + })(); + await Promise.allSettled([preceding, delayed]); + }); + assertSpyCalls(denops_batch, 2); + await t.step("the preceding call resolves", async () => { + assertEquals(await preceding, [0, 1, 2]); + }); + await t.step("the delayed call resolves", async () => { + assertEquals(await delayed, [1, 2, 3]); + }); + }, + ); + }); + await t.step(".cmd()", async (t) => { + await t.step("executes Vim command", async () => { + await denops.call("execute", [ + "let g:test_fn_call_args = []", + ]); + await accumulate(denops, async (helper) => { + await helper.cmd("call TestFn('foo', 1, v:true)"); + await Promise.all([ + helper.cmd("call TestFn('a')"), + helper.cmd("call TestFn(value)", { value: "b" }), + helper.cmd("call TestFn(value)", { value: "c" }), + ]); + }); + const actual = await denops.eval("g:test_fn_call_args"); + assertEquals(actual, [ + ["foo", 1, true], + ["a"], + ["b"], + ["c"], + ]); + }); + await t.step("rejects an error which Vim throws", async (t) => { + let error: Error; + await accumulate(denops, async (helper) => { + error = await assertRejects( + () => helper.cmd("call notexistsfn()"), + Error, + "Unknown function: notexistsfn", + ); + }); + await t.step("and the stack trace contains the call site", () => { + assertStringIncludes( + error.stack ?? "", + "AccumulateHelper.cmd", + ); + }); + }); + }); + await t.step(".eval()", async (t) => { + await t.step("evaluates Vim expression", async () => { + await denops.call("execute", [ + "let g:test_fn_call_args = []", + ]); + await accumulate(denops, async (helper) => { + await helper.eval("TestFn('foo', 1, v:true)"); + await Promise.all([ + helper.eval("TestFn('a')"), + helper.eval("TestFn(value)", { value: "b" }), + helper.eval("TestFn(value)", { value: "c" }), + ]); + }); + const actual = await denops.eval("g:test_fn_call_args"); + assertEquals(actual, [ + ["foo", 1, true], + ["a"], + ["b"], + ["c"], + ]); + }); + await t.step("resolves a result of Vim expression", async () => { + let actual: unknown; + await accumulate(denops, async (helper) => { + actual = await helper.eval("range(2, 4)"); + }); + assertEquals(actual, [2, 3, 4]); + }); + await t.step("rejects an error which Vim throws", async (t) => { + let error: Error; + await accumulate(denops, async (helper) => { + error = await assertRejects( + () => helper.eval("notexistsfn()"), + Error, + "Unknown function: notexistsfn", + ); + }); + await t.step("and the stack trace contains the call site", () => { + assertStringIncludes( + error.stack ?? "", + "AccumulateHelper.eval", + ); + }); + }); + }); + await t.step(".batch()", async (t) => { + await t.step("calls Vim functions", async () => { + await denops.call("execute", [ + "let g:test_fn_call_args = []", + ]); + await accumulate(denops, async (helper) => { + await helper.batch( + ["TestFn", "foo", 1, true], + ["TestFn", "a"], + ["TestFn", "b"], + ["TestFn", "c"], + ); + }); + const actual = await denops.eval("g:test_fn_call_args"); + assertEquals(actual, [ + ["foo", 1, true], + ["a"], + ["b"], + ["c"], + ]); + }); + await t.step("resolves results of Vim functions", async () => { + let actual: unknown; + await accumulate(denops, async (helper) => { + actual = await helper.batch( + ["range", 0, 2], + ["range", 2, 4], + ["matchstr", "hello", "el*"], + ); + }); + assertEquals(actual, [ + [0, 1, 2], + [2, 3, 4], + "ell", + ]); + }); + await t.step("resolves an empty array if no arguments", async () => { + using denops_batch = spy(denops, "batch"); + let actual: unknown; + await accumulate(denops, async (helper) => { + actual = await helper.batch(); + }); + assertEquals(actual, []); + assertSpyCalls(denops_batch, 0); + }); + await t.step("rejects a BatchError which Vim throws", async (t) => { + let error: BatchError; + await accumulate(denops, async (helper) => { + error = await assertRejects( + () => + helper.batch( + ["range", 3], + ["range", 2, 4], + ["notexistsfn"], + ["range", 3], + ), + BatchError, + "Unknown function: notexistsfn", + ); + assertEquals(error.results, [[0, 1, 2], [2, 3, 4]]); + }); + await t.step("and the stack trace contains the call site", () => { + assertStringIncludes( + error.stack ?? "", + "AccumulateHelper.batch", + ); + }); + }); + await t.step( + "when an error occurs during parallel execution", + async (t) => { + let actual: PromiseSettledResult[]; + await accumulate(denops, async (helper) => { + actual = await Promise.allSettled([ + helper.batch( + ["range", 1, 3], + ["range", 2, 4], + ), + helper.batch( + ["range", 1, 3], + ["notexistsfn"], + ["range", 2, 4], + ), + helper.batch( + ["range", 0, 2], + ["range", 3, 5], + ), + ]); + }); + await t.step("calls before the error resolves", () => { + assertEquals(actual[0].status, "fulfilled"); + assertEquals( + (actual[0] as PromiseFulfilledResult).value, + [[1, 2, 3], [2, 3, 4]], + ); + }); + await t.step("the invalid call rejects a BatchError", () => { + assertEquals(actual[1].status, "rejected"); + const error = (actual[1] as PromiseRejectedResult).reason; + assertInstanceOf(error, BatchError); + assertStringIncludes( + error.message, + "Unknown function: notexistsfn", + ); + assertEquals(error.results, [[1, 2, 3]]); + assertStringIncludes( + error.stack ?? "", + "AccumulateHelper.batch", + ); + }); + await t.step( + "calls after the error rejects an AccumulateCancelledError", + () => { + assertEquals(actual[2].status, "rejected"); + const error = (actual[2] as PromiseRejectedResult).reason; + assertInstanceOf(error, AccumulateCancelledError); + assertStringIncludes( + error.message, + "[['range', ...], ... total 2 calls]", + ); + assertEquals(error.calls, [["range", 0, 2], ["range", 3, 5]]); + assertStringIncludes( + error.stack ?? "", + "AccumulateHelper.batch", + ); + }, + ); + }, + ); + await t.step( + "when underlying 'denops.batch()' throws a non-BatchError", + async (t) => { + const underlyingError = new Error("Network error"); + using denops_batch = stub( + denops, + "batch", + resolvesNext([underlyingError]), + ); + let actual: Promise; + await accumulate(denops, async (helper) => { + actual = helper.batch( + ["range", 1, 3], + ["range", 2, 4], + ); + await Promise.allSettled([actual]); + }); + assertSpyCalls(denops_batch, 1); + await t.step("rejects the actual error", async () => { + const error = await assertRejects( + () => actual, + Error, + "Network error", + ); + assertStrictEquals(error.cause, underlyingError); + assertStringIncludes( + error.stack ?? "", + "AccumulateHelper.batch", + ); + }); + }, + ); + }); + await t.step(".dispatch()", async (t) => { + await t.step("calls 'denops.dispatch()'", async () => { + using denops_dispatch = stub( + denops, + "dispatch", + resolvesNext(["one", "two", "three"]), + ); + await accumulate(denops, async (helper) => { + await helper.dispatch("pluginA", "foo", "bar", 42, false); + await Promise.all([ + helper.dispatch("pluginA", "baz", 1), + helper.dispatch("pluginB", "qux", 2), + ]); + }); + assertEquals(denops_dispatch.calls.map((c) => c.args), [ + ["pluginA", "foo", "bar", 42, false], + ["pluginA", "baz", 1], + ["pluginB", "qux", 2], + ]); + }); + await t.step("resolves a result of 'denops.dispatch()'", async () => { + using _denops_dispatch = stub( + denops, + "dispatch", + resolvesNext(["one"]), + ); + let actual: unknown; + await accumulate(denops, async (helper) => { + actual = await helper.dispatch("pluginA", "foo", "bar"); + }); + assertEquals(actual, "one"); + }); + await t.step( + "rejects an error which the 'denops.dispatch()' rejects", + async (t) => { + using _denops_dispatch = stub( + denops, + "dispatch", + () => { + throw new Error("test plugin error"); + }, + ); + let error: Error; + await accumulate(denops, async (helper) => { + error = await assertRejects( + () => helper.dispatch("pluginA", "foo", "bar"), + Error, + "test plugin error", + ); + }); + await t.step("and the stack trace contains the call site", () => { + assertStringIncludes( + error.stack ?? "", + "AccumulateHelper.dispatch", + ); + }); + }, + ); + }); + await t.step(".name", async (t) => { + await t.step("getter returns 'denops.name'", async () => { + let actual: unknown; + await accumulate(denops, (helper) => { + actual = helper.name; + }); + assertStrictEquals(actual, denops.name); + }); + }); + await t.step(".meta", async (t) => { + await t.step("getter returns 'denops.meta'", async () => { + let actual: unknown; + await accumulate(denops, (helper) => { + actual = helper.meta; + }); + assertStrictEquals(actual, denops.meta); + }); + }); + await t.step(".interrupted", async (t) => { + await t.step("getter returns 'denops.interrupted'", async () => { + let actual: unknown; + await accumulate(denops, (helper) => { + actual = helper.interrupted; + }); + assertStrictEquals(actual, denops.interrupted); + }); + }); + await t.step(".context", async (t) => { + await t.step("getter returns 'denops.context'", async () => { + let actual: unknown; + await accumulate(denops, (helper) => { + actual = helper.context; + }); + assertStrictEquals(actual, denops.context); + }); + }); + await t.step(".dispatcher", async (t) => { + const MY_DISPATCHER = { + foo: () => {}, + }; + + await t.step("setter sets to 'denops.dispatcher'", async () => { + using stack = new DisposableStack(); + stack.adopt(denops.dispatcher, (saved) => { + denops.dispatcher = saved; + }); + await accumulate(denops, (helper) => { + helper.dispatcher = MY_DISPATCHER; + }); + assertStrictEquals(denops.dispatcher, MY_DISPATCHER); + }); + await t.step("getter returns 'denops.dispatcher'", async () => { + using stack = new DisposableStack(); + stack.adopt(denops.dispatcher, (saved) => { + denops.dispatcher = saved; + }); + denops.dispatcher = MY_DISPATCHER; + let actual: unknown; + await accumulate(denops, (helper) => { + actual = helper.dispatcher; + }); + assertStrictEquals(actual, MY_DISPATCHER); + }); + }); + await t.step("when outside of the 'accumulate()' block", async (t) => { + await t.step(".call()", async (t) => { + using denops_batch = spy(denops, "batch"); + let helper_outside: Denops; + await accumulate(denops, (helper) => { + helper_outside = helper; + }); + + await t.step("rejects an error", async () => { + await assertRejects( + () => helper_outside.call("range", 0), + TypeError, + "not available outside", + ); + }); + await t.step("does not call 'denops.batch()'", () => { + assertSpyCalls(denops_batch, 0); + }); + }); + await t.step(".cmd()", async (t) => { + using denops_batch = spy(denops, "batch"); + let helper_outside: Denops; + await accumulate(denops, (helper) => { + helper_outside = helper; + }); + + await t.step("rejects an error", async () => { + await assertRejects( + () => helper_outside.cmd("echo 'hello'"), + TypeError, + "not available outside", + ); + }); + await t.step("does not call 'denops.batch()'", () => { + assertSpyCalls(denops_batch, 0); + }); + }); + await t.step(".eval()", async (t) => { + using denops_batch = spy(denops, "batch"); + let helper_outside: Denops; + await accumulate(denops, (helper) => { + helper_outside = helper; + }); + + await t.step("rejects an error", async () => { + await assertRejects( + () => helper_outside.eval("123"), + TypeError, + "not available outside", + ); + }); + await t.step("does not call 'denops.batch()'", () => { + assertSpyCalls(denops_batch, 0); + }); + }); + await t.step(".batch()", async (t) => { + using denops_batch = spy(denops, "batch"); + let helper_outside: Denops; + await accumulate(denops, (helper) => { + helper_outside = helper; + }); + + await t.step("rejects an error", async () => { + await assertRejects( + () => helper_outside.batch(["range", 0]), + TypeError, + "not available outside", + ); + }); + await t.step("does not call 'denops.batch()'", () => { + assertSpyCalls(denops_batch, 0); + }); + }); + }); + }); + }, +}); diff --git a/batch/error.ts b/batch/error.ts new file mode 100644 index 0000000..530ee98 --- /dev/null +++ b/batch/error.ts @@ -0,0 +1,43 @@ +import type { Call } from "@denops/core"; + +/** + * Options for creating an {@linkcode AccumulateCancelledError}. + */ +export interface AccumulateCancelledErrorOptions extends ErrorOptions { + /** + * Information about the cancelled Vim/Neovim function calls. + */ + calls?: readonly Call[]; +} + +/** + * Error thrown when a Vim/Neovim function call is cancelled due to another + * error in a parallel execution within the same batch. + * + * This error occurs when multiple Vim/Neovim function calls are executed in + * parallel using `Promise.all()` or similar constructs within an `accumulate()` + * block, and one of the calls fails, causing the remaining calls in the same + * batch to be cancelled. + */ +export class AccumulateCancelledError extends Error { + static { + this.prototype.name = "AccumulateCancelledError"; + } + + /** + * Information about the cancelled Vim/Neovim function calls. + */ + readonly calls?: readonly Call[]; + + /** + * Creates a new {@linkcode AccumulateCancelledError}. + * + * @param message - The error message describing why the call was cancelled. + * @param options - Additional options for the error. + */ + constructor(message?: string, options: AccumulateCancelledErrorOptions = {}) { + const { calls, ...errorOptions } = options; + super(message, errorOptions); + this.calls = calls; + } +} diff --git a/batch/mod.ts b/batch/mod.ts index 67d092f..3b63149 100644 --- a/batch/mod.ts +++ b/batch/mod.ts @@ -3,7 +3,8 @@ * * ```typescript * import type { Entrypoint } from "jsr:@denops/std"; - * import { batch, collect } from "jsr:@denops/std/batch"; + * import * as fn from "jsr:@denops/std/function"; + * import { accumulate, batch, collect } from "jsr:@denops/std/batch"; * * export const main: Entrypoint = async (denops) => { * // Call multiple denops functions sequentially in a single RPC call @@ -20,10 +21,29 @@ * denops.eval("&filetype"), * ]); * // results contains the value of modifiable, modified, and filetype + * + * // Automatically batch multiple denops calls while writing regular async code + * // In this example, only 3 RPC calls are made: + * // 1. fn.getline, 2. batched fn.matchstr calls, 3. batched fn.len calls + * const results2 = await accumulate(denops, async (denops) => { + * const lines = await fn.getline(denops, 1, "$"); + * return await Promise.all(lines.map(async (line, index) => { + * const keyword = await fn.matchstr(denops, line, "\\k\\+"); + * const len = await fn.len(denops, keyword); + * return { + * lnum: index + 1, + * keyword, + * len, + * }; + * })); + * }); + * // results2 contains an array of objects with lnum, keyword, and len * } * ``` * * @module */ +export * from "./accumulate.ts"; export * from "./batch.ts"; export * from "./collect.ts"; +export * from "./error.ts"; diff --git a/deno.jsonc b/deno.jsonc index 266a132..efda116 100644 --- a/deno.jsonc +++ b/deno.jsonc @@ -44,6 +44,7 @@ "@lambdalisue/unreachable": "jsr:@lambdalisue/unreachable@^1.0.1", "@nick/dispose": "jsr:@nick/dispose@^1.1.0", "@std/assert": "jsr:@std/assert@^1.0.14", + "@std/async": "jsr:@std/async@^1.0.15", "@std/collections": "jsr:@std/collections@^1.1.3", "@std/fs": "jsr:@std/fs@^1.0.19", "@std/path": "jsr:@std/path@^1.1.2",