Skip to content
This repository was archived by the owner on Oct 22, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 63 additions & 57 deletions packages/core/src/actor/instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import type {
PersistedActor,
PersistedConn,
PersistedScheduleEvent,
PersistedScheduleEventKind,
} from "./persisted";
import { processMessage } from "./protocol/message/mod";
import { CachedSerializer } from "./protocol/serde";
import { Schedule, type ScheduledEvent } from "./schedule";
import { Schedule } from "./schedule";
import { DeadlineError, deadline } from "./utils";

/**
Expand Down Expand Up @@ -173,6 +174,7 @@ export class ActorInstance<

#connections = new Map<ConnId, Conn<S, CP, CS, V, I, AD, DB>>();
#subscriptionIndex = new Map<string, Set<Conn<S, CP, CS, V, I, AD, DB>>>();
#checkConnLivenessInterval?: NodeJS.Timeout;

#sleepTimeout?: NodeJS.Timeout;

Expand Down Expand Up @@ -340,26 +342,27 @@ export class ActorInstance<
// Must be called after setting `#ready` or else it will not schedule sleep
this.#resetSleepTimer();

this.#scheduleLivenessCheck();
// Start conn liveness interval
//
// Check for liveness immediately since we may have connections that
// were in `reconnecting` state when the actor went to sleep that we
// need to purge.
//
// We don't use alarms for connection liveness since alarms require
// durability & are expensive. Connection liveness is safe to assume
// it only needs to be ran while the actor is awake and does not need
// to manually wake the actor. The only case this is not true is if the
// connection liveness timeout is greater than the actor sleep timeout
// OR if the actor is manually put to sleep. In this case, the connections
// will be stuck in a `reconnecting` state until the actor is awaken again.
this.#checkConnLivenessInterval = setInterval(
this.#checkConnectionsLiveness.bind(this),
this.#config.options.connectionLivenessInterval,
);
this.#checkConnectionsLiveness();
}

async #scheduleEventInner(timestamp: number, event: ScheduledEvent) {
// Build event
let newEvent: PersistedScheduleEvent;
if ("checkConnectionLiveness" in event) {
newEvent = {
ccl: event.checkConnectionLiveness,
t: timestamp,
};
} else {
newEvent = {
e: crypto.randomUUID(),
t: timestamp,
a: event.fn,
ar: event.args,
};
}

async #scheduleEventInner(newEvent: PersistedScheduleEvent) {
this.actorContext.log.info("scheduling event", newEvent);

// remove old ccl event
Expand All @@ -385,7 +388,7 @@ export class ActorInstance<
// - this is the only event (i.e. the only event in the array)
if (insertIndex === 0 || this.#persist.e.length === 1) {
this.actorContext.log.info("setting alarm", {
timestamp,
timestamp: newEvent.t,
eventCount: this.#persist.e.length,
});
await this.#queueSetAlarm(newEvent.t);
Expand Down Expand Up @@ -449,29 +452,33 @@ export class ActorInstance<
this.actorContext.log.info("running action for event", {
event: event.e,
timestamp: event.t,
action: event.a,
args: event.ar,
action: event.k.g.a,
args: event.k.g.ar,
Comment on lines +455 to +456
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The property access pattern has changed from direct access (event.a, event.ar) to nested access (event.k.g.a, event.k.g.ar), but there's no null/undefined checking for these nested properties. This could cause runtime errors when processing persisted events that don't match the new structure.

Consider adding defensive checks before accessing these nested properties:

if (!event.k?.g) {
  throw new Error(`Invalid event format: missing k.g structure in event ${event.e}`);
}

Or use optional chaining with fallbacks:

action: event.k?.g?.a ?? "(unknown action)",
args: event.k?.g?.ar ?? [],

This would ensure backward compatibility with any persisted events in the old format.

Suggested change
action: event.k.g.a,
args: event.k.g.ar,
action: event.k?.g?.a ?? event.a ?? "(unknown action)",
args: event.k?.g?.ar ?? event.ar ?? [],

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

});

// Look up function
const fn: unknown = this.#config.actions[event.a];
const fn: unknown = this.#config.actions[event.k.g.a];

if (!fn) throw new Error(`Missing action for alarm ${event.a}`);
if (!fn) throw new Error(`Missing action for alarm ${event.k.g.a}`);
if (typeof fn !== "function")
throw new Error(
`Alarm function lookup for ${event.a} returned ${typeof fn}`,
`Alarm function lookup for ${event.k.g.a} returned ${typeof fn}`,
);

// Call function
try {
await fn.call(undefined, this.actorContext, ...(event.ar || []));
await fn.call(
undefined,
this.actorContext,
...(event.k.g.ar || []),
);
Comment on lines +470 to +474
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code accesses event.k.g.ar without first verifying that event.k and event.k.g exist. If the event structure is malformed, this could lead to a runtime error. Consider adding a null check or using optional chaining (event.k?.g?.ar || []) to handle potentially invalid event structures gracefully.

Suggested change
await fn.call(
undefined,
this.actorContext,
...(event.k.g.ar || []),
);
await fn.call(
undefined,
this.actorContext,
...(event.k?.g?.ar || []),
);

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

} catch (error) {
this.actorContext.log.error("error while running event", {
error: stringifyError(error),
event: event.e,
timestamp: event.t,
action: event.a,
args: event.ar,
action: event.k.g.a,
args: event.k.g.ar,
Comment on lines +480 to +481
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error logging code accesses event.k.g.a and event.k.g.ar directly without checking if event.k or event.k.g exist. If the event structure is malformed, this will cause additional runtime errors during error reporting, potentially obscuring the original error. Consider adding null checks or using optional chaining (event.k?.g?.a) to make the error reporting more robust.

Suggested change
action: event.k.g.a,
args: event.k.g.ar,
action: event.k?.g?.a,
args: event.k?.g?.ar,

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

});
}
}
Expand All @@ -486,10 +493,14 @@ export class ActorInstance<

async scheduleEvent(
timestamp: number,
fn: string,
action: string,
args: unknown[],
): Promise<void> {
return this.#scheduleEventInner(timestamp, { fn, args });
return this.#scheduleEventInner({
e: crypto.randomUUID(),
t: timestamp,
k: { g: { a: action, ar: args } },
});
}

get stateEnabled() {
Expand Down Expand Up @@ -1081,35 +1092,18 @@ export class ActorInstance<
});
continue;
}

// Connection is dead, remove it
logger().warn("connection is dead, removing", {
connId: conn.id,
lastSeen,
});
// we might disconnect the connection here?
// conn.disconnect("liveness check failed");
this.__removeConn(conn);
}
}

this.#scheduleLivenessCheck();
}
// TODO: Do we need to force disconnect the connection here?

/**
* Schedule a liveness check for the connections.
* This will check if the liveness check is already scheduled and skip scheduling if it is.
* @internal
*/
#scheduleLivenessCheck() {
if (this.isStopping) {
logger().debug("actor is stopping, skipping liveness check");
return;
this.__removeConn(conn);
}
}

this.#scheduleEventInner(
Date.now() + this.#config.options.connectionLivenessInterval,
{ checkConnectionLiveness: true },
);
}

/**
Expand Down Expand Up @@ -1613,12 +1607,6 @@ export class ActorInstance<
}
}

// Wait for any background tasks to finish, with timeout
await this.#waitBackgroundPromises(this.#config.options.waitUntilTimeout);

// Write state
await this.saveState({ immediate: true, allowStoppingState: true });

// Disconnect existing connections
const promises: Promise<unknown>[] = [];
for (const connection of this.#connections.values()) {
Expand All @@ -1627,6 +1615,18 @@ export class ActorInstance<
// TODO: Figure out how to abort HTTP requests on shutdown
}

// Wait for any background tasks to finish, with timeout
await this.#waitBackgroundPromises(this.#config.options.waitUntilTimeout);

// Clear timeouts
if (this.#pendingSaveTimeout) clearTimeout(this.#pendingSaveTimeout);
if (this.#sleepTimeout) clearTimeout(this.#sleepTimeout);
if (this.#checkConnLivenessInterval)
clearInterval(this.#checkConnLivenessInterval);

// Write state
await this.saveState({ immediate: true, allowStoppingState: true });

// Await all `close` event listeners with 1.5 second timeout
const res = Promise.race([
Promise.all(promises).then(() => false),
Expand All @@ -1640,6 +1640,12 @@ export class ActorInstance<
"timed out waiting for connections to close, shutting down anyway",
);
}

// Wait for queues to finish
if (this.#persistWriteQueue.runningDrainLoop)
await this.#persistWriteQueue.runningDrainLoop;
if (this.#alarmWriteQueue.runningDrainLoop)
await this.#alarmWriteQueue.runningDrainLoop;
}

/** Abort signal that fires when the actor is stopping. */
Expand Down
17 changes: 6 additions & 11 deletions packages/core/src/actor/persisted.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type { ConnectionDriver } from "./connection";
import type { ScheduledLivenessCheckEvent } from "./schedule";

/** State object that gets automatically persisted to storage. */
export interface PersistedActor<S, CP, CS, I> {
Expand Down Expand Up @@ -43,22 +42,18 @@ export interface PersistedSubscription {
}

export interface GenericPersistedScheduleEvent {
// Event ID
e: string;
// Timestamp
t: number;
// Action name
a: string;
// Arguments
ar?: unknown[];
}

export interface ScheduleLivenessCheckEvent {
ccl: ScheduledLivenessCheckEvent["checkConnectionLiveness"];
export type PersistedScheduleEventKind = { g: GenericPersistedScheduleEvent };

export interface PersistedScheduleEvent {
// Event ID
e: string;
// Timestamp
t: number;
k: PersistedScheduleEventKind;
}

export type PersistedScheduleEvent =
| GenericPersistedScheduleEvent
| ScheduleLivenessCheckEvent;
13 changes: 0 additions & 13 deletions packages/core/src/actor/schedule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,3 @@ export class Schedule {
await this.#actor.scheduleEvent(timestamp, fn, args);
}
}

export interface GenericScheduledEvent {
fn: string;
args: unknown[];
id?: string;
}
export interface ScheduledLivenessCheckEvent {
checkConnectionLiveness: {};
}

export type ScheduledEvent =
| GenericScheduledEvent
| ScheduledLivenessCheckEvent;
4 changes: 0 additions & 4 deletions packages/core/src/drivers/file-system/global-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,6 @@ export class FileSystemGlobalState {
id: actorId,
genericConnGlobalState: new GenericConnGlobalState(),
removed: false,
stateWriteQueue: new SinglePromiseQueue(),
alarmWriteQueue: new SinglePromiseQueue(),
};
this.#actors.set(actorId, entry);
return entry;
Expand Down Expand Up @@ -313,8 +311,6 @@ export class FileSystemGlobalState {
// Wait for actor to fully start before stopping it to avoid race conditions
if (actor.loadPromise) await actor.loadPromise.catch();
if (actor.startPromise?.promise) await actor.startPromise.promise.catch();
if (actor.stateWriteQueue.runningDrainLoop)
await actor.stateWriteQueue.runningDrainLoop.catch();

// Mark as removed
actor.removed = true;
Expand Down
Loading