Skip to content

Commit 47cb55f

Browse files
committed
chore(rivetkit): split ActorInstance logic in to multiple classes
1 parent eb0e6f2 commit 47cb55f

File tree

31 files changed

+2793
-2661
lines changed

31 files changed

+2793
-2661
lines changed

rivetkit-typescript/packages/cloudflare-workers/src/actor-handler-do.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ export function createActorDurableObject(
203203

204204
// Load the actor instance and trigger alarm
205205
const actor = await actorDriver.loadActor(actorId);
206-
await actor._onAlarm();
206+
await actor.onAlarm();
207207
}
208208
};
209209
}

rivetkit-typescript/packages/rivetkit/src/actor/config.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import { z } from "zod";
22
import type { UniversalWebSocket } from "@/common/websocket-interface";
3-
import type { ActionContext } from "./action";
4-
import type { Conn } from "./conn";
5-
import type { ActorContext } from "./context";
3+
import type { Conn } from "./conn/mod";
4+
import type { ActionContext } from "./contexts/action";
5+
import type { ActorContext } from "./contexts/actor";
66
import type { AnyDatabaseProvider } from "./database";
77

88
export type InitContext = ActorContext<

rivetkit-typescript/packages/rivetkit/src/actor/conn-drivers.ts

Lines changed: 0 additions & 171 deletions
This file was deleted.

rivetkit-typescript/packages/rivetkit/src/actor/conn-socket.ts

Lines changed: 0 additions & 9 deletions
This file was deleted.
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import type { AnyConn } from "@/actor/conn/mod";
2+
import type { AnyActorInstance } from "@/actor/instance/mod";
3+
import type { CachedSerializer } from "@/actor/protocol/serde";
4+
import type * as protocol from "@/schemas/client-protocol/mod";
5+
6+
export enum DriverReadyState {
7+
UNKNOWN = -1,
8+
CONNECTING = 0,
9+
OPEN = 1,
10+
CLOSING = 2,
11+
CLOSED = 3,
12+
}
13+
14+
export interface ConnDriver {
15+
requestId: string;
16+
requestIdBuf: ArrayBuffer | undefined;
17+
hibernatable: boolean;
18+
19+
sendMessage?(
20+
actor: AnyActorInstance,
21+
conn: AnyConn,
22+
message: CachedSerializer<protocol.ToClient>,
23+
): void;
24+
25+
/**
26+
* This returns a promise since we commonly disconnect at the end of a program, and not waiting will cause the socket to not close cleanly.
27+
*/
28+
disconnect(
29+
actor: AnyActorInstance,
30+
conn: AnyConn,
31+
reason?: string,
32+
): Promise<void>;
33+
34+
/** Terminates the connection without graceful handling. */
35+
terminate?(actor: AnyActorInstance, conn: AnyConn): void;
36+
37+
/**
38+
* Returns the ready state of the connection.
39+
* This is used to determine if the connection is ready to send messages, or if the connection is stale.
40+
*/
41+
getConnectionReadyState(
42+
actor: AnyActorInstance,
43+
conn: AnyConn,
44+
): DriverReadyState | undefined;
45+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import { type ConnDriver, DriverReadyState } from "../driver";
2+
3+
export type ConnHttpState = Record<never, never>;
4+
5+
export function createHttpSocket(): ConnDriver {
6+
return {
7+
requestId: crypto.randomUUID(),
8+
requestIdBuf: undefined,
9+
hibernatable: false,
10+
getConnectionReadyState(_actor, _conn) {
11+
// TODO: This might not be the correct logic
12+
return DriverReadyState.OPEN;
13+
},
14+
disconnect: async () => {
15+
// Noop
16+
// TODO: Configure with abort signals to abort the request
17+
},
18+
};
19+
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import type { WSContext } from "hono/ws";
2+
import type { AnyConn } from "@/actor/conn/mod";
3+
import type { AnyActorInstance } from "@/actor/instance/mod";
4+
import type { CachedSerializer, Encoding } from "@/actor/protocol/serde";
5+
import type * as protocol from "@/schemas/client-protocol/mod";
6+
import { type ConnDriver, DriverReadyState } from "../driver";
7+
8+
export type ConnDriverWebSocketState = {};
9+
10+
export function createWebSocketSocket(
11+
requestId: string,
12+
requestIdBuf: ArrayBuffer | undefined,
13+
hibernatable: boolean,
14+
encoding: Encoding,
15+
websocket: WSContext,
16+
closePromise: Promise<void>,
17+
): ConnDriver {
18+
return {
19+
requestId,
20+
requestIdBuf,
21+
hibernatable,
22+
sendMessage: (
23+
actor: AnyActorInstance,
24+
conn: AnyConn,
25+
message: CachedSerializer<protocol.ToClient>,
26+
) => {
27+
if (websocket.readyState !== DriverReadyState.OPEN) {
28+
actor.rLog.warn({
29+
msg: "attempting to send message to closed websocket, this is likely a bug in RivetKit",
30+
connId: conn.id,
31+
wsReadyState: websocket.readyState,
32+
});
33+
return;
34+
}
35+
36+
const serialized = message.serialize(encoding);
37+
38+
actor.rLog.debug({
39+
msg: "sending websocket message",
40+
encoding: encoding,
41+
dataType: typeof serialized,
42+
isUint8Array: serialized instanceof Uint8Array,
43+
isArrayBuffer: serialized instanceof ArrayBuffer,
44+
dataLength:
45+
(serialized as any).byteLength ||
46+
(serialized as any).length,
47+
});
48+
49+
// Convert Uint8Array to ArrayBuffer for proper transmission
50+
if (serialized instanceof Uint8Array) {
51+
const buffer = serialized.buffer.slice(
52+
serialized.byteOffset,
53+
serialized.byteOffset + serialized.byteLength,
54+
);
55+
// Handle SharedArrayBuffer case
56+
if (buffer instanceof SharedArrayBuffer) {
57+
const arrayBuffer = new ArrayBuffer(buffer.byteLength);
58+
new Uint8Array(arrayBuffer).set(new Uint8Array(buffer));
59+
actor.rLog.debug({
60+
msg: "converted SharedArrayBuffer to ArrayBuffer",
61+
byteLength: arrayBuffer.byteLength,
62+
});
63+
websocket.send(arrayBuffer);
64+
} else {
65+
actor.rLog.debug({
66+
msg: "sending ArrayBuffer",
67+
byteLength: buffer.byteLength,
68+
});
69+
websocket.send(buffer);
70+
}
71+
} else {
72+
actor.rLog.debug({
73+
msg: "sending string data",
74+
length: (serialized as string).length,
75+
});
76+
websocket.send(serialized);
77+
}
78+
},
79+
80+
disconnect: async (
81+
_actor: AnyActorInstance,
82+
_conn: AnyConn,
83+
reason?: string,
84+
) => {
85+
// Close socket
86+
websocket.close(1000, reason);
87+
88+
// Create promise to wait for socket to close gracefully
89+
await closePromise;
90+
},
91+
92+
terminate: () => {
93+
(websocket as any).terminate();
94+
},
95+
96+
getConnectionReadyState: (
97+
_actor: AnyActorInstance,
98+
_conn: AnyConn,
99+
): DriverReadyState | undefined => {
100+
return websocket.readyState;
101+
},
102+
};
103+
}

0 commit comments

Comments
 (0)