diff --git a/packages/core/src/actor/action.ts b/packages/core/src/actor/action.ts index 7ec77c1d4..1104a0305 100644 --- a/packages/core/src/actor/action.ts +++ b/packages/core/src/actor/action.ts @@ -156,10 +156,17 @@ export class ActionContext< } /** - * Runs a promise in the background. + * Prevents the actor from sleeping until promise is complete. */ - runInBackground(promise: Promise): void { - this.#actorContext.runInBackground(promise); + waitUntil(promise: Promise): void { + this.#actorContext.waitUntil(promise); + } + + /** + * AbortSignal that fires when the actor is stopping. + */ + get abortSignal(): AbortSignal { + return this.#actorContext.abortSignal; } /** diff --git a/packages/core/src/actor/config.ts b/packages/core/src/actor/config.ts index ac93d69e2..69ffbd957 100644 --- a/packages/core/src/actor/config.ts +++ b/packages/core/src/actor/config.ts @@ -68,6 +68,8 @@ export const ActorConfigSchema = z onStopTimeout: z.number().positive().default(5000), stateSaveInterval: z.number().positive().default(10_000), actionTimeout: z.number().positive().default(60_000), + // Max time to wait for waitUntil background promises during shutdown + waitUntilTimeout: z.number().positive().default(15_000), connectionLivenessTimeout: z.number().positive().default(2500), connectionLivenessInterval: z.number().positive().default(5000), noSleep: z.boolean().default(false), diff --git a/packages/core/src/actor/context.ts b/packages/core/src/actor/context.ts index 703196aca..1c447c472 100644 --- a/packages/core/src/actor/context.ts +++ b/packages/core/src/actor/context.ts @@ -145,12 +145,17 @@ export class ActorContext< } /** - * Runs a promise in the background. - * - * @param promise - The promise to run in the background. + * Prevents the actor from sleeping until promise is complete. + */ + waitUntil(promise: Promise): void { + this.#actor._waitUntil(promise); + } + + /** + * AbortSignal that fires when the actor is stopping. */ - runInBackground(promise: Promise): void { - this.#actor._runInBackground(promise); + get abortSignal(): AbortSignal { + return this.#actor.abortSignal; } /** diff --git a/packages/core/src/actor/instance.ts b/packages/core/src/actor/instance.ts index 35ebb4109..5b627050a 100644 --- a/packages/core/src/actor/instance.ts +++ b/packages/core/src/actor/instance.ts @@ -42,6 +42,8 @@ export interface SaveStateOptions { * Forces the state to be saved immediately. This function will return when the state has saved successfully. */ immediate?: boolean; + /** Bypass ready check for stopping. */ + allowStoppingState?: boolean; } /** Actor type alias with all `any` types. Used for `extends` in classes referencing this actor. */ @@ -158,6 +160,7 @@ export class ActorInstance< #vars?: V; #backgroundPromises: Promise[] = []; + #abortController = new AbortController(); #config: ActorConfig; #connectionDrivers!: ConnectionDriversMap; #actorDriver!: ActorDriver; @@ -757,7 +760,7 @@ export class ActorInstance< const connIdx = this.#persist.c.findIndex((c) => c.i === conn.id); if (connIdx !== -1) { this.#persist.c.splice(connIdx, 1); - this.saveState({ immediate: true }); + this.saveState({ immediate: true, allowStoppingState: true }); } else { logger().warn("could not find persisted connection to remove", { connId: conn.id, @@ -1048,8 +1051,12 @@ export class ActorInstance< } } - #assertReady() { + #assertReady(allowStoppingState: boolean = false) { if (!this.#ready) throw new errors.InternalError("Actor not ready"); + if (!allowStoppingState && this.#sleepCalled) + throw new errors.InternalError("Actor is going to sleep"); + if (!allowStoppingState && this.#stopCalled) + throw new errors.InternalError("Actor is stopping"); } /** @@ -1443,24 +1450,24 @@ export class ActorInstance< } /** - * Runs a promise in the background. + * Prevents the actor from sleeping until promise is complete. * * This allows the actor runtime to ensure that a promise completes while * returning from an action request early. * * @param promise - The promise to run in the background. */ - _runInBackground(promise: Promise) { + _waitUntil(promise: Promise) { this.#assertReady(); // TODO: Should we force save the state? // Add logging to promise and make it non-failable const nonfailablePromise = promise .then(() => { - logger().debug("background promise complete"); + logger().debug("wait until promise complete"); }) .catch((error) => { - logger().error("background promise failed", { + logger().error("wait until promise failed", { error: stringifyError(error), }); }); @@ -1476,7 +1483,7 @@ export class ActorInstance< * @param opts - Options for saving the state. */ async saveState(opts: SaveStateOptions) { - this.#assertReady(); + this.#assertReady(opts.allowStoppingState); if (this.#persistChanged) { if (opts.immediate) { @@ -1552,7 +1559,7 @@ export class ActorInstance< return true; } - /** Puts an actor to sleep. */ + /** Puts an actor to sleep. This should just start the sleep sequence, most shutdown logic should be in _stop (which is called by the ActorDriver when sleeping). */ async _sleep() { invariant(this.#sleepingSupported, "sleeping not supported"); invariant(this.#actorDriver.sleep, "no sleep on driver"); @@ -1581,6 +1588,11 @@ export class ActorInstance< logger().info("actor stopping"); + // Abort any listeners waiting for shutdown + try { + this.#abortController.abort(); + } catch {} + // Call onStop lifecycle hook if defined if (this.#config.onStop) { try { @@ -1601,8 +1613,11 @@ export class ActorInstance< } } + // Wait for any background tasks to finish, with timeout + await this.#waitBackgroundPromises(this.#config.options.waitUntilTimeout); + // Write state - await this.saveState({ immediate: true }); + await this.saveState({ immediate: true, allowStoppingState: true }); // Disconnect existing connections const promises: Promise[] = []; @@ -1625,8 +1640,39 @@ export class ActorInstance< "timed out waiting for connections to close, shutting down anyway", ); } + } + + /** Abort signal that fires when the actor is stopping. */ + get abortSignal(): AbortSignal { + return this.#abortController.signal; + } + + /** Wait for background waitUntil promises with a timeout. */ + async #waitBackgroundPromises(timeoutMs: number) { + const pending = this.#backgroundPromises; + if (pending.length === 0) { + logger().debug("no background promises"); + return; + } + + // Race promises with timeout to determine if pending promises settled fast enough + const timedOut = await Promise.race([ + Promise.allSettled(pending).then(() => false), + new Promise((resolve) => + setTimeout(() => resolve(true), timeoutMs), + ), + ]); - // TODO: - //Deno.exit(0); + if (timedOut) { + logger().error( + "timed out waiting for background tasks, background promises may have leaked", + { + count: pending.length, + timeoutMs, + }, + ); + } else { + logger().debug("background promises finished"); + } } }