From 60532e40d22402c32bc83f26496375e69d7d3ab7 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Wed, 10 Sep 2025 12:29:03 -0700 Subject: [PATCH] chore(core): move actor liveness check to setInterval --- packages/core/src/actor/instance.ts | 120 +++++++++--------- packages/core/src/actor/persisted.ts | 17 +-- packages/core/src/actor/schedule.ts | 13 -- .../src/drivers/file-system/global-state.ts | 4 - 4 files changed, 69 insertions(+), 85 deletions(-) diff --git a/packages/core/src/actor/instance.ts b/packages/core/src/actor/instance.ts index 5b627050a..ef93d8b70 100644 --- a/packages/core/src/actor/instance.ts +++ b/packages/core/src/actor/instance.ts @@ -28,10 +28,11 @@ import type { PersistedActor, PersistedConn, PersistedScheduleEvent, + PersistedScheduleEventKind, } from "./persisted"; import { processMessage } from "./protocol/message/mod"; import { CachedSerializer } from "./protocol/serde"; -import { Schedule, type ScheduledEvent } from "./schedule"; +import { Schedule } from "./schedule"; import { DeadlineError, deadline } from "./utils"; /** @@ -173,6 +174,7 @@ export class ActorInstance< #connections = new Map>(); #subscriptionIndex = new Map>>(); + #checkConnLivenessInterval?: NodeJS.Timeout; #sleepTimeout?: NodeJS.Timeout; @@ -340,26 +342,27 @@ export class ActorInstance< // Must be called after setting `#ready` or else it will not schedule sleep this.#resetSleepTimer(); - this.#scheduleLivenessCheck(); + // Start conn liveness interval + // + // Check for liveness immediately since we may have connections that + // were in `reconnecting` state when the actor went to sleep that we + // need to purge. + // + // We don't use alarms for connection liveness since alarms require + // durability & are expensive. Connection liveness is safe to assume + // it only needs to be ran while the actor is awake and does not need + // to manually wake the actor. The only case this is not true is if the + // connection liveness timeout is greater than the actor sleep timeout + // OR if the actor is manually put to sleep. In this case, the connections + // will be stuck in a `reconnecting` state until the actor is awaken again. + this.#checkConnLivenessInterval = setInterval( + this.#checkConnectionsLiveness.bind(this), + this.#config.options.connectionLivenessInterval, + ); + this.#checkConnectionsLiveness(); } - async #scheduleEventInner(timestamp: number, event: ScheduledEvent) { - // Build event - let newEvent: PersistedScheduleEvent; - if ("checkConnectionLiveness" in event) { - newEvent = { - ccl: event.checkConnectionLiveness, - t: timestamp, - }; - } else { - newEvent = { - e: crypto.randomUUID(), - t: timestamp, - a: event.fn, - ar: event.args, - }; - } - + async #scheduleEventInner(newEvent: PersistedScheduleEvent) { this.actorContext.log.info("scheduling event", newEvent); // remove old ccl event @@ -385,7 +388,7 @@ export class ActorInstance< // - this is the only event (i.e. the only event in the array) if (insertIndex === 0 || this.#persist.e.length === 1) { this.actorContext.log.info("setting alarm", { - timestamp, + timestamp: newEvent.t, eventCount: this.#persist.e.length, }); await this.#queueSetAlarm(newEvent.t); @@ -449,29 +452,33 @@ export class ActorInstance< this.actorContext.log.info("running action for event", { event: event.e, timestamp: event.t, - action: event.a, - args: event.ar, + action: event.k.g.a, + args: event.k.g.ar, }); // Look up function - const fn: unknown = this.#config.actions[event.a]; + const fn: unknown = this.#config.actions[event.k.g.a]; - if (!fn) throw new Error(`Missing action for alarm ${event.a}`); + if (!fn) throw new Error(`Missing action for alarm ${event.k.g.a}`); if (typeof fn !== "function") throw new Error( - `Alarm function lookup for ${event.a} returned ${typeof fn}`, + `Alarm function lookup for ${event.k.g.a} returned ${typeof fn}`, ); // Call function try { - await fn.call(undefined, this.actorContext, ...(event.ar || [])); + await fn.call( + undefined, + this.actorContext, + ...(event.k.g.ar || []), + ); } catch (error) { this.actorContext.log.error("error while running event", { error: stringifyError(error), event: event.e, timestamp: event.t, - action: event.a, - args: event.ar, + action: event.k.g.a, + args: event.k.g.ar, }); } } @@ -486,10 +493,14 @@ export class ActorInstance< async scheduleEvent( timestamp: number, - fn: string, + action: string, args: unknown[], ): Promise { - return this.#scheduleEventInner(timestamp, { fn, args }); + return this.#scheduleEventInner({ + e: crypto.randomUUID(), + t: timestamp, + k: { g: { a: action, ar: args } }, + }); } get stateEnabled() { @@ -1081,35 +1092,18 @@ export class ActorInstance< }); continue; } + // Connection is dead, remove it logger().warn("connection is dead, removing", { connId: conn.id, lastSeen, }); - // we might disconnect the connection here? - // conn.disconnect("liveness check failed"); - this.__removeConn(conn); - } - } - this.#scheduleLivenessCheck(); - } + // TODO: Do we need to force disconnect the connection here? - /** - * Schedule a liveness check for the connections. - * This will check if the liveness check is already scheduled and skip scheduling if it is. - * @internal - */ - #scheduleLivenessCheck() { - if (this.isStopping) { - logger().debug("actor is stopping, skipping liveness check"); - return; + this.__removeConn(conn); + } } - - this.#scheduleEventInner( - Date.now() + this.#config.options.connectionLivenessInterval, - { checkConnectionLiveness: true }, - ); } /** @@ -1613,12 +1607,6 @@ export class ActorInstance< } } - // Wait for any background tasks to finish, with timeout - await this.#waitBackgroundPromises(this.#config.options.waitUntilTimeout); - - // Write state - await this.saveState({ immediate: true, allowStoppingState: true }); - // Disconnect existing connections const promises: Promise[] = []; for (const connection of this.#connections.values()) { @@ -1627,6 +1615,18 @@ export class ActorInstance< // TODO: Figure out how to abort HTTP requests on shutdown } + // Wait for any background tasks to finish, with timeout + await this.#waitBackgroundPromises(this.#config.options.waitUntilTimeout); + + // Clear timeouts + if (this.#pendingSaveTimeout) clearTimeout(this.#pendingSaveTimeout); + if (this.#sleepTimeout) clearTimeout(this.#sleepTimeout); + if (this.#checkConnLivenessInterval) + clearInterval(this.#checkConnLivenessInterval); + + // Write state + await this.saveState({ immediate: true, allowStoppingState: true }); + // Await all `close` event listeners with 1.5 second timeout const res = Promise.race([ Promise.all(promises).then(() => false), @@ -1640,6 +1640,12 @@ export class ActorInstance< "timed out waiting for connections to close, shutting down anyway", ); } + + // Wait for queues to finish + if (this.#persistWriteQueue.runningDrainLoop) + await this.#persistWriteQueue.runningDrainLoop; + if (this.#alarmWriteQueue.runningDrainLoop) + await this.#alarmWriteQueue.runningDrainLoop; } /** Abort signal that fires when the actor is stopping. */ diff --git a/packages/core/src/actor/persisted.ts b/packages/core/src/actor/persisted.ts index 3a886a504..688d6ac51 100644 --- a/packages/core/src/actor/persisted.ts +++ b/packages/core/src/actor/persisted.ts @@ -1,5 +1,4 @@ import type { ConnectionDriver } from "./connection"; -import type { ScheduledLivenessCheckEvent } from "./schedule"; /** State object that gets automatically persisted to storage. */ export interface PersistedActor { @@ -43,22 +42,18 @@ export interface PersistedSubscription { } export interface GenericPersistedScheduleEvent { - // Event ID - e: string; - // Timestamp - t: number; // Action name a: string; // Arguments ar?: unknown[]; } -export interface ScheduleLivenessCheckEvent { - ccl: ScheduledLivenessCheckEvent["checkConnectionLiveness"]; +export type PersistedScheduleEventKind = { g: GenericPersistedScheduleEvent }; + +export interface PersistedScheduleEvent { + // Event ID + e: string; // Timestamp t: number; + k: PersistedScheduleEventKind; } - -export type PersistedScheduleEvent = - | GenericPersistedScheduleEvent - | ScheduleLivenessCheckEvent; diff --git a/packages/core/src/actor/schedule.ts b/packages/core/src/actor/schedule.ts index cef76e720..8948e8ccf 100644 --- a/packages/core/src/actor/schedule.ts +++ b/packages/core/src/actor/schedule.ts @@ -15,16 +15,3 @@ export class Schedule { await this.#actor.scheduleEvent(timestamp, fn, args); } } - -export interface GenericScheduledEvent { - fn: string; - args: unknown[]; - id?: string; -} -export interface ScheduledLivenessCheckEvent { - checkConnectionLiveness: {}; -} - -export type ScheduledEvent = - | GenericScheduledEvent - | ScheduledLivenessCheckEvent; diff --git a/packages/core/src/drivers/file-system/global-state.ts b/packages/core/src/drivers/file-system/global-state.ts index 505b99f25..2c3146d8b 100644 --- a/packages/core/src/drivers/file-system/global-state.ts +++ b/packages/core/src/drivers/file-system/global-state.ts @@ -191,8 +191,6 @@ export class FileSystemGlobalState { id: actorId, genericConnGlobalState: new GenericConnGlobalState(), removed: false, - stateWriteQueue: new SinglePromiseQueue(), - alarmWriteQueue: new SinglePromiseQueue(), }; this.#actors.set(actorId, entry); return entry; @@ -313,8 +311,6 @@ export class FileSystemGlobalState { // Wait for actor to fully start before stopping it to avoid race conditions if (actor.loadPromise) await actor.loadPromise.catch(); if (actor.startPromise?.promise) await actor.startPromise.promise.catch(); - if (actor.stateWriteQueue.runningDrainLoop) - await actor.stateWriteQueue.runningDrainLoop.catch(); // Mark as removed actor.removed = true;