From 804497fb65890509ef8e1c5b4a36d22e5818b8d6 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Sun, 17 Aug 2025 11:47:52 -0700 Subject: [PATCH] feat(core): actor sleeping --- CLAUDE.md | 1 - biome.json | 8 +- clients/openapi/openapi.json | 6 +- .../driver-test-suite/action-timeout.ts | 12 +- .../driver-test-suite/conn-liveness.ts | 6 +- .../driver-test-suite/error-handling.ts | 9 +- .../fixtures/driver-test-suite/registry.ts | 13 + .../core/fixtures/driver-test-suite/sleep.ts | 186 ++++++++ packages/core/package.json | 2 +- packages/core/src/actor/action.ts | 11 + packages/core/src/actor/config.ts | 57 ++- packages/core/src/actor/connection.ts | 5 +- packages/core/src/actor/context.ts | 12 +- packages/core/src/actor/driver.ts | 7 +- .../core/src/actor/generic-conn-driver.ts | 10 +- packages/core/src/actor/instance.ts | 248 +++++++++-- packages/core/src/client/actor-conn.ts | 2 +- packages/core/src/driver-test-suite/mod.ts | 8 +- .../src/driver-test-suite/tests/actor-conn.ts | 3 +- .../driver-test-suite/tests/actor-driver.ts | 11 + .../driver-test-suite/tests/actor-schedule.ts | 18 +- .../driver-test-suite/tests/actor-sleep.ts | 413 ++++++++++++++++++ .../core/src/drivers/engine/actor-driver.ts | 11 +- .../core/src/drivers/file-system/actor.ts | 17 +- .../src/drivers/file-system/global-state.ts | 258 +++++++++-- packages/core/src/drivers/file-system/mod.ts | 11 +- packages/core/src/utils.ts | 97 ++++ ...e-engine.test.ts => driver-engine.test.ts} | 0 ...tem.test.ts => driver-file-system.test.ts} | 2 + ...e-memory.test.ts => driver-memory.test.ts} | 2 + packages/core/tests/set-long-timeout.test.ts | 244 +++++++++++ pnpm-lock.yaml | 33 +- 32 files changed, 1563 insertions(+), 160 deletions(-) create mode 100644 packages/core/fixtures/driver-test-suite/sleep.ts create mode 100644 packages/core/src/driver-test-suite/tests/actor-sleep.ts rename packages/core/tests/{driver-test-suite-engine.test.ts => driver-engine.test.ts} (100%) rename packages/core/tests/{driver-test-suite-file-system.test.ts => driver-file-system.test.ts} (84%) rename packages/core/tests/{driver-test-suite-memory.test.ts => driver-memory.test.ts} (82%) create mode 100644 packages/core/tests/set-long-timeout.test.ts diff --git a/CLAUDE.md b/CLAUDE.md index dde4c7a82..90b2dc61b 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -194,4 +194,3 @@ Always include a README.md. The `README.md` should always follow this structure: ## Test Notes - Using setTimeout in tests & test actors will not work unless you call `await waitFor(driverTestConfig, )` -- Do not use setTimeout in tests or in actors used in tests unless you explictily use `await vi.advanceTimersByTimeAsync(time)` diff --git a/biome.json b/biome.json index 719ba3e43..fe7c8bb09 100644 --- a/biome.json +++ b/biome.json @@ -1,7 +1,13 @@ { "$schema": "https://biomejs.dev/schemas/2.1.1/schema.json", "files": { - "includes": ["**/*.json", "**/*.ts", "**/*.js", "!examples/snippets/**"], + "includes": [ + "**/*.json", + "**/*.ts", + "**/*.js", + "!examples/snippets/**", + "!clients/openapi/openapi.json" + ], "ignoreUnknown": true }, "vcs": { diff --git a/clients/openapi/openapi.json b/clients/openapi/openapi.json index 8d0c1be0c..2188e1ee2 100644 --- a/clients/openapi/openapi.json +++ b/clients/openapi/openapi.json @@ -14,7 +14,9 @@ "example": "actor-123" } }, - "required": ["i"] + "required": [ + "i" + ] }, "ResolveQuery": { "type": "object", @@ -676,4 +678,4 @@ } } } -} +} \ No newline at end of file diff --git a/packages/core/fixtures/driver-test-suite/action-timeout.ts b/packages/core/fixtures/driver-test-suite/action-timeout.ts index 73e7e8180..090ce3021 100644 --- a/packages/core/fixtures/driver-test-suite/action-timeout.ts +++ b/packages/core/fixtures/driver-test-suite/action-timeout.ts @@ -5,9 +5,7 @@ export const shortTimeoutActor = actor({ onAuth: () => {}, state: { value: 0 }, options: { - action: { - timeout: 50, // 50ms timeout - }, + actionTimeout: 50, // 50ms timeout }, actions: { quickAction: async (c) => { @@ -26,9 +24,7 @@ export const longTimeoutActor = actor({ onAuth: () => {}, state: { value: 0 }, options: { - action: { - timeout: 200, // 200ms timeout - }, + actionTimeout: 200, // 200ms timeout }, actions: { delayedAction: async (c) => { @@ -56,9 +52,7 @@ export const syncTimeoutActor = actor({ onAuth: () => {}, state: { value: 0 }, options: { - action: { - timeout: 50, // 50ms timeout - }, + actionTimeout: 50, // 50ms timeout }, actions: { syncAction: (c) => { diff --git a/packages/core/fixtures/driver-test-suite/conn-liveness.ts b/packages/core/fixtures/driver-test-suite/conn-liveness.ts index fe77595cc..bfa64a4cc 100644 --- a/packages/core/fixtures/driver-test-suite/conn-liveness.ts +++ b/packages/core/fixtures/driver-test-suite/conn-liveness.ts @@ -7,10 +7,8 @@ export const connLivenessActor = actor({ acceptingConnections: true, }, options: { - lifecycle: { - connectionLivenessInterval: 5_000, - connectionLivenessTimeout: 2_500, - }, + connectionLivenessInterval: 5_000, + connectionLivenessTimeout: 2_500, }, onConnect: (c, conn) => { if (!c.state.acceptingConnections) { diff --git a/packages/core/fixtures/driver-test-suite/error-handling.ts b/packages/core/fixtures/driver-test-suite/error-handling.ts index d816f8517..ecf65ef95 100644 --- a/packages/core/fixtures/driver-test-suite/error-handling.ts +++ b/packages/core/fixtures/driver-test-suite/error-handling.ts @@ -69,10 +69,7 @@ export const errorHandlingActor = actor({ }, }, options: { - // Set a short timeout for this actor's actions - action: { - timeout: 500, // 500ms timeout for actions - }, + actionTimeout: 500, // 500ms timeout for actions }, }); @@ -90,8 +87,6 @@ export const customTimeoutActor = actor({ }, }, options: { - action: { - timeout: 200, // 200ms timeout - }, + actionTimeout: 200, // 200ms timeout }, }); diff --git a/packages/core/fixtures/driver-test-suite/registry.ts b/packages/core/fixtures/driver-test-suite/registry.ts index d58b2384a..e935e0662 100644 --- a/packages/core/fixtures/driver-test-suite/registry.ts +++ b/packages/core/fixtures/driver-test-suite/registry.ts @@ -51,6 +51,13 @@ import { import { requestAccessActor } from "./request-access"; import { requestAccessAuthActor } from "./request-access-auth"; import { scheduled } from "./scheduled"; +import { + sleep, + sleepWithLongRpc, + sleepWithNoSleepOption, + sleepWithRawHttp, + sleepWithRawWebSocket, +} from "./sleep"; import { driverCtxActor, dynamicVarActor, @@ -68,6 +75,12 @@ export const registry = setup({ counterWithLifecycle, // From scheduled.ts scheduled, + // From sleep.ts + sleep, + sleepWithLongRpc, + sleepWithRawHttp, + sleepWithRawWebSocket, + sleepWithNoSleepOption, // From error-handling.ts errorHandlingActor, customTimeoutActor, diff --git a/packages/core/fixtures/driver-test-suite/sleep.ts b/packages/core/fixtures/driver-test-suite/sleep.ts new file mode 100644 index 000000000..b5357306f --- /dev/null +++ b/packages/core/fixtures/driver-test-suite/sleep.ts @@ -0,0 +1,186 @@ +import { actor, type UniversalWebSocket } from "@rivetkit/core"; + +export const SLEEP_TIMEOUT = 500; + +export const sleep = actor({ + onAuth: () => {}, + state: { startCount: 0, sleepCount: 0 }, + onStart: (c) => { + c.state.startCount += 1; + }, + onStop: (c) => { + c.state.sleepCount += 1; + }, + actions: { + triggerSleep: (c) => { + c.sleep(); + }, + getCounts: (c) => { + return { startCount: c.state.startCount, sleepCount: c.state.sleepCount }; + }, + setAlarm: async (c, duration: number) => { + await c.schedule.after(duration, "onAlarm"); + }, + onAlarm: (c) => { + c.log.info("alarm called"); + }, + }, + options: { + sleepTimeout: SLEEP_TIMEOUT, + }, +}); + +export const sleepWithLongRpc = actor({ + onAuth: () => {}, + state: { startCount: 0, sleepCount: 0 }, + createVars: () => ({}) as { longRunningResolve: PromiseWithResolvers }, + onStart: (c) => { + c.state.startCount += 1; + }, + onStop: (c) => { + c.state.sleepCount += 1; + }, + actions: { + getCounts: (c) => { + return { startCount: c.state.startCount, sleepCount: c.state.sleepCount }; + }, + longRunningRpc: async (c) => { + c.log.info("starting long running rpc"); + c.vars.longRunningResolve = Promise.withResolvers(); + c.broadcast("waiting"); + await c.vars.longRunningResolve.promise; + c.log.info("finished long running rpc"); + }, + finishLongRunningRpc: (c) => c.vars.longRunningResolve?.resolve(), + }, + options: { + sleepTimeout: SLEEP_TIMEOUT, + }, +}); + +export const sleepWithRawHttp = actor({ + onAuth: () => {}, + state: { startCount: 0, sleepCount: 0, requestCount: 0 }, + onStart: (c) => { + c.state.startCount += 1; + }, + onStop: (c) => { + c.state.sleepCount += 1; + }, + onFetch: async (c, request) => { + c.state.requestCount += 1; + const url = new URL(request.url); + + if (url.pathname === "/long-request") { + const duration = parseInt(url.searchParams.get("duration") || "1000"); + c.log.info("starting long fetch request", { duration }); + await new Promise((resolve) => setTimeout(resolve, duration)); + c.log.info("finished long fetch request"); + return new Response(JSON.stringify({ completed: true }), { + headers: { "Content-Type": "application/json" }, + }); + } + + return new Response("Not Found", { status: 404 }); + }, + actions: { + getCounts: (c) => { + return { + startCount: c.state.startCount, + sleepCount: c.state.sleepCount, + requestCount: c.state.requestCount, + }; + }, + }, + options: { + sleepTimeout: SLEEP_TIMEOUT, + }, +}); + +export const sleepWithRawWebSocket = actor({ + onAuth: () => {}, + state: { startCount: 0, sleepCount: 0, connectionCount: 0 }, + onStart: (c) => { + c.state.startCount += 1; + }, + onStop: (c) => { + c.state.sleepCount += 1; + }, + onWebSocket: (c, websocket: UniversalWebSocket, opts) => { + c.state.connectionCount += 1; + c.log.info("websocket connected", { + connectionCount: c.state.connectionCount, + }); + + websocket.send( + JSON.stringify({ + type: "connected", + connectionCount: c.state.connectionCount, + }), + ); + + websocket.addEventListener("message", (event: any) => { + const data = event.data; + if (typeof data === "string") { + try { + const parsed = JSON.parse(data); + if (parsed.type === "getCounts") { + websocket.send( + JSON.stringify({ + type: "counts", + startCount: c.state.startCount, + sleepCount: c.state.sleepCount, + connectionCount: c.state.connectionCount, + }), + ); + } else if (parsed.type === "keepAlive") { + // Just acknowledge to keep connection alive + websocket.send(JSON.stringify({ type: "ack" })); + } + } catch { + // Echo non-JSON messages + websocket.send(data); + } + } + }); + + websocket.addEventListener("close", () => { + c.state.connectionCount -= 1; + c.log.info("websocket disconnected", { + connectionCount: c.state.connectionCount, + }); + }); + }, + actions: { + getCounts: (c) => { + return { + startCount: c.state.startCount, + sleepCount: c.state.sleepCount, + connectionCount: c.state.connectionCount, + }; + }, + }, + options: { + sleepTimeout: SLEEP_TIMEOUT, + }, +}); + +export const sleepWithNoSleepOption = actor({ + onAuth: () => {}, + state: { startCount: 0, sleepCount: 0 }, + onStart: (c) => { + c.state.startCount += 1; + }, + onStop: (c) => { + c.state.sleepCount += 1; + }, + actions: { + getCounts: (c) => { + return { startCount: c.state.startCount, sleepCount: c.state.sleepCount }; + }, + }, + options: { + sleepTimeout: SLEEP_TIMEOUT, + noSleep: true, + }, +}); diff --git a/packages/core/package.json b/packages/core/package.json index 653745921..fdc70cfbd 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -167,7 +167,7 @@ "on-change": "^5.0.1", "p-retry": "^6.2.1", "zod": "^3.25.76", - "@rivetkit/engine-runner": "https://pkg.pr.new/rivet-gg/rivet/@rivetkit/engine-runner@f1c054d" + "@rivetkit/engine-runner": "https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-runner@664a377" }, "devDependencies": { "@hono/node-server": "^1.18.2", diff --git a/packages/core/src/actor/action.ts b/packages/core/src/actor/action.ts index a6be1f765..7ec77c1d4 100644 --- a/packages/core/src/actor/action.ts +++ b/packages/core/src/actor/action.ts @@ -161,4 +161,15 @@ export class ActionContext< runInBackground(promise: Promise): void { this.#actorContext.runInBackground(promise); } + + /** + * Forces the actor to sleep. + * + * Not supported on all drivers. + * + * @experimental + */ + sleep() { + this.#actorContext.sleep(); + } } diff --git a/packages/core/src/actor/config.ts b/packages/core/src/actor/config.ts index 42419919c..ac93d69e2 100644 --- a/packages/core/src/actor/config.ts +++ b/packages/core/src/actor/config.ts @@ -43,6 +43,7 @@ export const ActorConfigSchema = z onAuth: z.function().optional(), onCreate: z.function().optional(), onStart: z.function().optional(), + onStop: z.function().optional(), onStateChange: z.function().optional(), onBeforeConnect: z.function().optional(), onConnect: z.function().optional(), @@ -60,28 +61,17 @@ export const ActorConfigSchema = z createVars: z.function().optional(), options: z .object({ - lifecycle: z - .object({ - createVarsTimeout: z.number().positive().default(5000), - createConnStateTimeout: z.number().positive().default(5000), - onConnectTimeout: z.number().positive().default(5000), - connectionLivenessTimeout: z.number().positive().default(2500), - connectionLivenessInterval: z.number().positive().default(5000), - }) - .strict() - .default({}), - state: z - .object({ - saveInterval: z.number().positive().default(10_000), - }) - .strict() - .default({}), - action: z - .object({ - timeout: z.number().positive().default(60_000), - }) - .strict() - .default({}), + createVarsTimeout: z.number().positive().default(5000), + createConnStateTimeout: z.number().positive().default(5000), + onConnectTimeout: z.number().positive().default(5000), + // This must be less than ACTOR_STOP_THRESHOLD_MS + onStopTimeout: z.number().positive().default(5000), + stateSaveInterval: z.number().positive().default(10_000), + actionTimeout: z.number().positive().default(60_000), + connectionLivenessTimeout: z.number().positive().default(2500), + connectionLivenessInterval: z.number().positive().default(5000), + noSleep: z.boolean().default(false), + sleepTimeout: z.number().positive().default(30_000), }) .strict() .default({}), @@ -327,6 +317,28 @@ interface BaseActorConfig< >, ) => void | Promise; + /** + * Called when the actor is stopping or sleeping. + * + * Use this hook to clean up resources, save state, or perform + * any shutdown operations before the actor sleeps or stops. + * + * Not supported on all platforms. + * + * @returns Void or a Promise that resolves when shutdown is complete + */ + onStop?: ( + c: ActorContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TAuthData, + TDatabase + >, + ) => void | Promise; + /** * Called when the actor's state changes. * @@ -641,6 +653,7 @@ export type ActorConfigInput< | "onAuth" | "onCreate" | "onStart" + | "onStop" | "onStateChange" | "onBeforeConnect" | "onConnect" diff --git a/packages/core/src/actor/connection.ts b/packages/core/src/actor/connection.ts index 4ad315431..6ab59d50a 100644 --- a/packages/core/src/actor/connection.ts +++ b/packages/core/src/actor/connection.ts @@ -209,10 +209,7 @@ export class Conn { * @internal */ [CONNECTION_CHECK_LIVENESS_SYMBOL]() { - const readyState = this.#driver.getConnectionReadyState?.( - this.#actor, - this, - ); + const readyState = this.#driver.getConnectionReadyState(this.#actor, this); const isConnectionClosed = readyState === ConnectionReadyState.CLOSED || diff --git a/packages/core/src/actor/context.ts b/packages/core/src/actor/context.ts index b6238359d..703196aca 100644 --- a/packages/core/src/actor/context.ts +++ b/packages/core/src/actor/context.ts @@ -151,6 +151,16 @@ export class ActorContext< */ runInBackground(promise: Promise): void { this.#actor._runInBackground(promise); - return; + } + + /** + * Forces the actor to sleep. + * + * Not supported on all drivers. + * + * @experimental + */ + sleep() { + this.#actor._sleep(); } } diff --git a/packages/core/src/actor/driver.ts b/packages/core/src/actor/driver.ts index 4696da5c2..78c76292e 100644 --- a/packages/core/src/actor/driver.ts +++ b/packages/core/src/actor/driver.ts @@ -27,9 +27,12 @@ export interface ActorDriver { getContext(actorId: string): unknown; readPersistedData(actorId: string): Promise; + + /** ActorInstance ensure that only one instance of writePersistedData is called in parallel at a time. */ writePersistedData(actorId: string, data: Uint8Array): Promise; // Schedule + /** ActorInstance ensure that only one instance of setAlarm is called in parallel at a time. */ setAlarm(actor: AnyActorInstance, timestamp: number): Promise; // Database @@ -39,6 +42,8 @@ export interface ActorDriver { */ getDatabase(actorId: string): Promise; + sleep?(actorId: string): void; + shutdown?(immediate: boolean): Promise; } @@ -72,7 +77,7 @@ export interface ConnDriver { * Returns the ready state of the connection. * This is used to determine if the connection is ready to send messages, or if the connection is stale. */ - getConnectionReadyState?( + getConnectionReadyState( actor: AnyActorInstance, conn: AnyConn, ): ConnectionReadyState | undefined; diff --git a/packages/core/src/actor/generic-conn-driver.ts b/packages/core/src/actor/generic-conn-driver.ts index 4cb5d2f91..f7980b5a3 100644 --- a/packages/core/src/actor/generic-conn-driver.ts +++ b/packages/core/src/actor/generic-conn-driver.ts @@ -160,7 +160,9 @@ export interface GenericSseDriverState { encoding: Encoding; } -export function createGenericSseDriver(globalState: GenericConnGlobalState) { +export function createGenericSseDriver( + globalState: GenericConnGlobalState, +): ConnDriver { return { sendMessage: ( _actor: AnyActorInstance, @@ -219,8 +221,12 @@ export function createGenericSseDriver(globalState: GenericConnGlobalState) { // MARK: HTTP export type GenericHttpDriverState = Record; -export function createGenericHttpDriver() { +export function createGenericHttpDriver(): ConnDriver { return { + getConnectionReadyState(_actor, conn) { + // TODO: This might not be the correct logic + return ConnectionReadyState.OPEN; + }, disconnect: async () => { // Noop }, diff --git a/packages/core/src/actor/instance.ts b/packages/core/src/actor/instance.ts index 15f598a2e..35ebb4109 100644 --- a/packages/core/src/actor/instance.ts +++ b/packages/core/src/actor/instance.ts @@ -10,6 +10,7 @@ import { isCborSerializable, stringifyError } from "@/common/utils"; import type { UniversalWebSocket } from "@/common/websocket-interface"; import { ActorInspector } from "@/inspector/actor"; import type { Registry } from "@/mod"; +import { SinglePromiseQueue } from "@/utils"; import type { ActionContext } from "./action"; import type { ActorConfig, OnConnectOptions } from "./config"; import { @@ -31,7 +32,7 @@ import type { import { processMessage } from "./protocol/message/mod"; import { CachedSerializer } from "./protocol/serde"; import { Schedule, type ScheduledEvent } from "./schedule"; -import { DeadlineError, deadline, Lock } from "./utils"; +import { DeadlineError, deadline } from "./utils"; /** * Options for the `_saveState` method. @@ -129,7 +130,12 @@ export class ActorInstance< > { // Shared actor context for this instance actorContext: ActorContext; - isStopping = false; + #sleepCalled = false; + #stopCalled = false; + + get isStopping() { + return this.#stopCalled || this.#sleepCalled; + } #persistChanged = false; @@ -143,7 +149,8 @@ export class ActorInstance< /** Raw state without the proxy wrapper */ #persistRaw!: PersistedActor; - #writePersistLock = new Lock(void 0); + #persistWriteQueue = new SinglePromiseQueue(); + #alarmWriteQueue = new SinglePromiseQueue(); #lastSaveTime = 0; #pendingSaveTimeout?: NodeJS.Timeout; @@ -164,6 +171,12 @@ export class ActorInstance< #connections = new Map>(); #subscriptionIndex = new Map>>(); + #sleepTimeout?: NodeJS.Timeout; + + // Track active raw requests so sleep logic can account for them + #activeRawFetchCount = 0; + #activeRawWebSockets = new Set(); + #schedule!: Schedule; #db!: InferDatabaseClient; @@ -222,6 +235,10 @@ export class ActorInstance< return this.#inspector; } + get #sleepingSupported(): boolean { + return this.#actorDriver.sleep !== undefined; + } + /** * This constructor should never be used directly. * @@ -276,7 +293,7 @@ export class ActorInstance< if (dataOrPromise instanceof Promise) { vars = await deadline( dataOrPromise, - this.#config.options.lifecycle.createVarsTimeout, + this.#config.options.createVarsTimeout, ); } else { vars = dataOrPromise; @@ -311,12 +328,15 @@ export class ActorInstance< // Set alarm for next scheduled event if any exist after finishing initiation sequence if (this.#persist.e.length > 0) { - await this.#actorDriver.setAlarm(this, this.#persist.e[0].t); + await this.#queueSetAlarm(this.#persist.e[0].t); } logger().info("actor ready"); this.#ready = true; + // Must be called after setting `#ready` or else it will not schedule sleep + this.#resetSleepTimer(); + this.#scheduleLivenessCheck(); } @@ -361,21 +381,44 @@ export class ActorInstance< // - this is the newest event (i.e. at beginning of array) or // - this is the only event (i.e. the only event in the array) if (insertIndex === 0 || this.#persist.e.length === 1) { - this.actorContext.log.info("setting alarm", { timestamp }); - await this.#actorDriver.setAlarm(this, newEvent.t); + this.actorContext.log.info("setting alarm", { + timestamp, + eventCount: this.#persist.e.length, + }); + await this.#queueSetAlarm(newEvent.t); } } - async onAlarm() { + async _onAlarm() { const now = Date.now(); this.actorContext.log.debug("alarm triggered", { now, events: this.#persist.e.length, }); + // Update sleep + // + // Do this before any async logic + this.#resetSleepTimer(); + // Remove events from schedule that we're about to run const runIndex = this.#persist.e.findIndex((x) => x.t <= now); if (runIndex === -1) { + // No events are due yet. This will happen if timers fire slightly early. + // Ensure we reschedule the alarm for the next upcoming event to avoid losing it. + logger().warn("no events are due yet, time may have broken"); + if (this.#persist.e.length > 0) { + const nextTs = this.#persist.e[0].t; + this.actorContext.log.warn( + "alarm fired early, rescheduling for next event", + { + now, + nextTs, + delta: nextTs - now, + }, + ); + await this.#queueSetAlarm(nextTs); + } this.actorContext.log.debug("no events to run", { now }); return; } @@ -386,7 +429,12 @@ export class ActorInstance< // Set alarm for next event if (this.#persist.e.length > 0) { - await this.#actorDriver.setAlarm(this, this.#persist.e[0].t); + const nextTs = this.#persist.e[0].t; + this.actorContext.log.info("setting next alarm", { + nextTs, + remainingEvents: this.#persist.e.length, + }); + await this.#queueSetAlarm(nextTs); } // Iterate by event key in order to ensure we call the events in order @@ -472,7 +520,7 @@ export class ActorInstance< #savePersistThrottled() { const now = Date.now(); const timeSinceLastSave = now - this.#lastSaveTime; - const saveInterval = this.#config.options.state.saveInterval; + const saveInterval = this.#config.options.stateSaveInterval; // If we're within the throttle window and not already scheduled, schedule the next save. if (timeSinceLastSave < saveInterval) { @@ -494,10 +542,7 @@ export class ActorInstance< this.#lastSaveTime = Date.now(); if (this.#persistChanged) { - // Use a lock in order to avoid race conditions with multiple - // parallel promises writing to KV. This should almost never happen - // unless there are abnormally high latency in KV writes. - await this.#writePersistLock.lock(async () => { + const finished = this.#persistWriteQueue.enqueue(async () => { logger().debug("saving persist"); // There might be more changes while we're writing, so we set this @@ -512,6 +557,8 @@ export class ActorInstance< logger().debug("persist saved"); }); + + await finished; } this.#onPersistSavedPromise?.resolve(); @@ -521,6 +568,12 @@ export class ActorInstance< } } + async #queueSetAlarm(timestamp: number): Promise { + await this.#alarmWriteQueue.enqueue(async () => { + await this.#actorDriver.setAlarm(this, timestamp); + }); + } + /** * Creates proxy for `#persist` that handles automatically flagging when state needs to be updated. */ @@ -737,6 +790,9 @@ export class ActorInstance< }); } } + + // Update sleep + this.#resetSleepTimer(); } async prepareConn( @@ -777,7 +833,7 @@ export class ActorInstance< if (dataOrPromise instanceof Promise) { connState = await deadline( dataOrPromise, - this.#config.options.lifecycle.createConnStateTimeout, + this.#config.options.createConnStateTimeout, ); } else { connState = dataOrPromise; @@ -840,6 +896,11 @@ export class ActorInstance< ); this.#connections.set(conn.id, conn); + // Update sleep + // + // Do this immediately after adding connection & before any async logic in order to avoid race conditions with sleep timeouts + this.#resetSleepTimer(); + // Add to persistence & save immediately this.#persist.c.push(persist); this.saveState({ immediate: true }); @@ -849,15 +910,14 @@ export class ActorInstance< try { const result = this.#config.onConnect(this.actorContext, conn); if (result instanceof Promise) { - deadline( - result, - this.#config.options.lifecycle.onConnectTimeout, - ).catch((error) => { - logger().error("error in `onConnect`, closing socket", { - error, - }); - conn?.disconnect("`onConnect` failed"); - }); + deadline(result, this.#config.options.onConnectTimeout).catch( + (error) => { + logger().error("error in `onConnect`, closing socket", { + error, + }); + conn?.disconnect("`onConnect` failed"); + }, + ); } } catch (error) { logger().error("error in `onConnect`", { @@ -995,7 +1055,6 @@ export class ActorInstance< /** * Check the liveness of all connections. * Sets up a recurring check based on the configured interval. - * @internal */ #checkConnectionsLiveness() { logger().debug("checking connections liveness"); @@ -1007,10 +1066,7 @@ export class ActorInstance< } else { const lastSeen = liveness.lastSeen; const sinceLastSeen = Date.now() - lastSeen; - if ( - sinceLastSeen < - this.#config.options.lifecycle.connectionLivenessTimeout - ) { + if (sinceLastSeen < this.#config.options.connectionLivenessTimeout) { logger().debug("connection might be alive, will check later", { connId: conn.id, lastSeen, @@ -1044,7 +1100,7 @@ export class ActorInstance< } this.#scheduleEventInner( - Date.now() + this.#config.options.lifecycle.connectionLivenessInterval, + Date.now() + this.#config.options.connectionLivenessInterval, { checkConnectionLiveness: true }, ); } @@ -1112,7 +1168,7 @@ export class ActorInstance< output = await deadline( outputOrPromise, - this.#config.options.action.timeout, + this.#config.options.actionTimeout, ); // Log that async action completed @@ -1190,6 +1246,10 @@ export class ActorInstance< throw new errors.FetchHandlerNotDefined(); } + // Track active raw fetch while handler runs + this.#activeRawFetchCount++; + this.#resetSleepTimer(); + try { const response = await this.#config.onFetch( this.actorContext, @@ -1206,6 +1266,9 @@ export class ActorInstance< }); throw error; } finally { + // Decrement active raw fetch counter and re-evaluate sleep + this.#activeRawFetchCount = Math.max(0, this.#activeRawFetchCount - 1); + this.#resetSleepTimer(); this.#savePersistThrottled(); } } @@ -1227,6 +1290,26 @@ export class ActorInstance< // Set up state tracking to detect changes during WebSocket handling const stateBeforeHandler = this.#persistChanged; + // Track active websocket until it fully closes + this.#activeRawWebSockets.add(websocket); + this.#resetSleepTimer(); + + // Track socket close + const onSocketClosed = () => { + // Remove listener and socket from tracking + try { + websocket.removeEventListener("close", onSocketClosed); + websocket.removeEventListener("error", onSocketClosed); + } catch {} + this.#activeRawWebSockets.delete(websocket); + this.#resetSleepTimer(); + }; + try { + websocket.addEventListener("close", onSocketClosed); + websocket.addEventListener("error", onSocketClosed); + } catch {} + + // Handle WebSocket await this.#config.onWebSocket(this.actorContext, websocket, opts); // If state changed during the handler, save it @@ -1414,12 +1497,109 @@ export class ActorInstance< } } - async stop() { - if (this.isStopping) { + // MARK: Sleep + /** + * Reset timer from the last actor interaction that allows it to be put to sleep. + * + * This should be called any time a sleep-related event happens: + * - Connection opens (will clear timer) + * - Connection closes (will schedule timer if there are no open connections) + * - Alarm triggers (will reset timer) + * + * We don't need to call this on events like individual action calls, since there will always be a connection open for these. + **/ + #resetSleepTimer() { + if (this.#config.options.noSleep || !this.#sleepingSupported) return; + + const canSleep = this.#canSleep(); + + logger().debug("resetting sleep timer", { + canSleep, + existingTimeout: !!this.#sleepTimeout, + }); + + if (this.#sleepTimeout) { + clearTimeout(this.#sleepTimeout); + this.#sleepTimeout = undefined; + } + + if (canSleep) { + this.#sleepTimeout = setTimeout(() => { + this._sleep().catch((error) => { + logger().error("error during sleep", { + error: stringifyError(error), + }); + }); + }, this.#config.options.sleepTimeout); + } + } + + /** If this actor can be put in a sleeping state. */ + #canSleep(): boolean { + if (!this.#ready) return false; + + // Check for active conns. This will also cover active actions, since all actions have a connection. + for (const conn of this.#connections.values()) { + if (conn.status === "connected") return false; + } + + // Do not sleep if raw fetches are in-flight + if (this.#activeRawFetchCount > 0) return false; + + // Do not sleep if there are raw websockets open + if (this.#activeRawWebSockets.size > 0) return false; + + return true; + } + + /** Puts an actor to sleep. */ + async _sleep() { + invariant(this.#sleepingSupported, "sleeping not supported"); + invariant(this.#actorDriver.sleep, "no sleep on driver"); + + if (this.#sleepCalled) { + logger().warn("already sleeping actor"); + return; + } + this.#sleepCalled = true; + + logger().info("actor sleeping"); + + // The actor driver should call stop when ready to stop + // + // This will call _stop once Pegboard responds with the new status + this.#actorDriver.sleep(this.#actorId); + } + + // MARK: Stop + async _stop() { + if (this.#stopCalled) { logger().warn("already stopping actor"); return; } - this.isStopping = true; + this.#stopCalled = true; + + logger().info("actor stopping"); + + // Call onStop lifecycle hook if defined + if (this.#config.onStop) { + try { + logger().debug("calling onStop"); + const result = this.#config.onStop(this.actorContext); + if (result instanceof Promise) { + await deadline(result, this.#config.options.onStopTimeout); + } + logger().debug("onStop completed"); + } catch (error) { + if (error instanceof DeadlineError) { + logger().error("onStop timed out"); + } else { + logger().error("error in onStop", { + error: stringifyError(error), + }); + } + } + } // Write state await this.saveState({ immediate: true }); diff --git a/packages/core/src/client/actor-conn.ts b/packages/core/src/client/actor-conn.ts index 167cd2bcc..3b73f57dc 100644 --- a/packages/core/src/client/actor-conn.ts +++ b/packages/core/src/client/actor-conn.ts @@ -745,7 +745,7 @@ enc } this.#disposed = true; - logger().debug("disposing actor"); + logger().debug("disposing actor conn"); // Clear interval so NodeJS process can exit clearInterval(this.#keepNodeAliveInterval); diff --git a/packages/core/src/driver-test-suite/mod.ts b/packages/core/src/driver-test-suite/mod.ts index 0f857b5a3..388a53f11 100644 --- a/packages/core/src/driver-test-suite/mod.ts +++ b/packages/core/src/driver-test-suite/mod.ts @@ -13,7 +13,10 @@ import { runActionFeaturesTests } from "./tests/action-features"; import { runActorAuthTests } from "./tests/actor-auth"; import { runActorConnTests } from "./tests/actor-conn"; import { runActorConnStateTests } from "./tests/actor-conn-state"; -import { runActorDriverTests } from "./tests/actor-driver"; +import { + runActorDriverTests, + runActorDriverTestsWithTransport, +} from "./tests/actor-driver"; import { runActorErrorHandlingTests } from "./tests/actor-error-handling"; import { runActorHandleTests } from "./tests/actor-handle"; import { runActorInlineClientTests } from "./tests/actor-inline-client"; @@ -30,6 +33,7 @@ import { runRequestAccessTests } from "./tests/request-access"; export interface SkipTests { schedule?: boolean; + sleep?: boolean; } export interface DriverTestConfig { @@ -94,6 +98,8 @@ export function runDriverTests( runActorConnStateTests({ ...driverTestConfig, transport }); runRequestAccessTests({ ...driverTestConfig, transport }); + + runActorDriverTestsWithTransport({ ...driverTestConfig, transport }); }); } diff --git a/packages/core/src/driver-test-suite/tests/actor-conn.ts b/packages/core/src/driver-test-suite/tests/actor-conn.ts index 8d6c88130..98c42eaee 100644 --- a/packages/core/src/driver-test-suite/tests/actor-conn.ts +++ b/packages/core/src/driver-test-suite/tests/actor-conn.ts @@ -259,7 +259,8 @@ export function runActorConnTests(driverTestConfig: DriverTestConfig) { }); describe("Connection Liveness", () => { - test("should return correct liveness status for connections", async (c) => { + // TODO: KIT-242 + test.skip("should return correct liveness status for connections", async (c) => { const { client } = await setupDriverTest(c, driverTestConfig); // Create actor and connection diff --git a/packages/core/src/driver-test-suite/tests/actor-driver.ts b/packages/core/src/driver-test-suite/tests/actor-driver.ts index 7626cd0d0..d51720ee4 100644 --- a/packages/core/src/driver-test-suite/tests/actor-driver.ts +++ b/packages/core/src/driver-test-suite/tests/actor-driver.ts @@ -1,6 +1,7 @@ import { describe } from "vitest"; import type { DriverTestConfig } from "../mod"; import { runActorScheduleTests } from "./actor-schedule"; +import { runActorSleepTests } from "./actor-sleep"; import { runActorStateTests } from "./actor-state"; export function runActorDriverTests(driverTestConfig: DriverTestConfig) { @@ -12,3 +13,13 @@ export function runActorDriverTests(driverTestConfig: DriverTestConfig) { runActorScheduleTests(driverTestConfig); }); } + +/** Actor driver tests that need to be tested for all transport mechanisms. */ +export function runActorDriverTestsWithTransport( + driverTestConfig: DriverTestConfig, +) { + describe("Actor Driver Tests", () => { + // Run actor sleep tests + runActorSleepTests(driverTestConfig); + }); +} diff --git a/packages/core/src/driver-test-suite/tests/actor-schedule.ts b/packages/core/src/driver-test-suite/tests/actor-schedule.ts index 2f3c547d2..55d1fc676 100644 --- a/packages/core/src/driver-test-suite/tests/actor-schedule.ts +++ b/packages/core/src/driver-test-suite/tests/actor-schedule.ts @@ -18,7 +18,7 @@ export function runActorScheduleTests(driverTestConfig: DriverTestConfig) { await scheduled.scheduleTaskAt(timestamp); // Wait for longer than the scheduled time - await waitFor(driverTestConfig, 150); + await waitFor(driverTestConfig, 200); // Verify the scheduled task ran const lastRun = await scheduled.getLastRun(); @@ -38,7 +38,7 @@ export function runActorScheduleTests(driverTestConfig: DriverTestConfig) { await scheduled.scheduleTaskAfter(100); // Wait for longer than the scheduled time - await waitFor(driverTestConfig, 150); + await waitFor(driverTestConfig, 200); // Verify the scheduled task ran const lastRun = await scheduled.getLastRun(); @@ -56,7 +56,7 @@ export function runActorScheduleTests(driverTestConfig: DriverTestConfig) { await scheduled.scheduleTaskAfter(200); // Wait a little so the schedule is stored but hasn't triggered yet - await waitFor(driverTestConfig, 50); + await waitFor(driverTestConfig, 100); // Get a new reference to simulate actor restart const newInstance = client.scheduled.getOrCreate(); @@ -83,22 +83,22 @@ export function runActorScheduleTests(driverTestConfig: DriverTestConfig) { await scheduled.clearHistory(); // Schedule multiple tasks with different delays - await scheduled.scheduleTaskAfterWithId("first", 50); - await scheduled.scheduleTaskAfterWithId("second", 150); - await scheduled.scheduleTaskAfterWithId("third", 250); + await scheduled.scheduleTaskAfterWithId("first", 100); + await scheduled.scheduleTaskAfterWithId("second", 300); + await scheduled.scheduleTaskAfterWithId("third", 500); // Wait for first task only - await waitFor(driverTestConfig, 100); + await waitFor(driverTestConfig, 200); const history1 = await scheduled.getTaskHistory(); expect(history1).toEqual(["first"]); // Wait for second task - await waitFor(driverTestConfig, 100); + await waitFor(driverTestConfig, 200); const history2 = await scheduled.getTaskHistory(); expect(history2).toEqual(["first", "second"]); // Wait for third task - await waitFor(driverTestConfig, 100); + await waitFor(driverTestConfig, 200); const history3 = await scheduled.getTaskHistory(); expect(history3).toEqual(["first", "second", "third"]); }); diff --git a/packages/core/src/driver-test-suite/tests/actor-sleep.ts b/packages/core/src/driver-test-suite/tests/actor-sleep.ts new file mode 100644 index 000000000..c2279cdf3 --- /dev/null +++ b/packages/core/src/driver-test-suite/tests/actor-sleep.ts @@ -0,0 +1,413 @@ +import { describe, expect, test } from "vitest"; +import { SLEEP_TIMEOUT } from "../../../fixtures/driver-test-suite/sleep"; +import type { DriverTestConfig } from "../mod"; +import { setupDriverTest, waitFor } from "../utils"; + +// TODO: These tests are broken with fake timers because `_sleep` requires +// background async promises that have a race condition with calling +// `getCounts` +// +// To fix this, we need to imeplment some event system to be able to check for +// when an actor has slept. OR we can expose an HTTP endpoint on the manager +// for `.test` that checks if na actor is sleeping that we can poll. +export function runActorSleepTests(driverTestConfig: DriverTestConfig) { + describe.skipIf(driverTestConfig.skip?.sleep)("Actor Sleep Tests", () => { + test("actor sleep persists state", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + + // Create actor + const sleepActor = client.sleep.getOrCreate(); + + // Verify initial sleep count + { + const { startCount, sleepCount } = await sleepActor.getCounts(); + expect(sleepCount).toBe(0); + expect(startCount).toBe(1); + } + + // Trigger sleep + await sleepActor.triggerSleep(); + + // HACK: Wait for sleep to finish in background + await waitFor(driverTestConfig, 100); + + // Get sleep count after restore + { + const { startCount, sleepCount } = await sleepActor.getCounts(); + expect(sleepCount).toBe(1); + expect(startCount).toBe(2); + } + }); + + test("actor sleep persists state with connect", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + + // Create actor with persistent connection + const sleepActor = client.sleep.getOrCreate().connect(); + + // Verify initial sleep count + { + const { startCount, sleepCount } = await sleepActor.getCounts(); + expect(sleepCount).toBe(0); + expect(startCount).toBe(1); + } + + // Trigger sleep + await sleepActor.triggerSleep(); + + // Disconnect to allow reconnection + await sleepActor.dispose(); + + // HACK: Wait for sleep to finish in background + await waitFor(driverTestConfig, 100); + + // Reconnect to get sleep count after restore + const sleepActor2 = client.sleep.getOrCreate(); + { + const { startCount, sleepCount } = await sleepActor2.getCounts(); + expect(sleepCount).toBe(1); + expect(startCount).toBe(2); + } + }); + + test("actor automatically sleeps after timeout", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + + // Create actor + const sleepActor = client.sleep.getOrCreate(); + + // Verify initial sleep count + { + const { startCount, sleepCount } = await sleepActor.getCounts(); + expect(sleepCount).toBe(0); + expect(startCount).toBe(1); + } + + // Wait for sleep + await waitFor(driverTestConfig, SLEEP_TIMEOUT + 100); + + // Get sleep count after restore + { + const { startCount, sleepCount } = await sleepActor.getCounts(); + expect(sleepCount).toBe(1); + expect(startCount).toBe(2); + } + }); + + test("actor automatically sleeps after timeout with connect", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + + // Create actor with persistent connection + const sleepActor = client.sleep.getOrCreate().connect(); + + // Verify initial sleep count + { + const { startCount, sleepCount } = await sleepActor.getCounts(); + expect(sleepCount).toBe(0); + expect(startCount).toBe(1); + } + + // Disconnect to allow actor to sleep + await sleepActor.dispose(); + + // Wait for sleep + await waitFor(driverTestConfig, SLEEP_TIMEOUT + 100); + + // Reconnect to get sleep count after restore + const sleepActor2 = client.sleep.getOrCreate(); + { + const { startCount, sleepCount } = await sleepActor2.getCounts(); + expect(sleepCount).toBe(1); + expect(startCount).toBe(2); + } + }); + + test("rpc calls keep actor awake", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + + // Create actor + const sleepActor = client.sleep.getOrCreate(); + + // Verify initial state + { + const { startCount, sleepCount } = await sleepActor.getCounts(); + expect(sleepCount).toBe(0); + expect(startCount).toBe(1); + } + + // Wait almost until sleep timeout, then make RPC call + await waitFor(driverTestConfig, SLEEP_TIMEOUT - 100); + + // RPC call should reset the sleep timer + { + const { startCount, sleepCount } = await sleepActor.getCounts(); + expect(sleepCount).toBe(0); // Haven't slept yet + expect(startCount).toBe(1); // Still the same instance + } + + // Wait another partial timeout period - actor should still be awake + await waitFor(driverTestConfig, SLEEP_TIMEOUT - 100); + + // Actor should still be awake + { + const { startCount, sleepCount } = await sleepActor.getCounts(); + expect(sleepCount).toBe(0); // Still haven't slept + expect(startCount).toBe(1); // Still the same instance + } + + // Now wait for full timeout without any RPC calls + await waitFor(driverTestConfig, SLEEP_TIMEOUT + 100); + + // Actor should have slept and restarted + { + const { startCount, sleepCount } = await sleepActor.getCounts(); + expect(sleepCount).toBe(1); // Slept once + expect(startCount).toBe(2); // New instance after sleep + } + }); + + test("alarms keep actor awake", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + + // Create actor + const sleepActor = client.sleep.getOrCreate(); + + // Verify initial state + { + const { startCount, sleepCount } = await sleepActor.getCounts(); + expect(sleepCount).toBe(0); + expect(startCount).toBe(1); + } + + // Set an alarm to keep the actor awake + await sleepActor.setAlarm(SLEEP_TIMEOUT - 100); + + // Wait until after SLEEPT_IMEOUT to validate the actor did not sleep + await waitFor(driverTestConfig, SLEEP_TIMEOUT + 100); + + // Actor should not have slept + { + const { startCount, sleepCount } = await sleepActor.getCounts(); + expect(sleepCount).toBe(0); + expect(startCount).toBe(1); + } + }); + + test("alarms wake actors", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + + // Create actor + const sleepActor = client.sleep.getOrCreate(); + + // Verify initial state + { + const { startCount, sleepCount } = await sleepActor.getCounts(); + expect(sleepCount).toBe(0); + expect(startCount).toBe(1); + } + + // Set an alarm to keep the actor awake + await sleepActor.setAlarm(SLEEP_TIMEOUT + 100); + + // Wait until after SLEEPT_IMEOUT to validate the actor did not sleep + await waitFor(driverTestConfig, SLEEP_TIMEOUT + 200); + + // Actor should not have slept + { + const { startCount, sleepCount } = await sleepActor.getCounts(); + expect(sleepCount).toBe(1); + expect(startCount).toBe(2); + } + }); + + test("long running rpcs keep actor awake", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + + // Create actor + const sleepActor = client.sleepWithLongRpc.getOrCreate().connect(); + + // Verify initial state + { + const { startCount, sleepCount } = await sleepActor.getCounts(); + expect(sleepCount).toBe(0); + expect(startCount).toBe(1); + } + + // Start a long-running RPC that takes longer than the sleep timeout + const waitPromise = new Promise((resolve) => + sleepActor.once("waiting", resolve), + ); + const longRunningPromise = sleepActor.longRunningRpc(); + await waitPromise; + await waitFor(driverTestConfig, SLEEP_TIMEOUT + 100); + await sleepActor.finishLongRunningRpc(); + await longRunningPromise; + + // Actor should still be the same instance (didn't sleep during RPC) + { + const { startCount, sleepCount } = await sleepActor.getCounts(); + expect(sleepCount).toBe(0); // Hasn't slept + expect(startCount).toBe(1); // Same instance + } + await sleepActor.dispose(); + + // Now wait for the sleep timeout + await waitFor(driverTestConfig, SLEEP_TIMEOUT + 100); + + // Actor should have slept after the timeout + const sleepActor2 = client.sleepWithLongRpc.getOrCreate(); + { + const { startCount, sleepCount } = await sleepActor2.getCounts(); + expect(sleepCount).toBe(1); // Slept once + expect(startCount).toBe(2); // New instance after sleep + } + }); + + test("active raw websockets keep actor awake", async (c) => { + const { client, endpoint: baseUrl } = await setupDriverTest( + c, + driverTestConfig, + ); + + // Create actor + const sleepActor = client.sleepWithRawWebSocket.getOrCreate(); + + // Verify initial state + { + const { startCount, sleepCount } = await sleepActor.getCounts(); + expect(sleepCount).toBe(0); + expect(startCount).toBe(1); + } + + // Connect WebSocket + const ws = await sleepActor.websocket(); + + await new Promise((resolve, reject) => { + ws.onopen = () => resolve(); + ws.onerror = reject; + }); + + // Wait for connection message + await new Promise((resolve) => { + ws.onmessage = (event) => { + const data = JSON.parse(event.data); + if (data.type === "connected") { + resolve(); + } + }; + }); + + // Wait longer than sleep timeout while keeping WebSocket connected + await waitFor(driverTestConfig, SLEEP_TIMEOUT + 100); + + // Send a message to check if actor is still alive + ws.send(JSON.stringify({ type: "getCounts" })); + + const counts = await new Promise((resolve) => { + ws.onmessage = (event) => { + const data = JSON.parse(event.data); + if (data.type === "counts") { + resolve(data); + } + }; + }); + + // Actor should still be the same instance (didn't sleep while WebSocket connected) + expect(counts.sleepCount).toBe(0); + expect(counts.startCount).toBe(1); + + // Close WebSocket + ws.close(); + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Wait for sleep timeout after WebSocket closed + await waitFor(driverTestConfig, SLEEP_TIMEOUT + 100); + + // Actor should have slept after WebSocket closed + { + const { startCount, sleepCount } = await sleepActor.getCounts(); + expect(sleepCount).toBe(1); // Slept once + expect(startCount).toBe(2); // New instance after sleep + } + }); + + test("active raw fetch requests keep actor awake", async (c) => { + const { client, endpoint: baseUrl } = await setupDriverTest( + c, + driverTestConfig, + ); + + // Create actor + const sleepActor = client.sleepWithRawHttp.getOrCreate(); + + // Verify initial state + { + const { startCount, sleepCount } = await sleepActor.getCounts(); + expect(sleepCount).toBe(0); + expect(startCount).toBe(1); + } + + // Start a long-running fetch request + const fetchDuration = SLEEP_TIMEOUT + 100; + const fetchPromise = sleepActor.fetch( + `long-request?duration=${fetchDuration}`, + ); + + // Wait for the fetch to complete + const response = await fetchPromise; + const result = (await response.json()) as { completed: boolean }; + expect(result.completed).toBe(true); + { + const { startCount, sleepCount, requestCount } = + await sleepActor.getCounts(); + expect(sleepCount).toBe(0); + expect(startCount).toBe(1); + expect(requestCount).toBe(1); + } + + // Wait for sleep timeout + await waitFor(driverTestConfig, SLEEP_TIMEOUT + 100); + + // Actor should have slept after timeout + { + const { startCount, sleepCount } = await sleepActor.getCounts(); + expect(sleepCount).toBe(1); // Slept once + expect(startCount).toBe(2); // New instance after sleep + } + }); + + test("noSleep option disables sleeping", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + + // Create actor with noSleep option + const sleepActor = client.sleepWithNoSleepOption.getOrCreate(); + + // Verify initial state + { + const { startCount, sleepCount } = await sleepActor.getCounts(); + expect(sleepCount).toBe(0); + expect(startCount).toBe(1); + } + + // Wait longer than sleep timeout + await waitFor(driverTestConfig, SLEEP_TIMEOUT + 100); + + // Actor should NOT have slept due to noSleep option + { + const { startCount, sleepCount } = await sleepActor.getCounts(); + expect(sleepCount).toBe(0); // Never slept + expect(startCount).toBe(1); // Still the same instance + } + + // Wait even longer to be sure + await waitFor(driverTestConfig, SLEEP_TIMEOUT + 100); + + // Actor should still not have slept + { + const { startCount, sleepCount } = await sleepActor.getCounts(); + expect(sleepCount).toBe(0); // Never slept + expect(startCount).toBe(1); // Still the same instance + } + }); + }); +} diff --git a/packages/core/src/drivers/engine/actor-driver.ts b/packages/core/src/drivers/engine/actor-driver.ts index 9c2e2ee71..484d49eac 100644 --- a/packages/core/src/drivers/engine/actor-driver.ts +++ b/packages/core/src/drivers/engine/actor-driver.ts @@ -183,11 +183,6 @@ export class EngineActorDriver implements ActorDriver { // TODO: Set timeout // TODO: Use alarm on sleep // TODO: Send alarm to runner - - const delay = Math.max(timestamp - Date.now(), 0); - setTimeout(() => { - actor.onAlarm(); - }, delay); } async getDatabase(_actorId: string): Promise { @@ -261,7 +256,7 @@ export class EngineActorDriver implements ActorDriver { const handler = this.#actors.get(actorId); if (handler?.actor) { - await handler.actor.stop(); + await handler.actor._stop(); this.#actors.delete(actorId); } @@ -354,6 +349,10 @@ export class EngineActorDriver implements ActorDriver { }); } + sleep(actorId: string) { + this.#runner.sleepActor(actorId); + } + async shutdown(immediate: boolean): Promise { logger().info("stopping engine actor driver"); await this.#runner.shutdown(immediate); diff --git a/packages/core/src/drivers/file-system/actor.ts b/packages/core/src/drivers/file-system/actor.ts index 0a546256d..683fb9df6 100644 --- a/packages/core/src/drivers/file-system/actor.ts +++ b/packages/core/src/drivers/file-system/actor.ts @@ -64,21 +64,22 @@ export class FileSystemActorDriver implements ActorDriver { } async writePersistedData(actorId: string, data: Uint8Array): Promise { - const entry = await this.#state.loadActorStateOrError(actorId); - entry.persistedData = data; + const state = await this.#state.loadActorStateOrError(actorId); + state.persistedData = data; - // Save state to disk - await this.#state.writeActor(actorId); + // Save state to disk (pass state to avoid race with sleep/removal) + await this.#state.writeActor(actorId, state); } async setAlarm(actor: AnyActorInstance, timestamp: number): Promise { - const delay = Math.max(0, timestamp - Date.now()); - setTimeout(() => { - actor.onAlarm(); - }, delay); + await this.#state.setActorAlarm(actor.id, timestamp); } getDatabase(actorId: string): Promise { return this.#state.createDatabase(actorId); } + + sleep(actorId: string): void { + this.#state.sleepActor(actorId); + } } diff --git a/packages/core/src/drivers/file-system/global-state.ts b/packages/core/src/drivers/file-system/global-state.ts index 1f2df7107..505b99f25 100644 --- a/packages/core/src/drivers/file-system/global-state.ts +++ b/packages/core/src/drivers/file-system/global-state.ts @@ -20,6 +20,12 @@ import { } from "@/driver-helpers/mod"; import type { RegistryConfig } from "@/registry/config"; import type { RunConfig } from "@/registry/run-config"; +import { + type LongTimeoutHandle, + SinglePromiseQueue, + setLongTimeout, + stringifyError, +} from "@/utils"; import { logger } from "./log"; import { ensureDirectoryExists, @@ -42,8 +48,15 @@ interface ActorEntry { genericConnGlobalState: GenericConnGlobalState; - /** Promise for ongoing write operations to prevent concurrent writes */ - writePromise?: Promise; + alarmTimeout?: LongTimeoutHandle; + /** The timestamp currently scheduled for this actor's alarm (ms since epoch). */ + alarmTimestamp?: number; + + /** Resolver for pending write operations that need to be notified when any write completes */ + pendingWriteResolver?: PromiseWithResolvers; + + /** If the actor has been removed by destroy or sleep. */ + removed: boolean; } /** @@ -64,11 +77,19 @@ export class FileSystemGlobalState { #storagePath: string; #stateDir: string; #dbsDir: string; + #alarmsDir: string; #persist: boolean; #actors = new Map(); #actorCountOnStartup: number = 0; + #runnerParams?: { + registryConfig: RegistryConfig; + runConfig: RunConfig; + inlineClient: AnyClient; + actorDriver: ActorDriver; + }; + get storagePath() { return this.#storagePath; } @@ -82,11 +103,13 @@ export class FileSystemGlobalState { this.#storagePath = persist ? getStoragePath(customPath) : "/tmp"; this.#stateDir = path.join(this.#storagePath, "state"); this.#dbsDir = path.join(this.#storagePath, "databases"); + this.#alarmsDir = path.join(this.#storagePath, "alarms"); if (this.#persist) { // Ensure storage directories exist synchronously during initialization ensureDirectoryExistsSync(this.#stateDir); ensureDirectoryExistsSync(this.#dbsDir); + ensureDirectoryExistsSync(this.#alarmsDir); try { const actorIds = fsSync.readdirSync(this.#stateDir); @@ -119,6 +142,10 @@ export class FileSystemGlobalState { return path.join(this.#dbsDir, `${actorId}.db`); } + getActorAlarmPath(actorId: string): string { + return path.join(this.#alarmsDir, actorId); + } + async *getActorsIterator(params: { cursor?: string; }): AsyncGenerator { @@ -163,6 +190,9 @@ export class FileSystemGlobalState { entry = { id: actorId, genericConnGlobalState: new GenericConnGlobalState(), + removed: false, + stateWriteQueue: new SinglePromiseQueue(), + alarmWriteQueue: new SinglePromiseQueue(), }; this.#actors.set(actorId, entry); return entry; @@ -177,6 +207,8 @@ export class FileSystemGlobalState { key: ActorKey, input: unknown | undefined, ): Promise { + // TODO: Does not check if actor already exists on fs + if (this.#actors.has(actorId)) { throw new ActorAlreadyExists(name, key); } @@ -188,7 +220,7 @@ export class FileSystemGlobalState { key, persistedData: serializeEmptyPersistData(input), }; - await this.writeActor(actorId); + await this.writeActor(actorId, entry.state); return entry; } @@ -264,47 +296,75 @@ export class FileSystemGlobalState { key, persistedData: serializeEmptyPersistData(input), }; - await this.writeActor(actorId); + await this.writeActor(actorId, entry.state); } return entry; } + async sleepActor(actorId: string) { + invariant( + this.#persist, + "cannot sleep actor with memory driver, must use file system driver", + ); + + const actor = this.#actors.get(actorId); + invariant(actor, `tried to sleep ${actorId}, does not exist`); + + // Wait for actor to fully start before stopping it to avoid race conditions + if (actor.loadPromise) await actor.loadPromise.catch(); + if (actor.startPromise?.promise) await actor.startPromise.promise.catch(); + if (actor.stateWriteQueue.runningDrainLoop) + await actor.stateWriteQueue.runningDrainLoop.catch(); + + // Mark as removed + actor.removed = true; + + // Stop actor + invariant(actor.actor, "actor should be loaded"); + await actor.actor._stop(); + + // Remove from map after stop is complete + this.#actors.delete(actorId); + } + /** - * Save actor state to disk + * Save actor state to disk. */ - async writeActor(actorId: string): Promise { + async writeActor(actorId: string, state: ActorState): Promise { if (!this.#persist) { return; } const entry = this.#actors.get(actorId); - invariant(entry?.state, "missing actor state"); - const state = entry.state; - - // Get the current write promise for this actor (or resolved promise if none) - const currentWrite = entry.writePromise || Promise.resolve(); - - // Chain our write after the current one - const newWrite = currentWrite - .then(() => this.#performWrite(actorId, state)) - .catch((err) => { - // Log but don't prevent future writes - logger().error("write failed", { actorId, error: err }); - throw err; - }); + invariant(entry, "actor entry does not exist"); - // Update the actor's write promise - entry.writePromise = newWrite; + await this.#performWrite(actorId, state); + } - // Wait for our write to complete - try { - await newWrite; - } finally { - // Clean up if we're the last write - if (entry.writePromise === newWrite) { - entry.writePromise = undefined; + async setActorAlarm(actorId: string, timestamp: number) { + const entry = this.#actors.get(actorId); + invariant(entry, "actor entry does not exist"); + + // Persist alarm to disk + if (this.#persist) { + const alarmPath = this.getActorAlarmPath(actorId); + const tempPath = `${alarmPath}.tmp.${crypto.randomUUID()}`; + try { + await ensureDirectoryExists(path.dirname(alarmPath)); + const data = cbor.encode(timestamp); + await fs.writeFile(tempPath, data); + await fs.rename(tempPath, alarmPath); + } catch (error) { + try { + await fs.unlink(tempPath); + } catch {} + logger().error("failed to write alarm", { actorId, error }); + throw new Error(`Failed to write alarm: ${error}`); } } + + // Schedule timeout + this.#scheduleAlarmTimeout(actorId, timestamp); } /** @@ -335,6 +395,40 @@ export class FileSystemGlobalState { } } + /** + * Call this method after the actor driver has been initiated. + * + * This will trigger all initial alarms from the file system. + * + * This needs to be sync since DriverConfig.actor is sync + */ + onRunnerStart( + registryConfig: RegistryConfig, + runConfig: RunConfig, + inlineClient: AnyClient, + actorDriver: ActorDriver, + ) { + if (this.#runnerParams) { + logger().warn("already called onRunnerStart"); + return; + } + + // Save runner params for future use + this.#runnerParams = { + registryConfig, + runConfig, + inlineClient, + actorDriver, + }; + + // Load alarms from disk and schedule timeouts + try { + this.#loadAlarmsSync(); + } catch (err) { + logger().error("failed to load alarms on startup", { error: err }); + } + } + async startActor( registryConfig: RegistryConfig, runConfig: RunConfig, @@ -413,6 +507,112 @@ export class FileSystemGlobalState { return this.getActorDbPath(actorId); } + /** + * Load all persisted alarms from disk and schedule their timers. + */ + #loadAlarmsSync(): void { + try { + const files = fsSync.existsSync(this.#alarmsDir) + ? fsSync.readdirSync(this.#alarmsDir) + : []; + for (const file of files) { + // Skip temp files + if (file.includes(".tmp.")) continue; + const fullPath = path.join(this.#alarmsDir, file); + try { + const buf = fsSync.readFileSync(fullPath); + const timestamp = cbor.decode(buf) as number; + if (typeof timestamp === "number" && Number.isFinite(timestamp)) { + this.#scheduleAlarmTimeout(file, timestamp); + } else { + logger().debug("invalid alarm file contents", { file }); + } + } catch (err) { + logger().error("failed to read alarm file", { + file, + error: stringifyError(err), + }); + } + } + } catch (err) { + logger().error("failed to list alarms directory", { error: err }); + } + } + + /** + * Schedule an alarm timer for an actor without writing to disk. + */ + #scheduleAlarmTimeout(actorId: string, timestamp: number) { + const entry = this.#upsertEntry(actorId); + + // If there's already an earlier alarm scheduled, do not override it. + if ( + entry.alarmTimestamp !== undefined && + timestamp >= entry.alarmTimestamp + ) { + logger().debug("skipping alarm schedule (later than existing)", { + actorId, + timestamp, + current: entry.alarmTimestamp, + }); + return; + } + + logger().debug("scheduling alarm", { actorId, timestamp }); + + // Cancel existing timeout and update the current scheduled timestamp + entry.alarmTimeout?.abort(); + entry.alarmTimestamp = timestamp; + + const delay = Math.max(0, timestamp - Date.now()); + entry.alarmTimeout = setLongTimeout(async () => { + // Clear currently scheduled timestamp as this alarm is firing now + entry.alarmTimestamp = undefined; + // On trigger: remove persisted alarm file + if (this.#persist) { + try { + await fs.unlink(this.getActorAlarmPath(actorId)); + } catch (err: any) { + if (err?.code !== "ENOENT") { + logger().debug("failed to remove alarm file", { + actorId, + error: stringifyError(err), + }); + } + } + } + + try { + logger().debug("triggering alarm", { actorId, timestamp }); + + // Ensure actor state exists and start actor if needed + const loaded = await this.loadActor(actorId); + if (!loaded.state) throw new Error(`Actor does not exist: ${actorId}`); + + // Start actor if not already running + const runnerParams = this.#runnerParams; + invariant(runnerParams, "missing runner params"); + if (!loaded.actor) { + await this.startActor( + runnerParams.registryConfig, + runnerParams.runConfig, + runnerParams.inlineClient, + runnerParams.actorDriver, + actorId, + ); + } + + invariant(loaded.actor, "actor should be loaded after wake"); + await loaded.actor._onAlarm(); + } catch (err) { + logger().error("failed to handle alarm", { + actorId, + error: stringifyError(err), + }); + } + }, delay); + } + getOrCreateInspectorAccessToken(): string { const tokenPath = path.join(this.#storagePath, "inspector-token"); if (fsSync.existsSync(tokenPath)) { diff --git a/packages/core/src/drivers/file-system/mod.ts b/packages/core/src/drivers/file-system/mod.ts index b8e833bf8..a8b5b0e6d 100644 --- a/packages/core/src/drivers/file-system/mod.ts +++ b/packages/core/src/drivers/file-system/mod.ts @@ -22,14 +22,19 @@ export function createFileSystemOrMemoryDriver( state, driverConfig, ), - actor: (registryConfig, runConfig, managerDriver, inlineClient) => - new FileSystemActorDriver( + actor: (registryConfig, runConfig, managerDriver, inlineClient) => { + const actorDriver = new FileSystemActorDriver( registryConfig, runConfig, managerDriver, inlineClient, state, - ), + ); + + state.onRunnerStart(registryConfig, runConfig, inlineClient, actorDriver); + + return actorDriver; + }, }; return driverConfig; } diff --git a/packages/core/src/utils.ts b/packages/core/src/utils.ts index 52e0978d9..bb79214d6 100644 --- a/packages/core/src/utils.ts +++ b/packages/core/src/utils.ts @@ -66,3 +66,100 @@ export function toUint8Array(data: ArrayBuffer | ArrayBufferView): Uint8Array { throw new TypeError("Input must be ArrayBuffer or ArrayBufferView"); } } + +// Long timeouts +// +// JavaScript timers use a signed 32-bit integer for delays, so values above 2^31-1 (~24.8 days) +// are not reliable and may fire immediately or overflow. +// +// https://developer.mozilla.org/en-US/docs/Web/API/Window/setTimeout#maximum_delay_value +const TIMEOUT_MAX = 2147483647; // 2^31-1 + +export type LongTimeoutHandle = { abort: () => void }; + +export function setLongTimeout( + listener: () => void, + after: number, +): LongTimeoutHandle { + let timeout: ReturnType | undefined; + + function start(remaining: number) { + if (remaining <= TIMEOUT_MAX) { + timeout = setTimeout(listener, remaining); + } else { + timeout = setTimeout(() => { + start(remaining - TIMEOUT_MAX); + }, TIMEOUT_MAX); + } + } + + start(after); + + return { + abort: () => { + if (timeout !== undefined) clearTimeout(timeout); + }, + }; +} + +/** + * A tiny utility that coalesces/enqueues async operations so only the latest + * queued task runs per cycle, while callers receive a promise that resolves + * when the task for the cycle they joined has completed. + */ +export class SinglePromiseQueue { + /** Next operation to execute in the queue. If attempting to enqueue another op, it will override the existing op. */ + #queuedOp?: () => Promise; + + /** The currently running promise of #drainLoop. Do not await this, instead await `pending` to await the current cycle. */ + runningDrainLoop?: Promise; + + /** Pending resolver fro the currently queued entry. */ + #pending?: PromiseWithResolvers; + + /** Queue the next operation and return a promise that resolves when it flushes. */ + enqueue(op: () => Promise): Promise { + // Replace any previously queued operation with the latest one + this.#queuedOp = op; + + // Ensure a shared resolver exists for all callers in this cycle + if (!this.#pending) { + this.#pending = Promise.withResolvers(); + } + + const waitForThisCycle = this.#pending.promise; + + // Start runner if not already running + if (!this.runningDrainLoop) { + this.runningDrainLoop = this.#drainLoop(); + } + + return waitForThisCycle; + } + + /** Drain queued operations sequentially until there is nothing left. */ + async #drainLoop(): Promise { + try { + while (this.#queuedOp) { + // Capture current cycle resolver then reset for the next cycle + const resolver = this.#pending; + this.#pending = undefined; + + // Capture and clear the currently queued operation + const op = this.#queuedOp; + this.#queuedOp = undefined; + + try { + await op(); + } catch { + // Swallow errors: callers only await cycle completion, not success + } + + // Notify all waiters for this cycle + resolver?.resolve(); + } + } finally { + this.runningDrainLoop = undefined; + } + } +} diff --git a/packages/core/tests/driver-test-suite-engine.test.ts b/packages/core/tests/driver-engine.test.ts similarity index 100% rename from packages/core/tests/driver-test-suite-engine.test.ts rename to packages/core/tests/driver-engine.test.ts diff --git a/packages/core/tests/driver-test-suite-file-system.test.ts b/packages/core/tests/driver-file-system.test.ts similarity index 84% rename from packages/core/tests/driver-test-suite-file-system.test.ts rename to packages/core/tests/driver-file-system.test.ts index 6dd26cd68..6fca8683d 100644 --- a/packages/core/tests/driver-test-suite-file-system.test.ts +++ b/packages/core/tests/driver-file-system.test.ts @@ -3,6 +3,8 @@ import { createTestRuntime, runDriverTests } from "@/driver-test-suite/mod"; import { createFileSystemOrMemoryDriver } from "@/drivers/file-system/mod"; runDriverTests({ + // TODO: Remove this once timer issues are fixed in actor-sleep.ts + useRealTimers: true, async start(projectPath: string) { return await createTestRuntime( join(projectPath, "registry.ts"), diff --git a/packages/core/tests/driver-test-suite-memory.test.ts b/packages/core/tests/driver-memory.test.ts similarity index 82% rename from packages/core/tests/driver-test-suite-memory.test.ts rename to packages/core/tests/driver-memory.test.ts index 0322adc13..ee1af12c4 100644 --- a/packages/core/tests/driver-test-suite-memory.test.ts +++ b/packages/core/tests/driver-memory.test.ts @@ -3,6 +3,8 @@ import { createTestRuntime, runDriverTests } from "@/driver-test-suite/mod"; import { createFileSystemOrMemoryDriver } from "@/drivers/file-system/mod"; runDriverTests({ + // TODO: Remove this once timer issues are fixed in actor-sleep.ts + useRealTimers: true, async start(projectPath: string) { return await createTestRuntime( join(projectPath, "registry.ts"), diff --git a/packages/core/tests/set-long-timeout.test.ts b/packages/core/tests/set-long-timeout.test.ts new file mode 100644 index 000000000..86f9bfce0 --- /dev/null +++ b/packages/core/tests/set-long-timeout.test.ts @@ -0,0 +1,244 @@ +import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; +import { setLongTimeout } from "../src/utils"; + +describe("setLongTimeout", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + test("executes callback for short timeouts", () => { + const callback = vi.fn(); + const handle = setLongTimeout(callback, 100); + + expect(callback).not.toHaveBeenCalled(); + + vi.advanceTimersByTime(99); + expect(callback).not.toHaveBeenCalled(); + + vi.advanceTimersByTime(1); + expect(callback).toHaveBeenCalledTimes(1); + }); + + test("executes callback for zero timeout", () => { + const callback = vi.fn(); + setLongTimeout(callback, 0); + + expect(callback).not.toHaveBeenCalled(); + + vi.advanceTimersByTime(0); + expect(callback).toHaveBeenCalledTimes(1); + }); + + test("executes callback for negative timeout (treated as 0)", () => { + const callback = vi.fn(); + setLongTimeout(callback, -100); + + expect(callback).not.toHaveBeenCalled(); + + vi.advanceTimersByTime(0); + expect(callback).toHaveBeenCalledTimes(1); + }); + + test("handles timeout at exactly TIMEOUT_MAX", () => { + const callback = vi.fn(); + const TIMEOUT_MAX = 2147483647; + setLongTimeout(callback, TIMEOUT_MAX); + + expect(callback).not.toHaveBeenCalled(); + + vi.advanceTimersByTime(TIMEOUT_MAX - 1); + expect(callback).not.toHaveBeenCalled(); + + vi.advanceTimersByTime(1); + expect(callback).toHaveBeenCalledTimes(1); + }); + + test("handles timeout larger than TIMEOUT_MAX", () => { + const callback = vi.fn(); + const TIMEOUT_MAX = 2147483647; + const longTimeout = TIMEOUT_MAX + 1000; + + setLongTimeout(callback, longTimeout); + + expect(callback).not.toHaveBeenCalled(); + + // Advance to just before TIMEOUT_MAX + vi.advanceTimersByTime(TIMEOUT_MAX - 1); + expect(callback).not.toHaveBeenCalled(); + + // Advance past TIMEOUT_MAX - should trigger intermediate timeout + vi.advanceTimersByTime(1); + expect(callback).not.toHaveBeenCalled(); + + // Advance the remaining time + vi.advanceTimersByTime(999); + expect(callback).not.toHaveBeenCalled(); + + vi.advanceTimersByTime(1); + expect(callback).toHaveBeenCalledTimes(1); + }); + + test("handles very large timeout requiring multiple chunks", () => { + const callback = vi.fn(); + const TIMEOUT_MAX = 2147483647; + const veryLongTimeout = TIMEOUT_MAX * 2 + 5000; + + setLongTimeout(callback, veryLongTimeout); + + expect(callback).not.toHaveBeenCalled(); + + // First chunk + vi.advanceTimersByTime(TIMEOUT_MAX); + expect(callback).not.toHaveBeenCalled(); + + // Second chunk + vi.advanceTimersByTime(TIMEOUT_MAX); + expect(callback).not.toHaveBeenCalled(); + + // Final remainder + vi.advanceTimersByTime(4999); + expect(callback).not.toHaveBeenCalled(); + + vi.advanceTimersByTime(1); + expect(callback).toHaveBeenCalledTimes(1); + }); + + test("abort cancels short timeout", () => { + const callback = vi.fn(); + const handle = setLongTimeout(callback, 100); + + vi.advanceTimersByTime(50); + expect(callback).not.toHaveBeenCalled(); + + handle.abort(); + + vi.advanceTimersByTime(100); + expect(callback).not.toHaveBeenCalled(); + }); + + test("abort cancels long timeout during first chunk", () => { + const callback = vi.fn(); + const TIMEOUT_MAX = 2147483647; + const handle = setLongTimeout(callback, TIMEOUT_MAX + 1000); + + vi.advanceTimersByTime(1000); + expect(callback).not.toHaveBeenCalled(); + + handle.abort(); + + vi.advanceTimersByTime(TIMEOUT_MAX + 1000); + expect(callback).not.toHaveBeenCalled(); + }); + + test("abort cancels long timeout after first chunk", () => { + const callback = vi.fn(); + const TIMEOUT_MAX = 2147483647; + const handle = setLongTimeout(callback, TIMEOUT_MAX + 1000); + + // Advance past first chunk + vi.advanceTimersByTime(TIMEOUT_MAX); + expect(callback).not.toHaveBeenCalled(); + + // Abort during second chunk + handle.abort(); + + vi.advanceTimersByTime(1000); + expect(callback).not.toHaveBeenCalled(); + }); + + test("multiple abort calls are safe", () => { + const callback = vi.fn(); + const handle = setLongTimeout(callback, 100); + + handle.abort(); + handle.abort(); // Second abort should not throw + + vi.advanceTimersByTime(100); + expect(callback).not.toHaveBeenCalled(); + }); + + test("abort after timeout has fired is safe", () => { + const callback = vi.fn(); + const handle = setLongTimeout(callback, 100); + + vi.advanceTimersByTime(100); + expect(callback).toHaveBeenCalledTimes(1); + + // Abort after callback has executed should not throw + handle.abort(); + + vi.advanceTimersByTime(100); + expect(callback).toHaveBeenCalledTimes(1); // Still only called once + }); + + test("handles multiple concurrent timeouts", () => { + const callback1 = vi.fn(); + const callback2 = vi.fn(); + const callback3 = vi.fn(); + + setLongTimeout(callback1, 50); + setLongTimeout(callback2, 100); + setLongTimeout(callback3, 150); + + vi.advanceTimersByTime(49); + expect(callback1).not.toHaveBeenCalled(); + expect(callback2).not.toHaveBeenCalled(); + expect(callback3).not.toHaveBeenCalled(); + + vi.advanceTimersByTime(1); // 50ms total + expect(callback1).toHaveBeenCalledTimes(1); + expect(callback2).not.toHaveBeenCalled(); + expect(callback3).not.toHaveBeenCalled(); + + vi.advanceTimersByTime(50); // 100ms total + expect(callback1).toHaveBeenCalledTimes(1); + expect(callback2).toHaveBeenCalledTimes(1); + expect(callback3).not.toHaveBeenCalled(); + + vi.advanceTimersByTime(50); // 150ms total + expect(callback1).toHaveBeenCalledTimes(1); + expect(callback2).toHaveBeenCalledTimes(1); + expect(callback3).toHaveBeenCalledTimes(1); + }); + + test("real-world alarm scenario - 100ms delay", () => { + const callback = vi.fn(); + const now = Date.now(); + const futureTimestamp = now + 100; + const delay = Math.max(0, futureTimestamp - now); + + expect(delay).toBe(100); + + setLongTimeout(callback, delay); + + vi.advanceTimersByTime(99); + expect(callback).not.toHaveBeenCalled(); + + vi.advanceTimersByTime(1); + expect(callback).toHaveBeenCalledTimes(1); + }); + + test("handles async callbacks", async () => { + const asyncCallback = vi.fn(async () => { + await Promise.resolve(); + return "done"; + }); + + setLongTimeout(asyncCallback, 100); + + vi.advanceTimersByTime(100); + expect(asyncCallback).toHaveBeenCalledTimes(1); + }); + + test("callback receives no arguments", () => { + const callback = vi.fn(); + setLongTimeout(callback, 100); + + vi.advanceTimersByTime(100); + expect(callback).toHaveBeenCalledWith(); + }); +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 9b0aa7669..868d6ca50 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -965,8 +965,8 @@ importers: specifier: ^0.19.10 version: 0.19.10(hono@4.8.3)(zod@3.25.76) '@rivetkit/engine-runner': - specifier: https://pkg.pr.new/rivet-gg/rivet/@rivetkit/engine-runner@f1c054d - version: https://pkg.pr.new/rivet-gg/rivet/@rivetkit/engine-runner@f1c054d + specifier: https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-runner@664a377 + version: https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-runner@664a377 '@rivetkit/fast-json-patch': specifier: ^3.1.2 version: 3.1.2 @@ -2473,16 +2473,16 @@ packages: '@rivet-gg/actor-core@25.2.0': resolution: {integrity: sha512-4K72XcDLVAz44Ae6G6GuyzWyxQZOLN8jM/W+sVKm6fHr70X8FNCSC5+/9hFIxz/OH9E6q6Wi3V/UN/k6immUBQ==} - '@rivetkit/engine-runner-protocol@https://pkg.pr.new/rivet-gg/rivet/@rivetkit/engine-runner-protocol@f1c054db74707f06454ea6072da81cfbdc31873d': - resolution: {tarball: https://pkg.pr.new/rivet-gg/rivet/@rivetkit/engine-runner-protocol@f1c054db74707f06454ea6072da81cfbdc31873d} + '@rivetkit/engine-runner-protocol@https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-runner-protocol@664a377784e8296d6c3f5617a1ec45ab977dc9f8': + resolution: {tarball: https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-runner-protocol@664a377784e8296d6c3f5617a1ec45ab977dc9f8} version: 1.0.0 - '@rivetkit/engine-runner@https://pkg.pr.new/rivet-gg/rivet/@rivetkit/engine-runner@f1c054d': - resolution: {tarball: https://pkg.pr.new/rivet-gg/rivet/@rivetkit/engine-runner@f1c054d} + '@rivetkit/engine-runner@https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-runner@664a377': + resolution: {tarball: https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-runner@664a377} version: 0.0.0 - '@rivetkit/engine-tunnel-protocol@https://pkg.pr.new/rivet-gg/rivet/@rivetkit/engine-tunnel-protocol@f1c054db74707f06454ea6072da81cfbdc31873d': - resolution: {tarball: https://pkg.pr.new/rivet-gg/rivet/@rivetkit/engine-tunnel-protocol@f1c054db74707f06454ea6072da81cfbdc31873d} + '@rivetkit/engine-tunnel-protocol@https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-tunnel-protocol@664a377784e8296d6c3f5617a1ec45ab977dc9f8': + resolution: {tarball: https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-tunnel-protocol@664a377784e8296d6c3f5617a1ec45ab977dc9f8} version: 1.0.0 '@rivetkit/fast-json-patch@3.1.2': @@ -4498,6 +4498,10 @@ packages: util-deprecate@1.0.2: resolution: {integrity: sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==} + uuid@12.0.0: + resolution: {integrity: sha512-USe1zesMYh4fjCA8ZH5+X5WIVD0J4V1Jksm1bFTVBX2F/cwSXt0RO5w/3UXbdLKmZX65MiWV+hwhSS8p6oBTGA==} + hasBin: true + vary@1.1.2: resolution: {integrity: sha512-BNGbWLfd0eUPabhkXUVm0j8uuvREyTh5ovRa/dyow/BqAbZJyC+5fU+IzQOzmAKzYqYRAISoRhdQr3eIZ/PXqg==} engines: {node: '>= 0.8'} @@ -5693,20 +5697,21 @@ snapshots: dependencies: zod: 3.25.76 - '@rivetkit/engine-runner-protocol@https://pkg.pr.new/rivet-gg/rivet/@rivetkit/engine-runner-protocol@f1c054db74707f06454ea6072da81cfbdc31873d': + '@rivetkit/engine-runner-protocol@https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-runner-protocol@664a377784e8296d6c3f5617a1ec45ab977dc9f8': dependencies: '@bare-ts/lib': 0.4.0 - '@rivetkit/engine-runner@https://pkg.pr.new/rivet-gg/rivet/@rivetkit/engine-runner@f1c054d': + '@rivetkit/engine-runner@https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-runner@664a377': dependencies: - '@rivetkit/engine-runner-protocol': https://pkg.pr.new/rivet-gg/rivet/@rivetkit/engine-runner-protocol@f1c054db74707f06454ea6072da81cfbdc31873d - '@rivetkit/engine-tunnel-protocol': https://pkg.pr.new/rivet-gg/rivet/@rivetkit/engine-tunnel-protocol@f1c054db74707f06454ea6072da81cfbdc31873d + '@rivetkit/engine-runner-protocol': https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-runner-protocol@664a377784e8296d6c3f5617a1ec45ab977dc9f8 + '@rivetkit/engine-tunnel-protocol': https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-tunnel-protocol@664a377784e8296d6c3f5617a1ec45ab977dc9f8 + uuid: 12.0.0 ws: 8.18.3 transitivePeerDependencies: - bufferutil - utf-8-validate - '@rivetkit/engine-tunnel-protocol@https://pkg.pr.new/rivet-gg/rivet/@rivetkit/engine-tunnel-protocol@f1c054db74707f06454ea6072da81cfbdc31873d': + '@rivetkit/engine-tunnel-protocol@https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-tunnel-protocol@664a377784e8296d6c3f5617a1ec45ab977dc9f8': dependencies: '@bare-ts/lib': 0.4.0 @@ -7893,6 +7898,8 @@ snapshots: util-deprecate@1.0.2: {} + uuid@12.0.0: {} + vary@1.1.2: {} vite-node@3.2.4(@types/node@20.19.9)(tsx@4.20.3)(yaml@2.8.0):