Skip to content
This repository was archived by the owner on Oct 22, 2025. It is now read-only.

Commit cf30278

Browse files
committed
feat(core): add abort controller & implement waitUntil (#1200)
1 parent a4af2b1 commit cf30278

File tree

4 files changed

+79
-19
lines changed

4 files changed

+79
-19
lines changed

packages/core/src/actor/action.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,10 +156,17 @@ export class ActionContext<
156156
}
157157

158158
/**
159-
* Runs a promise in the background.
159+
* Prevents the actor from sleeping until promise is complete.
160160
*/
161-
runInBackground(promise: Promise<void>): void {
162-
this.#actorContext.runInBackground(promise);
161+
waitUntil(promise: Promise<void>): void {
162+
this.#actorContext.waitUntil(promise);
163+
}
164+
165+
/**
166+
* AbortSignal that fires when the actor is stopping.
167+
*/
168+
get abortSignal(): AbortSignal {
169+
return this.#actorContext.abortSignal;
163170
}
164171

165172
/**

packages/core/src/actor/config.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ export const ActorConfigSchema = z
6868
onStopTimeout: z.number().positive().default(5000),
6969
stateSaveInterval: z.number().positive().default(10_000),
7070
actionTimeout: z.number().positive().default(60_000),
71+
// Max time to wait for waitUntil background promises during shutdown
72+
waitUntilTimeout: z.number().positive().default(15_000),
7173
connectionLivenessTimeout: z.number().positive().default(2500),
7274
connectionLivenessInterval: z.number().positive().default(5000),
7375
noSleep: z.boolean().default(false),

packages/core/src/actor/context.ts

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -145,12 +145,17 @@ export class ActorContext<
145145
}
146146

147147
/**
148-
* Runs a promise in the background.
149-
*
150-
* @param promise - The promise to run in the background.
148+
* Prevents the actor from sleeping until promise is complete.
149+
*/
150+
waitUntil(promise: Promise<void>): void {
151+
this.#actor._waitUntil(promise);
152+
}
153+
154+
/**
155+
* AbortSignal that fires when the actor is stopping.
151156
*/
152-
runInBackground(promise: Promise<void>): void {
153-
this.#actor._runInBackground(promise);
157+
get abortSignal(): AbortSignal {
158+
return this.#actor.abortSignal;
154159
}
155160

156161
/**

packages/core/src/actor/instance.ts

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ export interface SaveStateOptions {
4242
* Forces the state to be saved immediately. This function will return when the state has saved successfully.
4343
*/
4444
immediate?: boolean;
45+
/** Bypass ready check for stopping. */
46+
allowStoppingState?: boolean;
4547
}
4648

4749
/** Actor type alias with all `any` types. Used for `extends` in classes referencing this actor. */
@@ -158,6 +160,7 @@ export class ActorInstance<
158160
#vars?: V;
159161

160162
#backgroundPromises: Promise<void>[] = [];
163+
#abortController = new AbortController();
161164
#config: ActorConfig<S, CP, CS, V, I, AD, DB>;
162165
#connectionDrivers!: ConnectionDriversMap;
163166
#actorDriver!: ActorDriver;
@@ -757,7 +760,7 @@ export class ActorInstance<
757760
const connIdx = this.#persist.c.findIndex((c) => c.i === conn.id);
758761
if (connIdx !== -1) {
759762
this.#persist.c.splice(connIdx, 1);
760-
this.saveState({ immediate: true });
763+
this.saveState({ immediate: true, allowStoppingState: true });
761764
} else {
762765
logger().warn("could not find persisted connection to remove", {
763766
connId: conn.id,
@@ -1048,8 +1051,12 @@ export class ActorInstance<
10481051
}
10491052
}
10501053

1051-
#assertReady() {
1054+
#assertReady(allowStoppingState: boolean = false) {
10521055
if (!this.#ready) throw new errors.InternalError("Actor not ready");
1056+
if (!allowStoppingState && this.#sleepCalled)
1057+
throw new errors.InternalError("Actor is going to sleep");
1058+
if (!allowStoppingState && this.#stopCalled)
1059+
throw new errors.InternalError("Actor is stopping");
10531060
}
10541061

10551062
/**
@@ -1443,24 +1450,24 @@ export class ActorInstance<
14431450
}
14441451

14451452
/**
1446-
* Runs a promise in the background.
1453+
* Prevents the actor from sleeping until promise is complete.
14471454
*
14481455
* This allows the actor runtime to ensure that a promise completes while
14491456
* returning from an action request early.
14501457
*
14511458
* @param promise - The promise to run in the background.
14521459
*/
1453-
_runInBackground(promise: Promise<void>) {
1460+
_waitUntil(promise: Promise<void>) {
14541461
this.#assertReady();
14551462

14561463
// TODO: Should we force save the state?
14571464
// Add logging to promise and make it non-failable
14581465
const nonfailablePromise = promise
14591466
.then(() => {
1460-
logger().debug("background promise complete");
1467+
logger().debug("wait until promise complete");
14611468
})
14621469
.catch((error) => {
1463-
logger().error("background promise failed", {
1470+
logger().error("wait until promise failed", {
14641471
error: stringifyError(error),
14651472
});
14661473
});
@@ -1476,7 +1483,7 @@ export class ActorInstance<
14761483
* @param opts - Options for saving the state.
14771484
*/
14781485
async saveState(opts: SaveStateOptions) {
1479-
this.#assertReady();
1486+
this.#assertReady(opts.allowStoppingState);
14801487

14811488
if (this.#persistChanged) {
14821489
if (opts.immediate) {
@@ -1552,7 +1559,7 @@ export class ActorInstance<
15521559
return true;
15531560
}
15541561

1555-
/** Puts an actor to sleep. */
1562+
/** Puts an actor to sleep. This should just start the sleep sequence, most shutdown logic should be in _stop (which is called by the ActorDriver when sleeping). */
15561563
async _sleep() {
15571564
invariant(this.#sleepingSupported, "sleeping not supported");
15581565
invariant(this.#actorDriver.sleep, "no sleep on driver");
@@ -1581,6 +1588,11 @@ export class ActorInstance<
15811588

15821589
logger().info("actor stopping");
15831590

1591+
// Abort any listeners waiting for shutdown
1592+
try {
1593+
this.#abortController.abort();
1594+
} catch {}
1595+
15841596
// Call onStop lifecycle hook if defined
15851597
if (this.#config.onStop) {
15861598
try {
@@ -1601,8 +1613,11 @@ export class ActorInstance<
16011613
}
16021614
}
16031615

1616+
// Wait for any background tasks to finish, with timeout
1617+
await this.#waitBackgroundPromises(this.#config.options.waitUntilTimeout);
1618+
16041619
// Write state
1605-
await this.saveState({ immediate: true });
1620+
await this.saveState({ immediate: true, allowStoppingState: true });
16061621

16071622
// Disconnect existing connections
16081623
const promises: Promise<unknown>[] = [];
@@ -1625,8 +1640,39 @@ export class ActorInstance<
16251640
"timed out waiting for connections to close, shutting down anyway",
16261641
);
16271642
}
1643+
}
1644+
1645+
/** Abort signal that fires when the actor is stopping. */
1646+
get abortSignal(): AbortSignal {
1647+
return this.#abortController.signal;
1648+
}
1649+
1650+
/** Wait for background waitUntil promises with a timeout. */
1651+
async #waitBackgroundPromises(timeoutMs: number) {
1652+
const pending = this.#backgroundPromises;
1653+
if (pending.length === 0) {
1654+
logger().debug("no background promises");
1655+
return;
1656+
}
1657+
1658+
// Race promises with timeout to determine if pending promises settled fast enough
1659+
const timedOut = await Promise.race([
1660+
Promise.allSettled(pending).then(() => false),
1661+
new Promise<true>((resolve) =>
1662+
setTimeout(() => resolve(true), timeoutMs),
1663+
),
1664+
]);
16281665

1629-
// TODO:
1630-
//Deno.exit(0);
1666+
if (timedOut) {
1667+
logger().error(
1668+
"timed out waiting for background tasks, background promises may have leaked",
1669+
{
1670+
count: pending.length,
1671+
timeoutMs,
1672+
},
1673+
);
1674+
} else {
1675+
logger().debug("background promises finished");
1676+
}
16311677
}
16321678
}

0 commit comments

Comments
 (0)