Skip to content

Commit 929b3eb

Browse files
committed
fix(rivetkit): only trigger onStateChange and inspector updates for actor state changes (#3393)
1 parent bebb8ae commit 929b3eb

File tree

3 files changed

+115
-24
lines changed

3 files changed

+115
-24
lines changed

rivetkit-typescript/packages/rivetkit/src/actor/instance.ts

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ import type {
5353
import { processMessage } from "./protocol/old";
5454
import { CachedSerializer } from "./protocol/serde";
5555
import { Schedule } from "./schedule";
56-
import { DeadlineError, deadline } from "./utils";
56+
import { DeadlineError, deadline, isConnStatePath, isStatePath } from "./utils";
5757

5858
export const PERSIST_SYMBOL = Symbol("persist");
5959

@@ -698,7 +698,6 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
698698
// Set raw persist object
699699
this.#persistRaw = target;
700700

701-
// TODO: Only validate this for conn state
702701
// TODO: Allow disabling in production
703702
// If this can't be proxied, return raw value
704703
if (target === null || typeof target !== "object") {
@@ -732,35 +731,46 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
732731
_previousValue: any,
733732
_applyData: any,
734733
) => {
735-
if (path !== "state" && !path.startsWith("state.")) {
736-
return;
734+
const actorStatePath = isStatePath(path);
735+
const connStatePath = isConnStatePath(path);
736+
737+
// Validate CBOR serializability for state changes
738+
if (actorStatePath || connStatePath) {
739+
let invalidPath = "";
740+
if (
741+
!isCborSerializable(
742+
value,
743+
(invalidPathPart) => {
744+
invalidPath = invalidPathPart;
745+
},
746+
"",
747+
)
748+
) {
749+
throw new errors.InvalidStateType({
750+
path: path + (invalidPath ? `.${invalidPath}` : ""),
751+
});
752+
}
737753
}
738754

739-
let invalidPath = "";
740-
if (
741-
!isCborSerializable(
742-
value,
743-
(invalidPathPart) => {
744-
invalidPath = invalidPathPart;
745-
},
746-
"",
747-
)
748-
) {
749-
throw new errors.InvalidStateType({
750-
path: path + (invalidPath ? `.${invalidPath}` : ""),
751-
});
752-
}
755+
this.#rLog.debug({
756+
msg: "onChange triggered, setting persistChanged=true",
757+
path,
758+
});
753759
this.#persistChanged = true;
754760

755-
// Inform the inspector about state changes
756-
this.inspector.emitter.emit(
757-
"stateUpdated",
758-
this.#persist.state,
759-
);
761+
// Inform the inspector about state changes (only for state path)
762+
if (actorStatePath) {
763+
this.inspector.emitter.emit(
764+
"stateUpdated",
765+
this.#persist.state,
766+
);
767+
}
760768

761769
// Call onStateChange if it exists
770+
//
762771
// Skip if we're already inside onStateChange to prevent infinite recursion
763772
if (
773+
actorStatePath &&
764774
this.#config.onStateChange &&
765775
this.#ready &&
766776
!this.#isInOnStateChange
@@ -806,6 +816,8 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
806816
this.#rLog.info({
807817
msg: "actor restoring",
808818
connections: persistData.connections.length,
819+
hibernatableWebSockets:
820+
persistData.hibernatableWebSocket.length,
809821
});
810822

811823
// Set initial state
@@ -1868,6 +1880,13 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
18681880
async saveState(opts: SaveStateOptions) {
18691881
this.#assertReady(opts.allowStoppingState);
18701882

1883+
this.#rLog.debug({
1884+
msg: "saveState called",
1885+
persistChanged: this.#persistChanged,
1886+
allowStoppingState: opts.allowStoppingState,
1887+
immediate: opts.immediate,
1888+
});
1889+
18711890
if (this.#persistChanged) {
18721891
if (opts.immediate) {
18731892
// Save immediately
@@ -2075,7 +2094,8 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
20752094
for (const connection of this.#connections.values()) {
20762095
promises.push(connection.disconnect());
20772096

2078-
// TODO: Figure out how to abort HTTP requests on shutdown
2097+
// TODO: Figure out how to abort HTTP requests on shutdown. This
2098+
// might already be handled by the engine runner tunnel shutdown.
20792099
}
20802100

20812101
// Wait for any background tasks to finish, with timeout
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import { describe, expect, test } from "vitest";
2+
import { isConnStatePath, isStatePath } from "./utils";
3+
4+
describe("isStatePath", () => {
5+
test("matches exact state", () => {
6+
expect(isStatePath("state")).toBe(true);
7+
});
8+
9+
test("matches nested state paths", () => {
10+
expect(isStatePath("state.foo")).toBe(true);
11+
expect(isStatePath("state.foo.bar")).toBe(true);
12+
});
13+
14+
test("does not match other paths", () => {
15+
expect(isStatePath("connections")).toBe(false);
16+
expect(isStatePath("stateX")).toBe(false);
17+
expect(isStatePath("mystate")).toBe(false);
18+
});
19+
});
20+
21+
describe("isConnStatePath", () => {
22+
test("matches connection state paths", () => {
23+
expect(isConnStatePath("connections.0.state")).toBe(true);
24+
expect(isConnStatePath("connections.123.state")).toBe(true);
25+
});
26+
27+
test("matches nested connection state paths", () => {
28+
expect(isConnStatePath("connections.0.state.foo")).toBe(true);
29+
expect(isConnStatePath("connections.5.state.bar.baz")).toBe(true);
30+
});
31+
32+
test("does not match non-state connection paths", () => {
33+
expect(isConnStatePath("connections.0.params")).toBe(false);
34+
expect(isConnStatePath("connections.0.token")).toBe(false);
35+
expect(isConnStatePath("connections.0")).toBe(false);
36+
});
37+
38+
test("does not match other paths", () => {
39+
expect(isConnStatePath("state")).toBe(false);
40+
expect(isConnStatePath("connections")).toBe(false);
41+
expect(isConnStatePath("other.0.state")).toBe(false);
42+
});
43+
44+
test("does not match malformed paths", () => {
45+
expect(isConnStatePath("connections.state")).toBe(false);
46+
expect(isConnStatePath("connections.0.stateX")).toBe(false);
47+
});
48+
});

rivetkit-typescript/packages/rivetkit/src/actor/utils.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,3 +104,26 @@ export function generateRandomString(length = 32) {
104104
}
105105
return result;
106106
}
107+
108+
/**
109+
* Checks if a path is an actor state path within the persisted actor data.
110+
*/
111+
export function isStatePath(path: string): boolean {
112+
return path === "state" || path.startsWith("state.");
113+
}
114+
115+
/**
116+
* Checks if a path is a connection state path within the persisted actor data.
117+
*/
118+
export function isConnStatePath(path: string): boolean {
119+
if (!path.startsWith("connections.")) {
120+
return false;
121+
}
122+
const stateIndex = path.indexOf(".state", 12); // Start after "connections."
123+
if (stateIndex === -1) {
124+
return false;
125+
}
126+
const afterState = stateIndex + 6; // ".state".length = 6
127+
// Check if ".state" is followed by end of string or "."
128+
return path.length === afterState || path[afterState] === ".";
129+
}

0 commit comments

Comments
 (0)