Skip to content
This repository was archived by the owner on Oct 22, 2025. It is now read-only.

Commit 05e19f8

Browse files
committed
chore(core): move actor liveness check to setInterval (#1202)
Fixes KIT-246
1 parent 831f4d9 commit 05e19f8

File tree

4 files changed

+69
-85
lines changed

4 files changed

+69
-85
lines changed

packages/core/src/actor/instance.ts

Lines changed: 63 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@ import type {
2828
PersistedActor,
2929
PersistedConn,
3030
PersistedScheduleEvent,
31+
PersistedScheduleEventKind,
3132
} from "./persisted";
3233
import { processMessage } from "./protocol/message/mod";
3334
import { CachedSerializer } from "./protocol/serde";
34-
import { Schedule, type ScheduledEvent } from "./schedule";
35+
import { Schedule } from "./schedule";
3536
import { DeadlineError, deadline } from "./utils";
3637

3738
/**
@@ -173,6 +174,7 @@ export class ActorInstance<
173174

174175
#connections = new Map<ConnId, Conn<S, CP, CS, V, I, AD, DB>>();
175176
#subscriptionIndex = new Map<string, Set<Conn<S, CP, CS, V, I, AD, DB>>>();
177+
#checkConnLivenessInterval?: NodeJS.Timeout;
176178

177179
#sleepTimeout?: NodeJS.Timeout;
178180

@@ -340,26 +342,27 @@ export class ActorInstance<
340342
// Must be called after setting `#ready` or else it will not schedule sleep
341343
this.#resetSleepTimer();
342344

343-
this.#scheduleLivenessCheck();
345+
// Start conn liveness interval
346+
//
347+
// Check for liveness immediately since we may have connections that
348+
// were in `reconnecting` state when the actor went to sleep that we
349+
// need to purge.
350+
//
351+
// We don't use alarms for connection liveness since alarms require
352+
// durability & are expensive. Connection liveness is safe to assume
353+
// it only needs to be ran while the actor is awake and does not need
354+
// to manually wake the actor. The only case this is not true is if the
355+
// connection liveness timeout is greater than the actor sleep timeout
356+
// OR if the actor is manually put to sleep. In this case, the connections
357+
// will be stuck in a `reconnecting` state until the actor is awaken again.
358+
this.#checkConnLivenessInterval = setInterval(
359+
this.#checkConnectionsLiveness.bind(this),
360+
this.#config.options.connectionLivenessInterval,
361+
);
362+
this.#checkConnectionsLiveness();
344363
}
345364

346-
async #scheduleEventInner(timestamp: number, event: ScheduledEvent) {
347-
// Build event
348-
let newEvent: PersistedScheduleEvent;
349-
if ("checkConnectionLiveness" in event) {
350-
newEvent = {
351-
ccl: event.checkConnectionLiveness,
352-
t: timestamp,
353-
};
354-
} else {
355-
newEvent = {
356-
e: crypto.randomUUID(),
357-
t: timestamp,
358-
a: event.fn,
359-
ar: event.args,
360-
};
361-
}
362-
365+
async #scheduleEventInner(newEvent: PersistedScheduleEvent) {
363366
this.actorContext.log.info("scheduling event", newEvent);
364367

365368
// remove old ccl event
@@ -385,7 +388,7 @@ export class ActorInstance<
385388
// - this is the only event (i.e. the only event in the array)
386389
if (insertIndex === 0 || this.#persist.e.length === 1) {
387390
this.actorContext.log.info("setting alarm", {
388-
timestamp,
391+
timestamp: newEvent.t,
389392
eventCount: this.#persist.e.length,
390393
});
391394
await this.#queueSetAlarm(newEvent.t);
@@ -449,29 +452,33 @@ export class ActorInstance<
449452
this.actorContext.log.info("running action for event", {
450453
event: event.e,
451454
timestamp: event.t,
452-
action: event.a,
453-
args: event.ar,
455+
action: event.k.g.a,
456+
args: event.k.g.ar,
454457
});
455458

456459
// Look up function
457-
const fn: unknown = this.#config.actions[event.a];
460+
const fn: unknown = this.#config.actions[event.k.g.a];
458461

459-
if (!fn) throw new Error(`Missing action for alarm ${event.a}`);
462+
if (!fn) throw new Error(`Missing action for alarm ${event.k.g.a}`);
460463
if (typeof fn !== "function")
461464
throw new Error(
462-
`Alarm function lookup for ${event.a} returned ${typeof fn}`,
465+
`Alarm function lookup for ${event.k.g.a} returned ${typeof fn}`,
463466
);
464467

465468
// Call function
466469
try {
467-
await fn.call(undefined, this.actorContext, ...(event.ar || []));
470+
await fn.call(
471+
undefined,
472+
this.actorContext,
473+
...(event.k.g.ar || []),
474+
);
468475
} catch (error) {
469476
this.actorContext.log.error("error while running event", {
470477
error: stringifyError(error),
471478
event: event.e,
472479
timestamp: event.t,
473-
action: event.a,
474-
args: event.ar,
480+
action: event.k.g.a,
481+
args: event.k.g.ar,
475482
});
476483
}
477484
}
@@ -486,10 +493,14 @@ export class ActorInstance<
486493

487494
async scheduleEvent(
488495
timestamp: number,
489-
fn: string,
496+
action: string,
490497
args: unknown[],
491498
): Promise<void> {
492-
return this.#scheduleEventInner(timestamp, { fn, args });
499+
return this.#scheduleEventInner({
500+
e: crypto.randomUUID(),
501+
t: timestamp,
502+
k: { g: { a: action, ar: args } },
503+
});
493504
}
494505

495506
get stateEnabled() {
@@ -1081,35 +1092,18 @@ export class ActorInstance<
10811092
});
10821093
continue;
10831094
}
1095+
10841096
// Connection is dead, remove it
10851097
logger().warn("connection is dead, removing", {
10861098
connId: conn.id,
10871099
lastSeen,
10881100
});
1089-
// we might disconnect the connection here?
1090-
// conn.disconnect("liveness check failed");
1091-
this.__removeConn(conn);
1092-
}
1093-
}
10941101

1095-
this.#scheduleLivenessCheck();
1096-
}
1102+
// TODO: Do we need to force disconnect the connection here?
10971103

1098-
/**
1099-
* Schedule a liveness check for the connections.
1100-
* This will check if the liveness check is already scheduled and skip scheduling if it is.
1101-
* @internal
1102-
*/
1103-
#scheduleLivenessCheck() {
1104-
if (this.isStopping) {
1105-
logger().debug("actor is stopping, skipping liveness check");
1106-
return;
1104+
this.__removeConn(conn);
1105+
}
11071106
}
1108-
1109-
this.#scheduleEventInner(
1110-
Date.now() + this.#config.options.connectionLivenessInterval,
1111-
{ checkConnectionLiveness: true },
1112-
);
11131107
}
11141108

11151109
/**
@@ -1613,12 +1607,6 @@ export class ActorInstance<
16131607
}
16141608
}
16151609

1616-
// Wait for any background tasks to finish, with timeout
1617-
await this.#waitBackgroundPromises(this.#config.options.waitUntilTimeout);
1618-
1619-
// Write state
1620-
await this.saveState({ immediate: true, allowStoppingState: true });
1621-
16221610
// Disconnect existing connections
16231611
const promises: Promise<unknown>[] = [];
16241612
for (const connection of this.#connections.values()) {
@@ -1627,6 +1615,18 @@ export class ActorInstance<
16271615
// TODO: Figure out how to abort HTTP requests on shutdown
16281616
}
16291617

1618+
// Wait for any background tasks to finish, with timeout
1619+
await this.#waitBackgroundPromises(this.#config.options.waitUntilTimeout);
1620+
1621+
// Clear timeouts
1622+
if (this.#pendingSaveTimeout) clearTimeout(this.#pendingSaveTimeout);
1623+
if (this.#sleepTimeout) clearTimeout(this.#sleepTimeout);
1624+
if (this.#checkConnLivenessInterval)
1625+
clearInterval(this.#checkConnLivenessInterval);
1626+
1627+
// Write state
1628+
await this.saveState({ immediate: true, allowStoppingState: true });
1629+
16301630
// Await all `close` event listeners with 1.5 second timeout
16311631
const res = Promise.race([
16321632
Promise.all(promises).then(() => false),
@@ -1640,6 +1640,12 @@ export class ActorInstance<
16401640
"timed out waiting for connections to close, shutting down anyway",
16411641
);
16421642
}
1643+
1644+
// Wait for queues to finish
1645+
if (this.#persistWriteQueue.runningDrainLoop)
1646+
await this.#persistWriteQueue.runningDrainLoop;
1647+
if (this.#alarmWriteQueue.runningDrainLoop)
1648+
await this.#alarmWriteQueue.runningDrainLoop;
16431649
}
16441650

16451651
/** Abort signal that fires when the actor is stopping. */

packages/core/src/actor/persisted.ts

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import type { ConnectionDriver } from "./connection";
2-
import type { ScheduledLivenessCheckEvent } from "./schedule";
32

43
/** State object that gets automatically persisted to storage. */
54
export interface PersistedActor<S, CP, CS, I> {
@@ -43,22 +42,18 @@ export interface PersistedSubscription {
4342
}
4443

4544
export interface GenericPersistedScheduleEvent {
46-
// Event ID
47-
e: string;
48-
// Timestamp
49-
t: number;
5045
// Action name
5146
a: string;
5247
// Arguments
5348
ar?: unknown[];
5449
}
5550

56-
export interface ScheduleLivenessCheckEvent {
57-
ccl: ScheduledLivenessCheckEvent["checkConnectionLiveness"];
51+
export type PersistedScheduleEventKind = { g: GenericPersistedScheduleEvent };
52+
53+
export interface PersistedScheduleEvent {
54+
// Event ID
55+
e: string;
5856
// Timestamp
5957
t: number;
58+
k: PersistedScheduleEventKind;
6059
}
61-
62-
export type PersistedScheduleEvent =
63-
| GenericPersistedScheduleEvent
64-
| ScheduleLivenessCheckEvent;

packages/core/src/actor/schedule.ts

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,3 @@ export class Schedule {
1515
await this.#actor.scheduleEvent(timestamp, fn, args);
1616
}
1717
}
18-
19-
export interface GenericScheduledEvent {
20-
fn: string;
21-
args: unknown[];
22-
id?: string;
23-
}
24-
export interface ScheduledLivenessCheckEvent {
25-
checkConnectionLiveness: {};
26-
}
27-
28-
export type ScheduledEvent =
29-
| GenericScheduledEvent
30-
| ScheduledLivenessCheckEvent;

packages/core/src/drivers/file-system/global-state.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -191,8 +191,6 @@ export class FileSystemGlobalState {
191191
id: actorId,
192192
genericConnGlobalState: new GenericConnGlobalState(),
193193
removed: false,
194-
stateWriteQueue: new SinglePromiseQueue(),
195-
alarmWriteQueue: new SinglePromiseQueue(),
196194
};
197195
this.#actors.set(actorId, entry);
198196
return entry;
@@ -313,8 +311,6 @@ export class FileSystemGlobalState {
313311
// Wait for actor to fully start before stopping it to avoid race conditions
314312
if (actor.loadPromise) await actor.loadPromise.catch();
315313
if (actor.startPromise?.promise) await actor.startPromise.promise.catch();
316-
if (actor.stateWriteQueue.runningDrainLoop)
317-
await actor.stateWriteQueue.runningDrainLoop.catch();
318314

319315
// Mark as removed
320316
actor.removed = true;

0 commit comments

Comments
 (0)