-
Notifications
You must be signed in to change notification settings - Fork 44
chore(core): move actor liveness check to setInterval #1202
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -28,10 +28,11 @@ import type { | |||||||||||||||||||||
| PersistedActor, | ||||||||||||||||||||||
| PersistedConn, | ||||||||||||||||||||||
| PersistedScheduleEvent, | ||||||||||||||||||||||
| PersistedScheduleEventKind, | ||||||||||||||||||||||
| } from "./persisted"; | ||||||||||||||||||||||
| import { processMessage } from "./protocol/message/mod"; | ||||||||||||||||||||||
| import { CachedSerializer } from "./protocol/serde"; | ||||||||||||||||||||||
| import { Schedule, type ScheduledEvent } from "./schedule"; | ||||||||||||||||||||||
| import { Schedule } from "./schedule"; | ||||||||||||||||||||||
| import { DeadlineError, deadline } from "./utils"; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| /** | ||||||||||||||||||||||
|
|
@@ -173,6 +174,7 @@ export class ActorInstance< | |||||||||||||||||||||
|
|
||||||||||||||||||||||
| #connections = new Map<ConnId, Conn<S, CP, CS, V, I, AD, DB>>(); | ||||||||||||||||||||||
| #subscriptionIndex = new Map<string, Set<Conn<S, CP, CS, V, I, AD, DB>>>(); | ||||||||||||||||||||||
| #checkConnLivenessInterval?: NodeJS.Timeout; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| #sleepTimeout?: NodeJS.Timeout; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
|
@@ -340,26 +342,27 @@ export class ActorInstance< | |||||||||||||||||||||
| // Must be called after setting `#ready` or else it will not schedule sleep | ||||||||||||||||||||||
| this.#resetSleepTimer(); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| this.#scheduleLivenessCheck(); | ||||||||||||||||||||||
| // Start conn liveness interval | ||||||||||||||||||||||
| // | ||||||||||||||||||||||
| // Check for liveness immediately since we may have connections that | ||||||||||||||||||||||
| // were in `reconnecting` state when the actor went to sleep that we | ||||||||||||||||||||||
| // need to purge. | ||||||||||||||||||||||
| // | ||||||||||||||||||||||
| // We don't use alarms for connection liveness since alarms require | ||||||||||||||||||||||
| // durability & are expensive. Connection liveness is safe to assume | ||||||||||||||||||||||
| // it only needs to be ran while the actor is awake and does not need | ||||||||||||||||||||||
| // to manually wake the actor. The only case this is not true is if the | ||||||||||||||||||||||
| // connection liveness timeout is greater than the actor sleep timeout | ||||||||||||||||||||||
| // OR if the actor is manually put to sleep. In this case, the connections | ||||||||||||||||||||||
| // will be stuck in a `reconnecting` state until the actor is awaken again. | ||||||||||||||||||||||
| this.#checkConnLivenessInterval = setInterval( | ||||||||||||||||||||||
| this.#checkConnectionsLiveness.bind(this), | ||||||||||||||||||||||
| this.#config.options.connectionLivenessInterval, | ||||||||||||||||||||||
| ); | ||||||||||||||||||||||
| this.#checkConnectionsLiveness(); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| async #scheduleEventInner(timestamp: number, event: ScheduledEvent) { | ||||||||||||||||||||||
| // Build event | ||||||||||||||||||||||
| let newEvent: PersistedScheduleEvent; | ||||||||||||||||||||||
| if ("checkConnectionLiveness" in event) { | ||||||||||||||||||||||
| newEvent = { | ||||||||||||||||||||||
| ccl: event.checkConnectionLiveness, | ||||||||||||||||||||||
| t: timestamp, | ||||||||||||||||||||||
| }; | ||||||||||||||||||||||
| } else { | ||||||||||||||||||||||
| newEvent = { | ||||||||||||||||||||||
| e: crypto.randomUUID(), | ||||||||||||||||||||||
| t: timestamp, | ||||||||||||||||||||||
| a: event.fn, | ||||||||||||||||||||||
| ar: event.args, | ||||||||||||||||||||||
| }; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| async #scheduleEventInner(newEvent: PersistedScheduleEvent) { | ||||||||||||||||||||||
| this.actorContext.log.info("scheduling event", newEvent); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // remove old ccl event | ||||||||||||||||||||||
|
|
@@ -385,7 +388,7 @@ export class ActorInstance< | |||||||||||||||||||||
| // - this is the only event (i.e. the only event in the array) | ||||||||||||||||||||||
| if (insertIndex === 0 || this.#persist.e.length === 1) { | ||||||||||||||||||||||
| this.actorContext.log.info("setting alarm", { | ||||||||||||||||||||||
| timestamp, | ||||||||||||||||||||||
| timestamp: newEvent.t, | ||||||||||||||||||||||
| eventCount: this.#persist.e.length, | ||||||||||||||||||||||
| }); | ||||||||||||||||||||||
| await this.#queueSetAlarm(newEvent.t); | ||||||||||||||||||||||
|
|
@@ -449,29 +452,33 @@ export class ActorInstance< | |||||||||||||||||||||
| this.actorContext.log.info("running action for event", { | ||||||||||||||||||||||
| event: event.e, | ||||||||||||||||||||||
| timestamp: event.t, | ||||||||||||||||||||||
| action: event.a, | ||||||||||||||||||||||
| args: event.ar, | ||||||||||||||||||||||
| action: event.k.g.a, | ||||||||||||||||||||||
| args: event.k.g.ar, | ||||||||||||||||||||||
| }); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Look up function | ||||||||||||||||||||||
| const fn: unknown = this.#config.actions[event.a]; | ||||||||||||||||||||||
| const fn: unknown = this.#config.actions[event.k.g.a]; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| if (!fn) throw new Error(`Missing action for alarm ${event.a}`); | ||||||||||||||||||||||
| if (!fn) throw new Error(`Missing action for alarm ${event.k.g.a}`); | ||||||||||||||||||||||
| if (typeof fn !== "function") | ||||||||||||||||||||||
| throw new Error( | ||||||||||||||||||||||
| `Alarm function lookup for ${event.a} returned ${typeof fn}`, | ||||||||||||||||||||||
| `Alarm function lookup for ${event.k.g.a} returned ${typeof fn}`, | ||||||||||||||||||||||
| ); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Call function | ||||||||||||||||||||||
| try { | ||||||||||||||||||||||
| await fn.call(undefined, this.actorContext, ...(event.ar || [])); | ||||||||||||||||||||||
| await fn.call( | ||||||||||||||||||||||
| undefined, | ||||||||||||||||||||||
| this.actorContext, | ||||||||||||||||||||||
| ...(event.k.g.ar || []), | ||||||||||||||||||||||
| ); | ||||||||||||||||||||||
|
Comment on lines
+470
to
+474
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code accesses
Suggested change
Spotted by Diamond |
||||||||||||||||||||||
| } catch (error) { | ||||||||||||||||||||||
| this.actorContext.log.error("error while running event", { | ||||||||||||||||||||||
| error: stringifyError(error), | ||||||||||||||||||||||
| event: event.e, | ||||||||||||||||||||||
| timestamp: event.t, | ||||||||||||||||||||||
| action: event.a, | ||||||||||||||||||||||
| args: event.ar, | ||||||||||||||||||||||
| action: event.k.g.a, | ||||||||||||||||||||||
| args: event.k.g.ar, | ||||||||||||||||||||||
|
Comment on lines
+480
to
+481
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The error logging code accesses
Suggested change
Spotted by Diamond |
||||||||||||||||||||||
| }); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
@@ -486,10 +493,14 @@ export class ActorInstance< | |||||||||||||||||||||
|
|
||||||||||||||||||||||
| async scheduleEvent( | ||||||||||||||||||||||
| timestamp: number, | ||||||||||||||||||||||
| fn: string, | ||||||||||||||||||||||
| action: string, | ||||||||||||||||||||||
| args: unknown[], | ||||||||||||||||||||||
| ): Promise<void> { | ||||||||||||||||||||||
| return this.#scheduleEventInner(timestamp, { fn, args }); | ||||||||||||||||||||||
| return this.#scheduleEventInner({ | ||||||||||||||||||||||
| e: crypto.randomUUID(), | ||||||||||||||||||||||
| t: timestamp, | ||||||||||||||||||||||
| k: { g: { a: action, ar: args } }, | ||||||||||||||||||||||
| }); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| get stateEnabled() { | ||||||||||||||||||||||
|
|
@@ -1081,35 +1092,18 @@ export class ActorInstance< | |||||||||||||||||||||
| }); | ||||||||||||||||||||||
| continue; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Connection is dead, remove it | ||||||||||||||||||||||
| logger().warn("connection is dead, removing", { | ||||||||||||||||||||||
| connId: conn.id, | ||||||||||||||||||||||
| lastSeen, | ||||||||||||||||||||||
| }); | ||||||||||||||||||||||
| // we might disconnect the connection here? | ||||||||||||||||||||||
| // conn.disconnect("liveness check failed"); | ||||||||||||||||||||||
| this.__removeConn(conn); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| this.#scheduleLivenessCheck(); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| // TODO: Do we need to force disconnect the connection here? | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| /** | ||||||||||||||||||||||
| * Schedule a liveness check for the connections. | ||||||||||||||||||||||
| * This will check if the liveness check is already scheduled and skip scheduling if it is. | ||||||||||||||||||||||
| * @internal | ||||||||||||||||||||||
| */ | ||||||||||||||||||||||
| #scheduleLivenessCheck() { | ||||||||||||||||||||||
| if (this.isStopping) { | ||||||||||||||||||||||
| logger().debug("actor is stopping, skipping liveness check"); | ||||||||||||||||||||||
| return; | ||||||||||||||||||||||
| this.__removeConn(conn); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| this.#scheduleEventInner( | ||||||||||||||||||||||
| Date.now() + this.#config.options.connectionLivenessInterval, | ||||||||||||||||||||||
| { checkConnectionLiveness: true }, | ||||||||||||||||||||||
| ); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| /** | ||||||||||||||||||||||
|
|
@@ -1613,12 +1607,6 @@ export class ActorInstance< | |||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Wait for any background tasks to finish, with timeout | ||||||||||||||||||||||
| await this.#waitBackgroundPromises(this.#config.options.waitUntilTimeout); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Write state | ||||||||||||||||||||||
| await this.saveState({ immediate: true, allowStoppingState: true }); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Disconnect existing connections | ||||||||||||||||||||||
| const promises: Promise<unknown>[] = []; | ||||||||||||||||||||||
| for (const connection of this.#connections.values()) { | ||||||||||||||||||||||
|
|
@@ -1627,6 +1615,18 @@ export class ActorInstance< | |||||||||||||||||||||
| // TODO: Figure out how to abort HTTP requests on shutdown | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Wait for any background tasks to finish, with timeout | ||||||||||||||||||||||
| await this.#waitBackgroundPromises(this.#config.options.waitUntilTimeout); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Clear timeouts | ||||||||||||||||||||||
| if (this.#pendingSaveTimeout) clearTimeout(this.#pendingSaveTimeout); | ||||||||||||||||||||||
| if (this.#sleepTimeout) clearTimeout(this.#sleepTimeout); | ||||||||||||||||||||||
| if (this.#checkConnLivenessInterval) | ||||||||||||||||||||||
| clearInterval(this.#checkConnLivenessInterval); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Write state | ||||||||||||||||||||||
| await this.saveState({ immediate: true, allowStoppingState: true }); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Await all `close` event listeners with 1.5 second timeout | ||||||||||||||||||||||
| const res = Promise.race([ | ||||||||||||||||||||||
| Promise.all(promises).then(() => false), | ||||||||||||||||||||||
|
|
@@ -1640,6 +1640,12 @@ export class ActorInstance< | |||||||||||||||||||||
| "timed out waiting for connections to close, shutting down anyway", | ||||||||||||||||||||||
| ); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Wait for queues to finish | ||||||||||||||||||||||
| if (this.#persistWriteQueue.runningDrainLoop) | ||||||||||||||||||||||
| await this.#persistWriteQueue.runningDrainLoop; | ||||||||||||||||||||||
| if (this.#alarmWriteQueue.runningDrainLoop) | ||||||||||||||||||||||
| await this.#alarmWriteQueue.runningDrainLoop; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| /** Abort signal that fires when the actor is stopping. */ | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The property access pattern has changed from direct access (
event.a,event.ar) to nested access (event.k.g.a,event.k.g.ar), but there's no null/undefined checking for these nested properties. This could cause runtime errors when processing persisted events that don't match the new structure.Consider adding defensive checks before accessing these nested properties:
Or use optional chaining with fallbacks:
This would ensure backward compatibility with any persisted events in the old format.
Spotted by Diamond

Is this helpful? React 👍 or 👎 to let us know.