Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ export function createActorDurableObject(

// Load the actor instance and trigger alarm
const actor = await actorDriver.loadActor(actorId);
await actor._onAlarm();
await actor.onAlarm();
}
};
}
6 changes: 3 additions & 3 deletions rivetkit-typescript/packages/rivetkit/src/actor/config.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { z } from "zod";
import type { UniversalWebSocket } from "@/common/websocket-interface";
import type { ActionContext } from "./action";
import type { Conn } from "./conn";
import type { ActorContext } from "./context";
import type { Conn } from "./conn/mod";
import type { ActionContext } from "./contexts/action";
import type { ActorContext } from "./contexts/actor";
import type { AnyDatabaseProvider } from "./database";

export type InitContext = ActorContext<
Expand Down
171 changes: 0 additions & 171 deletions rivetkit-typescript/packages/rivetkit/src/actor/conn-drivers.ts

This file was deleted.

This file was deleted.

45 changes: 45 additions & 0 deletions rivetkit-typescript/packages/rivetkit/src/actor/conn/driver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import type { AnyConn } from "@/actor/conn/mod";
import type { AnyActorInstance } from "@/actor/instance/mod";
import type { CachedSerializer } from "@/actor/protocol/serde";
import type * as protocol from "@/schemas/client-protocol/mod";

export enum DriverReadyState {
UNKNOWN = -1,
CONNECTING = 0,
OPEN = 1,
CLOSING = 2,
CLOSED = 3,
}

export interface ConnDriver {
requestId: string;
requestIdBuf: ArrayBuffer | undefined;
hibernatable: boolean;

sendMessage?(
actor: AnyActorInstance,
conn: AnyConn,
message: CachedSerializer<protocol.ToClient>,
): void;

/**
* This returns a promise since we commonly disconnect at the end of a program, and not waiting will cause the socket to not close cleanly.
*/
disconnect(
actor: AnyActorInstance,
conn: AnyConn,
reason?: string,
): Promise<void>;

/** Terminates the connection without graceful handling. */
terminate?(actor: AnyActorInstance, conn: AnyConn): void;

/**
* Returns the ready state of the connection.
* This is used to determine if the connection is ready to send messages, or if the connection is stale.
*/
getConnectionReadyState(
actor: AnyActorInstance,
conn: AnyConn,
): DriverReadyState | undefined;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { type ConnDriver, DriverReadyState } from "../driver";

export type ConnHttpState = Record<never, never>;

export function createHttpSocket(): ConnDriver {
return {
requestId: crypto.randomUUID(),
requestIdBuf: undefined,
hibernatable: false,
getConnectionReadyState(_actor, _conn) {
// TODO: This might not be the correct logic
return DriverReadyState.OPEN;
},
disconnect: async () => {
// Noop
// TODO: Configure with abort signals to abort the request
},
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import type { WSContext } from "hono/ws";
import type { AnyConn } from "@/actor/conn/mod";
import type { AnyActorInstance } from "@/actor/instance/mod";
import type { CachedSerializer, Encoding } from "@/actor/protocol/serde";
import type * as protocol from "@/schemas/client-protocol/mod";
import { type ConnDriver, DriverReadyState } from "../driver";

export type ConnDriverWebSocketState = {};

export function createWebSocketSocket(
requestId: string,
requestIdBuf: ArrayBuffer | undefined,
hibernatable: boolean,
encoding: Encoding,
websocket: WSContext,
closePromise: Promise<void>,
): ConnDriver {
return {
requestId,
requestIdBuf,
hibernatable,
sendMessage: (
actor: AnyActorInstance,
conn: AnyConn,
message: CachedSerializer<protocol.ToClient>,
) => {
if (websocket.readyState !== DriverReadyState.OPEN) {
actor.rLog.warn({
msg: "attempting to send message to closed websocket, this is likely a bug in RivetKit",
connId: conn.id,
wsReadyState: websocket.readyState,
});
return;
}

const serialized = message.serialize(encoding);

actor.rLog.debug({
msg: "sending websocket message",
encoding: encoding,
dataType: typeof serialized,
isUint8Array: serialized instanceof Uint8Array,
isArrayBuffer: serialized instanceof ArrayBuffer,
dataLength:
(serialized as any).byteLength ||
(serialized as any).length,
});

// Convert Uint8Array to ArrayBuffer for proper transmission
if (serialized instanceof Uint8Array) {
const buffer = serialized.buffer.slice(
serialized.byteOffset,
serialized.byteOffset + serialized.byteLength,
);
// Handle SharedArrayBuffer case
if (buffer instanceof SharedArrayBuffer) {
const arrayBuffer = new ArrayBuffer(buffer.byteLength);
new Uint8Array(arrayBuffer).set(new Uint8Array(buffer));
actor.rLog.debug({
msg: "converted SharedArrayBuffer to ArrayBuffer",
byteLength: arrayBuffer.byteLength,
});
websocket.send(arrayBuffer);
} else {
actor.rLog.debug({
msg: "sending ArrayBuffer",
byteLength: buffer.byteLength,
});
websocket.send(buffer);
}
} else {
actor.rLog.debug({
msg: "sending string data",
length: (serialized as string).length,
});
websocket.send(serialized);
}
},

disconnect: async (
_actor: AnyActorInstance,
_conn: AnyConn,
reason?: string,
) => {
// Close socket
websocket.close(1000, reason);

// Create promise to wait for socket to close gracefully
await closePromise;
},

terminate: () => {
(websocket as any).terminate();
},

getConnectionReadyState: (
_actor: AnyActorInstance,
_conn: AnyConn,
): DriverReadyState | undefined => {
return websocket.readyState;
},
};
}
Loading
Loading